From cf5517a0994182c7893513665e30322e2f7cc68d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Wed, 27 Mar 2024 11:31:41 +0000 Subject: [PATCH 1/4] Cancel merges during move/replace partition --- base/base/scope_guard.h | 8 ++-- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 42 ++++++++++++++++++- .../MergeTree/ReplicatedMergeTreeQueue.h | 12 ++++++ src/Storages/StorageReplicatedMergeTree.cpp | 29 ++++++++++++- 4 files changed, 84 insertions(+), 7 deletions(-) diff --git a/base/base/scope_guard.h b/base/base/scope_guard.h index 03670792d596..e6789c5cb1bb 100644 --- a/base/base/scope_guard.h +++ b/base/base/scope_guard.h @@ -29,11 +29,13 @@ class [[nodiscard]] BasicScopeGuard requires std::is_convertible_v constexpr BasicScopeGuard & operator=(BasicScopeGuard && src) // NOLINT(cppcoreguidelines-rvalue-reference-param-not-moved, cppcoreguidelines-noexcept-move-operations) { - if (this != &src) + if constexpr (std::is_same_v) { - invoke(); - function = src.release(); + if (this == &src) + return *this; } + invoke(); + function = src.release(); return *this; } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index ee4ed87d456a..6dadada2e7fd 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -8,10 +8,8 @@ #include #include #include -#include "Storages/MutationCommands.h" #include #include - #include #include @@ -221,6 +219,43 @@ void ReplicatedMergeTreeQueue::createLogEntriesToFetchBrokenParts() broken_parts_to_enqueue_fetches_on_loading.clear(); } +void ReplicatedMergeTreeQueue::addDropReplaceIntent(const MergeTreePartInfo & intent) +{ + std::lock_guard lock{state_mutex}; + drop_replace_range_intents.push_back(intent); +} + +void ReplicatedMergeTreeQueue::removeDropReplaceIntent(const MergeTreePartInfo & intent) +{ + std::lock_guard lock{state_mutex}; + auto it = std::find(drop_replace_range_intents.begin(), drop_replace_range_intents.end(), intent); + chassert(it != drop_replace_range_intents.end()); + drop_replace_range_intents.erase(it); +} + +bool ReplicatedMergeTreeQueue::isIntersectingWithDropReplaceIntent( + const LogEntry & entry, const String & part_name, String & out_reason, std::unique_lock & /*state_mutex lock*/) const +{ + // TODO(antaljanosbenjamin): fill out out_reason + const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version); + for (const auto & intent : drop_replace_range_intents) + { + if (!intent.isDisjoint(part_info)) + { + constexpr auto fmt_string = "Not executing {} of type {} for part {} (actual part {})" + "because there is a drop or replace intent with part name {}."; + LOG_INFO( + LogToStr(out_reason, log), + fmt_string, + entry.znode_name, + entry.type, + entry.new_part_name, + part_name, + intent.getPartNameForLogs()); + } + } + return false; +} void ReplicatedMergeTreeQueue::insertUnlocked( const LogEntryPtr & entry, std::optional & min_unprocessed_insert_time_changed, @@ -1303,6 +1338,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( /// We can wait in worker threads, but not in scheduler. if (isCoveredByFuturePartsImpl(entry, new_part_name, out_postpone_reason, state_lock, /* covered_entries_to_wait */ nullptr)) return false; + + if (isIntersectingWithDropReplaceIntent(entry, new_part_name, out_postpone_reason, state_lock)) + return false; } if (entry.type != LogEntry::DROP_RANGE && entry.type != LogEntry::DROP_PART) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index b17e78199463..95016d60ef10 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -107,6 +107,8 @@ class ReplicatedMergeTreeQueue */ ActiveDataPartSet virtual_parts; + /// Used to prevent operations to start in ranges which will be affected by DROP_RANGE/REPLACE_RANGE + std::vector drop_replace_range_intents; /// We do not add DROP_PARTs to virtual_parts because they can intersect, /// so we store them separately in this structure. @@ -251,6 +253,10 @@ class ReplicatedMergeTreeQueue std::optional min_unprocessed_insert_time_changed, std::optional max_processed_insert_time_changed) const; + bool isIntersectingWithDropReplaceIntent( + const LogEntry & entry, + const String & part_name, String & out_reason, std::unique_lock & /*state_mutex lock*/) const; + /// Marks the element of the queue as running. class CurrentlyExecuting { @@ -490,6 +496,12 @@ class ReplicatedMergeTreeQueue void setBrokenPartsToEnqueueFetchesOnLoading(Strings && parts_to_fetch); /// Must be called right after queue loading. void createLogEntriesToFetchBrokenParts(); + + /// Add an intent to block operations to start in the range. All intents must be removed by calling + /// removeDropReplaceIntent(). The same intent can be added multiple times, but it has to be removed exactly + /// the same amount of times. + void addDropReplaceIntent(const MergeTreePartInfo& intent); + void removeDropReplaceIntent(const MergeTreePartInfo& intent); }; using CommittingBlocks = std::unordered_map>; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 905473302ba5..52847935a72d 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -7998,10 +7998,19 @@ void StorageReplicatedMergeTree::replacePartitionFrom( replace = false; } + scope_guard intent_guard; if (!replace) { /// It's ATTACH PARTITION FROM, not REPLACE PARTITION. We have to reset drop range drop_range = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(partition_id); + queue.addDropReplaceIntent(drop_range); + intent_guard = scope_guard{[this, my_drop_range = drop_range]() { queue.removeDropReplaceIntent(my_drop_range); }}; + + getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range.partition_id, drop_range.max_block); + { + auto pause_checking_parts = part_check_thread.pausePartsCheck(); + part_check_thread.cancelRemovedPartsCheck(drop_range); + } } assert(replace == !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range)); @@ -8174,8 +8183,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom( lock2.reset(); lock1.reset(); - /// We need to pull the DROP_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost) + /// We need to pull the REPLACE_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost) queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC); + // No need to block operations further, especially that in case we have to wait for mutation to finish, the intent would block + // the execution of REPLACE_RANGE + intent_guard.reset(); parts_holder.clear(); cleanup_thread.wakeup(); @@ -8227,11 +8239,21 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta Coordination::Stat alter_partition_version_stat; zookeeper->get(alter_partition_version_path, &alter_partition_version_stat); - MergeTreePartInfo drop_range; std::optional delimiting_block_lock; + MergeTreePartInfo drop_range; getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true); String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range); + queue.addDropReplaceIntent(drop_range); + // Let's copy drop_range to make sure it doesn't get modified, otherwise we might run into issue on removal + scope_guard intent_guard{[this, my_drop_range = drop_range]() { queue.removeDropReplaceIntent(my_drop_range); }}; + + getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range.partition_id, drop_range.max_block); + { + auto pause_checking_parts = part_check_thread.pausePartsCheck(); + part_check_thread.cancelRemovedPartsCheck(drop_range); + } + DataPartPtr covering_part; DataPartsVector src_all_parts; { @@ -8436,6 +8458,9 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta /// We need to pull the DROP_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost) queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC); + // No need to block operations further, especially that in case we have to wait for mutation to finish, the intent would block + // the execution of DROP_RANGE + intent_guard.reset(); parts_holder.clear(); cleanup_thread.wakeup(); From 5f676999ede965c82e3cb14c5e62fe30e370dec6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Tue, 2 Apr 2024 10:48:08 +0000 Subject: [PATCH 2/4] Address review comments --- src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 6dadada2e7fd..7d7fd380887c 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -236,7 +236,6 @@ void ReplicatedMergeTreeQueue::removeDropReplaceIntent(const MergeTreePartInfo & bool ReplicatedMergeTreeQueue::isIntersectingWithDropReplaceIntent( const LogEntry & entry, const String & part_name, String & out_reason, std::unique_lock & /*state_mutex lock*/) const { - // TODO(antaljanosbenjamin): fill out out_reason const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version); for (const auto & intent : drop_replace_range_intents) { @@ -252,6 +251,7 @@ bool ReplicatedMergeTreeQueue::isIntersectingWithDropReplaceIntent( entry.new_part_name, part_name, intent.getPartNameForLogs()); + return true; } } return false; From 68320590edfa01664787b8aebf06e256af0ff88d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Tue, 2 Apr 2024 10:48:57 +0000 Subject: [PATCH 3/4] Wait for currently executing operations --- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 25 +++++++++++++++++++ .../MergeTree/ReplicatedMergeTreeQueue.h | 3 +++ src/Storages/StorageReplicatedMergeTree.cpp | 13 +++++++--- 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 7d7fd380887c..d2ec68186664 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1210,6 +1210,31 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange( entry->execution_complete.wait(lock, [&entry] { return !entry->currently_executing; }); } +void ReplicatedMergeTreeQueue::waitForCurrentlyExecutingOpsInRange(const MergeTreePartInfo & part_info) const +{ + Queue to_wait; + + std::unique_lock lock(state_mutex); + + for (const auto& entry : queue) + { + if (!entry->currently_executing) + continue; + + const auto virtual_part_names = entry->getVirtualPartNames(format_version); + for(const auto& virtual_part_name: virtual_part_names) { + if (!part_info.isDisjoint(MergeTreePartInfo::fromPartName(virtual_part_name, format_version))){ + to_wait.push_back(entry); + break; + } + } + } + + LOG_DEBUG(log, "Waiting for {} entries that are currently executing.", to_wait.size()); + + for (LogEntryPtr & entry : to_wait) + entry->execution_complete.wait(lock, [&entry] { return !entry->currently_executing; }); +} bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry, const String & new_part_name, String & out_reason, std::unique_lock & /* queue_lock */, diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 95016d60ef10..60b1a08912bc 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -355,6 +355,9 @@ class ReplicatedMergeTreeQueue const MergeTreePartInfo & part_info, const std::optional & covering_entry); + /// Wait for the execution of currently executing actions with virtual parts intersecting with part_info + void waitForCurrentlyExecutingOpsInRange(const MergeTreePartInfo & part_info) const; + /** In the case where there are not enough parts to perform the merge in part_name * - move actions with merged parts to the end of the queue * (in order to download a already merged part from another replica). diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 52847935a72d..1bcfd13e4917 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -7998,23 +7998,28 @@ void StorageReplicatedMergeTree::replacePartitionFrom( replace = false; } - scope_guard intent_guard; if (!replace) { /// It's ATTACH PARTITION FROM, not REPLACE PARTITION. We have to reset drop range drop_range = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(partition_id); + } + + assert(replace == !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range)); + + scope_guard intent_guard; + if (replace) + { queue.addDropReplaceIntent(drop_range); intent_guard = scope_guard{[this, my_drop_range = drop_range]() { queue.removeDropReplaceIntent(my_drop_range); }}; getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range.partition_id, drop_range.max_block); + queue.waitForCurrentlyExecutingOpsInRange(drop_range); { auto pause_checking_parts = part_check_thread.pausePartsCheck(); part_check_thread.cancelRemovedPartsCheck(drop_range); } } - assert(replace == !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range)); - String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range); std::set replaced_parts; @@ -8249,6 +8254,8 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta scope_guard intent_guard{[this, my_drop_range = drop_range]() { queue.removeDropReplaceIntent(my_drop_range); }}; getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range.partition_id, drop_range.max_block); + + queue.waitForCurrentlyExecutingOpsInRange(drop_range); { auto pause_checking_parts = part_check_thread.pausePartsCheck(); part_check_thread.cancelRemovedPartsCheck(drop_range); From 34c5fbd4cfef424e6430376adc6168bdd28392bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Wed, 3 Apr 2024 07:53:28 +0000 Subject: [PATCH 4/4] Fix style --- src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index d2ec68186664..0c9d4cfe9efe 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1222,8 +1222,10 @@ void ReplicatedMergeTreeQueue::waitForCurrentlyExecutingOpsInRange(const MergeTr continue; const auto virtual_part_names = entry->getVirtualPartNames(format_version); - for(const auto& virtual_part_name: virtual_part_names) { - if (!part_info.isDisjoint(MergeTreePartInfo::fromPartName(virtual_part_name, format_version))){ + for (const auto & virtual_part_name : virtual_part_names) + { + if (!part_info.isDisjoint(MergeTreePartInfo::fromPartName(virtual_part_name, format_version))) + { to_wait.push_back(entry); break; }