New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Backoff policy for failed mutation. #58036
Changes from all commits
f6fb20d
be58c1f
97e0e5f
ee5d8c0
5e84517
778efb8
b90a5b9
97ee8c0
b227e5d
94fa87d
c3ab907
7cdfeda
c2d641c
fa5747a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
#include <optional> | ||
#include <ranges> | ||
|
||
#include <Poco/Timestamp.h> | ||
#include <base/sort.h> | ||
#include <Backups/BackupEntriesCollector.h> | ||
#include <Databases/IDatabase.h> | ||
|
@@ -539,6 +540,8 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re | |
|
||
Int64 sources_data_version = result_part->parts.at(0)->info.getDataVersion(); | ||
Int64 result_data_version = result_part->part_info.getDataVersion(); | ||
auto & failed_part = result_part->parts.at(0); | ||
|
||
if (sources_data_version != result_data_version) | ||
{ | ||
std::lock_guard lock(currently_processing_in_background_mutex); | ||
|
@@ -556,14 +559,21 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re | |
entry.latest_failed_part_info = MergeTreePartInfo(); | ||
entry.latest_fail_time = 0; | ||
entry.latest_fail_reason.clear(); | ||
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first) | ||
mutation_backoff_policy.removePartFromFailed(failed_part->name); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be called from the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, we call However, I've just realized that for RMT it will be only called on the initiator, and not on other replicas (I'm not sure what's the best way to fix it) |
||
} | ||
} | ||
else | ||
{ | ||
entry.latest_failed_part = result_part->parts.at(0)->name; | ||
entry.latest_failed_part_info = result_part->parts.at(0)->info; | ||
entry.latest_failed_part = failed_part->name; | ||
entry.latest_failed_part_info = failed_part->info; | ||
entry.latest_fail_time = time(nullptr); | ||
entry.latest_fail_reason = exception_message; | ||
|
||
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first) | ||
{ | ||
mutation_backoff_policy.addPartMutationFailure(failed_part->name, getSettings()->max_postpone_time_for_failed_mutations_ms); | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -834,6 +844,8 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id) | |
} | ||
} | ||
|
||
mutation_backoff_policy.resetMutationFailures(); | ||
|
||
if (!to_kill) | ||
return CancellationCode::NotFound; | ||
|
||
|
@@ -1218,6 +1230,12 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( | |
TransactionID first_mutation_tid = mutations_begin_it->second.tid; | ||
MergeTreeTransactionPtr txn; | ||
|
||
if (!mutation_backoff_policy.partCanBeMutated(part->name)) | ||
{ | ||
LOG_DEBUG(log, "According to exponential backoff policy, do not perform mutations for the part {} yet. Put it aside.", part->name); | ||
continue; | ||
} | ||
|
||
if (!first_mutation_tid.isPrehistoric()) | ||
{ | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
<clickhouse> | ||
<merge_tree> | ||
<max_postpone_time_for_failed_mutations_ms>200</max_postpone_time_for_failed_mutations_ms> | ||
</merge_tree> | ||
</clickhouse> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
<clickhouse> | ||
<merge_tree> | ||
<max_postpone_time_for_failed_mutations_ms>60000</max_postpone_time_for_failed_mutations_ms> | ||
</merge_tree> | ||
</clickhouse> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
<clickhouse> | ||
<merge_tree> | ||
<max_postpone_time_for_failed_mutations_ms>0</max_postpone_time_for_failed_mutations_ms> | ||
</merge_tree> | ||
</clickhouse> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to also print the number of (milli)seconds til the next attempt, but it's not necessary