Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update load and dump impl #5

Open
wants to merge 1 commit into
base: 7.8.3.bili
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 10 additions & 1 deletion include/rocksdb/utilities/cache_dump_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ class CacheDumpReader {
// dump or load process related control variables can be added here.
struct CacheDumpOptions {
SystemClock* clock;
// Deadline for dumper or loader in microseconds
std::chrono::microseconds deadline = std::chrono::microseconds::zero();
// Max size bytes for dumper or loader
uint64_t max_size_bytes = 0;
};

// NOTE that: this class is EXPERIMENTAL! May be changed in the future!
Expand Down Expand Up @@ -110,6 +114,10 @@ class CacheDumpedLoader {
return IOStatus::NotSupported(
"RestoreCacheEntriesToSecondaryCache is not supported");
}
virtual IOStatus RestoreCacheEntriesToCache() {
return IOStatus::NotSupported(
"RestoreCacheEntriesToCache is not supported");
}
};

// Get the writer which stores all the metadata and data sequentially to a file
Expand All @@ -136,7 +144,8 @@ Status NewDefaultCacheDumpedLoader(
const BlockBasedTableOptions& toptions,
const std::shared_ptr<SecondaryCache>& secondary_cache,
std::unique_ptr<CacheDumpReader>&& reader,
std::unique_ptr<CacheDumpedLoader>* cache_dump_loader);
std::unique_ptr<CacheDumpedLoader>* cache_dump_loader,
const std::shared_ptr<Cache>& cache);

} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE
5 changes: 3 additions & 2 deletions utilities/cache_dump_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ Status NewDefaultCacheDumpedLoader(
const BlockBasedTableOptions& toptions,
const std::shared_ptr<SecondaryCache>& secondary_cache,
std::unique_ptr<CacheDumpReader>&& reader,
std::unique_ptr<CacheDumpedLoader>* cache_dump_loader) {
std::unique_ptr<CacheDumpedLoader>* cache_dump_loader,
const std::shared_ptr<Cache>& cache) {
cache_dump_loader->reset(new CacheDumpedLoaderImpl(
dump_options, toptions, secondary_cache, std::move(reader)));
dump_options, toptions, secondary_cache, std::move(reader), cache));
return Status::OK();
}

Expand Down
178 changes: 172 additions & 6 deletions utilities/cache_dump_load_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include "table/block_based/block_based_table_reader.h"
#ifndef ROCKSDB_LITE

#include <algorithm>
#include <limits>

#include "utilities/cache_dump_load_impl.h"

#include "cache/cache_entry_roles.h"
Expand All @@ -17,6 +20,7 @@
#include "rocksdb/utilities/ldb_cmd.h"
#include "table/format.h"
#include "util/crc32c.h"
#include "memory/memory_allocator.h"

namespace ROCKSDB_NAMESPACE {

Expand All @@ -26,6 +30,7 @@ namespace ROCKSDB_NAMESPACE {
// requirement.
Status CacheDumperImpl::SetDumpFilter(std::vector<DB*> db_list) {
Status s = Status::OK();
dump_all_keys_ = false;
for (size_t i = 0; i < db_list.size(); i++) {
assert(i < db_list.size());
TablePropertiesCollection ptc;
Expand Down Expand Up @@ -68,6 +73,7 @@ IOStatus CacheDumperImpl::DumpCacheEntriesToWriter() {
return IOStatus::InvalidArgument("System clock is null");
}
clock_ = options_.clock;
deadline_ = options_.deadline;
// We copy the Cache Deleter Role Map as its member.
role_map_ = CopyCacheDeleterRoleMap();
// Set the sequence number
Expand Down Expand Up @@ -112,6 +118,19 @@ CacheDumperImpl::DumpOneBlockCallBack() {
return [&](const Slice& key, void* value, size_t /*charge*/,
Cache::DeleterFn deleter) {
// Step 1: get the type of the block from role_map_
if (options_.max_size_bytes > 0 &&
dumped_size_bytes_ > options_.max_size_bytes) {
return;
}

uint64_t timestamp = clock_->NowMicros();
if (deadline_.count()) {
std::chrono::microseconds now = std::chrono::microseconds(timestamp);
if (now >= deadline_) {
return;
}
}

auto e = role_map_.find(deleter);
CacheEntryRole role;
CacheDumpUnitType type = CacheDumpUnitType::kBlockTypeMax;
Expand All @@ -123,7 +142,7 @@ CacheDumperImpl::DumpOneBlockCallBack() {
bool filter_out = false;

// Step 2: based on the key prefix, check if the block should be filter out.
if (ShouldFilterOut(key)) {
if (!dump_all_keys_ && ShouldFilterOut(key)) {
filter_out = true;
}

Expand Down Expand Up @@ -175,8 +194,9 @@ CacheDumperImpl::DumpOneBlockCallBack() {
// Step 4: if the block should not be filter out, write the block to the
// CacheDumpWriter
if (!filter_out && block_start != nullptr) {
WriteBlock(type, key, Slice(block_start, block_len))
WriteBlock(type, key, Slice(block_start, block_len), timestamp)
.PermitUncheckedError();
dumped_size_bytes_ += block_len;
}
};
}
Expand All @@ -190,8 +210,7 @@ CacheDumperImpl::DumpOneBlockCallBack() {
// First, we write the metadata first, which is a fixed size string. Then, we
// Append the dump unit string to the writer.
IOStatus CacheDumperImpl::WriteBlock(CacheDumpUnitType type, const Slice& key,
const Slice& value) {
uint64_t timestamp = clock_->NowMicros();
const Slice& value, uint64_t timestamp) {
uint32_t value_checksum = crc32c::Value(value.data(), value.size());

// First, serialize the block information in a string
Expand Down Expand Up @@ -241,15 +260,17 @@ IOStatus CacheDumperImpl::WriteHeader() {
"block_size, block_data, block_checksum> cache_value\n";
std::string header_value(s.str());
CacheDumpUnitType type = CacheDumpUnitType::kHeader;
return WriteBlock(type, header_key, header_value);
uint64_t timestamp = clock_->NowMicros();
return WriteBlock(type, header_key, header_value, timestamp);
}

// Write the footer after all the blocks are stored to indicate the ending.
IOStatus CacheDumperImpl::WriteFooter() {
std::string footer_key = "footer";
std::string footer_value("cache dump completed");
CacheDumpUnitType type = CacheDumpUnitType::kFooter;
return WriteBlock(type, footer_key, footer_value);
uint64_t timestamp = clock_->NowMicros();
return WriteBlock(type, footer_key, footer_value, timestamp);
}

// This is the main function to restore the cache entries to secondary cache.
Expand Down Expand Up @@ -368,6 +389,151 @@ IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() {
}
}

// This is the main function to restore the cache entries to secondary cache.
// First, we check if all the arguments are valid. Then, we read the block
// sequentially from the reader and insert them to the secondary cache.
IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToCache() {
// TODO: remove this line when options are used in the loader
(void)options_;
// Step 1: we check if all the arguments are valid
if (!cache_) {
return IOStatus::InvalidArgument("Cache is null");
}
if (reader_ == nullptr) {
return IOStatus::InvalidArgument("CacheDumpReader is null");
}
// we copy the Cache Deleter Role Map as its member.
role_map_ = CopyCacheDeleterRoleMap();

clock_ = options_.clock;
deadline_ = options_.deadline;
// Step 2: read the header
// TODO: we need to check the cache dump format version and RocksDB version
// after the header is read out.
IOStatus io_s;
DumpUnit dump_unit;
std::string data;
io_s = ReadHeader(&data, &dump_unit);
if (!io_s.ok()) {
return io_s;
}

// Step 3: read out the rest of the blocks from the reader. The loop will stop
// either I/O status is not ok or we reach to the the end.
while (io_s.ok() && dump_unit.type != CacheDumpUnitType::kFooter) {
dump_unit.reset();
data.clear();
uint64_t timestamp = clock_->NowMicros();
if (deadline_.count()) {
std::chrono::microseconds now = std::chrono::microseconds(timestamp);
if (now >= deadline_) {
break;
}
}
if (loaded_size_bytes_ > options_.max_size_bytes) {
break;
}
// read the content and store in the dump_unit
io_s = ReadCacheBlock(&data, &dump_unit);
if (!io_s.ok()) {
break;
}
loaded_size_bytes_ += dump_unit.key.size();
loaded_size_bytes_ += dump_unit.value_len;
// Create the uncompressed_block based on the information in the dump_unit
// (There is no block trailer here compatible with block-based SST file.)
CacheAllocationPtr cache_ptr = AllocateBlock(dump_unit.value_len, nullptr);
std::copy_n((char*)dump_unit.value, dump_unit.value_len, cache_ptr.get());
BlockContents uncompressed_block(std::move(cache_ptr), dump_unit.value_len);
size_t charge = uncompressed_block.ApproximateMemoryUsage();
Cache::CacheItemHelper* helper = nullptr;
Statistics* statistics = nullptr;
Status s = Status::OK();
// according to the block type, get the helper callback function and create
// the corresponding block
switch (dump_unit.type) {
case CacheDumpUnitType::kFilter: {
helper = BlocklikeTraits<ParsedFullFilterBlock>::GetCacheItemHelper(
BlockType::kFilter);
std::unique_ptr<ParsedFullFilterBlock> block_holder;
block_holder.reset(BlocklikeTraits<ParsedFullFilterBlock>::Create(
std::move(uncompressed_block), toptions_.read_amp_bytes_per_bit,
statistics, false, toptions_.filter_policy.get()));
if (helper != nullptr) {
s = cache_->Insert(dump_unit.key,
(void*)(block_holder.get()), helper, charge);
if (s.ok()) {
block_holder.release();
}
}
break;
}
case CacheDumpUnitType::kData: {
helper = BlocklikeTraits<Block>::GetCacheItemHelper(BlockType::kData);
std::unique_ptr<Block> block_holder;
block_holder.reset(BlocklikeTraits<Block>::Create(
std::move(uncompressed_block), toptions_.read_amp_bytes_per_bit,
statistics, false, toptions_.filter_policy.get()));
if (helper != nullptr) {
s = cache_->Insert(dump_unit.key,
(void*)(block_holder.get()), helper, charge);
if (s.ok()) {
block_holder.release();
}
}
break;
}
case CacheDumpUnitType::kIndex: {
helper = BlocklikeTraits<Block>::GetCacheItemHelper(BlockType::kIndex);
std::unique_ptr<Block> block_holder;
block_holder.reset(BlocklikeTraits<Block>::Create(
std::move(uncompressed_block), 0, statistics, false,
toptions_.filter_policy.get()));
if (helper != nullptr) {
s = cache_->Insert(dump_unit.key,
(void*)(block_holder.get()), helper, charge);
if (s.ok()) {
block_holder.release();
}
}
break;
}
case CacheDumpUnitType::kFilterMetaBlock: {
helper = BlocklikeTraits<Block>::GetCacheItemHelper(
BlockType::kFilterPartitionIndex);
std::unique_ptr<Block> block_holder;
block_holder.reset(BlocklikeTraits<Block>::Create(
std::move(uncompressed_block), toptions_.read_amp_bytes_per_bit,
statistics, false, toptions_.filter_policy.get()));
if (helper != nullptr) {
s = cache_->Insert(dump_unit.key,
(void*)(block_holder.get()), helper, charge);
if (s.ok()) {
block_holder.release();
}
}
break;
}
case CacheDumpUnitType::kFooter:
break;
case CacheDumpUnitType::kDeprecatedFilterBlock:
// Obsolete
break;
default:
continue;
}
if (!s.ok()) {
io_s = status_to_io_status(std::move(s));
}
}
if (dump_unit.type == CacheDumpUnitType::kFooter) {
return IOStatus::OK();
} else {
return io_s;
}
}


// Read and copy the dump unit metadata to std::string data, decode and create
// the unit metadata based on the string
IOStatus CacheDumpedLoaderImpl::ReadDumpUnitMeta(std::string* data,
Expand Down
21 changes: 17 additions & 4 deletions utilities/cache_dump_load_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,16 @@ class CacheDumperImpl : public CacheDumper {
CacheDumperImpl(const CacheDumpOptions& dump_options,
const std::shared_ptr<Cache>& cache,
std::unique_ptr<CacheDumpWriter>&& writer)
: options_(dump_options), cache_(cache), writer_(std::move(writer)) {}
: options_(dump_options), cache_(cache), writer_(std::move(writer)) {
dumped_size_bytes_ = 0;
}
~CacheDumperImpl() { writer_.reset(); }
Status SetDumpFilter(std::vector<DB*> db_list) override;
IOStatus DumpCacheEntriesToWriter() override;

private:
IOStatus WriteBlock(CacheDumpUnitType type, const Slice& key,
const Slice& value);
const Slice& value, uint64_t timestamp);
IOStatus WriteHeader();
IOStatus WriteFooter();
bool ShouldFilterOut(const Slice& key);
Expand All @@ -122,6 +124,10 @@ class CacheDumperImpl : public CacheDumper {
// improvement can be applied like BloomFilter or others to speedup the
// filtering.
std::set<std::string> prefix_filter_;
// Deadline for dumper in microseconds.
std::chrono::microseconds deadline_;
uint64_t dumped_size_bytes_;
bool dump_all_keys_ = true;
};

// The default implementation of CacheDumpedLoader
Expand All @@ -130,13 +136,16 @@ class CacheDumpedLoaderImpl : public CacheDumpedLoader {
CacheDumpedLoaderImpl(const CacheDumpOptions& dump_options,
const BlockBasedTableOptions& toptions,
const std::shared_ptr<SecondaryCache>& secondary_cache,
std::unique_ptr<CacheDumpReader>&& reader)
std::unique_ptr<CacheDumpReader>&& reader,
const std::shared_ptr<Cache>& cache)
: options_(dump_options),
toptions_(toptions),
secondary_cache_(secondary_cache),
reader_(std::move(reader)) {}
reader_(std::move(reader)),
cache_(cache) {}
~CacheDumpedLoaderImpl() {}
IOStatus RestoreCacheEntriesToSecondaryCache() override;
IOStatus RestoreCacheEntriesToCache() override;

private:
IOStatus ReadDumpUnitMeta(std::string* data, DumpUnitMeta* unit_meta);
Expand All @@ -149,6 +158,10 @@ class CacheDumpedLoaderImpl : public CacheDumpedLoader {
std::shared_ptr<SecondaryCache> secondary_cache_;
std::unique_ptr<CacheDumpReader> reader_;
UnorderedMap<Cache::DeleterFn, CacheEntryRole> role_map_;
std::shared_ptr<Cache> cache_;
SystemClock* clock_;
std::chrono::microseconds deadline_;
uint64_t loaded_size_bytes_ = 0;
};

// The default implementation of CacheDumpWriter. We write the blocks to a file
Expand Down