Skip to content

Commit

Permalink
Enable per query setting
Browse files Browse the repository at this point in the history
  • Loading branch information
MikhailBurdukov committed Jan 30, 2024
1 parent ee5d8c0 commit 14b59f8
Show file tree
Hide file tree
Showing 18 changed files with 161 additions and 58 deletions.
1 change: 1 addition & 0 deletions src/Core/Settings.h
Expand Up @@ -636,6 +636,7 @@ class IColumn;
M(Bool, mutations_execute_nondeterministic_on_initiator, false, "If true nondeterministic function are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \
M(Bool, mutations_execute_subqueries_on_initiator, false, "If true scalar subqueries are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \
M(UInt64, mutations_max_literal_size_to_replace, 16384, "The maximum size of serialized literal in bytes to replace in UPDATE and DELETE queries", 0) \
M(UInt64, max_postpone_time_for_failed_mutations, 0ul, "The maximum size of serialized literal in bytes to replace in UPDATE and DELETE queries", 0) \
\
M(Float, create_replicated_merge_tree_fault_injection_probability, 0.0f, "The probability of a fault injection during table creation after creating metadata in ZooKeeper", 0) \
\
Expand Down
16 changes: 16 additions & 0 deletions src/IO/ReadHelpers.cpp
Expand Up @@ -104,6 +104,22 @@ bool checkString(const char * s, ReadBuffer & buf)
return true;
}

bool checkStringWithPositionSaving(const char * s, ReadBuffer & buf)
{
auto initial_position = buf.position();
for (; *s; ++s)
{
if (buf.eof() || *buf.position() != *s)
{
buf.position() = initial_position;
return false;
}
++buf.position();
}
buf.position() = initial_position;
return true;
}


bool checkStringCaseInsensitive(const char * s, ReadBuffer & buf)
{
Expand Down
3 changes: 3 additions & 0 deletions src/IO/ReadHelpers.h
Expand Up @@ -216,6 +216,9 @@ inline void assertString(const String & s, ReadBuffer & buf)
}

bool checkString(const char * s, ReadBuffer & buf);

bool checkStringWithPositionSaving(const char * s, ReadBuffer & buf);

inline bool checkString(const String & s, ReadBuffer & buf)
{
return checkString(s.c_str(), buf);
Expand Down
64 changes: 38 additions & 26 deletions src/Storages/MergeTree/MergeTreeData.h
Expand Up @@ -1352,38 +1352,59 @@ class MergeTreeData : public IStorage, public WithMutableContext
{
struct PartMutationInfo
{
size_t retry_count = 0ul;
Poco::Timestamp latest_fail_time{};
UInt64 mutation_failure_version = 0ul;
size_t retry_count;
Poco::Timestamp latest_fail_time;
UInt64 mutation_failure_version;
size_t max_postpone_time_ms;
size_t max_postpone_power;

PartMutationInfo(UInt64 mutation_failure_version_, size_t max_postpone_time_ms_)
: retry_count(0ull)
, latest_fail_time(std::move(Poco::Timestamp()))
, mutation_failure_version(mutation_failure_version_)
, max_postpone_time_ms(max_postpone_time_ms_)
, max_postpone_power((max_postpone_time_ms_) ? (static_cast<size_t>(std::log2(max_postpone_time_ms_))) : (0ull))
{}


Poco::Timestamp getNextMinExecutionTime() const
{
if (max_postpone_time_ms == 0)
return Poco::Timestamp();
return latest_fail_time + (1 << retry_count) * 1000ul;
}

void addPartFailure()
{
if (max_postpone_time_ms == 0)
return;
retry_count = std::min(max_postpone_power, retry_count + 1);
latest_fail_time = Poco::Timestamp();
}

bool partCanBeMutated()
{
if (max_postpone_time_ms == 0)
return true;

auto current_time = Poco::Timestamp();
return current_time >= getNextMinExecutionTime();
}
};

using DataPartsWithRetryInfo = std::unordered_map<String, PartMutationInfo>;
DataPartsWithRetryInfo failed_mutation_parts;
size_t max_pospone_power;
mutable std::mutex parts_info_lock;

public:
explicit PartMutationBackoffPolicy(ContextPtr global_context_)
: WithContext(global_context_)
{
size_t max_pospone_time_ms = global_context_->getMaxPostponeTimeForFailedMutations();
if (max_pospone_time_ms == 0)
max_pospone_power = 0;
else
max_pospone_power = static_cast<size_t>(std::log2(max_pospone_time_ms));
}

void removeFromFailedByVersion(UInt64 mutation_version)
{
if (max_pospone_power == 0)
return;
std::unique_lock _lock(parts_info_lock);

for (auto failed_part_it = failed_mutation_parts.begin(); failed_part_it != failed_mutation_parts.end();)
{
if (failed_part_it->second.mutation_failure_version == mutation_version)
Expand All @@ -1395,40 +1416,31 @@ class MergeTreeData : public IStorage, public WithMutableContext

void removePartFromFailed(const String& part_name)
{
if (max_pospone_power == 0)
return;
std::unique_lock _lock(parts_info_lock);
failed_mutation_parts.erase(part_name);
}

void addPartMutationFailure (const String& part_name, UInt64 _mutation_failure_version)
void addPartMutationFailure (const String& part_name, UInt64 mutation_failure_version_, size_t max_postpone_time_ms_)
{
if (max_pospone_power == 0)
return;
std::unique_lock _lock(parts_info_lock);
auto part_info_it = failed_mutation_parts.find(part_name);
if (part_info_it == failed_mutation_parts.end())
{
auto [it, success] = failed_mutation_parts.emplace(part_name, PartMutationInfo());
auto [it, success] = failed_mutation_parts.emplace(part_name, PartMutationInfo(mutation_failure_version_, max_postpone_time_ms_));
std::swap(it, part_info_it);
}
auto& part_info = part_info_it->second;
part_info.retry_count = std::min(max_pospone_power, part_info.retry_count + 1);
part_info.latest_fail_time = Poco::Timestamp();
part_info.mutation_failure_version = _mutation_failure_version;
part_info.addPartFailure();
}

bool partCanBeMutated(const String& part_name)
{
if (max_pospone_power == 0)
return true;

std::unique_lock _lock(parts_info_lock);
auto iter = failed_mutation_parts.find(part_name);
if (iter == failed_mutation_parts.end())
return true;

auto current_time = Poco::Timestamp();
return current_time >= iter->second.getNextMinExecutionTime();
return iter->second.partCanBeMutated();
}
};
/// Controls postponing logic for failed mutations.
Expand Down
12 changes: 11 additions & 1 deletion src/Storages/MergeTree/MergeTreeMutationEntry.cpp
Expand Up @@ -48,13 +48,14 @@ UInt64 MergeTreeMutationEntry::parseFileName(const String & file_name_)
}

MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk_, const String & path_prefix_, UInt64 tmp_number,
const TransactionID & tid_, const WriteSettings & settings)
const TransactionID & tid_, const WriteSettings & settings, size_t max_postpone_time_)
: create_time(time(nullptr))
, commands(std::move(commands_))
, disk(std::move(disk_))
, path_prefix(path_prefix_)
, file_name("tmp_mutation_" + toString(tmp_number) + ".txt")
, is_temp(true)
, max_postpone_time(max_postpone_time_)
, tid(tid_)
{
try
Expand All @@ -65,6 +66,10 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskP
*out << "commands: ";
commands.writeText(*out, /* with_pure_metadata_commands = */ false);
*out << "\n";
*out << "max postpone time: ";
*out << max_postpone_time;
*out << "\n";

if (tid.isPrehistoric())
{
csn = Tx::PrehistoricCSN;
Expand Down Expand Up @@ -136,6 +141,11 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(DiskPtr disk_, const String & pat
commands.readText(*buf);
*buf >> "\n";

if (!buf->eof() && checkStringWithPositionSaving("max postpone time: ", *buf))
{
*buf >> "max postpone time: " >> max_postpone_time >> "\n";
}

if (buf->eof())
{
tid = Tx::PrehistoricTID;
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/MergeTreeMutationEntry.h
Expand Up @@ -23,6 +23,7 @@ struct MergeTreeMutationEntry
String file_name;
bool is_temp = false;

size_t max_postpone_time;
UInt64 block_number = 0;

String latest_failed_part;
Expand All @@ -38,7 +39,7 @@ struct MergeTreeMutationEntry

/// Create a new entry and write it to a temporary file.
MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk, const String & path_prefix_, UInt64 tmp_number,
const TransactionID & tid_, const WriteSettings & settings);
const TransactionID & tid_, const WriteSettings & settings, size_t max_postpone_time);
MergeTreeMutationEntry(const MergeTreeMutationEntry &) = delete;
MergeTreeMutationEntry(MergeTreeMutationEntry &&) = default;

Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp
Expand Up @@ -117,7 +117,8 @@ bool ReplicatedMergeMutateTaskBase::executeStep()
status.latest_failed_part_info = source_part_info;
status.latest_fail_time = time(nullptr);
status.latest_fail_reason = getExceptionMessage(saved_exception, false);
storage.mutation_backoff_policy.addPartMutationFailure(src_part, source_part_info.mutation + 1);
if (result_data_version == it->first)
storage.mutation_backoff_policy.addPartMutationFailure(src_part, source_part_info.mutation + 1, log_entry->max_postpone_time);
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp
Expand Up @@ -143,6 +143,7 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const

if (isAlterMutation())
out << "\nalter_version\n" << alter_version;
out << "\nmax_postpone_time\n" << max_postpone_time;
break;

case ALTER_METADATA: /// Just make local /metadata and /columns consistent with global
Expand Down Expand Up @@ -318,7 +319,9 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in, MergeTreeDataFor
{
in >> "\n";

if (checkString("alter_version\n", in))
if (checkString("max_postpone_time\n", in))
in >> max_postpone_time;
else if (checkString("alter_version\n", in))
in >> alter_version;
else if (checkString("to_uuid\n", in))
in >> new_part_uuid;
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h
Expand Up @@ -165,6 +165,7 @@ struct ReplicatedMergeTreeLogEntryData
size_t num_postponed = 0; /// The number of times the action was postponed.
String postpone_reason; /// The reason why the action was postponed, if it was postponed.
time_t last_postpone_time = 0; /// The time of the last time the action was postponed.
size_t max_postpone_time = 0;

/// Creation time or the time to copy from the general log to the queue of a particular replica.
time_t create_time = 0;
Expand Down
6 changes: 6 additions & 0 deletions src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.cpp
Expand Up @@ -29,7 +29,10 @@ void ReplicatedMergeTreeMutationEntry::writeText(WriteBuffer & out) const

out << "alter version: ";
out << alter_version;
out << "\n";

out << "max postpone time: ";
out << max_postpone_time;
}

void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in)
Expand Down Expand Up @@ -58,6 +61,9 @@ void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in)
commands.readText(in);
if (checkString("\nalter version: ", in))
in >> alter_version;
if (checkString("\nmax postpone time: ", in))
in >> max_postpone_time;

}

String ReplicatedMergeTreeMutationEntry::toString() const
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h
Expand Up @@ -56,6 +56,8 @@ struct ReplicatedMergeTreeMutationEntry
std::shared_ptr<const IBackupEntry> backup() const;

String getBlockNumbersForLogs() const;

size_t max_postpone_time = 0;
};

using ReplicatedMergeTreeMutationEntryPtr = std::shared_ptr<const ReplicatedMergeTreeMutationEntry>;
Expand Down
6 changes: 4 additions & 2 deletions src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Expand Up @@ -2481,7 +2481,7 @@ bool ReplicatedMergeTreeMergePredicate::partParticipatesInReplaceRange(const Mer
}


std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const
std::optional<ReplicatedMergeTreeMergePredicate::DesiredMutationDescription> ReplicatedMergeTreeMergePredicate::getDesiredMutationDescription(const MergeTreeData::DataPartPtr & part) const
{
/// Assigning mutations is easier than assigning merges because mutations appear in the same order as
/// the order of their version numbers (see StorageReplicatedMergeTree::mutate).
Expand Down Expand Up @@ -2509,6 +2509,7 @@ std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesir

Int64 current_version = queue.getCurrentMutationVersion(part->info.partition_id, part->info.getDataVersion());
Int64 max_version = in_partition->second.begin()->first;
size_t mutation_postpone_time = 0ul;

int alter_version = -1;
bool barrier_found = false;
Expand All @@ -2527,6 +2528,7 @@ std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesir
}

max_version = mutation_version;
mutation_postpone_time = mutation_status->entry->max_postpone_time;
if (current_version < max_version)
++mutations_count;

Expand Down Expand Up @@ -2560,7 +2562,7 @@ std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesir
LOG_TRACE(queue.log, "Will apply {} mutations and mutate part {} to version {} (the last version is {})",
mutations_count, part->name, max_version, in_partition->second.rbegin()->first);

return std::make_pair(max_version, alter_version);
return DesiredMutationDescription({max_version, alter_version, mutation_postpone_time});
}


Expand Down
9 changes: 8 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeQueue.h
Expand Up @@ -563,12 +563,19 @@ class ReplicatedMergeTreeMergePredicate : public BaseMergePredicate<ActiveDataPa
/// We should not drop part in this case, because replication queue may stuck without that part.
bool partParticipatesInReplaceRange(const MergeTreeData::DataPartPtr & part, String & out_reason) const;


struct DesiredMutationDescription
{
Int64 mutation_version;
int32_t alter_version;
size_t max_postpone_time;
};
/// Return nonempty optional of desired mutation version and alter version.
/// If we have no alter (modify/drop) mutations in mutations queue, than we return biggest possible
/// mutation version (and -1 as alter version). In other case, we return biggest mutation version with
/// smallest alter version. This required, because we have to execute alter mutations sequentially and
/// don't glue them together. Alter is rare operation, so it shouldn't affect performance.
std::optional<std::pair<Int64, int>> getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const;
std::optional<DesiredMutationDescription> getDesiredMutationDescription(const MergeTreeData::DataPartPtr & part) const;

bool isMutationFinished(const std::string & znode_name, const std::map<String, int64_t> & block_numbers,
std::unordered_set<String> & checked_partitions_cache) const;
Expand Down

0 comments on commit 14b59f8

Please sign in to comment.