Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backoff policy for failed mutation. #58036

Merged
2 changes: 2 additions & 0 deletions docker/test/upgrade/run.sh
Expand Up @@ -82,6 +82,7 @@ rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/config.d/zero_copy_destructive_operations.xml
rm /etc/clickhouse-server/config.d/storage_conf_02963.xml
rm /etc/clickhouse-server/config.d/backoff_failed_mutation.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
rm /etc/clickhouse-server/users.d/replicated_ddl_entry.xml
Expand Down Expand Up @@ -122,6 +123,7 @@ rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/config.d/zero_copy_destructive_operations.xml
rm /etc/clickhouse-server/config.d/storage_conf_02963.xml
rm /etc/clickhouse-server/config.d/backoff_failed_mutation.xml
rm /etc/clickhouse-server/config.d/block_number.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
Expand Down
89 changes: 88 additions & 1 deletion src/Storages/MergeTree/MergeTreeData.h
Expand Up @@ -35,9 +35,9 @@
#include <Storages/extractKeyExpressionList.h>
#include <Storages/PartitionCommands.h>
#include <Interpreters/PartLog.h>
#include <Poco/Timestamp.h>
#include <Common/threadPoolCallbackRunner.h>


#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/global_fun.hpp>
Expand Down Expand Up @@ -1351,6 +1351,93 @@ class MergeTreeData : public IStorage, public WithMutableContext
const MergeListEntry * merge_entry,
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters);

class PartMutationBackoffPolicy
{
struct PartMutationInfo
{
size_t retry_count;
size_t latest_fail_time_us;
size_t max_postpone_time_ms;
size_t max_postpone_power;

PartMutationInfo(size_t max_postpone_time_ms_)
: retry_count(0ull)
, latest_fail_time_us(static_cast<size_t>(Poco::Timestamp().epochMicroseconds()))
, max_postpone_time_ms(max_postpone_time_ms_)
, max_postpone_power((max_postpone_time_ms_) ? (static_cast<size_t>(std::log2(max_postpone_time_ms_))) : (0ull))
{}


size_t getNextMinExecutionTimeUsResolution() const
{
if (max_postpone_time_ms == 0)
return static_cast<size_t>(Poco::Timestamp().epochMicroseconds());
size_t current_backoff_interval_us = (1 << retry_count) * 1000ul;
return latest_fail_time_us + current_backoff_interval_us;
}

void addPartFailure()
{
if (max_postpone_time_ms == 0)
return;
retry_count = std::min(max_postpone_power, retry_count + 1);
latest_fail_time_us = static_cast<size_t>(Poco::Timestamp().epochMicroseconds());
}

bool partCanBeMutated()
{
if (max_postpone_time_ms == 0)
return true;

auto current_time_us = static_cast<size_t>(Poco::Timestamp().epochMicroseconds());
return current_time_us >= getNextMinExecutionTimeUsResolution();
}
};

using DataPartsWithRetryInfo = std::unordered_map<String, PartMutationInfo>;
DataPartsWithRetryInfo failed_mutation_parts;
mutable std::mutex parts_info_lock;

public:

void resetMutationFailures()
{
std::unique_lock _lock(parts_info_lock);
failed_mutation_parts.clear();
}

void removePartFromFailed(const String & part_name)
{
std::unique_lock _lock(parts_info_lock);
failed_mutation_parts.erase(part_name);
}

void addPartMutationFailure (const String& part_name, size_t max_postpone_time_ms_)
{
std::unique_lock _lock(parts_info_lock);
auto part_info_it = failed_mutation_parts.find(part_name);
if (part_info_it == failed_mutation_parts.end())
{
auto [it, success] = failed_mutation_parts.emplace(part_name, PartMutationInfo(max_postpone_time_ms_));
std::swap(it, part_info_it);
}
auto& part_info = part_info_it->second;
part_info.addPartFailure();
}

bool partCanBeMutated(const String& part_name)
{

std::unique_lock _lock(parts_info_lock);
auto iter = failed_mutation_parts.find(part_name);
if (iter == failed_mutation_parts.end())
return true;
return iter->second.partCanBeMutated();
}
};
/// Controls postponing logic for failed mutations.
PartMutationBackoffPolicy mutation_backoff_policy;

/// If part is assigned to merge or mutation (possibly replicated)
/// Should be overridden by children, because they can have different
/// mechanisms for parts locking
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeSettings.h
Expand Up @@ -146,6 +146,7 @@ struct Settings;
M(UInt64, vertical_merge_algorithm_min_rows_to_activate, 16 * 8192, "Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_bytes_to_activate, 0, "Minimal (approximate) uncompressed size in bytes in merging parts to activate Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_columns_to_activate, 11, "Minimal amount of non-PK columns to activate Vertical merge algorithm.", 0) \
M(UInt64, max_postpone_time_for_failed_mutations_ms, 5ULL * 60 * 1000, "The maximum postpone time for failed mutations.", 0) \
\
/** Compatibility settings */ \
M(Bool, allow_suspicious_indices, false, "Reject primary/secondary indexes and sorting keys with identical expressions", 0) \
Expand Down
12 changes: 11 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp
@@ -1,6 +1,7 @@
#include <Storages/MergeTree/ReplicatedMergeMutateTaskBase.h>

#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Common/ProfileEventsScope.h>

Expand Down Expand Up @@ -110,11 +111,14 @@ bool ReplicatedMergeMutateTaskBase::executeStep()
auto mutations_end_it = in_partition->second.upper_bound(result_data_version);
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
auto & src_part = log_entry->source_parts.at(0);
ReplicatedMergeTreeQueue::MutationStatus & status = *it->second;
status.latest_failed_part = log_entry->source_parts.at(0);
status.latest_failed_part = src_part;
status.latest_failed_part_info = source_part_info;
status.latest_fail_time = time(nullptr);
status.latest_fail_reason = getExceptionMessage(saved_exception, false);
if (result_data_version == it->first)
storage.mutation_backoff_policy.addPartMutationFailure(src_part, storage.getSettings()->max_postpone_time_for_failed_mutations_ms);
}
}
}
Expand Down Expand Up @@ -142,6 +146,12 @@ bool ReplicatedMergeMutateTaskBase::executeImpl()
{
storage.queue.removeProcessedEntry(storage.getZooKeeper(), selected_entry->log_entry);
state = State::SUCCESS;

auto & log_entry = selected_entry->log_entry;
if (log_entry->type == ReplicatedMergeTreeLogEntryData::MUTATE_PART)
{
storage.mutation_backoff_policy.removePartFromFailed(log_entry->source_parts.at(0));
}
}
catch (...)
{
Expand Down
11 changes: 10 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Expand Up @@ -13,6 +13,7 @@
#include <base/sort.h>

#include <ranges>
#include <Poco/Timestamp.h>

namespace DB
{
Expand Down Expand Up @@ -1353,9 +1354,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
sum_parts_size_in_bytes += part_in_memory->block.bytes();
else
sum_parts_size_in_bytes += part->getBytesOnDisk();

if (entry.type == LogEntry::MUTATE_PART && !storage.mutation_backoff_policy.partCanBeMutated(part->name))
{
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because recently it has failed. According to exponential backoff policy, put aside this log entry.";

LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name);
Comment on lines +1360 to +1363
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to also print the number of (milli)seconds til the next attempt, but it's not necessary

return false;
}
}
}

if (merger_mutator.merges_blocker.isCancelled())
{
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now.";
Expand Down
22 changes: 20 additions & 2 deletions src/Storages/StorageMergeTree.cpp
Expand Up @@ -5,6 +5,7 @@
#include <optional>
#include <ranges>

#include <Poco/Timestamp.h>
#include <base/sort.h>
#include <Backups/BackupEntriesCollector.h>
#include <Databases/IDatabase.h>
Expand Down Expand Up @@ -539,6 +540,8 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re

Int64 sources_data_version = result_part->parts.at(0)->info.getDataVersion();
Int64 result_data_version = result_part->part_info.getDataVersion();
auto & failed_part = result_part->parts.at(0);

if (sources_data_version != result_data_version)
{
std::lock_guard lock(currently_processing_in_background_mutex);
Expand All @@ -556,14 +559,21 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
entry.latest_failed_part_info = MergeTreePartInfo();
entry.latest_fail_time = 0;
entry.latest_fail_reason.clear();
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
mutation_backoff_policy.removePartFromFailed(failed_part->name);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be called from the killMutation as well? (so as for ReplicatedMergeTree)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we call resetMutationFailures instead, see the review comments.

However, I've just realized that for RMT it will be only called on the initiator, and not on other replicas (I'm not sure what's the best way to fix it)

}
}
else
{
entry.latest_failed_part = result_part->parts.at(0)->name;
entry.latest_failed_part_info = result_part->parts.at(0)->info;
entry.latest_failed_part = failed_part->name;
entry.latest_failed_part_info = failed_part->info;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = exception_message;

if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
{
mutation_backoff_policy.addPartMutationFailure(failed_part->name, getSettings()->max_postpone_time_for_failed_mutations_ms);
}
}
}
}
Expand Down Expand Up @@ -834,6 +844,8 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
}
}

mutation_backoff_policy.resetMutationFailures();

if (!to_kill)
return CancellationCode::NotFound;

Expand Down Expand Up @@ -1218,6 +1230,12 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
TransactionID first_mutation_tid = mutations_begin_it->second.tid;
MergeTreeTransactionPtr txn;

if (!mutation_backoff_policy.partCanBeMutated(part->name))
{
LOG_DEBUG(log, "According to exponential backoff policy, do not perform mutations for the part {} yet. Put it aside.", part->name);
continue;
}

if (!first_mutation_tid.isPrehistoric())
{

Expand Down
1 change: 1 addition & 0 deletions src/Storages/StorageReplicatedMergeTree.cpp
Expand Up @@ -7438,6 +7438,7 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio
Int64 block_number = pair.second;
getContext()->getMergeList().cancelPartMutations(getStorageID(), partition_id, block_number);
}
mutation_backoff_policy.resetMutationFailures();
return CancellationCode::CancelSent;
}

Expand Down
5 changes: 5 additions & 0 deletions tests/config/config.d/backoff_failed_mutation.xml
@@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<max_postpone_time_for_failed_mutations_ms>200</max_postpone_time_for_failed_mutations_ms>
</merge_tree>
</clickhouse>
1 change: 1 addition & 0 deletions tests/config/install.sh
Expand Up @@ -30,6 +30,7 @@ ln -sf $SRC_PATH/config.d/graphite_alternative.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/merge_tree_settings.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/backoff_failed_mutation.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/merge_tree_old_dirs_cleanup.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/test_cluster_with_incorrect_pw.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/keeper_port.xml $DEST_SERVER_PATH/config.d/
Expand Down
Empty file.
@@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<max_postpone_time_for_failed_mutations_ms>60000</max_postpone_time_for_failed_mutations_ms>
</merge_tree>
</clickhouse>
@@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<max_postpone_time_for_failed_mutations_ms>0</max_postpone_time_for_failed_mutations_ms>
</merge_tree>
</clickhouse>