New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Backoff policy for failed mutation. #58036
Changes from 8 commits
f6fb20d
be58c1f
97e0e5f
ee5d8c0
5e84517
778efb8
b90a5b9
97ee8c0
b227e5d
94fa87d
c3ab907
7cdfeda
c2d641c
fa5747a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -36,7 +36,7 @@ | |||||
#include <Storages/PartitionCommands.h> | ||||||
#include <Interpreters/PartLog.h> | ||||||
#include <Interpreters/threadPoolCallbackRunner.h> | ||||||
|
||||||
#include <Poco/Timestamp.h> | ||||||
|
||||||
#include <boost/multi_index_container.hpp> | ||||||
#include <boost/multi_index/ordered_index.hpp> | ||||||
|
@@ -1364,6 +1364,100 @@ class MergeTreeData : public IStorage, public WithMutableContext | |||||
const MergeListEntry * merge_entry, | ||||||
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters); | ||||||
|
||||||
class PartMutationBackoffPolicy : public WithContext | ||||||
{ | ||||||
struct PartMutationInfo | ||||||
{ | ||||||
size_t retry_count; | ||||||
Poco::Timestamp latest_fail_time; | ||||||
UInt64 mutation_failure_version; | ||||||
size_t max_postpone_time_ms; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we have time both in |
||||||
size_t max_postpone_power; | ||||||
|
||||||
PartMutationInfo(UInt64 mutation_failure_version_, size_t max_postpone_time_ms_) | ||||||
: retry_count(0ull) | ||||||
, latest_fail_time(std::move(Poco::Timestamp())) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't |
||||||
, mutation_failure_version(mutation_failure_version_) | ||||||
, max_postpone_time_ms(max_postpone_time_ms_) | ||||||
, max_postpone_power((max_postpone_time_ms_) ? (static_cast<size_t>(std::log2(max_postpone_time_ms_))) : (0ull)) | ||||||
{} | ||||||
|
||||||
|
||||||
Poco::Timestamp getNextMinExecutionTime() const | ||||||
{ | ||||||
if (max_postpone_time_ms == 0) | ||||||
return Poco::Timestamp(); | ||||||
return latest_fail_time + (1 << retry_count) * 1000ul; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
} | ||||||
|
||||||
void addPartFailure() | ||||||
{ | ||||||
if (max_postpone_time_ms == 0) | ||||||
return; | ||||||
retry_count = std::min(max_postpone_power, retry_count + 1); | ||||||
latest_fail_time = Poco::Timestamp(); | ||||||
} | ||||||
|
||||||
bool partCanBeMutated() | ||||||
{ | ||||||
if (max_postpone_time_ms == 0) | ||||||
return true; | ||||||
|
||||||
auto current_time = Poco::Timestamp(); | ||||||
return current_time >= getNextMinExecutionTime(); | ||||||
} | ||||||
}; | ||||||
|
||||||
using DataPartsWithRetryInfo = std::unordered_map<String, PartMutationInfo>; | ||||||
DataPartsWithRetryInfo failed_mutation_parts; | ||||||
mutable std::mutex parts_info_lock; | ||||||
|
||||||
public: | ||||||
|
||||||
void removeFromFailedByVersion(UInt64 mutation_version) | ||||||
{ | ||||||
std::unique_lock _lock(parts_info_lock); | ||||||
for (auto failed_part_it = failed_mutation_parts.begin(); failed_part_it != failed_mutation_parts.end();) | ||||||
{ | ||||||
if (failed_part_it->second.mutation_failure_version == mutation_version) | ||||||
failed_part_it = failed_mutation_parts.erase(failed_part_it); | ||||||
else | ||||||
++failed_part_it; | ||||||
} | ||||||
} | ||||||
|
||||||
void removePartFromFailed(const String& part_name) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
And in other places as well |
||||||
{ | ||||||
std::unique_lock _lock(parts_info_lock); | ||||||
failed_mutation_parts.erase(part_name); | ||||||
} | ||||||
|
||||||
void addPartMutationFailure (const String& part_name, UInt64 mutation_failure_version_, size_t max_postpone_time_ms_) | ||||||
{ | ||||||
std::unique_lock _lock(parts_info_lock); | ||||||
auto part_info_it = failed_mutation_parts.find(part_name); | ||||||
if (part_info_it == failed_mutation_parts.end()) | ||||||
{ | ||||||
auto [it, success] = failed_mutation_parts.emplace(part_name, PartMutationInfo(mutation_failure_version_, max_postpone_time_ms_)); | ||||||
std::swap(it, part_info_it); | ||||||
} | ||||||
auto& part_info = part_info_it->second; | ||||||
part_info.addPartFailure(); | ||||||
} | ||||||
|
||||||
bool partCanBeMutated(const String& part_name) | ||||||
{ | ||||||
|
||||||
std::unique_lock _lock(parts_info_lock); | ||||||
auto iter = failed_mutation_parts.find(part_name); | ||||||
if (iter == failed_mutation_parts.end()) | ||||||
return true; | ||||||
return iter->second.partCanBeMutated(); | ||||||
} | ||||||
}; | ||||||
/// Controls postponing logic for failed mutations. | ||||||
PartMutationBackoffPolicy mutation_backoff_policy; | ||||||
|
||||||
/// If part is assigned to merge or mutation (possibly replicated) | ||||||
/// Should be overridden by children, because they can have different | ||||||
/// mechanisms for parts locking | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -146,6 +146,7 @@ struct Settings; | |||||
M(UInt64, vertical_merge_algorithm_min_rows_to_activate, 16 * 8192, "Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm.", 0) \ | ||||||
M(UInt64, vertical_merge_algorithm_min_bytes_to_activate, 0, "Minimal (approximate) uncompressed size in bytes in merging parts to activate Vertical merge algorithm.", 0) \ | ||||||
M(UInt64, vertical_merge_algorithm_min_columns_to_activate, 11, "Minimal amount of non-PK columns to activate Vertical merge algorithm.", 0) \ | ||||||
M(UInt64, max_postpone_time_for_failed_mutations, 0ul, "The maximum postpone time for failed mutations in ms.", 0) \ | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In general, the change is good and it makes sense to enable it by default. What will be a good default value in your opinion? We can start with enabling it in the CI. See There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
\ | ||||||
/** Compatibility settings */ \ | ||||||
M(Bool, allow_suspicious_indices, false, "Reject primary/secondary indexes and sorting keys with identical expressions", 0) \ | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
#include <base/sort.h> | ||
|
||
#include <ranges> | ||
#include <Poco/Timestamp.h> | ||
|
||
namespace DB | ||
{ | ||
|
@@ -1350,9 +1351,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( | |
sum_parts_size_in_bytes += part_in_memory->block.bytes(); | ||
else | ||
sum_parts_size_in_bytes += part->getBytesOnDisk(); | ||
|
||
if (entry.type == LogEntry::MUTATE_PART && !storage.mutation_backoff_policy.partCanBeMutated(part->name)) | ||
{ | ||
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} " | ||
"because recently it has failed. According to exponential backoff policy, put aside this log entry."; | ||
|
||
LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name); | ||
Comment on lines
+1360
to
+1363
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice to also print the number of (milli)seconds til the next attempt, but it's not necessary |
||
return false; | ||
} | ||
} | ||
} | ||
|
||
if (merger_mutator.merges_blocker.isCancelled()) | ||
{ | ||
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now."; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
#include <optional> | ||
#include <ranges> | ||
|
||
#include <Poco/Timestamp.h> | ||
#include <Backups/BackupEntriesCollector.h> | ||
#include <Databases/IDatabase.h> | ||
#include <IO/copyData.h> | ||
|
@@ -540,6 +541,8 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re | |
|
||
Int64 sources_data_version = result_part->parts.at(0)->info.getDataVersion(); | ||
Int64 result_data_version = result_part->part_info.getDataVersion(); | ||
auto & failed_part = result_part->parts.at(0); | ||
|
||
if (sources_data_version != result_data_version) | ||
{ | ||
std::lock_guard lock(currently_processing_in_background_mutex); | ||
|
@@ -557,14 +560,21 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re | |
entry.latest_failed_part_info = MergeTreePartInfo(); | ||
entry.latest_fail_time = 0; | ||
entry.latest_fail_reason.clear(); | ||
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first) | ||
mutation_backoff_policy.removePartFromFailed(failed_part->name); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be called from the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, we call However, I've just realized that for RMT it will be only called on the initiator, and not on other replicas (I'm not sure what's the best way to fix it) |
||
} | ||
} | ||
else | ||
{ | ||
entry.latest_failed_part = result_part->parts.at(0)->name; | ||
entry.latest_failed_part_info = result_part->parts.at(0)->info; | ||
entry.latest_failed_part = failed_part->name; | ||
entry.latest_failed_part_info = failed_part->info; | ||
entry.latest_fail_time = time(nullptr); | ||
entry.latest_fail_reason = exception_message; | ||
|
||
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first) | ||
{ | ||
mutation_backoff_policy.addPartMutationFailure(failed_part->name, it->first, getSettings()->max_postpone_time_for_failed_mutations); | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -835,6 +845,8 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id) | |
} | ||
} | ||
|
||
mutation_backoff_policy.removeFromFailedByVersion(mutation_version); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
if (!to_kill) | ||
return CancellationCode::NotFound; | ||
|
||
|
@@ -1195,6 +1207,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( | |
|
||
CurrentlyMergingPartsTaggerPtr tagger; | ||
|
||
bool exist_postponed_failed_part = false; | ||
auto mutations_end_it = current_mutations_by_version.end(); | ||
for (const auto & part : getDataPartsVectorForInternalUsage()) | ||
{ | ||
|
@@ -1219,6 +1232,13 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( | |
TransactionID first_mutation_tid = mutations_begin_it->second.tid; | ||
MergeTreeTransactionPtr txn; | ||
|
||
if (!mutation_backoff_policy.partCanBeMutated(part->name)) | ||
{ | ||
exist_postponed_failed_part = true; | ||
LOG_DEBUG(log, "According to exponential backoff policy, do not perform mutations for the part {} yet. Put it aside.", part->name); | ||
continue; | ||
} | ||
|
||
if (!first_mutation_tid.isPrehistoric()) | ||
{ | ||
|
||
|
@@ -1325,7 +1345,11 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( | |
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands, txn); | ||
} | ||
} | ||
|
||
if (exist_postponed_failed_part) | ||
{ | ||
std::lock_guard lock(mutation_wait_mutex); | ||
mutation_wait_event.notify_all(); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is unexpected, could you please add a comment explaining why we need to notify here? Introducing a backoff affects only timings and should not affect any synchronization There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, you are right. Removed it. |
||
return {}; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
<clickhouse> | ||
<merge_tree> | ||
<max_postpone_time_for_failed_mutations>60000</max_postpone_time_for_failed_mutations> | ||
</merge_tree> | ||
</clickhouse> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the context is never used