Skip to content

Commit

Permalink
[BACKPORT 2.14.10][#19730] docdb: added SST write retries on corruption
Browse files Browse the repository at this point in the history
Summary:
Added support for `rocksdb_max_sst_write_retries` flag: maximum allowed number of attempts to write
SST file in case of detected corruption after write (by default 0 which means no retries).
Implemented for both flushes and compactions.

For now, we only support retries when sub-compaction results in single output file (which is
the case for yugabyte-db as of today). If in future we detect corruption after the first output
file in case of multiple sub-compaction output files and retries are enabled, DFATAL will be logged
and no retry will be done.

Original commit: f58c809 / D29784
Backported from 2.14 commit: daf620d / D29953

Jira: DB-8560

Test Plan:
Jenkins: urgent

DBTest.SstTailZerosCheckFlushRetries and DBTest.SstTailZerosCheckCompactionRetries

Reviewers: bogdan, timur, rthallam

Reviewed By: bogdan

Subscribers: ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D30001
  • Loading branch information
ttyusupov authored and arybochkin committed Nov 7, 2023
1 parent a717259 commit 769ad57
Show file tree
Hide file tree
Showing 6 changed files with 486 additions and 306 deletions.
256 changes: 144 additions & 112 deletions src/yb/rocksdb/db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@

DECLARE_uint64(rocksdb_check_sst_file_tail_for_zeros);

DEFINE_uint64(rocksdb_max_sst_write_retries, 0,
"Maximum allowed number of retries to write SST file in case of detected corruption after "
"write.");
TAG_FLAG(rocksdb_max_sst_write_retries, runtime);
TAG_FLAG(rocksdb_max_sst_write_retries, advanced);

namespace rocksdb {

class TableFactory;
Expand Down Expand Up @@ -128,138 +134,164 @@ Status BuildTable(const std::string& dbname,
TableProperties* table_properties) {
// Reports the IOStats for flush for every following bytes.
const size_t kReportFlushIOStatsEvery = 1048576;

auto* env = db_options.env;

const bool is_split_sst = ioptions.table_factory->IsSplitSstForWriteSupported();
const std::string base_fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(),
meta->fd.GetPathId());
const std::string data_fname = TableBaseToDataFileName(base_fname);

Status s;
meta->fd.total_file_size = 0;
meta->fd.base_file_size = 0;
iter->SeekToFirst();
auto num_retries_left = FLAGS_rocksdb_max_sst_write_retries;
for (;;) {
s = Status::OK();
meta->fd.total_file_size = 0;
meta->fd.base_file_size = 0;
iter->SeekToFirst();

const bool is_split_sst = ioptions.table_factory->IsSplitSstForWriteSupported();
bool do_retry = false;

const std::string base_fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(),
meta->fd.GetPathId());
const std::string data_fname = is_split_sst ? TableBaseToDataFileName(base_fname) : "";
if (iter->Valid()) {
std::shared_ptr<WritableFileWriter> base_file_writer;
std::shared_ptr<WritableFileWriter> data_file_writer;
s = CreateWritableFileWriter(base_fname, env_options, io_priority, env, &base_file_writer);
if (s.ok() && is_split_sst) {
s = CreateWritableFileWriter(data_fname, env_options, io_priority, env, &data_file_writer);
}
if (!s.ok()) {
return s;
}
std::unique_ptr<TableBuilder> builder(NewTableBuilder(
ioptions, internal_comparator, int_tbl_prop_collector_factories,
column_family_id, base_file_writer.get(), data_file_writer.get(), compression,
compression_opts));

MergeHelper merge(env, internal_comparator->user_comparator(),
ioptions.merge_operator, nullptr, ioptions.info_log,
ioptions.min_partial_merge_operands,
true /* internal key corruption is not ok */,
snapshots.empty() ? 0 : snapshots.back());

CompactionIterator c_iter(iter, internal_comparator->user_comparator(),
&merge, kMaxSequenceNumber, &snapshots,
earliest_write_conflict_snapshot,
true /* internal key corruption is not ok */);
c_iter.SeekToFirst();
const bool non_empty = c_iter.Valid();
if (non_empty) {
meta->UpdateKey(c_iter.key(), UpdateBoundariesType::kSmallest);
}
if (iter->Valid()) {
std::shared_ptr<WritableFileWriter> base_file_writer;
std::shared_ptr<WritableFileWriter> data_file_writer;
s = CreateWritableFileWriter(base_fname, env_options, io_priority, env, &base_file_writer);
if (s.ok() && is_split_sst) {
s = CreateWritableFileWriter(
data_fname, env_options, io_priority, env, &data_file_writer);
}
if (!s.ok()) {
return s;
}
std::unique_ptr<TableBuilder> builder(NewTableBuilder(
ioptions, internal_comparator, int_tbl_prop_collector_factories, column_family_id,
base_file_writer.get(), data_file_writer.get(), compression, compression_opts));

MergeHelper merge(
env, internal_comparator->user_comparator(), ioptions.merge_operator, nullptr,
ioptions.info_log, ioptions.min_partial_merge_operands,
true /* internal key corruption is not ok */, snapshots.empty() ? 0 : snapshots.back());

boost::container::small_vector<UserBoundaryValueRef, 0x10> user_values;
for (; c_iter.Valid(); c_iter.Next()) {
const Slice& key = c_iter.key();
const Slice& value = c_iter.value();
builder->Add(key, value);
meta->UpdateBoundarySeqNo(GetInternalKeySeqno(key));
if (db_options.boundary_extractor) {
user_values.clear();
auto status = db_options.boundary_extractor->Extract(ExtractUserKey(key), &user_values);
if (!status.ok()) {
CompactionIterator c_iter(
iter, internal_comparator->user_comparator(), &merge, kMaxSequenceNumber, &snapshots,
earliest_write_conflict_snapshot, true /* internal key corruption is not ok */);
c_iter.SeekToFirst();
const bool non_empty = c_iter.Valid();
if (non_empty) {
meta->UpdateKey(c_iter.key(), UpdateBoundariesType::kSmallest);
}

boost::container::small_vector<UserBoundaryValueRef, 0x10> user_values;
for (; c_iter.Valid(); c_iter.Next()) {
const Slice& key = c_iter.key();
const Slice& value = c_iter.value();
builder->Add(key, value);
meta->UpdateBoundarySeqNo(GetInternalKeySeqno(key));
if (db_options.boundary_extractor) {
user_values.clear();
auto status = db_options.boundary_extractor->Extract(ExtractUserKey(key), &user_values);
if (!status.ok()) {
builder->Abandon();
return status;
}
meta->UpdateBoundaryUserValues(user_values, UpdateBoundariesType::kAll);
}
}

if (non_empty) {
meta->UpdateKey(builder->LastKey(), UpdateBoundariesType::kLargest);
}

// Finish and check for builder errors
bool empty = builder->NumEntries() == 0;
s = c_iter.status();
if (!s.ok() || empty) {
builder->Abandon();
return status;
} else {
s = builder->Finish();
}
meta->UpdateBoundaryUserValues(user_values, UpdateBoundariesType::kAll);
}
}

if (non_empty) {
meta->UpdateKey(builder->LastKey(), UpdateBoundariesType::kLargest);
}
if (s.ok() && !empty) {
meta->fd.total_file_size = builder->TotalFileSize();
meta->fd.base_file_size = builder->BaseFileSize();
meta->marked_for_compaction = builder->NeedCompact();
assert(meta->fd.GetTotalFileSize() > 0);
if (table_properties) {
*table_properties = builder->GetTableProperties();
}
}

// Finish and check for builder errors
bool empty = builder->NumEntries() == 0;
s = c_iter.status();
if (!s.ok() || empty) {
builder->Abandon();
} else {
s = builder->Finish();
}
// Finish and check for file errors
if (s.ok() && !empty && !ioptions.disable_data_sync) {
if (is_split_sst) {
RETURN_NOT_OK(data_file_writer->Sync(ioptions.use_fsync));
}
RETURN_NOT_OK(base_file_writer->Sync(ioptions.use_fsync));
}
if (s.ok() && !empty && is_split_sst) {
s = data_file_writer->Close();
}
if (s.ok() && !empty) {
s = base_file_writer->Close();
}

if (s.ok() && !empty) {
meta->fd.total_file_size = builder->TotalFileSize();
meta->fd.base_file_size = builder->BaseFileSize();
meta->marked_for_compaction = builder->NeedCompact();
assert(meta->fd.GetTotalFileSize() > 0);
if (table_properties) {
*table_properties = builder->GetTableProperties();
}
}
const auto rocksdb_check_sst_file_tail_for_zeros =
FLAGS_rocksdb_check_sst_file_tail_for_zeros;
if (s.ok() && !empty && PREDICT_FALSE(rocksdb_check_sst_file_tail_for_zeros > 0)) {
s = CheckSstTailForZeros(
db_options, env_options, is_split_sst ? data_fname : base_fname,
rocksdb_check_sst_file_tail_for_zeros);
if (!s.ok()) {
// Retry flush if more attempts are allowed.
do_retry = true;
}
}

// Finish and check for file errors
if (s.ok() && !empty && !ioptions.disable_data_sync) {
if (is_split_sst) {
RETURN_NOT_OK(data_file_writer->Sync(ioptions.use_fsync));
if (s.ok() && !empty) {
// Verify that the table is usable
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
ReadOptions(), env_options, internal_comparator, meta->fd, meta->UserFilter(),
nullptr, (internal_stats == nullptr) ? nullptr : internal_stats->GetFileReadHist(0),
false));
s = it->status();
if (s.ok() && paranoid_file_checks) {
for (it->SeekToFirst(); it->Valid(); it->Next()) {
}
s = it->status();
}
if (!s.ok()) {
// Retry flush if more attempts are allowed.
do_retry = true;
}
}
}
RETURN_NOT_OK(base_file_writer->Sync(ioptions.use_fsync));
}
if (s.ok() && !empty && is_split_sst) {
s = data_file_writer->Close();
}
if (s.ok() && !empty) {
s = base_file_writer->Close();
}

const auto rocksdb_check_sst_file_tail_for_zeros =
FLAGS_rocksdb_check_sst_file_tail_for_zeros;
if (s.ok() && !empty && PREDICT_FALSE(rocksdb_check_sst_file_tail_for_zeros > 0)) {
s = CheckSstTailForZeros(
db_options, env_options, is_split_sst ? data_fname : base_fname,
rocksdb_check_sst_file_tail_for_zeros);
}
// Check for input iterator errors
if (!iter->status().ok()) {
s = iter->status();
}

if (s.ok() && !empty) {
// Verify that the table is usable
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
ReadOptions(), env_options, internal_comparator, meta->fd, meta->UserFilter(), nullptr,
(internal_stats == nullptr) ? nullptr
: internal_stats->GetFileReadHist(0),
false));
s = it->status();
if (s.ok() && paranoid_file_checks) {
for (it->SeekToFirst(); it->Valid(); it->Next()) {
if (!s.ok() || meta->fd.GetTotalFileSize() == 0) {
if (!env->CleanupFile(base_fname, db_options.log_prefix)) {
do_retry = false;
}
if (is_split_sst) {
if (!env->CleanupFile(data_fname, db_options.log_prefix)) {
do_retry = false;
}
}
s = it->status();
}
}
}

// Check for input iterator errors
if (!iter->status().ok()) {
s = iter->status();
}
if (!do_retry || num_retries_left == 0) {
break;
}

if (!s.ok() || meta->fd.GetTotalFileSize() == 0) {
env->CleanupFile(base_fname);
if (is_split_sst) {
env->CleanupFile(data_fname);
}
--num_retries_left;
RLOG(
InfoLogLevel::INFO_LEVEL, db_options.info_log, "Retrying flush to %s",
base_fname.c_str());
}

return s;
}

Expand Down

0 comments on commit 769ad57

Please sign in to comment.