Skip to content

Commit

Permalink
update load and dump impl
Browse files Browse the repository at this point in the history
  • Loading branch information
lhsoft committed Apr 17, 2024
1 parent e582791 commit 2d73f7f
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 13 deletions.
11 changes: 10 additions & 1 deletion include/rocksdb/utilities/cache_dump_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ class CacheDumpReader {
// dump or load process related control variables can be added here.
struct CacheDumpOptions {
SystemClock* clock;
// Deadline for dumper or loader in microseconds
std::chrono::microseconds deadline = std::chrono::microseconds::zero();
// Max size bytes for dumper or loader
uint64_t max_size_bytes = 0;
};

// NOTE that: this class is EXPERIMENTAL! May be changed in the future!
Expand Down Expand Up @@ -110,6 +114,10 @@ class CacheDumpedLoader {
return IOStatus::NotSupported(
"RestoreCacheEntriesToSecondaryCache is not supported");
}
virtual IOStatus RestoreCacheEntriesToCache() {
return IOStatus::NotSupported(
"RestoreCacheEntriesToCache is not supported");
}
};

// Get the writer which stores all the metadata and data sequentially to a file
Expand All @@ -136,7 +144,8 @@ Status NewDefaultCacheDumpedLoader(
const BlockBasedTableOptions& toptions,
const std::shared_ptr<SecondaryCache>& secondary_cache,
std::unique_ptr<CacheDumpReader>&& reader,
std::unique_ptr<CacheDumpedLoader>* cache_dump_loader);
std::unique_ptr<CacheDumpedLoader>* cache_dump_loader,
const std::shared_ptr<Cache>& cache);

} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE
5 changes: 3 additions & 2 deletions utilities/cache_dump_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ Status NewDefaultCacheDumpedLoader(
const BlockBasedTableOptions& toptions,
const std::shared_ptr<SecondaryCache>& secondary_cache,
std::unique_ptr<CacheDumpReader>&& reader,
std::unique_ptr<CacheDumpedLoader>* cache_dump_loader) {
std::unique_ptr<CacheDumpedLoader>* cache_dump_loader,
const std::shared_ptr<Cache>& cache) {
cache_dump_loader->reset(new CacheDumpedLoaderImpl(
dump_options, toptions, secondary_cache, std::move(reader)));
dump_options, toptions, secondary_cache, std::move(reader), cache));
return Status::OK();
}

Expand Down
179 changes: 173 additions & 6 deletions utilities/cache_dump_load_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
#include "table/block_based/block_based_table_reader.h"
#ifndef ROCKSDB_LITE

#include <algorithm>
#include <limits>
#include <iostream>

#include "utilities/cache_dump_load_impl.h"

#include "cache/cache_entry_roles.h"
Expand All @@ -17,6 +21,7 @@
#include "rocksdb/utilities/ldb_cmd.h"
#include "table/format.h"
#include "util/crc32c.h"
#include "memory/memory_allocator.h"

namespace ROCKSDB_NAMESPACE {

Expand All @@ -26,6 +31,7 @@ namespace ROCKSDB_NAMESPACE {
// requirement.
Status CacheDumperImpl::SetDumpFilter(std::vector<DB*> db_list) {
Status s = Status::OK();
dump_all_keys_ = false;
for (size_t i = 0; i < db_list.size(); i++) {
assert(i < db_list.size());
TablePropertiesCollection ptc;
Expand Down Expand Up @@ -68,6 +74,7 @@ IOStatus CacheDumperImpl::DumpCacheEntriesToWriter() {
return IOStatus::InvalidArgument("System clock is null");
}
clock_ = options_.clock;
deadline_ = options_.deadline;
// We copy the Cache Deleter Role Map as its member.
role_map_ = CopyCacheDeleterRoleMap();
// Set the sequence number
Expand Down Expand Up @@ -112,6 +119,19 @@ CacheDumperImpl::DumpOneBlockCallBack() {
return [&](const Slice& key, void* value, size_t /*charge*/,
Cache::DeleterFn deleter) {
// Step 1: get the type of the block from role_map_
if (options_.max_size_bytes > 0 &&
dumped_size_bytes_ > options_.max_size_bytes) {
return;
}

uint64_t timestamp = clock_->NowMicros();
if (deadline_.count()) {
std::chrono::microseconds now = std::chrono::microseconds(timestamp);
if (now >= deadline_) {
return;
}
}

auto e = role_map_.find(deleter);
CacheEntryRole role;
CacheDumpUnitType type = CacheDumpUnitType::kBlockTypeMax;
Expand All @@ -123,7 +143,7 @@ CacheDumperImpl::DumpOneBlockCallBack() {
bool filter_out = false;

// Step 2: based on the key prefix, check if the block should be filter out.
if (ShouldFilterOut(key)) {
if (!dump_all_keys_ && ShouldFilterOut(key)) {
filter_out = true;
}

Expand Down Expand Up @@ -175,8 +195,9 @@ CacheDumperImpl::DumpOneBlockCallBack() {
// Step 4: if the block should not be filter out, write the block to the
// CacheDumpWriter
if (!filter_out && block_start != nullptr) {
WriteBlock(type, key, Slice(block_start, block_len))
WriteBlock(type, key, Slice(block_start, block_len), timestamp)
.PermitUncheckedError();
dumped_size_bytes_ += block_len;
}
};
}
Expand All @@ -190,8 +211,7 @@ CacheDumperImpl::DumpOneBlockCallBack() {
// First, we write the metadata first, which is a fixed size string. Then, we
// Append the dump unit string to the writer.
IOStatus CacheDumperImpl::WriteBlock(CacheDumpUnitType type, const Slice& key,
const Slice& value) {
uint64_t timestamp = clock_->NowMicros();
const Slice& value, uint64_t timestamp) {
uint32_t value_checksum = crc32c::Value(value.data(), value.size());

// First, serialize the block information in a string
Expand Down Expand Up @@ -241,15 +261,17 @@ IOStatus CacheDumperImpl::WriteHeader() {
"block_size, block_data, block_checksum> cache_value\n";
std::string header_value(s.str());
CacheDumpUnitType type = CacheDumpUnitType::kHeader;
return WriteBlock(type, header_key, header_value);
uint64_t timestamp = clock_->NowMicros();
return WriteBlock(type, header_key, header_value, timestamp);
}

// Write the footer after all the blocks are stored to indicate the ending.
IOStatus CacheDumperImpl::WriteFooter() {
std::string footer_key = "footer";
std::string footer_value("cache dump completed");
CacheDumpUnitType type = CacheDumpUnitType::kFooter;
return WriteBlock(type, footer_key, footer_value);
uint64_t timestamp = clock_->NowMicros();
return WriteBlock(type, footer_key, footer_value, timestamp);
}

// This is the main function to restore the cache entries to secondary cache.
Expand Down Expand Up @@ -368,6 +390,151 @@ IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() {
}
}

// This is the main function to restore the cache entries to secondary cache.
// First, we check if all the arguments are valid. Then, we read the block
// sequentially from the reader and insert them to the secondary cache.
IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToCache() {
// TODO: remove this line when options are used in the loader
(void)options_;
// Step 1: we check if all the arguments are valid
if (!cache_) {
return IOStatus::InvalidArgument("Cache is null");
}
if (reader_ == nullptr) {
return IOStatus::InvalidArgument("CacheDumpReader is null");
}
// we copy the Cache Deleter Role Map as its member.
role_map_ = CopyCacheDeleterRoleMap();

clock_ = options_.clock;
deadline_ = options_.deadline;
// Step 2: read the header
// TODO: we need to check the cache dump format version and RocksDB version
// after the header is read out.
IOStatus io_s;
DumpUnit dump_unit;
std::string data;
io_s = ReadHeader(&data, &dump_unit);
if (!io_s.ok()) {
return io_s;
}

// Step 3: read out the rest of the blocks from the reader. The loop will stop
// either I/O status is not ok or we reach to the the end.
while (io_s.ok() && dump_unit.type != CacheDumpUnitType::kFooter) {
dump_unit.reset();
data.clear();
uint64_t timestamp = clock_->NowMicros();
if (deadline_.count()) {
std::chrono::microseconds now = std::chrono::microseconds(timestamp);
if (now >= deadline_) {
break;
}
}
if (loaded_size_bytes_ > options_.max_size_bytes) {
break;
}
// read the content and store in the dump_unit
io_s = ReadCacheBlock(&data, &dump_unit);
if (!io_s.ok()) {
break;
}
loaded_size_bytes_ += dump_unit.key.size();
loaded_size_bytes_ += dump_unit.value_len;
// Create the uncompressed_block based on the information in the dump_unit
// (There is no block trailer here compatible with block-based SST file.)
CacheAllocationPtr cache_ptr = AllocateBlock(dump_unit.value_len, nullptr);
std::copy_n((char*)dump_unit.value, dump_unit.value_len, cache_ptr.get());
BlockContents uncompressed_block(std::move(cache_ptr), dump_unit.value_len);
size_t charge = uncompressed_block.ApproximateMemoryUsage();
Cache::CacheItemHelper* helper = nullptr;
Statistics* statistics = nullptr;
Status s = Status::OK();
// according to the block type, get the helper callback function and create
// the corresponding block
switch (dump_unit.type) {
case CacheDumpUnitType::kFilter: {
helper = BlocklikeTraits<ParsedFullFilterBlock>::GetCacheItemHelper(
BlockType::kFilter);
std::unique_ptr<ParsedFullFilterBlock> block_holder;
block_holder.reset(BlocklikeTraits<ParsedFullFilterBlock>::Create(
std::move(uncompressed_block), toptions_.read_amp_bytes_per_bit,
statistics, false, toptions_.filter_policy.get()));
if (helper != nullptr) {
s = cache_->Insert(dump_unit.key,
(void*)(block_holder.get()), helper, charge);
if (s.ok()) {
block_holder.release();
}
}
break;
}
case CacheDumpUnitType::kData: {
helper = BlocklikeTraits<Block>::GetCacheItemHelper(BlockType::kData);
std::unique_ptr<Block> block_holder;
block_holder.reset(BlocklikeTraits<Block>::Create(
std::move(uncompressed_block), toptions_.read_amp_bytes_per_bit,
statistics, false, toptions_.filter_policy.get()));
if (helper != nullptr) {
s = cache_->Insert(dump_unit.key,
(void*)(block_holder.get()), helper, charge);
if (s.ok()) {
block_holder.release();
}
}
break;
}
case CacheDumpUnitType::kIndex: {
helper = BlocklikeTraits<Block>::GetCacheItemHelper(BlockType::kIndex);
std::unique_ptr<Block> block_holder;
block_holder.reset(BlocklikeTraits<Block>::Create(
std::move(uncompressed_block), 0, statistics, false,
toptions_.filter_policy.get()));
if (helper != nullptr) {
s = cache_->Insert(dump_unit.key,
(void*)(block_holder.get()), helper, charge);
if (s.ok()) {
block_holder.release();
}
}
break;
}
case CacheDumpUnitType::kFilterMetaBlock: {
helper = BlocklikeTraits<Block>::GetCacheItemHelper(
BlockType::kFilterPartitionIndex);
std::unique_ptr<Block> block_holder;
block_holder.reset(BlocklikeTraits<Block>::Create(
std::move(uncompressed_block), toptions_.read_amp_bytes_per_bit,
statistics, false, toptions_.filter_policy.get()));
if (helper != nullptr) {
s = cache_->Insert(dump_unit.key,
(void*)(block_holder.get()), helper, charge);
if (s.ok()) {
block_holder.release();
}
}
break;
}
case CacheDumpUnitType::kFooter:
break;
case CacheDumpUnitType::kDeprecatedFilterBlock:
// Obsolete
break;
default:
continue;
}
if (!s.ok()) {
io_s = status_to_io_status(std::move(s));
}
}
if (dump_unit.type == CacheDumpUnitType::kFooter) {
return IOStatus::OK();
} else {
return io_s;
}
}


// Read and copy the dump unit metadata to std::string data, decode and create
// the unit metadata based on the string
IOStatus CacheDumpedLoaderImpl::ReadDumpUnitMeta(std::string* data,
Expand Down
37 changes: 37 additions & 0 deletions utilities/cache_dump_load_impl.cc.rej
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
diff a/utilities/cache_dump_load_impl.cc b/utilities/cache_dump_load_impl.cc (rejected hunks)
@@ -6,6 +6,8 @@
#include "cache/cache_key.h"
#include "table/block_based/block_based_table_reader.h"

+#include <limits>
+
#include "cache/cache_entry_roles.h"
#include "file/writable_file_writer.h"
#include "port/lang.h"
@@ -117,6 +126,16 @@ CacheDumperImpl::DumpOneBlockCallBack(std::string& buf) {
return;
}

+ if (options_.max_size_bytes > 0 &&
+ dumped_size_bytes_ > options_.max_size_bytes) {
+ return;
+ }
+
+ uint64_t timestamp = clock_->NowMicros();
+ if (timestamp > deadline_ts_) {
+ return;
+ }
+
CacheEntryRole role = helper->role;
CacheDumpUnitType type = CacheDumpUnitType::kBlockTypeMax;

@@ -154,7 +173,8 @@ CacheDumperImpl::DumpOneBlockCallBack(std::string& buf) {

if (s.ok()) {
// Write it out
- WriteBlock(type, key, buf).PermitUncheckedError();
+ WriteBlock(type, key, buf, timestamp).PermitUncheckedError();
+ dumped_size_bytes_ += len;
}
};
}
21 changes: 17 additions & 4 deletions utilities/cache_dump_load_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,16 @@ class CacheDumperImpl : public CacheDumper {
CacheDumperImpl(const CacheDumpOptions& dump_options,
const std::shared_ptr<Cache>& cache,
std::unique_ptr<CacheDumpWriter>&& writer)
: options_(dump_options), cache_(cache), writer_(std::move(writer)) {}
: options_(dump_options), cache_(cache), writer_(std::move(writer)) {
dumped_size_bytes_ = 0;
}
~CacheDumperImpl() { writer_.reset(); }
Status SetDumpFilter(std::vector<DB*> db_list) override;
IOStatus DumpCacheEntriesToWriter() override;

private:
IOStatus WriteBlock(CacheDumpUnitType type, const Slice& key,
const Slice& value);
const Slice& value, uint64_t timestamp);
IOStatus WriteHeader();
IOStatus WriteFooter();
bool ShouldFilterOut(const Slice& key);
Expand All @@ -122,6 +124,10 @@ class CacheDumperImpl : public CacheDumper {
// improvement can be applied like BloomFilter or others to speedup the
// filtering.
std::set<std::string> prefix_filter_;
// Deadline for dumper in microseconds.
std::chrono::microseconds deadline_;
uint64_t dumped_size_bytes_;
bool dump_all_keys_ = true;
};

// The default implementation of CacheDumpedLoader
Expand All @@ -130,13 +136,16 @@ class CacheDumpedLoaderImpl : public CacheDumpedLoader {
CacheDumpedLoaderImpl(const CacheDumpOptions& dump_options,
const BlockBasedTableOptions& toptions,
const std::shared_ptr<SecondaryCache>& secondary_cache,
std::unique_ptr<CacheDumpReader>&& reader)
std::unique_ptr<CacheDumpReader>&& reader,
const std::shared_ptr<Cache>& cache)
: options_(dump_options),
toptions_(toptions),
secondary_cache_(secondary_cache),
reader_(std::move(reader)) {}
reader_(std::move(reader)),
cache_(cache) {}
~CacheDumpedLoaderImpl() {}
IOStatus RestoreCacheEntriesToSecondaryCache() override;
IOStatus RestoreCacheEntriesToCache() override;

private:
IOStatus ReadDumpUnitMeta(std::string* data, DumpUnitMeta* unit_meta);
Expand All @@ -149,6 +158,10 @@ class CacheDumpedLoaderImpl : public CacheDumpedLoader {
std::shared_ptr<SecondaryCache> secondary_cache_;
std::unique_ptr<CacheDumpReader> reader_;
UnorderedMap<Cache::DeleterFn, CacheEntryRole> role_map_;
std::shared_ptr<Cache> cache_;
SystemClock* clock_;
std::chrono::microseconds deadline_;
uint64_t loaded_size_bytes_ = 0;
};

// The default implementation of CacheDumpWriter. We write the blocks to a file
Expand Down

0 comments on commit 2d73f7f

Please sign in to comment.