Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backoff policy for failed mutation. #58036

Merged
6 changes: 6 additions & 0 deletions src/Interpreters/Context.cpp
Expand Up @@ -2902,6 +2902,12 @@ BackgroundTaskSchedulingSettings Context::getBackgroundMoveTaskSchedulingSetting
return task_settings;
}

size_t Context::getMaxPostponeTimeForFailedMutations() const
{
const auto & config = getConfigRef();
return config.getUInt("max_postpone_time_for_failed_mutations", 0ull);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add this setting here
and query it like getSettingsRef().max_postpone_time...


BackgroundSchedulePool & Context::getSchedulePool() const
{
callOnce(shared->schedule_pool_initialized, [&] {
Expand Down
3 changes: 3 additions & 0 deletions src/Interpreters/Context.h
Expand Up @@ -1022,6 +1022,9 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const;
BackgroundTaskSchedulingSettings getBackgroundMoveTaskSchedulingSettings() const;

// Setting for backoff policy for failed mutation tasks.
size_t getMaxPostponeTimeForFailedMutations() const;

BackgroundSchedulePool & getBufferFlushSchedulePool() const;
BackgroundSchedulePool & getSchedulePool() const;
BackgroundSchedulePool & getMessageBrokerSchedulePool() const;
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeData.cpp
Expand Up @@ -363,6 +363,7 @@ MergeTreeData::MergeTreeData(
, parts_mover(this)
, background_operations_assignee(*this, BackgroundJobsAssignee::Type::DataProcessing, getContext())
, background_moves_assignee(*this, BackgroundJobsAssignee::Type::Moving, getContext())
, mutation_backoff_policy(getContext())
{
context_->getGlobalContext()->initializeBackgroundExecutorsIfNeeded();

Expand Down
88 changes: 87 additions & 1 deletion src/Storages/MergeTree/MergeTreeData.h
Expand Up @@ -35,7 +35,7 @@
#include <Storages/PartitionCommands.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/threadPoolCallbackRunner.h>

#include <Poco/Timestamp.h>

#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
Expand Down Expand Up @@ -1348,6 +1348,92 @@ class MergeTreeData : public IStorage, public WithMutableContext
const MergeListEntry * merge_entry,
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters);

class PartMutationBackoffPolicy : public WithContext
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the context is never used

{
struct PartMutationInfo
{
size_t retry_count = 0ul;
Poco::Timestamp latest_fail_time{};
UInt64 mutation_failure_version = 0ul;

Poco::Timestamp getNextMinExecutionTime() const
{
return latest_fail_time + (1 << retry_count) * 1000ul;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Double whitespace after "return" and after "+"
  2. It's unclear what the precision of latest_fail_time is when it's implicitly converted to an integer (milliseconds? microseconds? nanoseconds?). Having * 1000ul doesn't make it more clear (although it's clear that it's not seconds). I would prefer having everything in integers with the same precision and a clear suffix like _ms. Poco::Timestamp (as well as Poco::Timespan) with implicit conversions are error-prone

}
};

using DataPartsWithRetryInfo = std::unordered_map<String, PartMutationInfo>;
DataPartsWithRetryInfo failed_mutation_parts;
size_t max_pospone_power;
mutable std::mutex parts_info_lock;

public:
explicit PartMutationBackoffPolicy(ContextPtr global_context_)
: WithContext(global_context_)
{
size_t max_pospone_time_ms = global_context_->getMaxPostponeTimeForFailedMutations();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could store context.getSettings() instead of storing the context ptr itself? Except for the setting, we don't use the context

if (max_pospone_time_ms == 0)
max_pospone_power = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: posTpone

else
max_pospone_power = static_cast<size_t>(std::log2(max_pospone_time_ms));
}

void removeFromFailedByVersion(UInt64 mutation_version)
{
if (max_pospone_power == 0)
return;
std::unique_lock _lock(parts_info_lock);

for (auto failed_part_it = failed_mutation_parts.begin(); failed_part_it != failed_mutation_parts.end();)
{
if (failed_part_it->second.mutation_failure_version == mutation_version)
failed_part_it = failed_mutation_parts.erase(failed_part_it);
else
++failed_part_it;
}
}

void removePartFromFailed(const String& part_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void removePartFromFailed(const String& part_name)
void removePartFromFailed(const String & part_name)

And in other places as well
(it's interesting why the Style Check did not fail)

{
if (max_pospone_power == 0)
return;
std::unique_lock _lock(parts_info_lock);
failed_mutation_parts.erase(part_name);
}

void addPartMutationFailure (const String& part_name, UInt64 _mutation_failure_version)
{
if (max_pospone_power == 0)
return;
std::unique_lock _lock(parts_info_lock);
auto part_info_it = failed_mutation_parts.find(part_name);
if (part_info_it == failed_mutation_parts.end())
{
auto [it, success] = failed_mutation_parts.emplace(part_name, PartMutationInfo());
std::swap(it, part_info_it);
}
auto& part_info = part_info_it->second;
part_info.retry_count = std::min(max_pospone_power, part_info.retry_count + 1);
part_info.latest_fail_time = Poco::Timestamp();
part_info.mutation_failure_version = _mutation_failure_version;
}

bool partCanBeMutated(const String& part_name)
{
if (max_pospone_power == 0)
return true;
std::unique_lock _lock(parts_info_lock);
auto iter = failed_mutation_parts.find(part_name);
if (iter == failed_mutation_parts.end())
return true;

auto current_time = Poco::Timestamp();
return current_time >= iter->second.getNextMinExecutionTime();
}
};
/// Controls postponing logic for failed mutations.
PartMutationBackoffPolicy mutation_backoff_policy;

/// If part is assigned to merge or mutation (possibly replicated)
/// Should be overridden by children, because they can have different
/// mechanisms for parts locking
Expand Down
11 changes: 10 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp
@@ -1,6 +1,7 @@
#include <Storages/MergeTree/ReplicatedMergeMutateTaskBase.h>

#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Common/ProfileEventsScope.h>

Expand Down Expand Up @@ -110,11 +111,13 @@ bool ReplicatedMergeMutateTaskBase::executeStep()
auto mutations_end_it = in_partition->second.upper_bound(result_data_version);
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
auto & src_part = log_entry->source_parts.at(0);
ReplicatedMergeTreeQueue::MutationStatus & status = *it->second;
status.latest_failed_part = log_entry->source_parts.at(0);
status.latest_failed_part = src_part;
status.latest_failed_part_info = source_part_info;
status.latest_fail_time = time(nullptr);
status.latest_fail_reason = getExceptionMessage(saved_exception, false);
storage.mutation_backoff_policy.addPartMutationFailure(src_part, source_part_info.mutation + 1);
}
}
}
Expand Down Expand Up @@ -142,6 +145,12 @@ bool ReplicatedMergeMutateTaskBase::executeImpl()
{
storage.queue.removeProcessedEntry(storage.getZooKeeper(), selected_entry->log_entry);
state = State::SUCCESS;

auto & log_entry = selected_entry->log_entry;
if (log_entry->type == ReplicatedMergeTreeLogEntryData::MUTATE_PART)
{
storage.mutation_backoff_policy.removePartFromFailed(log_entry->source_parts.at(0));
}
}
catch (...)
{
Expand Down
11 changes: 10 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Expand Up @@ -13,6 +13,7 @@
#include <base/sort.h>

#include <ranges>
#include <Poco/Timestamp.h>

namespace DB
{
Expand Down Expand Up @@ -1350,9 +1351,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
sum_parts_size_in_bytes += part_in_memory->block.bytes();
else
sum_parts_size_in_bytes += part->getBytesOnDisk();

if (entry.type == LogEntry::MUTATE_PART && !storage.mutation_backoff_policy.partCanBeMutated(part->name))
{
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because recently it has failed. According to exponential backoff policy, put aside this log entry.";

LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name);
Comment on lines +1360 to +1363
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to also print the number of (milli)seconds til the next attempt, but it's not necessary

return false;
}
}
}

if (merger_mutator.merges_blocker.isCancelled())
{
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now.";
Expand Down
25 changes: 21 additions & 4 deletions src/Storages/StorageMergeTree.cpp
Expand Up @@ -4,7 +4,7 @@

#include <optional>
#include <ranges>

#include <Poco/Timestamp.h>
#include <base/sort.h>
#include <Backups/BackupEntriesCollector.h>
#include <Databases/IDatabase.h>
Expand Down Expand Up @@ -534,6 +534,7 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
MergeTreeMutationEntry & entry = it->second;
auto & failed_part = result_part->parts.at(0);
if (is_successful)
{
if (!entry.latest_failed_part.empty() && result_part->part_info.contains(entry.latest_failed_part_info))
Expand All @@ -542,14 +543,16 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
entry.latest_failed_part_info = MergeTreePartInfo();
entry.latest_fail_time = 0;
entry.latest_fail_reason.clear();
mutation_backoff_policy.removePartFromFailed(failed_part->name);
}
}
else
{
entry.latest_failed_part = result_part->parts.at(0)->name;
entry.latest_failed_part_info = result_part->parts.at(0)->info;
entry.latest_failed_part = failed_part->name;
entry.latest_failed_part_info = failed_part->info;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = exception_message;
mutation_backoff_policy.addPartMutationFailure(failed_part->name, sources_data_version + 1);
}
}
}
Expand Down Expand Up @@ -816,6 +819,8 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
}
}

mutation_backoff_policy.removeFromFailedByVersion(mutation_version);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Do we need the same for ReplicatedMergeTree?
  2. Feels like it will not work because multiple mutations may be merged together and executed at once. For example, if there's a faulty mutation with version 42 and another mutation with version 137, then a part all_1_2_3 can be mutated directly to all_1_2_3_137 (and this will fail due to the mutation 42). So according to the if (static_cast<UInt64>(result_part->part_info.mutation) == it->first) condition, addPartMutationFailure will be called for the version 137, and killing the faulty mutation 42 will not reset the backoff. Please add the corresponding test. Consider removing all the logic around the mutation version for simplicity and resetting all backoffs when killing any mutation


if (!to_kill)
return CancellationCode::NotFound;

Expand Down Expand Up @@ -1176,6 +1181,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(

CurrentlyMergingPartsTaggerPtr tagger;

bool exist_posponed_failed_part = false;
auto mutations_end_it = current_mutations_by_version.end();
for (const auto & part : getDataPartsVectorForInternalUsage())
{
Expand All @@ -1200,6 +1206,13 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
TransactionID first_mutation_tid = mutations_begin_it->second.tid;
MergeTreeTransactionPtr txn;

if (!mutation_backoff_policy.partCanBeMutated(part->name))
{
exist_posponed_failed_part = true;
LOG_DEBUG(log, "According to exponential backoff policy, do not perform mutations for the part {} yet. Put it aside.", part->name);
continue;
}

if (!first_mutation_tid.isPrehistoric())
{

Expand Down Expand Up @@ -1306,7 +1319,11 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands, txn);
}
}

if (exist_posponed_failed_part)
{
std::lock_guard lock(mutation_wait_mutex);
mutation_wait_event.notify_all();
}
return {};
}

Expand Down
2 changes: 2 additions & 0 deletions src/Storages/StorageReplicatedMergeTree.cpp
Expand Up @@ -7438,6 +7438,8 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio
if (!mutation_entry)
return CancellationCode::NotFound;

mutation_backoff_policy.removeFromFailedByVersion(static_cast<UInt64>(mutation_entry->alter_version));

/// After this point no new part mutations will start and part mutations that still exist
/// in the queue will be skipped.

Expand Down
Empty file.
@@ -0,0 +1,3 @@
<clickhouse>
<max_postpone_time_for_failed_mutations>60000</max_postpone_time_for_failed_mutations>
</clickhouse>