Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache the mapping of sequence to log block index in transaction log iterator #12538

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions db/db_log_iter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,26 @@ TEST_F(DBTestXactLogIterator, TransactionLogIteratorBlobs) {
"Delete(0, key2)",
handler.seen);
}

TEST_F(DBTestXactLogIterator, TransactionIteratorCache) {
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(dbfull()->Put({}, handles_[0], "key1", DummyString(1024 * 32)));
ASSERT_OK(dbfull()->Put({}, handles_[1], "key2", DummyString(1024 * 32)));
std::string batch_data;
{
auto iter = OpenTransactionLogIter(2);
ASSERT_TRUE(iter->Valid());
batch_data = iter->GetBatch().writeBatchPtr->Data();
}
{
auto iter = OpenTransactionLogIter(2);
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->GetBatch().writeBatchPtr->Data(), batch_data);
}
}

} // namespace ROCKSDB_NAMESPACE


Expand Down
17 changes: 12 additions & 5 deletions db/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ Reader::Reader(std::shared_ptr<Logger> info_log,
compression_type_record_read_(false),
uncompress_(nullptr),
hash_state_(nullptr),
uncompress_hash_state_(nullptr){}
uncompress_hash_state_(nullptr),
skipped_(false) {}

Reader::~Reader() {
delete[] backing_store_;
Expand Down Expand Up @@ -112,6 +113,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
*record = fragment;
last_record_offset_ = prospective_record_offset;
first_record_read_ = true;
skipped_ = false;
return true;

case kFirstType:
Expand All @@ -130,13 +132,16 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
in_fragmented_record = true;
skipped_ = false;
break;

case kMiddleType:
case kRecyclableMiddleType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
if (!skipped_) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
}
} else {
if (record_checksum != nullptr) {
XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
Expand All @@ -148,8 +153,10 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
case kLastType:
case kRecyclableLastType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
if (!skipped_) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
}
} else {
if (record_checksum != nullptr) {
XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
Expand Down
19 changes: 19 additions & 0 deletions db/log_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "db/log_format.h"
#include "file/sequence_file_reader.h"
#include "rocksdb/io_status.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
Expand Down Expand Up @@ -123,6 +124,20 @@ class Reader {
return !first_record_read_ && compression_type_record_read_;
}

IOStatus Skip(uint64_t n) {
if (n == 0) {
return IOStatus::OK();
}
auto s = file_->Skip(n);
if (!s.ok()) {
return s;
}
end_of_buffer_offset_ += n;
skipped_ = true;
buffer_.clear();
return s;
}

protected:
std::shared_ptr<Logger> info_log_;
const std::unique_ptr<SequentialFileReader> file_;
Expand Down Expand Up @@ -170,6 +185,10 @@ class Reader {
// is only for WAL logs.
UnorderedMap<uint32_t, size_t> recorded_cf_to_ts_sz_;

// if log reader is skipped, may need to drop bytes
// until seek to the first of a record
bool skipped_;

// Extend record types with the following special values
enum {
kEof = kMaxRecordType + 1,
Expand Down
39 changes: 34 additions & 5 deletions db/transaction_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seq,
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer)
const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer,
const std::shared_ptr<TransactionLogSeqCache>& transaction_log_seq_cache)
: dir_(dir),
options_(options),
read_options_(read_options),
Expand All @@ -32,6 +33,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
started_(false),
is_valid_(false),
current_file_index_(0),
transaction_log_seq_cache_(transaction_log_seq_cache),
current_batch_seq_(0),
current_last_seq_(0) {
assert(files_ != nullptr);
Expand Down Expand Up @@ -113,8 +115,13 @@ void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index,
} else if (!current_status_.ok()) {
return;
}
Status s =
OpenLogReader(files_->at(static_cast<size_t>(start_file_index)).get());
auto& file = files_->at(static_cast<size_t>(start_file_index));
uint64_t hint_block_index{0};
if (read_options_.with_cache_) {
transaction_log_seq_cache_->Lookup(
file->LogNumber(), starting_sequence_number_, &hint_block_index);
}
Status s = OpenLogReader(file.get(), hint_block_index * log::kBlockSize);
if (!s.ok()) {
current_status_ = s;
reporter_.Info(current_status_.ToString().c_str());
Expand Down Expand Up @@ -207,7 +214,13 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) {
// Open the next file
if (current_file_index_ < files_->size() - 1) {
++current_file_index_;
Status s = OpenLogReader(files_->at(current_file_index_).get());
auto& file = files_->at(static_cast<size_t>(current_file_index_));
uint64_t hint_block_index{0};
if (read_options_.with_cache_) {
transaction_log_seq_cache_->Lookup(
file->LogNumber(), starting_sequence_number_, &hint_block_index);
}
Status s = OpenLogReader(file.get(), hint_block_index * log::kBlockSize);
if (!s.ok()) {
is_valid_ = false;
current_status_ = s;
Expand Down Expand Up @@ -276,12 +289,25 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
// currentBatchSeq_ can only change here
assert(current_last_seq_ <= versions_->LastSequence());

if (read_options_.with_cache_) {
// cache the mapping of sequence to log block index when seeking to the
// start or end sequence
if ((current_batch_seq_ <= starting_sequence_number_ &&
current_last_seq_ >= starting_sequence_number_) ||
current_last_seq_ == versions_->LastSequence()) {
transaction_log_seq_cache_->Insert(
current_log_reader_->GetLogNumber(), current_batch_seq_,
current_log_reader_->LastRecordOffset() / log::kBlockSize);
}
}

current_batch_ = std::move(batch);
is_valid_ = true;
current_status_ = Status::OK();
}

Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file) {
Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file,
uint64_t hint_offset) {
std::unique_ptr<SequentialFileReader> file;
Status s = OpenLogFile(log_file, &file);
if (!s.ok()) {
Expand All @@ -291,6 +317,9 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file) {
current_log_reader_.reset(
new log::Reader(options_->info_log, std::move(file), &reporter_,
read_options_.verify_checksums_, log_file->LogNumber()));
if (hint_offset > 0) {
return current_log_reader_->Skip(hint_offset);
}
return Status::OK();
}
} // namespace ROCKSDB_NAMESPACE
60 changes: 58 additions & 2 deletions db/transaction_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "rocksdb/options.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/types.h"
#include "util/random.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -54,14 +55,68 @@ class LogFileImpl : public LogFile {
uint64_t sizeFileBytes_;
};

class TransactionLogSeqCache {
public:
TransactionLogSeqCache(size_t size)
: size_(size), rand_(std::random_device{}()) {}
struct SeqWithFileBlockIdx {
uint64_t log_number;
uint64_t seq_number;
uint64_t block_index;
SeqWithFileBlockIdx(uint64_t log_num, uint64_t seq_num, uint64_t block_idx)
: log_number(log_num), seq_number(seq_num), block_index(block_idx){};
bool operator<(const SeqWithFileBlockIdx& other) const {
return std::tie(other.log_number, other.seq_number, other.block_index) <
std::tie(log_number, seq_number, block_index);
}
bool operator==(const SeqWithFileBlockIdx& other) const {
return std::tie(log_number, seq_number, block_index) ==
std::tie(other.log_number, other.seq_number, other.block_index);
}
};

void Insert(uint64_t log_number, uint64_t seq_number, uint64_t block_index) {
std::lock_guard<std::mutex> lk{mutex_};
if (cache_.size() > size_) {
auto iter = cache_.begin();
std::advance(iter, rand_.Next() % cache_.size());
cache_.erase(iter);
}
cache_.emplace(log_number, seq_number, block_index);
}

bool Lookup(uint64_t log_number, uint64_t seq_number, uint64_t* block_index) {
std::lock_guard<std::mutex> lk{mutex_};
const static uint64_t max_block_index =
std::numeric_limits<uint64_t>::max();
auto iter = cache_.lower_bound(
SeqWithFileBlockIdx{log_number, seq_number, max_block_index});
if (iter == cache_.end()) {
return false;
}
if (log_number == iter->log_number && seq_number >= iter->seq_number) {
*block_index = iter->block_index;
return true;
}
return false;
}

private:
size_t size_;
std::mutex mutex_;
Random32 rand_;
std::set<SeqWithFileBlockIdx> cache_;
};

class TransactionLogIteratorImpl : public TransactionLogIterator {
public:
TransactionLogIteratorImpl(
const std::string& dir, const ImmutableDBOptions* options,
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seqNum,
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer);
const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer,
const std::shared_ptr<TransactionLogSeqCache>& transaction_log_seq_cache);

bool Valid() override;

Expand Down Expand Up @@ -92,6 +147,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
std::unique_ptr<WriteBatch> current_batch_;
std::unique_ptr<log::Reader> current_log_reader_;
std::string scratch_;
std::shared_ptr<TransactionLogSeqCache> transaction_log_seq_cache_;
Status OpenLogFile(const LogFile* log_file,
std::unique_ptr<SequentialFileReader>* file);

Expand Down Expand Up @@ -123,6 +179,6 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
bool IsBatchExpected(const WriteBatch* batch, SequenceNumber expected_seq);
// Update current batch if a continuous batch is found.
void UpdateCurrentWriteBatch(const Slice& record);
Status OpenLogReader(const LogFile* file);
Status OpenLogReader(const LogFile* file, uint64_t hint_offset);
};
} // namespace ROCKSDB_NAMESPACE
3 changes: 2 additions & 1 deletion db/wal_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ Status WalManager::GetUpdatesSince(
}
iter->reset(new TransactionLogIteratorImpl(
wal_dir_, &db_options_, read_options, file_options_, seq,
std::move(wal_files), version_set, seq_per_batch_, io_tracer_));
std::move(wal_files), version_set, seq_per_batch_, io_tracer_,
transaction_log_seq_cache_));
return (*iter)->status();
}

Expand Down
8 changes: 7 additions & 1 deletion db/wal_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <utility>
#include <vector>

#include "db/transaction_log_impl.h"
#include "db/version_set.h"
#include "file/file_util.h"
#include "options/db_options.h"
Expand Down Expand Up @@ -47,7 +48,9 @@ class WalManager {
seq_per_batch_(seq_per_batch),
wal_dir_(db_options_.GetWalDir()),
wal_in_db_path_(db_options_.IsWalDirSameAsDBPath()),
io_tracer_(io_tracer) {}
io_tracer_(io_tracer),
transaction_log_seq_cache_(
std::make_shared<TransactionLogSeqCache>(128)) {}

Status GetSortedWalFiles(VectorLogPtr& files);

Expand Down Expand Up @@ -132,6 +135,9 @@ class WalManager {
static constexpr uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600;

std::shared_ptr<IOTracer> io_tracer_;

// cache for sequence to log block index
std::shared_ptr<TransactionLogSeqCache> transaction_log_seq_cache_;
};

} // namespace ROCKSDB_NAMESPACE
8 changes: 7 additions & 1 deletion include/rocksdb/transaction_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,13 @@ class TransactionLogIterator {
// Default: true
bool verify_checksums_;

ReadOptions() : verify_checksums_(true) {}
// if true, the mapping of db sequence to WAL file block index will be
// cached. This prevents repeated reading from the beginning of the
// target wal log when GetUpdatesSince() is called.
// Default: true
bool with_cache_;

ReadOptions() : verify_checksums_(true), with_cache_(true) {}

explicit ReadOptions(bool verify_checksums)
: verify_checksums_(verify_checksums) {}
Expand Down