Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel merges before removing moved parts #61610

Merged
merged 4 commits into from Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 5 additions & 3 deletions base/base/scope_guard.h
Expand Up @@ -29,11 +29,13 @@ class [[nodiscard]] BasicScopeGuard
requires std::is_convertible_v<G, F>
constexpr BasicScopeGuard & operator=(BasicScopeGuard<G> && src) // NOLINT(cppcoreguidelines-rvalue-reference-param-not-moved, cppcoreguidelines-noexcept-move-operations)
{
if (this != &src)
if constexpr (std::is_same_v<G, F>)
antaljanosbenjamin marked this conversation as resolved.
Show resolved Hide resolved
{
invoke();
function = src.release();
if (this == &src)
return *this;
}
invoke();
function = src.release();
return *this;
}

Expand Down
69 changes: 67 additions & 2 deletions src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Expand Up @@ -8,10 +8,8 @@
#include <IO/WriteHelpers.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/CurrentMetrics.h>
#include "Storages/MutationCommands.h"
#include <Parsers/formatAST.h>
#include <base/sort.h>

#include <ranges>
#include <Poco/Timestamp.h>

Expand Down Expand Up @@ -221,6 +219,43 @@ void ReplicatedMergeTreeQueue::createLogEntriesToFetchBrokenParts()
broken_parts_to_enqueue_fetches_on_loading.clear();
}

void ReplicatedMergeTreeQueue::addDropReplaceIntent(const MergeTreePartInfo & intent)
{
std::lock_guard lock{state_mutex};
drop_replace_range_intents.push_back(intent);
}

void ReplicatedMergeTreeQueue::removeDropReplaceIntent(const MergeTreePartInfo & intent)
{
std::lock_guard lock{state_mutex};
auto it = std::find(drop_replace_range_intents.begin(), drop_replace_range_intents.end(), intent);
antonio2368 marked this conversation as resolved.
Show resolved Hide resolved
chassert(it != drop_replace_range_intents.end());
drop_replace_range_intents.erase(it);
}

bool ReplicatedMergeTreeQueue::isIntersectingWithDropReplaceIntent(
const LogEntry & entry, const String & part_name, String & out_reason, std::unique_lock<std::mutex> & /*state_mutex lock*/) const
{
const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
for (const auto & intent : drop_replace_range_intents)
{
if (!intent.isDisjoint(part_info))
{
constexpr auto fmt_string = "Not executing {} of type {} for part {} (actual part {})"
"because there is a drop or replace intent with part name {}.";
LOG_INFO(
LogToStr(out_reason, log),
fmt_string,
entry.znode_name,
entry.type,
entry.new_part_name,
part_name,
intent.getPartNameForLogs());
return true;
}
}
return false;
}

void ReplicatedMergeTreeQueue::insertUnlocked(
const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
Expand Down Expand Up @@ -1175,6 +1210,33 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
entry->execution_complete.wait(lock, [&entry] { return !entry->currently_executing; });
}

void ReplicatedMergeTreeQueue::waitForCurrentlyExecutingOpsInRange(const MergeTreePartInfo & part_info) const
{
Queue to_wait;

std::unique_lock lock(state_mutex);

for (const auto& entry : queue)
{
if (!entry->currently_executing)
continue;

const auto virtual_part_names = entry->getVirtualPartNames(format_version);
for (const auto & virtual_part_name : virtual_part_names)
{
if (!part_info.isDisjoint(MergeTreePartInfo::fromPartName(virtual_part_name, format_version)))
{
to_wait.push_back(entry);
break;
}
}
}

LOG_DEBUG(log, "Waiting for {} entries that are currently executing.", to_wait.size());

for (LogEntryPtr & entry : to_wait)
entry->execution_complete.wait(lock, [&entry] { return !entry->currently_executing; });
}

bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry, const String & new_part_name,
String & out_reason, std::unique_lock<std::mutex> & /* queue_lock */,
Expand Down Expand Up @@ -1303,6 +1365,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
/// We can wait in worker threads, but not in scheduler.
if (isCoveredByFuturePartsImpl(entry, new_part_name, out_postpone_reason, state_lock, /* covered_entries_to_wait */ nullptr))
return false;

if (isIntersectingWithDropReplaceIntent(entry, new_part_name, out_postpone_reason, state_lock))
return false;
}

if (entry.type != LogEntry::DROP_RANGE && entry.type != LogEntry::DROP_PART)
Expand Down
15 changes: 15 additions & 0 deletions src/Storages/MergeTree/ReplicatedMergeTreeQueue.h
Expand Up @@ -107,6 +107,8 @@ class ReplicatedMergeTreeQueue
*/
ActiveDataPartSet virtual_parts;

/// Used to prevent operations to start in ranges which will be affected by DROP_RANGE/REPLACE_RANGE
std::vector<MergeTreePartInfo> drop_replace_range_intents;

/// We do not add DROP_PARTs to virtual_parts because they can intersect,
/// so we store them separately in this structure.
Expand Down Expand Up @@ -251,6 +253,10 @@ class ReplicatedMergeTreeQueue
std::optional<time_t> min_unprocessed_insert_time_changed,
std::optional<time_t> max_processed_insert_time_changed) const;

bool isIntersectingWithDropReplaceIntent(
const LogEntry & entry,
const String & part_name, String & out_reason, std::unique_lock<std::mutex> & /*state_mutex lock*/) const;

/// Marks the element of the queue as running.
class CurrentlyExecuting
{
Expand Down Expand Up @@ -349,6 +355,9 @@ class ReplicatedMergeTreeQueue
const MergeTreePartInfo & part_info,
const std::optional<ReplicatedMergeTreeLogEntryData> & covering_entry);

/// Wait for the execution of currently executing actions with virtual parts intersecting with part_info
void waitForCurrentlyExecutingOpsInRange(const MergeTreePartInfo & part_info) const;

/** In the case where there are not enough parts to perform the merge in part_name
* - move actions with merged parts to the end of the queue
* (in order to download a already merged part from another replica).
Expand Down Expand Up @@ -490,6 +499,12 @@ class ReplicatedMergeTreeQueue
void setBrokenPartsToEnqueueFetchesOnLoading(Strings && parts_to_fetch);
/// Must be called right after queue loading.
void createLogEntriesToFetchBrokenParts();

/// Add an intent to block operations to start in the range. All intents must be removed by calling
/// removeDropReplaceIntent(). The same intent can be added multiple times, but it has to be removed exactly
/// the same amount of times.
void addDropReplaceIntent(const MergeTreePartInfo& intent);
void removeDropReplaceIntent(const MergeTreePartInfo& intent);
};

using CommittingBlocks = std::unordered_map<String, std::set<Int64>>;
Expand Down
36 changes: 34 additions & 2 deletions src/Storages/StorageReplicatedMergeTree.cpp
Expand Up @@ -8006,6 +8006,20 @@ void StorageReplicatedMergeTree::replacePartitionFrom(

assert(replace == !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range));

scope_guard intent_guard;
if (replace)
{
queue.addDropReplaceIntent(drop_range);
intent_guard = scope_guard{[this, my_drop_range = drop_range]() { queue.removeDropReplaceIntent(my_drop_range); }};

getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range.partition_id, drop_range.max_block);
queue.waitForCurrentlyExecutingOpsInRange(drop_range);
{
auto pause_checking_parts = part_check_thread.pausePartsCheck();
part_check_thread.cancelRemovedPartsCheck(drop_range);
}
}

String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);

std::set<String> replaced_parts;
Expand Down Expand Up @@ -8174,8 +8188,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
lock2.reset();
lock1.reset();

/// We need to pull the DROP_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost)
/// We need to pull the REPLACE_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost)
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
// No need to block operations further, especially that in case we have to wait for mutation to finish, the intent would block
// the execution of REPLACE_RANGE
intent_guard.reset();
parts_holder.clear();
cleanup_thread.wakeup();

Expand Down Expand Up @@ -8227,11 +8244,23 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
Coordination::Stat alter_partition_version_stat;
zookeeper->get(alter_partition_version_path, &alter_partition_version_stat);

MergeTreePartInfo drop_range;
std::optional<EphemeralLockInZooKeeper> delimiting_block_lock;
MergeTreePartInfo drop_range;
getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true);
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);

queue.addDropReplaceIntent(drop_range);
// Let's copy drop_range to make sure it doesn't get modified, otherwise we might run into issue on removal
scope_guard intent_guard{[this, my_drop_range = drop_range]() { queue.removeDropReplaceIntent(my_drop_range); }};

getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range.partition_id, drop_range.max_block);
antaljanosbenjamin marked this conversation as resolved.
Show resolved Hide resolved

queue.waitForCurrentlyExecutingOpsInRange(drop_range);
{
auto pause_checking_parts = part_check_thread.pausePartsCheck();
part_check_thread.cancelRemovedPartsCheck(drop_range);
}

DataPartPtr covering_part;
DataPartsVector src_all_parts;
{
Expand Down Expand Up @@ -8436,6 +8465,9 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta

/// We need to pull the DROP_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost)
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
// No need to block operations further, especially that in case we have to wait for mutation to finish, the intent would block
// the execution of DROP_RANGE
intent_guard.reset();
parts_holder.clear();
cleanup_thread.wakeup();

Expand Down