New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Backoff policy for failed mutation. #58036
Backoff policy for failed mutation. #58036
Conversation
This is an automated comment for commit fa5747a with description of existing statuses. It's updated for the latest CI running ❌ Click here to open a full report in a separate page Successful checks
|
src/Storages/StorageMergeTree.cpp
Outdated
@@ -516,6 +516,7 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re | |||
for (auto it = mutations_begin_it; it != mutations_end_it; ++it) | |||
{ | |||
MergeTreeMutationEntry & entry = it->second; | |||
auto failed_part = result_part->parts.at(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto failed_part = result_part->parts.at(0); | |
auto & failed_part = result_part->parts.at(0); |
src/Storages/StorageMergeTree.cpp
Outdated
if (exist_posponed_failed_part) | ||
mutation_wait_event.notify_all(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All other usages of mutation_wait_event
notify it under
std::lock_guard lock(mutation_wait_mutex)
. Generally you should not notify a condition variable while holding a mutex (as waiting threads will be notified just to sleep again) but maybe there is some other semantics here you could investigate
cdf3c5f
to
ee5d8c0
Compare
Resolves: #57089 |
src/Interpreters/Context.cpp
Outdated
return config.getUInt("max_postpone_time_for_failed_mutations", 0ull); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can add this setting here
and query it like getSettingsRef().max_postpone_time...
explicit PartMutationBackoffPolicy(ContextPtr global_context_) | ||
: WithContext(global_context_) | ||
{ | ||
size_t max_pospone_time_ms = global_context_->getMaxPostponeTimeForFailedMutations(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could store context.getSettings()
instead of storing the context ptr itself? Except for the setting, we don't use the context
if (max_pospone_time_ms == 0) | ||
max_pospone_power = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: posTpone
# Executing incorrect mutation. | ||
node.query( | ||
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this setting will be present in Settings.h
, we could change the setting in runtime via SETTINGS max_postpone_time=12345
.
Please consider adding a test case changing the setting in runtime
) | ||
|
||
time.sleep(5) | ||
assert node_no_backoff.contains_in_log(REPLICATED_POSPONE_MUTATION_LOG) == False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert node_no_backoff.contains_in_log(REPLICATED_POSPONE_MUTATION_LOG) == False | |
assert not node_no_backoff.contains_in_log(REPLICATED_POSPONE_MUTATION_LOG) |
|
||
time.sleep(5) | ||
assert node_no_backoff.contains_in_log(REPLICATED_POSPONE_MUTATION_LOG) == False | ||
assert node_with_backoff.contains_in_log(REPLICATED_POSPONE_MUTATION_LOG) == True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert node_with_backoff.contains_in_log(REPLICATED_POSPONE_MUTATION_LOG) == True | |
assert node_with_backoff.contains_in_log(REPLICATED_POSPONE_MUTATION_LOG) |
14b59f8
to
5e84517
Compare
Poco::Timestamp latest_fail_time; | ||
UInt64 mutation_failure_version; | ||
size_t max_postpone_time_ms; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have time both in Poco::Timestamp
and size_t
?
|
||
PartMutationInfo(UInt64 mutation_failure_version_, size_t max_postpone_time_ms_) | ||
: retry_count(0ull) | ||
, latest_fail_time(std::move(Poco::Timestamp())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't Poco::Timestamp
a POD type? What's the point of moving it, then?
{ | ||
if (max_postpone_time_ms == 0) | ||
return Poco::Timestamp(); | ||
return latest_fail_time + (1 << retry_count) * 1000ul; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Double whitespace after "return" and after "+"
- It's unclear what the precision of
latest_fail_time
is when it's implicitly converted to an integer (milliseconds? microseconds? nanoseconds?). Having* 1000ul
doesn't make it more clear (although it's clear that it's not seconds). I would prefer having everything in integers with the same precision and a clear suffix like _ms.Poco::Timestamp
(as well asPoco::Timespan
) with implicit conversions are error-prone
} | ||
} | ||
|
||
void removePartFromFailed(const String& part_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
void removePartFromFailed(const String& part_name) | |
void removePartFromFailed(const String & part_name) |
And in other places as well
(it's interesting why the Style Check did not fail)
@@ -1364,6 +1364,100 @@ class MergeTreeData : public IStorage, public WithMutableContext | |||
const MergeListEntry * merge_entry, | |||
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters); | |||
|
|||
class PartMutationBackoffPolicy : public WithContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the context is never used
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} " | ||
"because recently it has failed. According to exponential backoff policy, put aside this log entry."; | ||
|
||
LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to also print the number of (milli)seconds til the next attempt, but it's not necessary
src/Storages/StorageMergeTree.cpp
Outdated
if (exist_postponed_failed_part) | ||
{ | ||
std::lock_guard lock(mutation_wait_mutex); | ||
mutation_wait_event.notify_all(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unexpected, could you please add a comment explaining why we need to notify here? Introducing a backoff affects only timings and should not affect any synchronization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. Removed it.
src/Storages/StorageMergeTree.cpp
Outdated
@@ -835,6 +845,8 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id) | |||
} | |||
} | |||
|
|||
mutation_backoff_policy.removeFromFailedByVersion(mutation_version); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Do we need the same for ReplicatedMergeTree?
- Feels like it will not work because multiple mutations may be merged together and executed at once. For example, if there's a faulty mutation with version 42 and another mutation with version 137, then a part
all_1_2_3
can be mutated directly toall_1_2_3_137
(and this will fail due to the mutation 42). So according to theif (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
condition,addPartMutationFailure
will be called for the version 137, and killing the faulty mutation 42 will not reset the backoff. Please add the corresponding test. Consider removing all the logic around the mutation version for simplicity and resetting all backoffs when killing any mutation
assert node.contains_in_log(POSPONE_MUTATION_LOG) == found_in_log | ||
node.rotate_logs() | ||
|
||
time.sleep(5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests with sleep
are flaky and unreliable
REPLICATED_POSPONE_MUTATION_LOG if replicated_table else POSPONE_MUTATION_LOG | ||
) | ||
|
||
node.restart_clickhouse() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use functional tests whenever possible: #39359 (comment)
In this case, restart_clickhouse
can be replaced with DETACH/ATTACH TABLE, and wait_for_log_line
with reading from system.text_log.
It's just FYI, it's not necessary to rewrite the test.
07b4c2a
to
94fa87d
Compare
@tavplubix Also there is one more thing to discuss:
For sure it depends on the usage scenario, but I think in general 5s-10s is totally okey. Can I set 5s as default value? |
The CI is completely broken by #60013 ClickHouse/docker/test/upgrade/run.sh Line 80 in cef109a
Isn't it too small? Imagine we have a mutation that converts a String column to UInt64, and it fails because it cannot parse some number in the middle. If the part is quite big, the mutation may work for a few minutes before failing. So it will wait for 5 seconds and then work again for a few minutes, so it will not reduce CPU usage. Maybe we can set it to 5 minutes. |
Set 5 minutes as default. Thanks for the help! |
Integration tests:
Stateless tests - |
@@ -556,14 +559,21 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re | |||
entry.latest_failed_part_info = MergeTreePartInfo(); | |||
entry.latest_fail_time = 0; | |||
entry.latest_fail_reason.clear(); | |||
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first) | |||
mutation_backoff_policy.removePartFromFailed(failed_part->name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be called from the killMutation
as well? (so as for ReplicatedMergeTree
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we call resetMutationFailures
instead, see the review comments.
However, I've just realized that for RMT it will be only called on the initiator, and not on other replicas (I'm not sure what's the best way to fix it)
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Enabled a backoff logic (e.g. exponential). Will provide an ability for reduced CPU usage, memory usage and log file sizes.
Documentation entry for user-facing changes
If mutation query is incorrect or can't be performed right now then it will hang in infinite loop with high resources usage:
Example (from #55946):
Have a backoff logic (e.g. exponential) will provide an ability for reduced CPU usage, memory usage and log file sizes.