Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load dumped cache to block cache #12571

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 24 additions & 20 deletions cache/lru_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2188,8 +2188,10 @@ TEST_P(DBSecondaryCacheTest, LRUCacheDumpLoadBasic) {

// we have a new cache it is empty, then, before we do the Get, we do the
// dumpload
std::shared_ptr<TestSecondaryCache> secondary_cache =
std::shared_ptr<SecondaryCache> secondary_cache =
std::make_shared<TestSecondaryCache>(2048 * 1024, true);
std::shared_ptr<TestSecondaryCache> test_secondary_cache =
std::static_pointer_cast<TestSecondaryCache>(secondary_cache);
// This time with secondary cache
base_cache = NewCache(1024 * 1024 /* capacity */, 0 /* num_shard_bits */,
false /* strict_capacity_limit */, secondary_cache);
Expand All @@ -2201,8 +2203,8 @@ TEST_P(DBSecondaryCacheTest, LRUCacheDumpLoadBasic) {
options.env = fault_env_.get();

// start to load the data to new block cache
start_insert = secondary_cache->num_inserts();
start_lookup = secondary_cache->num_lookups();
start_insert = test_secondary_cache->num_inserts();
start_lookup = test_secondary_cache->num_lookups();
std::unique_ptr<CacheDumpReader> dump_reader;
s = NewFromFileCacheDumpReader(fault_fs_, FileOptions(), dump_path,
&dump_reader);
Expand All @@ -2211,10 +2213,10 @@ TEST_P(DBSecondaryCacheTest, LRUCacheDumpLoadBasic) {
s = NewDefaultCacheDumpedLoader(cd_options, table_options, secondary_cache,
std::move(dump_reader), &cache_loader);
ASSERT_OK(s);
s = cache_loader->RestoreCacheEntriesToSecondaryCache();
s = cache_loader->RestoreCacheEntriesToCache();
ASSERT_OK(s);
uint32_t load_insert = secondary_cache->num_inserts() - start_insert;
uint32_t load_lookup = secondary_cache->num_lookups() - start_lookup;
uint32_t load_insert = test_secondary_cache->num_inserts() - start_insert;
uint32_t load_lookup = test_secondary_cache->num_lookups() - start_lookup;
// check the number we inserted
ASSERT_EQ(64, static_cast<int>(load_insert));
ASSERT_EQ(0, static_cast<int>(load_lookup));
Expand All @@ -2223,16 +2225,16 @@ TEST_P(DBSecondaryCacheTest, LRUCacheDumpLoadBasic) {
Reopen(options);

// After load, we do the Get again
start_insert = secondary_cache->num_inserts();
start_lookup = secondary_cache->num_lookups();
start_insert = test_secondary_cache->num_inserts();
start_lookup = test_secondary_cache->num_lookups();
uint32_t cache_insert = cache->GetInsertCount();
uint32_t cache_lookup = cache->GetLookupcount();
for (int i = 0; i < N; i++) {
v = Get(Key(i));
ASSERT_EQ(v, value[i]);
}
uint32_t final_insert = secondary_cache->num_inserts() - start_insert;
uint32_t final_lookup = secondary_cache->num_lookups() - start_lookup;
uint32_t final_insert = test_secondary_cache->num_inserts() - start_insert;
uint32_t final_lookup = test_secondary_cache->num_lookups() - start_lookup;
// no insert to secondary cache
ASSERT_EQ(0, static_cast<int>(final_insert));
// lookup the secondary to get all blocks
Expand Down Expand Up @@ -2344,8 +2346,10 @@ TEST_P(DBSecondaryCacheTest, LRUCacheDumpLoadWithFilter) {

// we have a new cache it is empty, then, before we do the Get, we do the
// dumpload
std::shared_ptr<TestSecondaryCache> secondary_cache =
std::shared_ptr<SecondaryCache> secondary_cache =
std::make_shared<TestSecondaryCache>(2048 * 1024, true);
std::shared_ptr<TestSecondaryCache> test_secondary_cache =
std::static_pointer_cast<TestSecondaryCache>(secondary_cache);
// This time with secondary_cache
base_cache = NewCache(1024 * 1024 /* capacity */, 0 /* num_shard_bits */,
false /* strict_capacity_limit */, secondary_cache);
Expand All @@ -2357,8 +2361,8 @@ TEST_P(DBSecondaryCacheTest, LRUCacheDumpLoadWithFilter) {
options.env = fault_env_.get();

// Start the cache loading process
start_insert = secondary_cache->num_inserts();
start_lookup = secondary_cache->num_lookups();
start_insert = test_secondary_cache->num_inserts();
start_lookup = test_secondary_cache->num_lookups();
std::unique_ptr<CacheDumpReader> dump_reader;
s = NewFromFileCacheDumpReader(fault_fs_, FileOptions(), dump_path,
&dump_reader);
Expand All @@ -2367,10 +2371,10 @@ TEST_P(DBSecondaryCacheTest, LRUCacheDumpLoadWithFilter) {
s = NewDefaultCacheDumpedLoader(cd_options, table_options, secondary_cache,
std::move(dump_reader), &cache_loader);
ASSERT_OK(s);
s = cache_loader->RestoreCacheEntriesToSecondaryCache();
s = cache_loader->RestoreCacheEntriesToCache();
ASSERT_OK(s);
uint32_t load_insert = secondary_cache->num_inserts() - start_insert;
uint32_t load_lookup = secondary_cache->num_lookups() - start_lookup;
uint32_t load_insert = test_secondary_cache->num_inserts() - start_insert;
uint32_t load_lookup = test_secondary_cache->num_lookups() - start_lookup;
// check the number we inserted
ASSERT_EQ(64, static_cast<int>(load_insert));
ASSERT_EQ(0, static_cast<int>(load_lookup));
Expand All @@ -2384,16 +2388,16 @@ TEST_P(DBSecondaryCacheTest, LRUCacheDumpLoadWithFilter) {
// I/O, so we set the file system to false.
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
fault_fs_->SetFilesystemActive(false, error_msg);
start_insert = secondary_cache->num_inserts();
start_lookup = secondary_cache->num_lookups();
start_insert = test_secondary_cache->num_inserts();
start_lookup = test_secondary_cache->num_lookups();
uint32_t cache_insert = cache->GetInsertCount();
uint32_t cache_lookup = cache->GetLookupcount();
for (int i = 0; i < N; i++) {
ASSERT_OK(db1->Get(ro, Key(i), &v));
ASSERT_EQ(v, value1[i]);
}
uint32_t final_insert = secondary_cache->num_inserts() - start_insert;
uint32_t final_lookup = secondary_cache->num_lookups() - start_lookup;
uint32_t final_insert = test_secondary_cache->num_inserts() - start_insert;
uint32_t final_lookup = test_secondary_cache->num_lookups() - start_lookup;
// no insert to secondary cache
ASSERT_EQ(0, static_cast<int>(final_insert));
// lookup the secondary to get all blocks
Expand Down
8 changes: 4 additions & 4 deletions include/rocksdb/utilities/cache_dump_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ class CacheDumper {
class CacheDumpedLoader {
public:
virtual ~CacheDumpedLoader() = default;
virtual IOStatus RestoreCacheEntriesToSecondaryCache() {
virtual IOStatus RestoreCacheEntriesToCache() {
return IOStatus::NotSupported(
"RestoreCacheEntriesToSecondaryCache is not supported");
"RestoreCacheEntriesToCache is not supported");
}
};

Expand All @@ -134,10 +134,10 @@ Status NewDefaultCacheDumper(const CacheDumpOptions& dump_options,
std::unique_ptr<CacheDumper>* cache_dumper);

// Get the default cache dump loader
template <typename T>
Status NewDefaultCacheDumpedLoader(
const CacheDumpOptions& dump_options,
const BlockBasedTableOptions& toptions,
const std::shared_ptr<SecondaryCache>& secondary_cache,
const BlockBasedTableOptions& toptions, const std::shared_ptr<T>& cache,
std::unique_ptr<CacheDumpReader>&& reader,
std::unique_ptr<CacheDumpedLoader>* cache_dump_loader);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Load dumped cache to block cache
18 changes: 15 additions & 3 deletions utilities/cache_dump_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,26 @@ Status NewDefaultCacheDumper(const CacheDumpOptions& dump_options,
return Status::OK();
}

template <>
Status NewDefaultCacheDumpedLoader(
const CacheDumpOptions& dump_options,
const BlockBasedTableOptions& toptions,
const std::shared_ptr<SecondaryCache>& secondary_cache,
const std::shared_ptr<SecondaryCache>& cache,
std::unique_ptr<CacheDumpReader>&& reader,
std::unique_ptr<CacheDumpedLoader>* cache_dump_loader) {
cache_dump_loader->reset(new CacheDumpedLoaderImpl(
dump_options, toptions, secondary_cache, std::move(reader)));
cache_dump_loader->reset(new CacheDumpedLoaderSecondaryCacheImpl(
dump_options, toptions, cache, std::move(reader)));
return Status::OK();
}

template <>
Status NewDefaultCacheDumpedLoader(
const CacheDumpOptions& dump_options,
const BlockBasedTableOptions& toptions, const std::shared_ptr<Cache>& cache,
std::unique_ptr<CacheDumpReader>&& reader,
std::unique_ptr<CacheDumpedLoader>* cache_dump_loader) {
cache_dump_loader->reset(new CacheDumpedLoaderBlockCacheImpl(
dump_options, toptions, cache, std::move(reader)));
return Status::OK();
}

Expand Down
147 changes: 130 additions & 17 deletions utilities/cache_dump_load_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,16 +253,25 @@ IOStatus CacheDumperImpl::WriteFooter() {
// This is the main function to restore the cache entries to secondary cache.
// First, we check if all the arguments are valid. Then, we read the block
// sequentially from the reader and insert them to the secondary cache.
IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() {
IOStatus CacheDumpedLoaderImplBase::RestoreCacheEntriesToCache() {
// TODO: remove this line when options are used in the loader
(void)options_;
// Step 1: we check if all the arguments are valid
if (secondary_cache_ == nullptr) {
return IOStatus::InvalidArgument("Secondary Cache is null");
Status s = Check();
if (!s.ok()) {
return status_to_io_status(std::move(s));
}
if (reader_ == nullptr) {
return IOStatus::InvalidArgument("CacheDumpReader is null");
}
// Set the system clock
if (options_.clock == nullptr) {
return IOStatus::InvalidArgument("System clock is null");
}

clock_ = options_.clock;

deadline_ = options_.deadline;

// Step 2: read the header
// TODO: we need to check the cache dump format version and RocksDB version
Expand All @@ -280,6 +289,20 @@ IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() {
while (io_s.ok()) {
dump_unit.reset();
data.clear();

if (options_.max_size_bytes > 0 &&
loaded_size_bytes_ > options_.max_size_bytes) {
return IOStatus::OK();
}

uint64_t timestamp = clock_->NowMicros();
if (deadline_.count()) {
std::chrono::microseconds now = std::chrono::microseconds(timestamp);
if (now >= deadline_) {
return IOStatus::OK();
}
}

// read the content and store in the dump_unit
io_s = ReadCacheBlock(&data, &dump_unit);
if (!io_s.ok()) {
Expand All @@ -288,14 +311,10 @@ IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() {
if (dump_unit.type == CacheDumpUnitType::kFooter) {
break;
}
loaded_size_bytes_ += dump_unit.value_len;
// Create the uncompressed_block based on the information in the dump_unit
// (There is no block trailer here compatible with block-based SST file.)
Slice content =
Slice(static_cast<char*>(dump_unit.value), dump_unit.value_len);
Status s = secondary_cache_->InsertSaved(dump_unit.key, content);
if (!s.ok()) {
io_s = status_to_io_status(std::move(s));
}
io_s = InsertDumpUnitToCache(dump_unit);
}
if (dump_unit.type == CacheDumpUnitType::kFooter) {
return IOStatus::OK();
Expand All @@ -306,8 +325,8 @@ IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() {

// Read and copy the dump unit metadata to std::string data, decode and create
// the unit metadata based on the string
IOStatus CacheDumpedLoaderImpl::ReadDumpUnitMeta(std::string* data,
DumpUnitMeta* unit_meta) {
IOStatus CacheDumpedLoaderImplBase::ReadDumpUnitMeta(std::string* data,
DumpUnitMeta* unit_meta) {
assert(reader_ != nullptr);
assert(data != nullptr);
assert(unit_meta != nullptr);
Expand All @@ -321,8 +340,8 @@ IOStatus CacheDumpedLoaderImpl::ReadDumpUnitMeta(std::string* data,

// Read and copy the dump unit to std::string data, decode and create the unit
// based on the string
IOStatus CacheDumpedLoaderImpl::ReadDumpUnit(size_t len, std::string* data,
DumpUnit* unit) {
IOStatus CacheDumpedLoaderImplBase::ReadDumpUnit(size_t len, std::string* data,
DumpUnit* unit) {
assert(reader_ != nullptr);
assert(data != nullptr);
assert(unit != nullptr);
Expand All @@ -339,8 +358,8 @@ IOStatus CacheDumpedLoaderImpl::ReadDumpUnit(size_t len, std::string* data,
}

// Read the header
IOStatus CacheDumpedLoaderImpl::ReadHeader(std::string* data,
DumpUnit* dump_unit) {
IOStatus CacheDumpedLoaderImplBase::ReadHeader(std::string* data,
DumpUnit* dump_unit) {
DumpUnitMeta header_meta;
header_meta.reset();
std::string meta_string;
Expand All @@ -361,8 +380,8 @@ IOStatus CacheDumpedLoaderImpl::ReadHeader(std::string* data,
}

// Read the blocks after header is read out
IOStatus CacheDumpedLoaderImpl::ReadCacheBlock(std::string* data,
DumpUnit* dump_unit) {
IOStatus CacheDumpedLoaderImplBase::ReadCacheBlock(std::string* data,
DumpUnit* dump_unit) {
// According to the write process, we read the dump_unit_metadata first
DumpUnitMeta unit_meta;
unit_meta.reset();
Expand All @@ -386,4 +405,98 @@ IOStatus CacheDumpedLoaderImpl::ReadCacheBlock(std::string* data,
return io_s;
}

IOStatus CacheDumpedLoaderSecondaryCacheImpl::InsertDumpUnitToCache(
const DumpUnit& dump_unit) {
Slice content =
Slice(static_cast<char*>(dump_unit.value), dump_unit.value_len);
Status s = secondary_cache_->InsertSaved(dump_unit.key, content);
return status_to_io_status(std::move(s));
}

Status CacheDumpedLoaderSecondaryCacheImpl::Check() {
if (secondary_cache_ == nullptr) {
return Status::InvalidArgument("Secondary Cache is null");
}
return Status::OK();
}

IOStatus CacheDumpedLoaderBlockCacheImpl::InsertDumpUnitToCache(
const DumpUnit& dump_unit) {
Statistics* statistics = nullptr;
Slice data(static_cast<char*>(dump_unit.value), dump_unit.value_len);
BlockContents block =
BlockContents(AllocateAndCopyBlock(data, nullptr), data.size());
Status s = Status::OK();

switch (dump_unit.type) {
case CacheDumpUnitType::kData: {
const Cache::CacheItemHelper* helper =
GetCacheItemHelper(BlockType::kData, CacheTier::kVolatileTier);
std::unique_ptr<Block_kData> block_holder;
block_holder.reset(new Block_kData(
std::move(block), toptions_.read_amp_bytes_per_bit, statistics));
size_t charge = block_holder->ApproximateMemoryUsage();
s = block_cache_->Insert(dump_unit.key, block_holder.get(), helper,
charge);
if (s.ok()) {
block_holder.release();
}
break;
}
case CacheDumpUnitType::kIndex: {
const Cache::CacheItemHelper* helper =
GetCacheItemHelper(BlockType::kIndex, CacheTier::kVolatileTier);
std::unique_ptr<Block_kIndex> block_holder;
block_holder.reset(new Block_kIndex(
std::move(block), toptions_.read_amp_bytes_per_bit, statistics));
size_t charge = block_holder->ApproximateMemoryUsage();
s = block_cache_->Insert(dump_unit.key, block_holder.get(), helper,
charge);
if (s.ok()) {
block_holder.release();
}
break;
}
case CacheDumpUnitType::kFilter: {
const Cache::CacheItemHelper* helper =
GetCacheItemHelper(BlockType::kFilter, CacheTier::kVolatileTier);
std::unique_ptr<ParsedFullFilterBlock> block_holder;
block_holder.reset(new ParsedFullFilterBlock(
toptions_.filter_policy.get(), std::move(block)));
size_t charge = block_holder->ApproximateMemoryUsage();
s = block_cache_->Insert(dump_unit.key, block_holder.get(), helper,
charge);
if (s.ok()) {
block_holder.release();
}
break;
}
case CacheDumpUnitType::kFilterMetaBlock: {
const Cache::CacheItemHelper* helper = GetCacheItemHelper(
BlockType::kFilterPartitionIndex, CacheTier::kVolatileTier);
std::unique_ptr<Block_kFilterPartitionIndex> block_holder;
block_holder.reset(new Block_kFilterPartitionIndex(
std::move(block), toptions_.read_amp_bytes_per_bit, statistics));
size_t charge = block_holder->ApproximateMemoryUsage();
s = block_cache_->Insert(dump_unit.key, block_holder.get(), helper,
charge);
if (s.ok()) {
block_holder.release();
}
break;
}

default:
break;
}
return status_to_io_status(std::move(s));
}

Status CacheDumpedLoaderBlockCacheImpl::Check() {
if (block_cache_ == nullptr) {
return Status::InvalidArgument("Block Cache is null");
}
return Status::OK();
}

} // namespace ROCKSDB_NAMESPACE