Skip to content

Commit

Permalink
[Backport 2.20.1] [#20398, #20648] DocDB: Restore bloom filter usage …
Browse files Browse the repository at this point in the history
…during multi-row insert

Summary:
Revert commits:
beeebbe/D26549
7b582dd/D26561
That were preventing bloom filters from being used during multi-row insert.
(Check for existing row and conflict resolution).

Expected regression in PgSingleTServerTest.ScanWithPackedRow (also could happen in any other insert heavy test) Insert TServer time:
master - 7.89s
this diff - 13.99s

Also fixed bug with missing conflict that was fixed by beeebbe/D26549.
And added test for this bug - PgSingleTServerTest.RangeConflict.
We did not recreate iterator when checking for overwritten value during conflict resolution with range only keys.
So overwritten value could be filtered by bloom filter for previous checked entry.
Fixed by adjusting check to recreate iterator.

Also this diff fixes GH #20648, because this issue was introduced by beeebbe/D26549 that was reverted.
Jira: DB-9391, DB-9644

Test Plan:
Launched n2-standard-4 instance with RF1 cluster.
And script to load 10M rows by chunks:
```
drop table if exists test_table;
create extension if not exists pgcrypto;
create table test_table(k text, v text, PRIMARY KEY(k ASC));
create index index_v_test_table on test_table(v);

-- Load 10M rows
do $$
begin
  for counter in 1..1000 loop
    raise notice 'counter: %', (counter * 10000);
    insert into test_table (select gen_random_uuid(), gen_random_bytes(50)::text
                        from generate_series(1, 10000) i);
    commit;
  end loop;
end $$;
```

Execution time:
master (5923bb6) - 8m41s
this diff - 6m57s
2.18.5 (ef3afb2) - 10m00s

---

Also loaded 30M rows using a similar set of scripts at https://github.com/mbautin/ysql-bulk-load-test on a GCP VM with 7693 MiB of RAM, 4 vcpus (Xeon(R) CPU @ 2.80GHz) with a balanced persistent disk:

https://gist.githubusercontent.com/mbautin/160fc5b72ec6aae476b02238529a170a/raw

This diff results in a 3.8x speedup (1489.6 sec instead of 5689.3 sec) and a reduction in IO from 17600.9 to 11901.5 kb/sec disk read throughput (a factor of ~1.5) at the end of the load (it generally increases throughout the bulk load).

Re-running the 30M row load test on 2.18.5.1 results on 1990.8 load time, confirming that the code before this diff (with the load time of 5689.3 sec) represents a signficant regression compared to that release.

https://gist.githubusercontent.com/mbautin/84073c28fb2aa3ae92c2146eda55e4cc/raw

Original commit: 78efbc0/D31635

Reviewers: mbautin, timur

Reviewed By: mbautin

Subscribers: kannan, ybase, yql, bogdan

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D31809
  • Loading branch information
spolitov committed Jan 19, 2024
1 parent b77dfa0 commit 1e9aa06
Show file tree
Hide file tree
Showing 14 changed files with 71 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ public void testFastpathIntentdbSeeks() throws Exception {
updateCounter(counter);
final int seeks = counter.intentdbSeeks.get("t").value();

// - Expect one seek per conflict resolution.
assertEquals(seeks, 1);
// - Expect four seeks for the columns being updated (kStrongWrite intents)
// - Expect four seeks for the weak intents
// - Two seeks for the range components
// - One seek for the hash component
// - One seek for the tablet/relation
assertEquals(seeks, 8);
stmt.execute("COMMIT");
}
}
Expand Down
63 changes: 17 additions & 46 deletions src/yb/docdb/conflict_resolution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "yb/docdb/docdb.messages.h"
#include "yb/docdb/doc_read_context.h"
#include "yb/docdb/docdb_rocksdb_util.h"
#include "yb/docdb/docdb_filter_policy.h"
#include "yb/docdb/iter_util.h"
#include "yb/docdb/shared_lock_manager.h"
#include "yb/docdb/transaction_dump.h"
Expand Down Expand Up @@ -197,7 +198,7 @@ class ConflictResolver : public std::enable_shared_from_this<ConflictResolver> {
}

// Reads conflicts for specified intent from DB.
Status ReadIntentConflicts(IntentTypeSet type, bool first, KeyBytes* intent_key_prefix) {
Status ReadIntentConflicts(IntentTypeSet type, KeyBytes* intent_key_prefix) {
EnsureIntentIteratorCreated();

const auto conflicting_intent_types = kIntentTypeSetConflicts[type.ToUIntPtr()];
Expand All @@ -221,12 +222,7 @@ class ConflictResolver : public std::enable_shared_from_this<ConflictResolver> {
VLOG_WITH_PREFIX_AND_FUNC(4) << "Check conflicts in intents DB; Seek: "
<< intent_key_prefix->AsSlice().ToDebugHexString() << " for type "
<< ToString(type);
if (first) {
intent_iter_.Seek(intent_key_prefix->AsSlice());
} else {
intent_iter_.RevalidateAfterUpperBoundChange();
SeekForward(intent_key_prefix->AsSlice(), &intent_iter_);
}
intent_iter_.Seek(intent_key_prefix->AsSlice());
int64_t num_keys_scanned = 0;
while (intent_iter_.Valid()) {
auto existing_key = intent_iter_.key();
Expand Down Expand Up @@ -765,20 +761,19 @@ class StrongConflictChecker {
buffer_(*buffer)
{}

Status Check(
Slice intent_key, bool strong, ConflictManagementPolicy conflict_management_policy,
BloomFilterMode bloom_filter_mode) {
if (PREDICT_FALSE(!value_iter_.Initialized())) {
Status Check(
Slice intent_key, bool strong, ConflictManagementPolicy conflict_management_policy) {
const auto bloom_filter_prefix = VERIFY_RESULT(ExtractFilterPrefixFromKey(intent_key));
if (!value_iter_.Initialized() || bloom_filter_prefix != value_iter_bloom_filter_prefix_) {
value_iter_ = CreateRocksDBIterator(
resolver_.doc_db().regular,
resolver_.doc_db().key_bounds,
bloom_filter_mode,
BloomFilterMode::USE_BLOOM_FILTER,
intent_key,
rocksdb::kDefaultQueryId);
value_iter_.Seek(intent_key);
} else {
SeekForward(intent_key, &value_iter_);
value_iter_bloom_filter_prefix_ = bloom_filter_prefix;
}
value_iter_.Seek(intent_key);

VLOG_WITH_PREFIX_AND_FUNC(4)
<< "Overwrite; Seek: " << intent_key.ToDebugString() << " ("
Expand Down Expand Up @@ -839,7 +834,7 @@ class StrongConflictChecker {
buffer_.Reset(existing_key);
// Already have ValueType::kHybridTime at the end
buffer_.AppendHybridTime(DocHybridTime::kMin);
SeekForward(buffer_.AsSlice(), &value_iter_);
ROCKSDB_SEEK(&value_iter_, buffer_.AsSlice());
}

return value_iter_.status();
Expand All @@ -858,6 +853,7 @@ class StrongConflictChecker {

// RocksDb iterator with bloom filter can be reused in case keys has same hash component.
BoundedRocksDbIterator value_iter_;
Slice value_iter_bloom_filter_prefix_;
};

class ConflictResolverContextBase : public ConflictResolverContext {
Expand Down Expand Up @@ -1058,43 +1054,20 @@ class TransactionConflictResolverContext : public ConflictResolverContextBase {
// DB where the provisional record has already been removed.
resolver->EnsureIntentIteratorCreated();

// Check if we could use bloom filter for value_iter_.
auto bloom_filter_mode = BloomFilterMode::USE_BLOOM_FILTER;
// Whether we plan to instantiate value_iter_.
if (read_time_ != HybridTime::kMax) {
Slice bloom_filter_component;
for (const auto& [key_buffer, data] : container) {
const Slice intent_key = key_buffer.AsSlice();
if (data.full_doc_key || HasStrong(data.types)) {
if (bloom_filter_component.empty()) {
auto size_result = VERIFY_RESULT(dockv::DocKey::EncodedSize(
intent_key, dockv::DocKeyPart::kUpToHashOrFirstRange));
bloom_filter_component = intent_key.Prefix(size_result);
} else if (!intent_key.starts_with(bloom_filter_component)) {
bloom_filter_mode = BloomFilterMode::DONT_USE_BLOOM_FILTER;
break;
}
}
}
}

bool first = true;
for (const auto& i : container) {
const Slice intent_key = i.first.AsSlice();
if (read_time_ != HybridTime::kMax) {
const Slice intent_key = i.first.AsSlice();
bool strong = HasStrong(i.second.types);
// For strong intents or weak intents at a full document key level (i.e. excluding intents
// that omit some final range components of the document key), check for conflicts with
// records in regular RocksDB. We need this because the row might have been deleted
// concurrently by a single-shard transaction or a committed and applied transaction.
if (strong || i.second.full_doc_key) {
RETURN_NOT_OK(checker.Check(
intent_key, strong, GetConflictManagementPolicy(), bloom_filter_mode));
RETURN_NOT_OK(checker.Check(intent_key, strong, GetConflictManagementPolicy()));
}
}
buffer.Reset(i.first.AsSlice());
RETURN_NOT_OK(resolver->ReadIntentConflicts(i.second.types, first, &buffer));
first = false;
buffer.Reset(intent_key);
RETURN_NOT_OK(resolver->ReadIntentConflicts(i.second.types, &buffer));
}

return Status::OK();
Expand Down Expand Up @@ -1248,11 +1221,9 @@ class OperationConflictResolverContext : public ConflictResolverContextBase {
return Status::OK();
}

bool first = true;
for (const auto& [key, intent_data] : container) {
encoded_key_buffer.Reset(key.AsSlice());
RETURN_NOT_OK(resolver->ReadIntentConflicts(intent_data.types, first, &encoded_key_buffer));
first = false;
RETURN_NOT_OK(resolver->ReadIntentConflicts(intent_data.types, &encoded_key_buffer));
}

return Status::OK();
Expand Down
4 changes: 0 additions & 4 deletions src/yb/docdb/doc_operation-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,6 @@ class DocOperationTest : public DocDBTestBase {
.doc_write_batch = &doc_write_batch,
.read_operation_data = ReadOperationData(),
.restart_read_ht = &restart_read_ht,
.iterator = nullptr,
.restart_seek = true,
.schema_packing_provider = nullptr,
}));
ASSERT_OK(WriteToRocksDB(doc_write_batch, hybrid_time));
Expand Down Expand Up @@ -391,8 +389,6 @@ TEST_F(DocOperationTest, TestRedisSetKVWithTTL) {
.doc_write_batch = &doc_write_batch,
.read_operation_data = {},
.restart_read_ht = nullptr,
.iterator = nullptr,
.restart_seek = true,
.schema_packing_provider = nullptr,
}));

Expand Down
13 changes: 0 additions & 13 deletions src/yb/docdb/doc_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ struct DocOperationApplyData {
DocWriteBatch* doc_write_batch;
ReadOperationData read_operation_data;
HybridTime* restart_read_ht;
DocRowwiseIterator* iterator;
// Whether we should restart seek while fetching entry from doc key.
bool restart_seek;
SchemaPackingProvider* schema_packing_provider; // null okay

CoarseTimePoint deadline() const {
Expand Down Expand Up @@ -84,16 +81,6 @@ class DocOperation {
virtual Type OpType() = 0;
virtual void ClearResponse() = 0;

// Update iterator stored in iterator, and setup data to use it.
// prev - The operation before this one, that works with the current iterator.
// Should be used to check whether iterators are compatible.
// single_operation - is this operation is the only operation in batch.
virtual Status UpdateIterator(
DocOperationApplyData* data, DocOperation* prev, SingleOperation single_operation,
std::optional<DocRowwiseIterator>* iterator) {
return Status::OK();
}

virtual std::string ToString() const = 0;
};

Expand Down
7 changes: 1 addition & 6 deletions src/yb/docdb/doc_pgsql_scanspec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ DocPgsqlScanSpec::DocPgsqlScanSpec(
bool is_forward_scan,
const DocKey& lower_doc_key,
const DocKey& upper_doc_key,
const size_t prefix_length,
AddHighestToUpperDocKey add_highest_to_upper_doc_key)
const size_t prefix_length)
: PgsqlScanSpec(
schema, is_forward_scan, query_id,
condition ? std::make_unique<qlexpr::QLScanRange>(schema, *condition) : nullptr,
Expand All @@ -112,10 +111,6 @@ DocPgsqlScanSpec::DocPgsqlScanSpec(
start_doc_key_(start_doc_key.empty() ? KeyBytes() : start_doc_key.Encode()),
lower_doc_key_(lower_doc_key.Encode()),
upper_doc_key_(upper_doc_key.Encode()) {
if (add_highest_to_upper_doc_key) {
upper_doc_key_.AppendKeyEntryTypeBeforeGroupEnd(KeyEntryType::kHighest);
}

if (!hashed_components_->empty() && schema.num_hash_key_columns() > 0) {
options_ = std::make_shared<std::vector<qlexpr::OptionList>>(schema.num_dockey_components());
options_col_ids_.reserve(schema.num_dockey_components());
Expand Down
6 changes: 1 addition & 5 deletions src/yb/docdb/doc_pgsql_scanspec.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@

namespace yb::docdb {

YB_STRONGLY_TYPED_BOOL(AddHighestToUpperDocKey);

// DocDB variant of scanspec.
class DocPgsqlScanSpec : public qlexpr::PgsqlScanSpec {
public:
Expand Down Expand Up @@ -64,9 +62,7 @@ class DocPgsqlScanSpec : public qlexpr::PgsqlScanSpec {
bool is_forward_scan = true,
const dockv::DocKey& lower_doc_key = DefaultStartDocKey(),
const dockv::DocKey& upper_doc_key = DefaultStartDocKey(),
const size_t prefix_length = 0,
AddHighestToUpperDocKey add_highest_to_upper_doc_key =
AddHighestToUpperDocKey::kFalse);
const size_t prefix_length = 0);

//------------------------------------------------------------------------------------------------
// Filters.
Expand Down
7 changes: 0 additions & 7 deletions src/yb/docdb/docdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -291,17 +291,10 @@ Status AssembleDocWriteBatch(const vector<unique_ptr<DocOperation>>& doc_write_o
.doc_write_batch = &doc_write_batch,
.read_operation_data = read_operation_data,
.restart_read_ht = restart_read_ht,
.iterator = nullptr,
.restart_seek = true,
.schema_packing_provider = schema_packing_provider,
};

std::optional<DocRowwiseIterator> iterator;
DocOperation* prev_operation = nullptr;
SingleOperation single_operation(doc_write_ops.size() == 1);
for (const unique_ptr<DocOperation>& doc_op : doc_write_ops) {
RETURN_NOT_OK(doc_op->UpdateIterator(&data, prev_operation, single_operation, &iterator));
prev_operation = doc_op.get();
Status s = doc_op->Apply(data);
if (s.IsQLError() && doc_op->OpType() == DocOperation::Type::QL_WRITE_OPERATION) {
std::string error_msg;
Expand Down
5 changes: 5 additions & 0 deletions src/yb/docdb/docdb_filter_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,9 @@ DocDbAwareV3FilterPolicy::GetKeyTransformer() const {
return &DocKeyComponentsExtractor<dockv::DocKeyPart::kUpToHashOrFirstRange>::GetInstance();
}

Result<Slice> ExtractFilterPrefixFromKey(Slice key) {
return key.Prefix(VERIFY_RESULT(dockv::DocKey::EncodedSize(
key, dockv::DocKeyPart::kUpToHashOrFirstRange)));
}

} // namespace yb::docdb
2 changes: 2 additions & 0 deletions src/yb/docdb/docdb_filter_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,6 @@ class DocDbAwareV3FilterPolicy : public DocDbAwareFilterPolicyBase {
const KeyTransformer* GetKeyTransformer() const override;
};

Result<Slice> ExtractFilterPrefixFromKey(Slice key);

} // namespace yb::docdb
58 changes: 11 additions & 47 deletions src/yb/docdb/pgsql_operation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1365,58 +1365,22 @@ const dockv::ReaderProjection& PgsqlWriteOperation::projection() const {
return *projection_;
}

Status PgsqlWriteOperation::UpdateIterator(
DocOperationApplyData* data, DocOperation* prev_op, SingleOperation single_operation,
std::optional<DocRowwiseIterator>* iterator) {
if (prev_op) {
auto* prev = down_cast<PgsqlWriteOperation*>(prev_op);
if (request_.table_id() == prev->request_.table_id() &&
projection() == prev->projection()) {
data->restart_seek = doc_key_ <= prev->doc_key_;
return Status::OK();
}
}

iterator->emplace(
projection(),
*doc_read_context_,
txn_op_context_,
data->doc_write_batch->doc_db(),
data->read_operation_data,
data->doc_write_batch->pending_op());

static const dockv::DocKey kEmptyDocKey;
auto& key = single_operation ? doc_key_ : kEmptyDocKey;
static const dockv::KeyEntryValues kEmptyVec;
DocPgsqlScanSpec scan_spec(
doc_read_context_->schema(),
request_.stmt_id(),
/* hashed_components= */ kEmptyVec,
/* range_components= */ kEmptyVec,
/* condition= */ nullptr ,
/* hash_code= */ boost::none,
/* max_hash_code= */ boost::none,
key,
/* is_forward_scan= */ true ,
key,
key,
0,
AddHighestToUpperDocKey::kTrue);

data->iterator = &**iterator;
data->restart_seek = true;

return (**iterator).Init(scan_spec, SkipSeek::kTrue);
}

Result<bool> PgsqlWriteOperation::ReadColumns(
const DocOperationApplyData& data, dockv::PgTableRow* table_row) {
// Filter the columns using primary key.
if (!VERIFY_RESULT(data.iterator->PgFetchRow(
encoded_doc_key_.as_slice(), data.restart_seek, table_row))) {
DocPgsqlScanSpec spec(doc_read_context_->schema(), request_.stmt_id(), doc_key_);
auto iterator = DocRowwiseIterator(
projection(),
*doc_read_context_,
txn_op_context_,
data.doc_write_batch->doc_db(),
data.read_operation_data,
data.doc_write_batch->pending_op());
RETURN_NOT_OK(iterator.Init(spec));
if (!VERIFY_RESULT(iterator.PgFetchNext(table_row))) {
return false;
}
data.restart_read_ht->MakeAtLeast(VERIFY_RESULT(data.iterator->RestartReadHt()));
data.restart_read_ht->MakeAtLeast(VERIFY_RESULT(iterator.RestartReadHt()));

return true;
}
Expand Down
4 changes: 0 additions & 4 deletions src/yb/docdb/pgsql_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ class PgsqlWriteOperation :
// Execute write.
Status Apply(const DocOperationApplyData& data) override;

Status UpdateIterator(
DocOperationApplyData* data, DocOperation* prev, SingleOperation single_operation,
std::optional<DocRowwiseIterator>* iterator) final;

private:
void ClearResponse() override {
if (response_) {
Expand Down
2 changes: 0 additions & 2 deletions src/yb/master/restore_sys_catalog_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ Status ApplyWriteRequest(
.doc_write_batch = write_batch,
.read_operation_data = {},
.restart_read_ht = nullptr,
.iterator = nullptr,
.restart_seek = true,
.schema_packing_provider = schema_packing_provider,
};
qlexpr::IndexMap index_map;
Expand Down
2 changes: 0 additions & 2 deletions src/yb/tools/yb-bulk_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,6 @@ Status BulkLoadTask::InsertRow(const string &row,
.read_operation_data = docdb::ReadOperationData::FromSingleReadTime(
HybridTime::FromMicros(kYugaByteMicrosecondEpoch)),
.restart_read_ht = nullptr,
.iterator = nullptr,
.restart_seek = true,
.schema_packing_provider = db_fixture,
}));
return Status::OK();
Expand Down

0 comments on commit 1e9aa06

Please sign in to comment.