Skip to content

Commit

Permalink
Merge branch 'main' into get-update-since
Browse files Browse the repository at this point in the history
  • Loading branch information
HypenZou committed Apr 16, 2024
2 parents de39db5 + d41e568 commit f26e11d
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 36 deletions.
3 changes: 3 additions & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,9 @@ DECLARE_uint32(lowest_used_cache_tier);
DECLARE_bool(enable_custom_split_merge);
DECLARE_uint32(adm_policy);
DECLARE_bool(enable_memtable_insert_with_hint_prefix_extractor);
DECLARE_bool(check_multiget_consistency);
DECLARE_bool(check_multiget_entity_consistency);
DECLARE_bool(inplace_update_support);

constexpr long KB = 1024;
constexpr int kRandomValueMaxFactor = 3;
Expand Down
12 changes: 12 additions & 0 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1345,4 +1345,16 @@ DEFINE_bool(enable_memtable_insert_with_hint_prefix_extractor,
"If true and FLAGS_prefix_size > 0, set "
"Options.memtable_insert_with_hint_prefix_extractor to "
"be Options.prefix_extractor");

DEFINE_bool(check_multiget_consistency, true,
"If true, check consistency of MultiGet result by comparing it "
"with Get's under a snapshot");

DEFINE_bool(check_multiget_entity_consistency, true,
"If true, check consistency of MultiGetEntity result by comparing "
"it GetEntity's under a snapshot");

DEFINE_bool(inplace_update_support,
ROCKSDB_NAMESPACE::Options().inplace_update_support,
"Options.inplace_update_support");
#endif // GFLAGS
1 change: 1 addition & 0 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3693,6 +3693,7 @@ void InitializeOptionsFromFlags(
}
options.lowest_used_cache_tier =
static_cast<CacheTier>(FLAGS_lowest_used_cache_tier);
options.inplace_update_support = FLAGS_inplace_update_support;
}

void InitializeOptionsGeneral(
Expand Down
9 changes: 6 additions & 3 deletions db_stress_tool/db_stress_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,17 @@ int db_stress_tool(int argc, char** argv) {
exit(1);
}
}
if (FLAGS_enable_compaction_filter &&
if ((FLAGS_enable_compaction_filter || FLAGS_inplace_update_support) &&
(FLAGS_acquire_snapshot_one_in > 0 || FLAGS_compact_range_one_in > 0 ||
FLAGS_iterpercent > 0 || FLAGS_test_batches_snapshots ||
FLAGS_test_cf_consistency)) {
FLAGS_test_cf_consistency || FLAGS_check_multiget_consistency ||
FLAGS_check_multiget_entity_consistency)) {
fprintf(
stderr,
"Error: acquire_snapshot_one_in, compact_range_one_in, iterpercent, "
"test_batches_snapshots must all be 0 when using compaction filter\n");
"test_batches_snapshots, test_cf_consistency, "
"check_multiget_consistency, check_multiget_entity_consistency must "
"all be 0 when using compaction filter or inplace update support\n");
exit(1);
}
if (FLAGS_test_multi_ops_txns) {
Expand Down
15 changes: 4 additions & 11 deletions db_stress_tool/no_batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -580,13 +580,8 @@ class NonBatchedOpsStressTest : public StressTest {
int column_family = rand_column_families[0];
ColumnFamilyHandle* cfh = column_families_[column_family];
int error_count = 0;
// Do a consistency check between Get and MultiGet. Don't do it too
// often as it will slow db_stress down
//
// CompactionFilter can make snapshot non-repeatable by removing keys
// protected by snapshot
bool do_consistency_check =
!FLAGS_enable_compaction_filter && thread->rand.OneIn(4);

bool do_consistency_check = FLAGS_check_multiget_consistency;

ReadOptions readoptionscopy = read_opts;

Expand Down Expand Up @@ -1075,10 +1070,8 @@ class NonBatchedOpsStressTest : public StressTest {
fault_fs_guard->DisableErrorInjection();
}

// CompactionFilter can make snapshot non-repeatable by removing keys
// protected by snapshot
const bool check_get_entity = !FLAGS_enable_compaction_filter &&
!error_count && thread->rand.OneIn(4);
const bool check_get_entity =
!error_count && FLAGS_check_multiget_entity_consistency;

for (size_t i = 0; i < num_keys; ++i) {
const Status& s = statuses[i];
Expand Down
35 changes: 34 additions & 1 deletion include/rocksdb/utilities/write_batch_with_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,23 @@ class WriteBatchWithIndex : public WriteBatchBase {
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value);

// TODO: implement GetEntityFromBatchAndDB
// Similar to DB::GetEntity() but also reads writes from this batch.
//
// This method queries the batch for the key and if the result can be
// determined based on the batch alone, it is returned (assuming the key is
// found, in the form of a wide-column entity). If the batch does not contain
// enough information to determine the result (the key is not present in the
// batch at all or a merge is in progress), the DB is queried and the result
// is merged with the entries from the batch if necessary.
//
// Setting read_options.snapshot will affect what is read from the DB
// but will NOT change which keys are read from the batch (the keys in
// this batch do not yet belong to any snapshot and will be fetched
// regardless).
Status GetEntityFromBatchAndDB(DB* db, const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key,
PinnableWideColumns* columns);

void MultiGetFromBatchAndDB(DB* db, const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
Expand Down Expand Up @@ -314,11 +330,23 @@ class WriteBatchWithIndex : public WriteBatchBase {
// last sub-batch.
size_t SubBatchCnt();

void MergeAcrossBatchAndDBImpl(ColumnFamilyHandle* column_family,
const Slice& key,
const PinnableWideColumns& existing,
const MergeContext& merge_context,
std::string* value,
PinnableWideColumns* columns, Status* status);
void MergeAcrossBatchAndDB(ColumnFamilyHandle* column_family,
const Slice& key,
const PinnableWideColumns& existing,
const MergeContext& merge_context,
PinnableSlice* value, Status* status);
void MergeAcrossBatchAndDB(ColumnFamilyHandle* column_family,
const Slice& key,
const PinnableWideColumns& existing,
const MergeContext& merge_context,
PinnableWideColumns* columns, Status* status);

Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, ReadCallback* callback);
Expand All @@ -327,6 +355,11 @@ class WriteBatchWithIndex : public WriteBatchBase {
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
bool sorted_input, ReadCallback* callback);
Status GetEntityFromBatchAndDB(DB* db, const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableWideColumns* columns,
ReadCallback* callback);

struct Rep;
std::unique_ptr<Rep> rep;
};
Expand Down
14 changes: 11 additions & 3 deletions tools/db_crashtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
"destroy_db_initially": 0,
"enable_pipelined_write": lambda: random.randint(0, 1),
"enable_compaction_filter": lambda: random.choice([0, 0, 0, 1]),
"inplace_update_support": lambda: random.randint(0, 1),
"expected_values_dir": lambda: setup_expected_values_dir(),
"fail_if_options_file_error": lambda: random.randint(0, 1),
"flush_one_in": lambda: random.choice([1000, 1000000]),
Expand Down Expand Up @@ -280,6 +281,8 @@
# TODO(hx235): enable `enable_memtable_insert_with_hint_prefix_extractor`
# after fixing the surfaced issue with delete range
"enable_memtable_insert_with_hint_prefix_extractor": 0,
"check_multiget_consistency": lambda: random.choice([0, 0, 0, 1]),
"check_multiget_entity_consistency": lambda: random.choice([0, 0, 0, 1]),
}
_TEST_DIR_ENV_VAR = "TEST_TMPDIR"
# If TEST_TMPDIR_EXPECTED is not specified, default value will be TEST_TMPDIR
Expand Down Expand Up @@ -437,8 +440,9 @@ def is_direct_io_supported(dbname):
"write_buffer_size": 1024 * 1024,
"enable_pipelined_write": lambda: random.randint(0, 1),
# Snapshots are used heavily in this test mode, while they are incompatible
# with compaction filter.
# with compaction filter, inplace_update_support
"enable_compaction_filter": 0,
"inplace_update_support": 0,
# `CfConsistencyStressTest::TestIngestExternalFile()` is not implemented.
"ingest_external_file_one_in": 0,
# `CfConsistencyStressTest::TestIterateAgainstExpected()` is not implemented.
Expand Down Expand Up @@ -639,6 +643,7 @@ def finalize_and_sanitize(src_params):

if dest_params["test_batches_snapshots"] == 1:
dest_params["enable_compaction_filter"] = 0
dest_params["inplace_update_support"] = 0
if dest_params["prefix_size"] < 0:
dest_params["prefix_size"] = 1

Expand Down Expand Up @@ -699,15 +704,18 @@ def finalize_and_sanitize(src_params):
dest_params["enable_pipelined_write"] = 0
if dest_params.get("sst_file_manager_bytes_per_sec", 0) == 0:
dest_params["sst_file_manager_bytes_per_truncate"] = 0
if dest_params.get("enable_compaction_filter", 0) == 1:
# Compaction filter is incompatible with snapshots. Need to avoid taking
if (dest_params.get("enable_compaction_filter", 0) == 1
or dest_params.get("inplace_update_support", 0) == 1):
# Compaction filter, inplace update support are incompatible with snapshots. Need to avoid taking
# snapshots, as well as avoid operations that use snapshots for
# verification.
dest_params["acquire_snapshot_one_in"] = 0
dest_params["compact_range_one_in"] = 0
# Give the iterator ops away to reads.
dest_params["readpercent"] += dest_params.get("iterpercent", 10)
dest_params["iterpercent"] = 0
dest_params["check_multiget_consistency"] = 0
dest_params["check_multiget_entity_consistency"] = 0
if dest_params.get("prefix_size") == -1:
dest_params["readpercent"] += dest_params.get("prefixpercent", 20)
dest_params["prefixpercent"] = 0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added a new API `GetEntityFromBatchAndDB` to `WriteBatchWithIndex` that can be used for wide-column point lookups with read-your-own-writes consistency. Similarly to `GetFromBatchAndDB`, the API can combine data from the write batch with data from the underlying database if needed. See the API comments for more details.
131 changes: 117 additions & 14 deletions utilities/write_batch_with_index/write_batch_with_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -567,43 +567,62 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,
nullptr);
}

void WriteBatchWithIndex::MergeAcrossBatchAndDB(
void WriteBatchWithIndex::MergeAcrossBatchAndDBImpl(
ColumnFamilyHandle* column_family, const Slice& key,
const PinnableWideColumns& existing, const MergeContext& merge_context,
PinnableSlice* value, Status* status) {
assert(value);
std::string* value, PinnableWideColumns* columns, Status* status) {
assert(value || columns);
assert(!value || !columns);
assert(status);
assert(status->ok() || status->IsNotFound());

std::string result_value;

if (status->ok()) {
if (WideColumnsHelper::HasDefaultColumnOnly(existing.columns())) {
*status = WriteBatchWithIndexInternal::MergeKeyWithBaseValue(
column_family, key, MergeHelper::kPlainBaseValue,
WideColumnsHelper::GetDefaultColumn(existing.columns()),
merge_context, &result_value,
static_cast<PinnableWideColumns*>(nullptr));
merge_context, value, columns);
} else {
*status = WriteBatchWithIndexInternal::MergeKeyWithBaseValue(
column_family, key, MergeHelper::kWideBaseValue, existing.columns(),
merge_context, &result_value,
static_cast<PinnableWideColumns*>(nullptr));
merge_context, value, columns);
}
} else {
assert(status->IsNotFound());
*status = WriteBatchWithIndexInternal::MergeKeyWithNoBaseValue(
column_family, key, merge_context, &result_value,
static_cast<PinnableWideColumns*>(nullptr));
column_family, key, merge_context, value, columns);
}
}

void WriteBatchWithIndex::MergeAcrossBatchAndDB(
ColumnFamilyHandle* column_family, const Slice& key,
const PinnableWideColumns& existing, const MergeContext& merge_context,
PinnableSlice* value, Status* status) {
assert(value);
assert(status);

std::string result_value;
constexpr PinnableWideColumns* result_entity = nullptr;
MergeAcrossBatchAndDBImpl(column_family, key, existing, merge_context,
&result_value, result_entity, status);

if (status->ok()) {
value->Reset();
*value->GetSelf() = std::move(result_value);
value->PinSelf();
}
}

void WriteBatchWithIndex::MergeAcrossBatchAndDB(
ColumnFamilyHandle* column_family, const Slice& key,
const PinnableWideColumns& existing, const MergeContext& merge_context,
PinnableWideColumns* columns, Status* status) {
assert(columns);
assert(status);

constexpr std::string* value = nullptr;
MergeAcrossBatchAndDBImpl(column_family, key, existing, merge_context, value,
columns, status);
}

Status WriteBatchWithIndex::GetFromBatchAndDB(
DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) {
Expand All @@ -620,6 +639,8 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(
return Status::InvalidArgument("Must specify timestamp");
}

pinnable_val->Reset();

// Since the lifetime of the WriteBatch is the same as that of the transaction
// we cannot pin it as otherwise the returned value will not be available
// after the transaction finishes.
Expand All @@ -634,7 +655,8 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(
return s;
}

if (!s.ok() || result == WBWIIteratorImpl::kError) {
assert(!s.ok() == (result == WBWIIteratorImpl::kError));
if (result == WBWIIteratorImpl::kError) {
return s;
}

Expand Down Expand Up @@ -800,6 +822,87 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB(
}
}

Status WriteBatchWithIndex::GetEntityFromBatchAndDB(
DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableWideColumns* columns, ReadCallback* callback) {
assert(db);
assert(column_family);
assert(columns);

const Comparator* const ucmp = rep->comparator.GetComparator(column_family);
size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0;
if (ts_sz > 0 && !read_options.timestamp) {
return Status::InvalidArgument("Must specify timestamp");
}

columns->Reset();

MergeContext merge_context;
Status s;

auto result = WriteBatchWithIndexInternal::GetEntityFromBatch(
this, column_family, key, &merge_context, columns, &s);

assert(!s.ok() == (result == WBWIIteratorImpl::kError));

if (result == WBWIIteratorImpl::kFound ||
result == WBWIIteratorImpl::kError) {
return s;
}

if (result == WBWIIteratorImpl::kDeleted) {
return Status::NotFound();
}

assert(result == WBWIIteratorImpl::kMergeInProgress ||
result == WBWIIteratorImpl::kNotFound);

PinnableWideColumns existing;

DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.columns =
(result == WBWIIteratorImpl::kMergeInProgress) ? &existing : columns;
get_impl_options.callback = callback;

s = static_cast_with_check<DBImpl>(db->GetRootDB())
->GetImpl(read_options, key, get_impl_options);

if (result == WBWIIteratorImpl::kMergeInProgress) {
if (s.ok() || s.IsNotFound()) { // DB lookup succeeded
MergeAcrossBatchAndDB(column_family, key, existing, merge_context,
columns, &s);
}
}

return s;
}

Status WriteBatchWithIndex::GetEntityFromBatchAndDB(
DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableWideColumns* columns) {
if (!db) {
return Status::InvalidArgument(
"Cannot call GetEntityFromBatchAndDB without a DB object");
}

if (!column_family) {
return Status::InvalidArgument(
"Cannot call GetEntityFromBatchAndDB without a column family handle");
}

if (!columns) {
return Status::InvalidArgument(
"Cannot call GetEntityFromBatchAndDB without a PinnableWideColumns "
"object");
}

constexpr ReadCallback* callback = nullptr;

return GetEntityFromBatchAndDB(db, read_options, column_family, key, columns,
callback);
}

void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); }

Status WriteBatchWithIndex::RollbackToSavePoint() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,9 @@ WBWIIteratorImpl::Result WriteBatchWithIndexInternal::GetFromBatchImpl(
Traits::ClearOutput(output);
result = WBWIIteratorImpl::Result::kError;
}
} else {
Traits::ClearOutput(output);
*s = Status::OK();
}

return result;
Expand Down

0 comments on commit f26e11d

Please sign in to comment.