Skip to content

Commit

Permalink
Cancel merges during move/replace partition
Browse files Browse the repository at this point in the history
  • Loading branch information
antaljanosbenjamin committed Mar 27, 2024
1 parent 2f382ac commit 1c3a847
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 3 deletions.
42 changes: 40 additions & 2 deletions src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Expand Up @@ -8,10 +8,8 @@
#include <IO/WriteHelpers.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/CurrentMetrics.h>
#include "Storages/MutationCommands.h"
#include <Parsers/formatAST.h>
#include <base/sort.h>

#include <ranges>
#include <Poco/Timestamp.h>

Expand Down Expand Up @@ -221,6 +219,43 @@ void ReplicatedMergeTreeQueue::createLogEntriesToFetchBrokenParts()
broken_parts_to_enqueue_fetches_on_loading.clear();
}

void ReplicatedMergeTreeQueue::addDropReplaceIntent(const MergeTreePartInfo & intent)
{
std::lock_guard lock{state_mutex};
drop_replace_range_intents.push_back(intent);
}

void ReplicatedMergeTreeQueue::removeDropReplaceIntent(const MergeTreePartInfo & intent)
{
std::lock_guard lock{state_mutex};
auto it = std::find(drop_replace_range_intents.begin(), drop_replace_range_intents.end(), intent);
chassert(it != drop_replace_range_intents.end());
drop_replace_range_intents.erase(it);
}

bool ReplicatedMergeTreeQueue::isIntersectingWithDropReplaceIntent(
const LogEntry & entry, const String & part_name, String & out_reason, std::unique_lock<std::mutex> & /*state_mutex lock*/) const
{
// TODO(antaljanosbenjamin): fill out out_reason
const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
for (const auto & intent : drop_replace_range_intents)
{
if (!intent.isDisjoint(part_info))
{
constexpr auto fmt_string = "Not executing {} of type {} for part {} (actual part {})"
"because there is a drop or replace intent with part name {}.";
LOG_INFO(
LogToStr(out_reason, log),
fmt_string,
entry.znode_name,
entry.type,
entry.new_part_name,
part_name,
intent.getPartNameForLogs());
}
}
return false;
}

void ReplicatedMergeTreeQueue::insertUnlocked(
const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
Expand Down Expand Up @@ -1303,6 +1338,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
/// We can wait in worker threads, but not in scheduler.
if (isCoveredByFuturePartsImpl(entry, new_part_name, out_postpone_reason, state_lock, /* covered_entries_to_wait */ nullptr))
return false;

if (isIntersectingWithDropReplaceIntent(entry, new_part_name, out_postpone_reason, state_lock))
return false;
}

if (entry.type != LogEntry::DROP_RANGE && entry.type != LogEntry::DROP_PART)
Expand Down
12 changes: 12 additions & 0 deletions src/Storages/MergeTree/ReplicatedMergeTreeQueue.h
Expand Up @@ -107,6 +107,8 @@ class ReplicatedMergeTreeQueue
*/
ActiveDataPartSet virtual_parts;

/// Used to prevent merges to start in ranges which will be affected by DROP_RANGE/REPLACE_RANGE
std::vector<MergeTreePartInfo> drop_replace_range_intents;

/// We do not add DROP_PARTs to virtual_parts because they can intersect,
/// so we store them separately in this structure.
Expand Down Expand Up @@ -251,6 +253,10 @@ class ReplicatedMergeTreeQueue
std::optional<time_t> min_unprocessed_insert_time_changed,
std::optional<time_t> max_processed_insert_time_changed) const;

bool isIntersectingWithDropReplaceIntent(
const LogEntry & entry,
const String & part_name, String & out_reason, std::unique_lock<std::mutex> & /*state_mutex lock*/) const;

/// Marks the element of the queue as running.
class CurrentlyExecuting
{
Expand Down Expand Up @@ -490,6 +496,12 @@ class ReplicatedMergeTreeQueue
void setBrokenPartsToEnqueueFetchesOnLoading(Strings && parts_to_fetch);
/// Must be called right after queue loading.
void createLogEntriesToFetchBrokenParts();

/// Add an intent to block merges to start in the range. All intents must be removed by calling
/// removeDropReplaceIntent(). The same intent can be added multiple times, but it has to be removed exactly
/// the same amount of times.
void addDropReplaceIntent(const MergeTreePartInfo& intent);
void removeDropReplaceIntent(const MergeTreePartInfo& intent);
};

using CommittingBlocks = std::unordered_map<String, std::set<Int64>>;
Expand Down
25 changes: 24 additions & 1 deletion src/Storages/StorageReplicatedMergeTree.cpp
Expand Up @@ -7998,10 +7998,19 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
replace = false;
}

std::optional<scope_guard> intent_guard;
if (!replace)
{
/// It's ATTACH PARTITION FROM, not REPLACE PARTITION. We have to reset drop range
drop_range = makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(partition_id);
queue.addDropReplaceIntent(drop_range);
intent_guard = make_scope_guard([this, drop_range = drop_range]() { queue.removeDropReplaceIntent(drop_range); });

getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range.partition_id, drop_range.max_block);
{
auto pause_checking_parts = part_check_thread.pausePartsCheck();
part_check_thread.cancelRemovedPartsCheck(drop_range);
}
}

assert(replace == !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range));
Expand Down Expand Up @@ -8176,6 +8185,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom(

/// We need to pull the DROP_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost)
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
// No need to block the merges anymore
intent_guard.reset();
parts_holder.clear();
cleanup_thread.wakeup();

Expand Down Expand Up @@ -8218,6 +8229,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
auto src_data_id = src_data.getStorageID();
String partition_id = getPartitionIDFromQuery(partition, query_context);

std::optional<scope_guard> intent_guard;
/// A range for log entry to remove parts from the source table (myself).
auto zookeeper = getZooKeeper();
/// Retry if alter_partition_version changes
Expand All @@ -8227,11 +8239,20 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
Coordination::Stat alter_partition_version_stat;
zookeeper->get(alter_partition_version_path, &alter_partition_version_stat);

MergeTreePartInfo drop_range;
std::optional<EphemeralLockInZooKeeper> delimiting_block_lock;
MergeTreePartInfo drop_range;
getFakePartCoveringAllPartsInPartition(partition_id, drop_range, delimiting_block_lock, true);
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);

queue.addDropReplaceIntent(drop_range);
intent_guard = make_scope_guard([this, drop_range = drop_range]() { queue.removeDropReplaceIntent(drop_range); });

getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range.partition_id, drop_range.max_block);
{
auto pause_checking_parts = part_check_thread.pausePartsCheck();
part_check_thread.cancelRemovedPartsCheck(drop_range);
}

DataPartPtr covering_part;
DataPartsVector src_all_parts;
{
Expand Down Expand Up @@ -8436,6 +8457,8 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta

/// We need to pull the DROP_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost)
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
// No need to block merges anymore
intent_guard.reset();
parts_holder.clear();
cleanup_thread.wakeup();

Expand Down

0 comments on commit 1c3a847

Please sign in to comment.