Skip to content

Commit

Permalink
Merge pull request #58036 from MikhailBurdukov/backoff_for_failed_mut…
Browse files Browse the repository at this point in the history
…ations

Backoff policy for failed mutation.
  • Loading branch information
tavplubix committed Feb 23, 2024
2 parents 83841c2 + fa5747a commit ef3b191
Show file tree
Hide file tree
Showing 13 changed files with 350 additions and 5 deletions.
2 changes: 2 additions & 0 deletions docker/test/upgrade/run.sh
Expand Up @@ -88,6 +88,7 @@ rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/config.d/zero_copy_destructive_operations.xml
rm /etc/clickhouse-server/config.d/storage_conf_02963.xml
rm /etc/clickhouse-server/config.d/backoff_failed_mutation.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
rm /etc/clickhouse-server/users.d/replicated_ddl_entry.xml
Expand Down Expand Up @@ -134,6 +135,7 @@ rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/config.d/zero_copy_destructive_operations.xml
rm /etc/clickhouse-server/config.d/storage_conf_02963.xml
rm /etc/clickhouse-server/config.d/backoff_failed_mutation.xml
rm /etc/clickhouse-server/config.d/block_number.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
Expand Down
89 changes: 88 additions & 1 deletion src/Storages/MergeTree/MergeTreeData.h
Expand Up @@ -35,9 +35,9 @@
#include <Storages/extractKeyExpressionList.h>
#include <Storages/PartitionCommands.h>
#include <Interpreters/PartLog.h>
#include <Poco/Timestamp.h>
#include <Common/threadPoolCallbackRunner.h>


#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/global_fun.hpp>
Expand Down Expand Up @@ -1353,6 +1353,93 @@ class MergeTreeData : public IStorage, public WithMutableContext
const MergeListEntry * merge_entry,
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters);

class PartMutationBackoffPolicy
{
struct PartMutationInfo
{
size_t retry_count;
size_t latest_fail_time_us;
size_t max_postpone_time_ms;
size_t max_postpone_power;

PartMutationInfo(size_t max_postpone_time_ms_)
: retry_count(0ull)
, latest_fail_time_us(static_cast<size_t>(Poco::Timestamp().epochMicroseconds()))
, max_postpone_time_ms(max_postpone_time_ms_)
, max_postpone_power((max_postpone_time_ms_) ? (static_cast<size_t>(std::log2(max_postpone_time_ms_))) : (0ull))
{}


size_t getNextMinExecutionTimeUsResolution() const
{
if (max_postpone_time_ms == 0)
return static_cast<size_t>(Poco::Timestamp().epochMicroseconds());
size_t current_backoff_interval_us = (1 << retry_count) * 1000ul;
return latest_fail_time_us + current_backoff_interval_us;
}

void addPartFailure()
{
if (max_postpone_time_ms == 0)
return;
retry_count = std::min(max_postpone_power, retry_count + 1);
latest_fail_time_us = static_cast<size_t>(Poco::Timestamp().epochMicroseconds());
}

bool partCanBeMutated()
{
if (max_postpone_time_ms == 0)
return true;

auto current_time_us = static_cast<size_t>(Poco::Timestamp().epochMicroseconds());
return current_time_us >= getNextMinExecutionTimeUsResolution();
}
};

using DataPartsWithRetryInfo = std::unordered_map<String, PartMutationInfo>;
DataPartsWithRetryInfo failed_mutation_parts;
mutable std::mutex parts_info_lock;

public:

void resetMutationFailures()
{
std::unique_lock _lock(parts_info_lock);
failed_mutation_parts.clear();
}

void removePartFromFailed(const String & part_name)
{
std::unique_lock _lock(parts_info_lock);
failed_mutation_parts.erase(part_name);
}

void addPartMutationFailure (const String& part_name, size_t max_postpone_time_ms_)
{
std::unique_lock _lock(parts_info_lock);
auto part_info_it = failed_mutation_parts.find(part_name);
if (part_info_it == failed_mutation_parts.end())
{
auto [it, success] = failed_mutation_parts.emplace(part_name, PartMutationInfo(max_postpone_time_ms_));
std::swap(it, part_info_it);
}
auto& part_info = part_info_it->second;
part_info.addPartFailure();
}

bool partCanBeMutated(const String& part_name)
{

std::unique_lock _lock(parts_info_lock);
auto iter = failed_mutation_parts.find(part_name);
if (iter == failed_mutation_parts.end())
return true;
return iter->second.partCanBeMutated();
}
};
/// Controls postponing logic for failed mutations.
PartMutationBackoffPolicy mutation_backoff_policy;

/// If part is assigned to merge or mutation (possibly replicated)
/// Should be overridden by children, because they can have different
/// mechanisms for parts locking
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeSettings.h
Expand Up @@ -146,6 +146,7 @@ struct Settings;
M(UInt64, vertical_merge_algorithm_min_rows_to_activate, 16 * 8192, "Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_bytes_to_activate, 0, "Minimal (approximate) uncompressed size in bytes in merging parts to activate Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_columns_to_activate, 11, "Minimal amount of non-PK columns to activate Vertical merge algorithm.", 0) \
M(UInt64, max_postpone_time_for_failed_mutations_ms, 5ULL * 60 * 1000, "The maximum postpone time for failed mutations.", 0) \
\
/** Compatibility settings */ \
M(Bool, allow_suspicious_indices, false, "Reject primary/secondary indexes and sorting keys with identical expressions", 0) \
Expand Down
12 changes: 11 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp
@@ -1,6 +1,7 @@
#include <Storages/MergeTree/ReplicatedMergeMutateTaskBase.h>

#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Common/ProfileEventsScope.h>

Expand Down Expand Up @@ -110,11 +111,14 @@ bool ReplicatedMergeMutateTaskBase::executeStep()
auto mutations_end_it = in_partition->second.upper_bound(result_data_version);
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
auto & src_part = log_entry->source_parts.at(0);
ReplicatedMergeTreeQueue::MutationStatus & status = *it->second;
status.latest_failed_part = log_entry->source_parts.at(0);
status.latest_failed_part = src_part;
status.latest_failed_part_info = source_part_info;
status.latest_fail_time = time(nullptr);
status.latest_fail_reason = getExceptionMessage(saved_exception, false);
if (result_data_version == it->first)
storage.mutation_backoff_policy.addPartMutationFailure(src_part, storage.getSettings()->max_postpone_time_for_failed_mutations_ms);
}
}
}
Expand Down Expand Up @@ -142,6 +146,12 @@ bool ReplicatedMergeMutateTaskBase::executeImpl()
{
storage.queue.removeProcessedEntry(storage.getZooKeeper(), selected_entry->log_entry);
state = State::SUCCESS;

auto & log_entry = selected_entry->log_entry;
if (log_entry->type == ReplicatedMergeTreeLogEntryData::MUTATE_PART)
{
storage.mutation_backoff_policy.removePartFromFailed(log_entry->source_parts.at(0));
}
}
catch (...)
{
Expand Down
11 changes: 10 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Expand Up @@ -13,6 +13,7 @@
#include <base/sort.h>

#include <ranges>
#include <Poco/Timestamp.h>

namespace DB
{
Expand Down Expand Up @@ -1353,9 +1354,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
sum_parts_size_in_bytes += part_in_memory->block.bytes();
else
sum_parts_size_in_bytes += part->getBytesOnDisk();

if (entry.type == LogEntry::MUTATE_PART && !storage.mutation_backoff_policy.partCanBeMutated(part->name))
{
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because recently it has failed. According to exponential backoff policy, put aside this log entry.";

LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name);
return false;
}
}
}

if (merger_mutator.merges_blocker.isCancelled())
{
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now.";
Expand Down
22 changes: 20 additions & 2 deletions src/Storages/StorageMergeTree.cpp
Expand Up @@ -5,6 +5,7 @@
#include <optional>
#include <ranges>

#include <Poco/Timestamp.h>
#include <base/sort.h>
#include <Backups/BackupEntriesCollector.h>
#include <Databases/IDatabase.h>
Expand Down Expand Up @@ -538,6 +539,8 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re

Int64 sources_data_version = result_part->parts.at(0)->info.getDataVersion();
Int64 result_data_version = result_part->part_info.getDataVersion();
auto & failed_part = result_part->parts.at(0);

if (sources_data_version != result_data_version)
{
std::lock_guard lock(currently_processing_in_background_mutex);
Expand All @@ -555,14 +558,21 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
entry.latest_failed_part_info = MergeTreePartInfo();
entry.latest_fail_time = 0;
entry.latest_fail_reason.clear();
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
mutation_backoff_policy.removePartFromFailed(failed_part->name);
}
}
else
{
entry.latest_failed_part = result_part->parts.at(0)->name;
entry.latest_failed_part_info = result_part->parts.at(0)->info;
entry.latest_failed_part = failed_part->name;
entry.latest_failed_part_info = failed_part->info;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = exception_message;

if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
{
mutation_backoff_policy.addPartMutationFailure(failed_part->name, getSettings()->max_postpone_time_for_failed_mutations_ms);
}
}
}
}
Expand Down Expand Up @@ -833,6 +843,8 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
}
}

mutation_backoff_policy.resetMutationFailures();

if (!to_kill)
return CancellationCode::NotFound;

Expand Down Expand Up @@ -1217,6 +1229,12 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
TransactionID first_mutation_tid = mutations_begin_it->second.tid;
MergeTreeTransactionPtr txn;

if (!mutation_backoff_policy.partCanBeMutated(part->name))
{
LOG_DEBUG(log, "According to exponential backoff policy, do not perform mutations for the part {} yet. Put it aside.", part->name);
continue;
}

if (!first_mutation_tid.isPrehistoric())
{

Expand Down
1 change: 1 addition & 0 deletions src/Storages/StorageReplicatedMergeTree.cpp
Expand Up @@ -7460,6 +7460,7 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio
Int64 block_number = pair.second;
getContext()->getMergeList().cancelPartMutations(getStorageID(), partition_id, block_number);
}
mutation_backoff_policy.resetMutationFailures();
return CancellationCode::CancelSent;
}

Expand Down
5 changes: 5 additions & 0 deletions tests/config/config.d/backoff_failed_mutation.xml
@@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<max_postpone_time_for_failed_mutations_ms>200</max_postpone_time_for_failed_mutations_ms>
</merge_tree>
</clickhouse>
1 change: 1 addition & 0 deletions tests/config/install.sh
Expand Up @@ -30,6 +30,7 @@ ln -sf $SRC_PATH/config.d/graphite_alternative.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/merge_tree_settings.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/backoff_failed_mutation.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/merge_tree_old_dirs_cleanup.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/test_cluster_with_incorrect_pw.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/keeper_port.xml $DEST_SERVER_PATH/config.d/
Expand Down
Empty file.
@@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<max_postpone_time_for_failed_mutations_ms>60000</max_postpone_time_for_failed_mutations_ms>
</merge_tree>
</clickhouse>
@@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<max_postpone_time_for_failed_mutations_ms>0</max_postpone_time_for_failed_mutations_ms>
</merge_tree>
</clickhouse>

0 comments on commit ef3b191

Please sign in to comment.