Skip to content

Commit

Permalink
I hate REPLACE_RANGE and actual_part_name
Browse files Browse the repository at this point in the history
  • Loading branch information
tavplubix committed Mar 20, 2024
1 parent 2de9697 commit 0f7dc2c
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 21 deletions.
1 change: 1 addition & 0 deletions src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h
Expand Up @@ -93,6 +93,7 @@ struct ReplicatedMergeTreeLogEntryData
MergeTreeDataPartFormat new_part_format;
String block_id; /// For parts of level zero, the block identifier for deduplication (node name in /blocks/).
mutable String actual_new_part_name; /// GET_PART could actually fetch a part covering 'new_part_name'.
mutable std::unordered_set<String> replace_range_actual_new_part_names; /// Same as above, but for REPLACE_RANGE
UUID new_part_uuid = UUIDHelpers::Nil;

Strings source_parts;
Expand Down
60 changes: 39 additions & 21 deletions src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Expand Up @@ -342,6 +342,11 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
/// NOTE actual_new_part_name is very confusing and error-prone. This approach must be fixed.
removeCoveredPartsFromMutations(entry->actual_new_part_name, /*remove_part = */ false, /*remove_covered_parts = */ true);
}
for (const auto & actual_part : entry->replace_range_actual_new_part_names)
{
LOG_TEST(log, "Entry {} has actual new part name {}, removing it from mutations", entry->znode_name, actual_part);
removeCoveredPartsFromMutations(actual_part, /*remove_part = */ false, /*remove_covered_parts = */ true);
}

LOG_TEST(log, "Adding parts [{}] to current parts", fmt::join(entry_virtual_parts, ", "));

Expand Down Expand Up @@ -1180,9 +1185,9 @@ bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry
if (entry_for_same_part_it != future_parts.end())
{
const LogEntry & another_entry = *entry_for_same_part_it->second;
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} (actual part {})"
"because another log entry {} of type {} for the same part ({}) is being processed.";
LOG_INFO(LogToStr(out_reason, log), fmt_string, entry.znode_name, entry.type, entry.new_part_name,
LOG_INFO(LogToStr(out_reason, log), fmt_string, entry.znode_name, entry.type, entry.new_part_name, new_part_name,
another_entry.znode_name, another_entry.type, another_entry.new_part_name);
return true;

Expand All @@ -1198,6 +1203,7 @@ bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry
auto result_part = MergeTreePartInfo::fromPartName(new_part_name, format_version);

/// It can slow down when the size of `future_parts` is large. But it can not be large, since background pool is limited.
/// (well, it can actually, thanks to REPLACE_RANGE, but it's a rare case)
for (const auto & future_part_elem : future_parts)
{
auto future_part = MergeTreePartInfo::fromPartName(future_part_elem.first, format_version);
Expand Down Expand Up @@ -1608,26 +1614,39 @@ void ReplicatedMergeTreeQueue::CurrentlyExecuting::setActualPartName(
std::unique_lock<std::mutex> & state_lock,
std::vector<LogEntryPtr> & covered_entries_to_wait)
{
if (actual_part_name.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Actual part name is empty");

if (!entry.actual_new_part_name.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Entry actual part isn't empty yet. This is a bug.");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Entry {} actual part isn't empty yet: '{}'. This is a bug.",
entry.znode_name, entry.actual_new_part_name);

entry.actual_new_part_name = actual_part_name;
auto actual_part_info = MergeTreePartInfo::fromPartName(actual_part_name, queue.format_version);
for (const auto & other_part_name : entry.replace_range_actual_new_part_names)
if (!MergeTreePartInfo::fromPartName(other_part_name, queue.format_version).isDisjoint(actual_part_info))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Entry {} already has actual part {} non-disjoint with {}. This is a bug.",
entry.actual_new_part_name, other_part_name, actual_part_name);

/// Check if it is the same (and already added) part.
if (entry.actual_new_part_name == entry.new_part_name)
if (actual_part_name == entry.new_part_name)
return;

if (!queue.future_parts.emplace(entry.actual_new_part_name, entry.shared_from_this()).second)
if (!queue.future_parts.emplace(actual_part_name, entry.shared_from_this()).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attaching already existing future part {}. This is a bug. "
"It happened on attempt to execute {}: {}",
entry.actual_new_part_name, entry.znode_name, entry.toString());
actual_part_name, entry.znode_name, entry.toString());

if (entry.type == LogEntry::REPLACE_RANGE)
entry.replace_range_actual_new_part_names.insert(actual_part_name);
else
entry.actual_new_part_name = actual_part_name;

for (LogEntryPtr & covered_entry : covered_entries_to_wait)
{
if (&entry == covered_entry.get())
continue;
LOG_TRACE(queue.log, "Waiting for {} producing {} to finish before executing {} producing not disjoint part {}",
covered_entry->znode_name, covered_entry->new_part_name, entry.znode_name, entry.new_part_name);
LOG_TRACE(queue.log, "Waiting for {} producing {} to finish before executing {} producing not disjoint part {} (actual part {})",
covered_entry->znode_name, covered_entry->new_part_name, entry.znode_name, entry.new_part_name, actual_part_name);
covered_entry->execution_complete.wait(state_lock, [&covered_entry] { return !covered_entry->currently_executing; });
}
}
Expand All @@ -1646,25 +1665,24 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
entry->currently_executing = false;
entry->execution_complete.notify_all();

for (const String & new_part_name : entry->getVirtualPartNames(queue.format_version))
auto erase_and_check = [this](const String & part_name)
{
if (!queue.future_parts.erase(new_part_name))
if (!queue.future_parts.erase(part_name))
{
LOG_ERROR(queue.log, "Untagging already untagged future part {}. This is a bug.", new_part_name);
LOG_ERROR(queue.log, "Untagging already untagged future part {}. This is a bug.", part_name);
assert(false);
}
}
};

for (const String & new_part_name : entry->getVirtualPartNames(queue.format_version))
erase_and_check(new_part_name);

if (!entry->actual_new_part_name.empty())
{
if (entry->actual_new_part_name != entry->new_part_name && !queue.future_parts.erase(entry->actual_new_part_name))
{
LOG_ERROR(queue.log, "Untagging already untagged future part {}. This is a bug.", entry->actual_new_part_name);
assert(false);
}
erase_and_check(entry->actual_new_part_name);

for (const auto & actual_part : entry->replace_range_actual_new_part_names)
erase_and_check(actual_part);

entry->actual_new_part_name.clear();
}
}


Expand Down
@@ -0,0 +1,8 @@
all_0_0_0 0
all_0_1_1 1
all_1_1_0 0
all_2_2_0 0
all_2_3_1 1
all_3_3_0 0
0
40 1580
@@ -0,0 +1,46 @@
-- Tags: no-parallel

create database if not exists shard_0;
create database if not exists shard_1;

drop table if exists shard_0.from_1;
drop table if exists shard_1.from_1;
drop table if exists shard_0.to;
drop table if exists shard_1.to;

create table shard_0.from_1 (x UInt32) engine = ReplicatedMergeTree('/clickhouse/tables/from_1_' || currentDatabase(), '0') order by x settings old_parts_lifetime=1, max_cleanup_delay_period=1, cleanup_delay_period=1;
create table shard_1.from_1 (x UInt32) engine = ReplicatedMergeTree('/clickhouse/tables/from_1_' || currentDatabase(), '1') order by x settings old_parts_lifetime=1, max_cleanup_delay_period=1, cleanup_delay_period=1;

system stop merges shard_0.from_1;
insert into shard_0.from_1 select number + 20 from numbers(10);
insert into shard_0.from_1 select number + 30 from numbers(10);

insert into shard_0.from_1 select number + 40 from numbers(10);
insert into shard_0.from_1 select number + 50 from numbers(10);

system sync replica shard_1.from_1;

create table shard_0.to (x UInt32) engine = ReplicatedMergeTree('/clickhouse/tables/to_' || currentDatabase(), '0') order by x settings old_parts_lifetime=1, max_cleanup_delay_period=1, cleanup_delay_period=1, max_parts_to_merge_at_once=2;

create table shard_1.to (x UInt32) engine = ReplicatedMergeTree('/clickhouse/tables/to_' || currentDatabase(), '1') order by x settings old_parts_lifetime=1, max_cleanup_delay_period=1, cleanup_delay_period=1, max_parts_to_merge_at_once=2;

detach table shard_1.to;

alter table shard_0.from_1 on cluster test_cluster_two_shards_different_databases move partition tuple() to table shard_0.to format Null settings distributed_ddl_output_mode='never_throw', distributed_ddl_task_timeout = 1;

drop table if exists shard_0.from_1;
drop table if exists shard_1.from_1;
OPTIMIZE TABLE shard_0.to;
OPTIMIZE TABLE shard_0.to;
select name, active from system.parts where database='shard_0' and table='to' order by name;

system restart replica shard_0.to;

select sleep(3);

attach table shard_1.to;
system sync replica shard_1.to;
select count(), sum(x) from shard_1.to;

drop table if exists shard_0.to;
drop table if exists shard_1.to;

0 comments on commit 0f7dc2c

Please sign in to comment.