Skip to content

Commit

Permalink
Cache the mapping of sequence to log block index in transaction log i…
Browse files Browse the repository at this point in the history
…terator
  • Loading branch information
HypenZou committed Apr 15, 2024
1 parent 6fbd02f commit de39db5
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 15 deletions.
17 changes: 12 additions & 5 deletions db/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ Reader::Reader(std::shared_ptr<Logger> info_log,
compression_type_record_read_(false),
uncompress_(nullptr),
hash_state_(nullptr),
uncompress_hash_state_(nullptr){}
uncompress_hash_state_(nullptr),
skipped_(false) {}

Reader::~Reader() {
delete[] backing_store_;
Expand Down Expand Up @@ -112,6 +113,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
*record = fragment;
last_record_offset_ = prospective_record_offset;
first_record_read_ = true;
skipped_ = false;
return true;

case kFirstType:
Expand All @@ -130,13 +132,16 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
in_fragmented_record = true;
skipped_ = false;
break;

case kMiddleType:
case kRecyclableMiddleType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
if (!skipped_) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
}
} else {
if (record_checksum != nullptr) {
XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
Expand All @@ -148,8 +153,10 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
case kLastType:
case kRecyclableLastType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
if (!skipped_) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
}
} else {
if (record_checksum != nullptr) {
XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
Expand Down
18 changes: 18 additions & 0 deletions db/log_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "db/log_format.h"
#include "file/sequence_file_reader.h"
#include "rocksdb/io_status.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
Expand Down Expand Up @@ -123,6 +124,20 @@ class Reader {
return !first_record_read_ && compression_type_record_read_;
}

rocksdb::IOStatus Skip(uint64_t n) {
if (n == 0) {
return IOStatus::OK();
}
auto s = file_->Skip(n);
if (!s.ok()) {
return s;
}
end_of_buffer_offset_ += n;
skipped_ = true;
buffer_.clear();
return s;
}

protected:
std::shared_ptr<Logger> info_log_;
const std::unique_ptr<SequentialFileReader> file_;
Expand Down Expand Up @@ -170,6 +185,9 @@ class Reader {
// is only for WAL logs.
UnorderedMap<uint32_t, size_t> recorded_cf_to_ts_sz_;

// if log reader is skipped, may need to drop bytes until seek to first of a record
bool skipped_;

// Extend record types with the following special values
enum {
kEof = kMaxRecordType + 1,
Expand Down
39 changes: 34 additions & 5 deletions db/transaction_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seq,
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer)
const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer,
const std::shared_ptr<TransactionLogSeqCache>& transaction_log_seq_cache)
: dir_(dir),
options_(options),
read_options_(read_options),
Expand All @@ -32,6 +33,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
started_(false),
is_valid_(false),
current_file_index_(0),
transaction_log_seq_cache_(transaction_log_seq_cache),
current_batch_seq_(0),
current_last_seq_(0) {
assert(files_ != nullptr);
Expand Down Expand Up @@ -113,8 +115,13 @@ void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index,
} else if (!current_status_.ok()) {
return;
}
Status s =
OpenLogReader(files_->at(static_cast<size_t>(start_file_index)).get());
auto& file = files_->at(static_cast<size_t>(start_file_index));
uint64_t hint_block_index{0};
if (read_options_.with_cache_) {
transaction_log_seq_cache_->Lookup(
file->LogNumber(), starting_sequence_number_, &hint_block_index);
}
Status s = OpenLogReader(file.get(), hint_block_index * log::kBlockSize);
if (!s.ok()) {
current_status_ = s;
reporter_.Info(current_status_.ToString().c_str());
Expand Down Expand Up @@ -207,7 +214,13 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) {
// Open the next file
if (current_file_index_ < files_->size() - 1) {
++current_file_index_;
Status s = OpenLogReader(files_->at(current_file_index_).get());
auto& file = files_->at(static_cast<size_t>(current_file_index_));
uint64_t hint_block_index{0};
if (read_options_.with_cache_) {
transaction_log_seq_cache_->Lookup(
file->LogNumber(), starting_sequence_number_, &hint_block_index);
}
Status s = OpenLogReader(file.get(), hint_block_index * log::kBlockSize);
if (!s.ok()) {
is_valid_ = false;
current_status_ = s;
Expand Down Expand Up @@ -276,12 +289,25 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
// currentBatchSeq_ can only change here
assert(current_last_seq_ <= versions_->LastSequence());

if (read_options_.with_cache_) {
// cache the mapping of sequence to log block index when seeking to the
// start or end sequence
if ((current_batch_seq_ <= starting_sequence_number_ &&
current_last_seq_ >= starting_sequence_number_) ||
current_last_seq_ == versions_->LastSequence()) {
transaction_log_seq_cache_->Insert(
current_log_reader_->GetLogNumber(), current_batch_seq_,
current_log_reader_->LastRecordOffset() / log::kBlockSize);
}
}

current_batch_ = std::move(batch);
is_valid_ = true;
current_status_ = Status::OK();
}

Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file) {
Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file,
uint64_t hint_offset) {
std::unique_ptr<SequentialFileReader> file;
Status s = OpenLogFile(log_file, &file);
if (!s.ok()) {
Expand All @@ -291,6 +317,9 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file) {
current_log_reader_.reset(
new log::Reader(options_->info_log, std::move(file), &reporter_,
read_options_.verify_checksums_, log_file->LogNumber()));
if (hint_offset > 0) {
return current_log_reader_->Skip(hint_offset);
}
return Status::OK();
}
} // namespace ROCKSDB_NAMESPACE
60 changes: 58 additions & 2 deletions db/transaction_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "rocksdb/options.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/types.h"
#include "util/random.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -54,14 +55,68 @@ class LogFileImpl : public LogFile {
uint64_t sizeFileBytes_;
};

class TransactionLogSeqCache {
public:
TransactionLogSeqCache(size_t size)
: size_(size), rand_(std::random_device{}()) {}
struct SeqWithFileBlockIdx {
uint64_t log_number;
uint64_t seq_number;
uint64_t block_index;
SeqWithFileBlockIdx(uint64_t log_num, uint64_t seq_num, uint64_t block_idx)
: log_number(log_num), seq_number(seq_num), block_index(block_idx){};
bool operator<(const SeqWithFileBlockIdx& other) const {
return std::tie(other.log_number, other.seq_number, other.block_index) <
std::tie(log_number, seq_number, block_index);
}
bool operator==(const SeqWithFileBlockIdx& other) const {
return std::tie(log_number, seq_number, block_index) ==
std::tie(other.log_number, other.seq_number, other.block_index);
}
};

void Insert(uint64_t log_number, uint64_t seq_number, uint64_t block_index) {
std::lock_guard<std::mutex> lk{mutex_};
if (cache_.size() > size_) {
auto iter = cache_.begin();
std::advance(iter, rand_.Next() % cache_.size());
cache_.erase(iter);
}
cache_.emplace(log_number, seq_number, block_index);
}

bool Lookup(uint64_t log_number, uint64_t seq_number, uint64_t* block_index) {
std::lock_guard<std::mutex> lk{mutex_};
const static uint64_t max_block_index =
std::numeric_limits<uint64_t>::max();
auto iter = cache_.lower_bound(
SeqWithFileBlockIdx{log_number, seq_number, max_block_index});
if (iter == cache_.end()) {
return false;
}
if (log_number == iter->log_number && seq_number >= iter->seq_number) {
*block_index = iter->block_index;
return true;
}
return false;
}

private:
size_t size_;
std::mutex mutex_;
Random32 rand_;
std::set<SeqWithFileBlockIdx> cache_;
};

class TransactionLogIteratorImpl : public TransactionLogIterator {
public:
TransactionLogIteratorImpl(
const std::string& dir, const ImmutableDBOptions* options,
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seqNum,
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer);
const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer,
const std::shared_ptr<TransactionLogSeqCache>& transaction_log_seq_cache);

bool Valid() override;

Expand Down Expand Up @@ -92,6 +147,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
std::unique_ptr<WriteBatch> current_batch_;
std::unique_ptr<log::Reader> current_log_reader_;
std::string scratch_;
std::shared_ptr<TransactionLogSeqCache> transaction_log_seq_cache_;
Status OpenLogFile(const LogFile* log_file,
std::unique_ptr<SequentialFileReader>* file);

Expand Down Expand Up @@ -123,6 +179,6 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
bool IsBatchExpected(const WriteBatch* batch, SequenceNumber expected_seq);
// Update current batch if a continuous batch is found.
void UpdateCurrentWriteBatch(const Slice& record);
Status OpenLogReader(const LogFile* file);
Status OpenLogReader(const LogFile* file, uint64_t hint_offset);
};
} // namespace ROCKSDB_NAMESPACE
2 changes: 1 addition & 1 deletion db/wal_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ Status WalManager::GetUpdatesSince(
}
iter->reset(new TransactionLogIteratorImpl(
wal_dir_, &db_options_, read_options, file_options_, seq,
std::move(wal_files), version_set, seq_per_batch_, io_tracer_));
std::move(wal_files), version_set, seq_per_batch_, io_tracer_, transaction_log_seq_cache_));
return (*iter)->status();
}

Expand Down
7 changes: 6 additions & 1 deletion db/wal_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <utility>
#include <vector>

#include "db/transaction_log_impl.h"
#include "db/version_set.h"
#include "file/file_util.h"
#include "options/db_options.h"
Expand Down Expand Up @@ -47,7 +48,8 @@ class WalManager {
seq_per_batch_(seq_per_batch),
wal_dir_(db_options_.GetWalDir()),
wal_in_db_path_(db_options_.IsWalDirSameAsDBPath()),
io_tracer_(io_tracer) {}
io_tracer_(io_tracer),
transaction_log_seq_cache_(std::make_shared<TransactionLogSeqCache>(128)) {}

Status GetSortedWalFiles(VectorLogPtr& files);

Expand Down Expand Up @@ -132,6 +134,9 @@ class WalManager {
static constexpr uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600;

std::shared_ptr<IOTracer> io_tracer_;

// cache for sequence to log block index
std::shared_ptr<TransactionLogSeqCache> transaction_log_seq_cache_;
};

} // namespace ROCKSDB_NAMESPACE
8 changes: 7 additions & 1 deletion include/rocksdb/transaction_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,13 @@ class TransactionLogIterator {
// Default: true
bool verify_checksums_;

ReadOptions() : verify_checksums_(true) {}
// if true, the mapping of db sequence to WAL file block index will be
// cached. This prevents the need to read from the beginning of the target
// wal log when GetUpdatesSince() is called.
// Default: true
bool with_cache_;

ReadOptions() : verify_checksums_(true), with_cache_(true) {}

explicit ReadOptions(bool verify_checksums)
: verify_checksums_(verify_checksums) {}
Expand Down

0 comments on commit de39db5

Please sign in to comment.