Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backoff policy for failed mutation. #58036

Merged
96 changes: 95 additions & 1 deletion src/Storages/MergeTree/MergeTreeData.h
Expand Up @@ -36,7 +36,7 @@
#include <Storages/PartitionCommands.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/threadPoolCallbackRunner.h>

#include <Poco/Timestamp.h>

#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
Expand Down Expand Up @@ -1364,6 +1364,100 @@ class MergeTreeData : public IStorage, public WithMutableContext
const MergeListEntry * merge_entry,
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters);

class PartMutationBackoffPolicy : public WithContext
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the context is never used

{
struct PartMutationInfo
{
size_t retry_count;
Poco::Timestamp latest_fail_time;
UInt64 mutation_failure_version;
size_t max_postpone_time_ms;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have time both in Poco::Timestamp and size_t?

size_t max_postpone_power;

PartMutationInfo(UInt64 mutation_failure_version_, size_t max_postpone_time_ms_)
: retry_count(0ull)
, latest_fail_time(std::move(Poco::Timestamp()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't Poco::Timestamp a POD type? What's the point of moving it, then?

, mutation_failure_version(mutation_failure_version_)
, max_postpone_time_ms(max_postpone_time_ms_)
, max_postpone_power((max_postpone_time_ms_) ? (static_cast<size_t>(std::log2(max_postpone_time_ms_))) : (0ull))
{}


Poco::Timestamp getNextMinExecutionTime() const
{
if (max_postpone_time_ms == 0)
return Poco::Timestamp();
return latest_fail_time + (1 << retry_count) * 1000ul;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Double whitespace after "return" and after "+"
  2. It's unclear what the precision of latest_fail_time is when it's implicitly converted to an integer (milliseconds? microseconds? nanoseconds?). Having * 1000ul doesn't make it more clear (although it's clear that it's not seconds). I would prefer having everything in integers with the same precision and a clear suffix like _ms. Poco::Timestamp (as well as Poco::Timespan) with implicit conversions are error-prone

}

void addPartFailure()
{
if (max_postpone_time_ms == 0)
return;
retry_count = std::min(max_postpone_power, retry_count + 1);
latest_fail_time = Poco::Timestamp();
}

bool partCanBeMutated()
{
if (max_postpone_time_ms == 0)
return true;

auto current_time = Poco::Timestamp();
return current_time >= getNextMinExecutionTime();
}
};

using DataPartsWithRetryInfo = std::unordered_map<String, PartMutationInfo>;
DataPartsWithRetryInfo failed_mutation_parts;
mutable std::mutex parts_info_lock;

public:

void removeFromFailedByVersion(UInt64 mutation_version)
{
std::unique_lock _lock(parts_info_lock);
for (auto failed_part_it = failed_mutation_parts.begin(); failed_part_it != failed_mutation_parts.end();)
{
if (failed_part_it->second.mutation_failure_version == mutation_version)
failed_part_it = failed_mutation_parts.erase(failed_part_it);
else
++failed_part_it;
}
}

void removePartFromFailed(const String& part_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void removePartFromFailed(const String& part_name)
void removePartFromFailed(const String & part_name)

And in other places as well
(it's interesting why the Style Check did not fail)

{
std::unique_lock _lock(parts_info_lock);
failed_mutation_parts.erase(part_name);
}

void addPartMutationFailure (const String& part_name, UInt64 mutation_failure_version_, size_t max_postpone_time_ms_)
{
std::unique_lock _lock(parts_info_lock);
auto part_info_it = failed_mutation_parts.find(part_name);
if (part_info_it == failed_mutation_parts.end())
{
auto [it, success] = failed_mutation_parts.emplace(part_name, PartMutationInfo(mutation_failure_version_, max_postpone_time_ms_));
std::swap(it, part_info_it);
}
auto& part_info = part_info_it->second;
part_info.addPartFailure();
}

bool partCanBeMutated(const String& part_name)
{

std::unique_lock _lock(parts_info_lock);
auto iter = failed_mutation_parts.find(part_name);
if (iter == failed_mutation_parts.end())
return true;
return iter->second.partCanBeMutated();
}
};
/// Controls postponing logic for failed mutations.
PartMutationBackoffPolicy mutation_backoff_policy;

/// If part is assigned to merge or mutation (possibly replicated)
/// Should be overridden by children, because they can have different
/// mechanisms for parts locking
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeSettings.h
Expand Up @@ -146,6 +146,7 @@ struct Settings;
M(UInt64, vertical_merge_algorithm_min_rows_to_activate, 16 * 8192, "Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_bytes_to_activate, 0, "Minimal (approximate) uncompressed size in bytes in merging parts to activate Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_columns_to_activate, 11, "Minimal amount of non-PK columns to activate Vertical merge algorithm.", 0) \
M(UInt64, max_postpone_time_for_failed_mutations, 0ul, "The maximum postpone time for failed mutations in ms.", 0) \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, the change is good and it makes sense to enable it by default. What will be a good default value in your opinion?

We can start with enabling it in the CI. See install.sh. 50-100ms should be fine for the CI (so the new code will be executed in functional tests, but will not affect tests significantly)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
M(UInt64, max_postpone_time_for_failed_mutations, 0ul, "The maximum postpone time for failed mutations in ms.", 0) \
M(UInt64, max_postpone_time_for_failed_mutations_ms, 0ul, "The maximum postpone time for failed mutations.", 0) \

\
/** Compatibility settings */ \
M(Bool, allow_suspicious_indices, false, "Reject primary/secondary indexes and sorting keys with identical expressions", 0) \
Expand Down
12 changes: 11 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp
@@ -1,6 +1,7 @@
#include <Storages/MergeTree/ReplicatedMergeMutateTaskBase.h>

#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Common/ProfileEventsScope.h>

Expand Down Expand Up @@ -110,11 +111,14 @@ bool ReplicatedMergeMutateTaskBase::executeStep()
auto mutations_end_it = in_partition->second.upper_bound(result_data_version);
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
auto & src_part = log_entry->source_parts.at(0);
ReplicatedMergeTreeQueue::MutationStatus & status = *it->second;
status.latest_failed_part = log_entry->source_parts.at(0);
status.latest_failed_part = src_part;
status.latest_failed_part_info = source_part_info;
status.latest_fail_time = time(nullptr);
status.latest_fail_reason = getExceptionMessage(saved_exception, false);
if (result_data_version == it->first)
storage.mutation_backoff_policy.addPartMutationFailure(src_part, result_data_version, storage.getSettings()->max_postpone_time_for_failed_mutations);
}
}
}
Expand Down Expand Up @@ -142,6 +146,12 @@ bool ReplicatedMergeMutateTaskBase::executeImpl()
{
storage.queue.removeProcessedEntry(storage.getZooKeeper(), selected_entry->log_entry);
state = State::SUCCESS;

auto & log_entry = selected_entry->log_entry;
if (log_entry->type == ReplicatedMergeTreeLogEntryData::MUTATE_PART)
{
storage.mutation_backoff_policy.removePartFromFailed(log_entry->source_parts.at(0));
}
}
catch (...)
{
Expand Down
11 changes: 10 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Expand Up @@ -13,6 +13,7 @@
#include <base/sort.h>

#include <ranges>
#include <Poco/Timestamp.h>

namespace DB
{
Expand Down Expand Up @@ -1350,9 +1351,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
sum_parts_size_in_bytes += part_in_memory->block.bytes();
else
sum_parts_size_in_bytes += part->getBytesOnDisk();

if (entry.type == LogEntry::MUTATE_PART && !storage.mutation_backoff_policy.partCanBeMutated(part->name))
{
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because recently it has failed. According to exponential backoff policy, put aside this log entry.";

LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name);
Comment on lines +1360 to +1363
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to also print the number of (milli)seconds til the next attempt, but it's not necessary

return false;
}
}
}

if (merger_mutator.merges_blocker.isCancelled())
{
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now.";
Expand Down
30 changes: 27 additions & 3 deletions src/Storages/StorageMergeTree.cpp
Expand Up @@ -5,6 +5,7 @@
#include <optional>
#include <ranges>

#include <Poco/Timestamp.h>
#include <Backups/BackupEntriesCollector.h>
#include <Databases/IDatabase.h>
#include <IO/copyData.h>
Expand Down Expand Up @@ -540,6 +541,8 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re

Int64 sources_data_version = result_part->parts.at(0)->info.getDataVersion();
Int64 result_data_version = result_part->part_info.getDataVersion();
auto & failed_part = result_part->parts.at(0);

if (sources_data_version != result_data_version)
{
std::lock_guard lock(currently_processing_in_background_mutex);
Expand All @@ -557,14 +560,21 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
entry.latest_failed_part_info = MergeTreePartInfo();
entry.latest_fail_time = 0;
entry.latest_fail_reason.clear();
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
mutation_backoff_policy.removePartFromFailed(failed_part->name);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be called from the killMutation as well? (so as for ReplicatedMergeTree)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we call resetMutationFailures instead, see the review comments.

However, I've just realized that for RMT it will be only called on the initiator, and not on other replicas (I'm not sure what's the best way to fix it)

}
}
else
{
entry.latest_failed_part = result_part->parts.at(0)->name;
entry.latest_failed_part_info = result_part->parts.at(0)->info;
entry.latest_failed_part = failed_part->name;
entry.latest_failed_part_info = failed_part->info;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = exception_message;

if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
{
mutation_backoff_policy.addPartMutationFailure(failed_part->name, it->first, getSettings()->max_postpone_time_for_failed_mutations);
}
}
}
}
Expand Down Expand Up @@ -835,6 +845,8 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
}
}

mutation_backoff_policy.removeFromFailedByVersion(mutation_version);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Do we need the same for ReplicatedMergeTree?
  2. Feels like it will not work because multiple mutations may be merged together and executed at once. For example, if there's a faulty mutation with version 42 and another mutation with version 137, then a part all_1_2_3 can be mutated directly to all_1_2_3_137 (and this will fail due to the mutation 42). So according to the if (static_cast<UInt64>(result_part->part_info.mutation) == it->first) condition, addPartMutationFailure will be called for the version 137, and killing the faulty mutation 42 will not reset the backoff. Please add the corresponding test. Consider removing all the logic around the mutation version for simplicity and resetting all backoffs when killing any mutation


if (!to_kill)
return CancellationCode::NotFound;

Expand Down Expand Up @@ -1195,6 +1207,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(

CurrentlyMergingPartsTaggerPtr tagger;

bool exist_postponed_failed_part = false;
auto mutations_end_it = current_mutations_by_version.end();
for (const auto & part : getDataPartsVectorForInternalUsage())
{
Expand All @@ -1219,6 +1232,13 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
TransactionID first_mutation_tid = mutations_begin_it->second.tid;
MergeTreeTransactionPtr txn;

if (!mutation_backoff_policy.partCanBeMutated(part->name))
{
exist_postponed_failed_part = true;
LOG_DEBUG(log, "According to exponential backoff policy, do not perform mutations for the part {} yet. Put it aside.", part->name);
continue;
}

if (!first_mutation_tid.isPrehistoric())
{

Expand Down Expand Up @@ -1325,7 +1345,11 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands, txn);
}
}

if (exist_postponed_failed_part)
{
std::lock_guard lock(mutation_wait_mutex);
mutation_wait_event.notify_all();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unexpected, could you please add a comment explaining why we need to notify here? Introducing a backoff affects only timings and should not affect any synchronization

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. Removed it.

return {};
}

Expand Down
Empty file.
@@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<max_postpone_time_for_failed_mutations>60000</max_postpone_time_for_failed_mutations>
</merge_tree>
</clickhouse>