New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Backoff policy for failed mutation. #58036
Changes from 4 commits
f6fb20d
be58c1f
97e0e5f
ee5d8c0
5e84517
778efb8
b90a5b9
97ee8c0
b227e5d
94fa87d
c3ab907
7cdfeda
c2d641c
fa5747a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -35,7 +35,7 @@ | |||||
#include <Storages/PartitionCommands.h> | ||||||
#include <Interpreters/PartLog.h> | ||||||
#include <Interpreters/threadPoolCallbackRunner.h> | ||||||
|
||||||
#include <Poco/Timestamp.h> | ||||||
|
||||||
#include <boost/multi_index_container.hpp> | ||||||
#include <boost/multi_index/ordered_index.hpp> | ||||||
|
@@ -1348,6 +1348,92 @@ class MergeTreeData : public IStorage, public WithMutableContext | |||||
const MergeListEntry * merge_entry, | ||||||
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters); | ||||||
|
||||||
class PartMutationBackoffPolicy : public WithContext | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like the context is never used |
||||||
{ | ||||||
struct PartMutationInfo | ||||||
{ | ||||||
size_t retry_count = 0ul; | ||||||
Poco::Timestamp latest_fail_time{}; | ||||||
UInt64 mutation_failure_version = 0ul; | ||||||
|
||||||
Poco::Timestamp getNextMinExecutionTime() const | ||||||
{ | ||||||
return latest_fail_time + (1 << retry_count) * 1000ul; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
} | ||||||
}; | ||||||
|
||||||
using DataPartsWithRetryInfo = std::unordered_map<String, PartMutationInfo>; | ||||||
DataPartsWithRetryInfo failed_mutation_parts; | ||||||
size_t max_pospone_power; | ||||||
mutable std::mutex parts_info_lock; | ||||||
|
||||||
public: | ||||||
explicit PartMutationBackoffPolicy(ContextPtr global_context_) | ||||||
: WithContext(global_context_) | ||||||
{ | ||||||
size_t max_pospone_time_ms = global_context_->getMaxPostponeTimeForFailedMutations(); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we could store |
||||||
if (max_pospone_time_ms == 0) | ||||||
max_pospone_power = 0; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo: |
||||||
else | ||||||
max_pospone_power = static_cast<size_t>(std::log2(max_pospone_time_ms)); | ||||||
} | ||||||
|
||||||
void removeFromFailedByVersion(UInt64 mutation_version) | ||||||
{ | ||||||
if (max_pospone_power == 0) | ||||||
return; | ||||||
std::unique_lock _lock(parts_info_lock); | ||||||
|
||||||
for (auto failed_part_it = failed_mutation_parts.begin(); failed_part_it != failed_mutation_parts.end();) | ||||||
{ | ||||||
if (failed_part_it->second.mutation_failure_version == mutation_version) | ||||||
failed_part_it = failed_mutation_parts.erase(failed_part_it); | ||||||
else | ||||||
++failed_part_it; | ||||||
} | ||||||
} | ||||||
|
||||||
void removePartFromFailed(const String& part_name) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
And in other places as well |
||||||
{ | ||||||
if (max_pospone_power == 0) | ||||||
return; | ||||||
std::unique_lock _lock(parts_info_lock); | ||||||
failed_mutation_parts.erase(part_name); | ||||||
} | ||||||
|
||||||
void addPartMutationFailure (const String& part_name, UInt64 _mutation_failure_version) | ||||||
{ | ||||||
if (max_pospone_power == 0) | ||||||
return; | ||||||
std::unique_lock _lock(parts_info_lock); | ||||||
auto part_info_it = failed_mutation_parts.find(part_name); | ||||||
if (part_info_it == failed_mutation_parts.end()) | ||||||
{ | ||||||
auto [it, success] = failed_mutation_parts.emplace(part_name, PartMutationInfo()); | ||||||
std::swap(it, part_info_it); | ||||||
} | ||||||
auto& part_info = part_info_it->second; | ||||||
part_info.retry_count = std::min(max_pospone_power, part_info.retry_count + 1); | ||||||
part_info.latest_fail_time = Poco::Timestamp(); | ||||||
part_info.mutation_failure_version = _mutation_failure_version; | ||||||
} | ||||||
|
||||||
bool partCanBeMutated(const String& part_name) | ||||||
{ | ||||||
if (max_pospone_power == 0) | ||||||
return true; | ||||||
std::unique_lock _lock(parts_info_lock); | ||||||
auto iter = failed_mutation_parts.find(part_name); | ||||||
if (iter == failed_mutation_parts.end()) | ||||||
return true; | ||||||
|
||||||
auto current_time = Poco::Timestamp(); | ||||||
return current_time >= iter->second.getNextMinExecutionTime(); | ||||||
} | ||||||
}; | ||||||
/// Controls postponing logic for failed mutations. | ||||||
PartMutationBackoffPolicy mutation_backoff_policy; | ||||||
|
||||||
/// If part is assigned to merge or mutation (possibly replicated) | ||||||
/// Should be overridden by children, because they can have different | ||||||
/// mechanisms for parts locking | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
#include <base/sort.h> | ||
|
||
#include <ranges> | ||
#include <Poco/Timestamp.h> | ||
|
||
namespace DB | ||
{ | ||
|
@@ -1350,9 +1351,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( | |
sum_parts_size_in_bytes += part_in_memory->block.bytes(); | ||
else | ||
sum_parts_size_in_bytes += part->getBytesOnDisk(); | ||
|
||
if (entry.type == LogEntry::MUTATE_PART && !storage.mutation_backoff_policy.partCanBeMutated(part->name)) | ||
{ | ||
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} " | ||
"because recently it has failed. According to exponential backoff policy, put aside this log entry."; | ||
|
||
LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name); | ||
Comment on lines
+1360
to
+1363
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice to also print the number of (milli)seconds til the next attempt, but it's not necessary |
||
return false; | ||
} | ||
} | ||
} | ||
|
||
if (merger_mutator.merges_blocker.isCancelled()) | ||
{ | ||
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now."; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,7 @@ | |
|
||
#include <optional> | ||
#include <ranges> | ||
|
||
#include <Poco/Timestamp.h> | ||
#include <base/sort.h> | ||
#include <Backups/BackupEntriesCollector.h> | ||
#include <Databases/IDatabase.h> | ||
|
@@ -534,6 +534,7 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re | |
for (auto it = mutations_begin_it; it != mutations_end_it; ++it) | ||
{ | ||
MergeTreeMutationEntry & entry = it->second; | ||
auto & failed_part = result_part->parts.at(0); | ||
if (is_successful) | ||
{ | ||
if (!entry.latest_failed_part.empty() && result_part->part_info.contains(entry.latest_failed_part_info)) | ||
|
@@ -542,14 +543,16 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re | |
entry.latest_failed_part_info = MergeTreePartInfo(); | ||
entry.latest_fail_time = 0; | ||
entry.latest_fail_reason.clear(); | ||
mutation_backoff_policy.removePartFromFailed(failed_part->name); | ||
} | ||
} | ||
else | ||
{ | ||
entry.latest_failed_part = result_part->parts.at(0)->name; | ||
entry.latest_failed_part_info = result_part->parts.at(0)->info; | ||
entry.latest_failed_part = failed_part->name; | ||
entry.latest_failed_part_info = failed_part->info; | ||
entry.latest_fail_time = time(nullptr); | ||
entry.latest_fail_reason = exception_message; | ||
mutation_backoff_policy.addPartMutationFailure(failed_part->name, sources_data_version + 1); | ||
} | ||
} | ||
} | ||
|
@@ -816,6 +819,8 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id) | |
} | ||
} | ||
|
||
mutation_backoff_policy.removeFromFailedByVersion(mutation_version); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
if (!to_kill) | ||
return CancellationCode::NotFound; | ||
|
||
|
@@ -1176,6 +1181,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( | |
|
||
CurrentlyMergingPartsTaggerPtr tagger; | ||
|
||
bool exist_posponed_failed_part = false; | ||
auto mutations_end_it = current_mutations_by_version.end(); | ||
for (const auto & part : getDataPartsVectorForInternalUsage()) | ||
{ | ||
|
@@ -1200,6 +1206,13 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( | |
TransactionID first_mutation_tid = mutations_begin_it->second.tid; | ||
MergeTreeTransactionPtr txn; | ||
|
||
if (!mutation_backoff_policy.partCanBeMutated(part->name)) | ||
{ | ||
exist_posponed_failed_part = true; | ||
LOG_DEBUG(log, "According to exponential backoff policy, do not perform mutations for the part {} yet. Put it aside.", part->name); | ||
continue; | ||
} | ||
|
||
if (!first_mutation_tid.isPrehistoric()) | ||
{ | ||
|
||
|
@@ -1306,7 +1319,11 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( | |
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands, txn); | ||
} | ||
} | ||
|
||
if (exist_posponed_failed_part) | ||
{ | ||
std::lock_guard lock(mutation_wait_mutex); | ||
mutation_wait_event.notify_all(); | ||
} | ||
return {}; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
<clickhouse> | ||
<max_postpone_time_for_failed_mutations>60000</max_postpone_time_for_failed_mutations> | ||
</clickhouse> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can add this setting here
and query it like
getSettingsRef().max_postpone_time...