Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix actual_part_name for REPLACE_RANGE (Entry actual part isn't empty yet) #61675

Merged
merged 4 commits into from Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h
Expand Up @@ -93,6 +93,7 @@ struct ReplicatedMergeTreeLogEntryData
MergeTreeDataPartFormat new_part_format;
String block_id; /// For parts of level zero, the block identifier for deduplication (node name in /blocks/).
mutable String actual_new_part_name; /// GET_PART could actually fetch a part covering 'new_part_name'.
mutable std::unordered_set<String> replace_range_actual_new_part_names; /// Same as above, but for REPLACE_RANGE
UUID new_part_uuid = UUIDHelpers::Nil;

Strings source_parts;
Expand Down
63 changes: 42 additions & 21 deletions src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Expand Up @@ -342,6 +342,11 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
/// NOTE actual_new_part_name is very confusing and error-prone. This approach must be fixed.
removeCoveredPartsFromMutations(entry->actual_new_part_name, /*remove_part = */ false, /*remove_covered_parts = */ true);
}
for (const auto & actual_part : entry->replace_range_actual_new_part_names)
{
LOG_TEST(log, "Entry {} has actual new part name {}, removing it from mutations", entry->znode_name, actual_part);
removeCoveredPartsFromMutations(actual_part, /*remove_part = */ false, /*remove_covered_parts = */ true);
}

LOG_TEST(log, "Adding parts [{}] to current parts", fmt::join(entry_virtual_parts, ", "));

Expand Down Expand Up @@ -1180,9 +1185,9 @@ bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry
if (entry_for_same_part_it != future_parts.end())
{
const LogEntry & another_entry = *entry_for_same_part_it->second;
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} (actual part {})"
"because another log entry {} of type {} for the same part ({}) is being processed.";
LOG_INFO(LogToStr(out_reason, log), fmt_string, entry.znode_name, entry.type, entry.new_part_name,
LOG_INFO(LogToStr(out_reason, log), fmt_string, entry.znode_name, entry.type, entry.new_part_name, new_part_name,
another_entry.znode_name, another_entry.type, another_entry.new_part_name);
return true;

Expand All @@ -1198,6 +1203,7 @@ bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry
auto result_part = MergeTreePartInfo::fromPartName(new_part_name, format_version);

/// It can slow down when the size of `future_parts` is large. But it can not be large, since background pool is limited.
/// (well, it can actually, thanks to REPLACE_RANGE, but it's a rare case)
for (const auto & future_part_elem : future_parts)
{
auto future_part = MergeTreePartInfo::fromPartName(future_part_elem.first, format_version);
Expand Down Expand Up @@ -1608,26 +1614,39 @@ void ReplicatedMergeTreeQueue::CurrentlyExecuting::setActualPartName(
std::unique_lock<std::mutex> & state_lock,
std::vector<LogEntryPtr> & covered_entries_to_wait)
{
if (actual_part_name.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Actual part name is empty");

if (!entry.actual_new_part_name.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Entry actual part isn't empty yet. This is a bug.");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Entry {} actual part isn't empty yet: '{}'. This is a bug.",
entry.znode_name, entry.actual_new_part_name);

entry.actual_new_part_name = actual_part_name;
auto actual_part_info = MergeTreePartInfo::fromPartName(actual_part_name, queue.format_version);
for (const auto & other_part_name : entry.replace_range_actual_new_part_names)
if (!MergeTreePartInfo::fromPartName(other_part_name, queue.format_version).isDisjoint(actual_part_info))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Entry {} already has actual part {} non-disjoint with {}. This is a bug.",
entry.actual_new_part_name, other_part_name, actual_part_name);

/// Check if it is the same (and already added) part.
if (entry.actual_new_part_name == entry.new_part_name)
if (actual_part_name == entry.new_part_name)
return;

if (!queue.future_parts.emplace(entry.actual_new_part_name, entry.shared_from_this()).second)
if (!queue.future_parts.emplace(actual_part_name, entry.shared_from_this()).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attaching already existing future part {}. This is a bug. "
"It happened on attempt to execute {}: {}",
entry.actual_new_part_name, entry.znode_name, entry.toString());
actual_part_name, entry.znode_name, entry.toString());

if (entry.type == LogEntry::REPLACE_RANGE)
entry.replace_range_actual_new_part_names.insert(actual_part_name);
else
entry.actual_new_part_name = actual_part_name;

for (LogEntryPtr & covered_entry : covered_entries_to_wait)
{
if (&entry == covered_entry.get())
continue;
LOG_TRACE(queue.log, "Waiting for {} producing {} to finish before executing {} producing not disjoint part {}",
covered_entry->znode_name, covered_entry->new_part_name, entry.znode_name, entry.new_part_name);
LOG_TRACE(queue.log, "Waiting for {} producing {} to finish before executing {} producing not disjoint part {} (actual part {})",
covered_entry->znode_name, covered_entry->new_part_name, entry.znode_name, entry.new_part_name, actual_part_name);
covered_entry->execution_complete.wait(state_lock, [&covered_entry] { return !covered_entry->currently_executing; });
}
}
Expand All @@ -1646,25 +1665,27 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
entry->currently_executing = false;
entry->execution_complete.notify_all();

for (const String & new_part_name : entry->getVirtualPartNames(queue.format_version))
auto erase_and_check = [this](const String & part_name)
{
if (!queue.future_parts.erase(new_part_name))
if (!queue.future_parts.erase(part_name))
{
LOG_ERROR(queue.log, "Untagging already untagged future part {}. This is a bug.", new_part_name);
LOG_ERROR(queue.log, "Untagging already untagged future part {}. This is a bug.", part_name);
assert(false);
}
}
};

for (const String & new_part_name : entry->getVirtualPartNames(queue.format_version))
erase_and_check(new_part_name);

if (!entry->actual_new_part_name.empty())
{
if (entry->actual_new_part_name != entry->new_part_name && !queue.future_parts.erase(entry->actual_new_part_name))
{
LOG_ERROR(queue.log, "Untagging already untagged future part {}. This is a bug.", entry->actual_new_part_name);
assert(false);
}
erase_and_check(entry->actual_new_part_name);

entry->actual_new_part_name.clear();
}
entry->actual_new_part_name.clear();

for (const auto & actual_part : entry->replace_range_actual_new_part_names)
erase_and_check(actual_part);

entry->replace_range_actual_new_part_names.clear();
}


Expand Down
@@ -0,0 +1,4 @@
all_0_1_1 1
all_2_3_1 1
0
40 1580
@@ -0,0 +1,47 @@
-- Tags: no-parallel

create database if not exists shard_0;
create database if not exists shard_1;

drop table if exists shard_0.from_1;
drop table if exists shard_1.from_1;
drop table if exists shard_0.to;
drop table if exists shard_1.to;

create table shard_0.from_1 (x UInt32) engine = ReplicatedMergeTree('/clickhouse/tables/from_1_' || currentDatabase(), '0') order by x settings old_parts_lifetime=1, max_cleanup_delay_period=1, cleanup_delay_period=1;
create table shard_1.from_1 (x UInt32) engine = ReplicatedMergeTree('/clickhouse/tables/from_1_' || currentDatabase(), '1') order by x settings old_parts_lifetime=1, max_cleanup_delay_period=1, cleanup_delay_period=1;

system stop merges shard_0.from_1;
system stop merges shard_1.from_1;
insert into shard_0.from_1 select number + 20 from numbers(10);
insert into shard_0.from_1 select number + 30 from numbers(10);

insert into shard_0.from_1 select number + 40 from numbers(10);
insert into shard_0.from_1 select number + 50 from numbers(10);

system sync replica shard_1.from_1;

create table shard_0.to (x UInt32) engine = ReplicatedMergeTree('/clickhouse/tables/to_' || currentDatabase(), '0') order by x settings old_parts_lifetime=1, max_cleanup_delay_period=1, cleanup_delay_period=1, max_parts_to_merge_at_once=2, shared_merge_tree_disable_merges_and_mutations_assignment=1;

create table shard_1.to (x UInt32) engine = ReplicatedMergeTree('/clickhouse/tables/to_' || currentDatabase(), '1') order by x settings old_parts_lifetime=1, max_cleanup_delay_period=1, cleanup_delay_period=1, max_parts_to_merge_at_once=2;

detach table shard_1.to;

alter table shard_0.from_1 on cluster test_cluster_two_shards_different_databases move partition tuple() to table shard_0.to format Null settings distributed_ddl_output_mode='never_throw', distributed_ddl_task_timeout = 1;

drop table if exists shard_0.from_1;
drop table if exists shard_1.from_1;
OPTIMIZE TABLE shard_0.to;
OPTIMIZE TABLE shard_0.to;
select name, active from system.parts where database='shard_0' and table='to' and active order by name;

system restart replica shard_0.to;

select sleep(3);

attach table shard_1.to;
system sync replica shard_1.to;
select count(), sum(x) from shard_1.to;

drop table if exists shard_0.to;
drop table if exists shard_1.to;