Skip to content

Commit

Permalink
Preserve TimedPut on penultimate level until it actually expires (#12543
Browse files Browse the repository at this point in the history
)

Summary:
To make sure `TimedPut` are placed on proper tier before and when it becomes eligible for cold tier
1) flush and compaction need to keep relevant seqno to time mapping for not just the sequence number contained in internal keys, but also preferred sequence number for `TimedPut` entries.

This PR also fix some bugs in for handling `TimedPut` during compaction:
1) dealing with an edge case when a `TimedPut` entry's internal key is the right bound for penultimate level, the internal key after swapping in its preferred sequence number will fall outside of the penultimate range because preferred sequence number is smaller than its original sequence number. The entry however is still safe to be placed on penultimate level, so we keep track of `TimedPut` entry's original sequence number for this check. The idea behind this is that as long as it's safe for the original key to be placed on penultimate level, it's safe for the entry with swapped preferred sequence number to be placed on penultimate level too. Because we only swap in preferred sequence number when that entry is visible to the earliest snapshot and there is no other data points with the same user key in lower levels. On the other hand, as long as it's not safe for the original key to be placed on penultimate level, we will not place the entry after swapping the preferred seqno on penultimate level either.

2) the assertion that preferred seqno is always bigger than original sequence number may fail if this logic is only exercised after sequence number is zeroed out. We adjust the assertion to handle that case too. In this case, we don't swap in the preferred seqno but will adjust the its type to `kTypeValue`.

3) there was a special case handling for when range deletion may end up incorrectly covering an entry if preferred seqno is swapped in. But it missed the case that if the original entry is already covered by range deletion. The original handling will mistakenly output the entry instead of omitting it.

Pull Request resolved: #12543

Test Plan:
./tiered_compaction_test --gtest_filter="PrecludeLastLevelTest.PreserveTimedPutOnPenultimateLevel"
./compaction_iterator_test --gtest_filter="*TimedPut*"

Reviewed By: pdillinger

Differential Revision: D56195096

Pulled By: jowlyzhang

fbshipit-source-id: 37ebb09d2513abbd9e90cda0217e26874584b8f3
  • Loading branch information
jowlyzhang authored and facebook-github-bot committed Apr 30, 2024
1 parent 45c1051 commit 2c02a9b
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 22 deletions.
10 changes: 7 additions & 3 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ Status BuildTable(
const bool logical_strip_timestamp =
ts_sz > 0 && !ioptions.persist_user_defined_timestamps;

SequenceNumber smallest_preferred_seqno = kMaxSequenceNumber;
std::string key_after_flush_buf;
std::string value_buf;
c_iter.SeekToFirst();
Expand Down Expand Up @@ -242,6 +243,8 @@ Status BuildTable(
if (preferred_seqno < ikey.sequence) {
value_after_flush =
PackValueAndSeqno(unpacked_value, preferred_seqno, &value_buf);
smallest_preferred_seqno =
std::min(smallest_preferred_seqno, preferred_seqno);
} else {
// Cannot get a useful preferred seqno, convert it to a kTypeValue.
UpdateInternalKey(&key_after_flush_buf, ikey.sequence, kTypeValue);
Expand Down Expand Up @@ -326,9 +329,10 @@ Status BuildTable(
} else {
SeqnoToTimeMapping relevant_mapping;
if (seqno_to_time_mapping) {
relevant_mapping.CopyFromSeqnoRange(*seqno_to_time_mapping,
meta->fd.smallest_seqno,
meta->fd.largest_seqno);
relevant_mapping.CopyFromSeqnoRange(
*seqno_to_time_mapping,
std::min(meta->fd.smallest_seqno, smallest_preferred_seqno),
meta->fd.largest_seqno);
relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST);
relevant_mapping.Enforce(tboptions.file_creation_time);
}
Expand Down
58 changes: 40 additions & 18 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -997,25 +997,37 @@ void CompactionIterator::NextFromInput() {
// A special case involving range deletion is handled separately below.
auto [unpacked_value, preferred_seqno] =
ParsePackedValueWithSeqno(value_);
assert(preferred_seqno < ikey_.sequence);
InternalKey ikey_after_swap(ikey_.user_key, preferred_seqno, kTypeValue);
Slice ikey_after_swap_slice(*ikey_after_swap.rep());
assert(preferred_seqno < ikey_.sequence || ikey_.sequence == 0);
if (range_del_agg_->ShouldDelete(
ikey_after_swap_slice,
RangeDelPositioningMode::kForwardTraversal)) {
// A range tombstone that doesn't cover this kTypeValuePreferredSeqno
// entry may end up covering the entry, so it's not safe to swap
// preferred sequence number. In this case, we output the entry as is.
validity_info_.SetValid(ValidContext::kNewUserKey);
key_, RangeDelPositioningMode::kForwardTraversal)) {
++iter_stats_.num_record_drop_hidden;
++iter_stats_.num_record_drop_range_del;
AdvanceInputIter();
} else {
iter_stats_.num_timed_put_swap_preferred_seqno++;
ikey_.sequence = preferred_seqno;
ikey_.type = kTypeValue;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetInternalKey();
ikey_.user_key = current_key_.GetUserKey();
value_ = unpacked_value;
validity_info_.SetValid(ValidContext::kSwapPreferredSeqno);
InternalKey ikey_after_swap(ikey_.user_key,
std::min(preferred_seqno, ikey_.sequence),
kTypeValue);
Slice ikey_after_swap_slice(*ikey_after_swap.rep());
if (range_del_agg_->ShouldDelete(
ikey_after_swap_slice,
RangeDelPositioningMode::kForwardTraversal)) {
// A range tombstone that doesn't cover this kTypeValuePreferredSeqno
// entry will end up covering the entry, so it's not safe to swap
// preferred sequence number. In this case, we output the entry as is.
validity_info_.SetValid(ValidContext::kNewUserKey);
} else {
if (ikey_.sequence != 0) {
iter_stats_.num_timed_put_swap_preferred_seqno++;
saved_seq_for_penul_check_ = ikey_.sequence;
ikey_.sequence = preferred_seqno;
}
ikey_.type = kTypeValue;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetInternalKey();
ikey_.user_key = current_key_.GetUserKey();
value_ = unpacked_value;
validity_info_.SetValid(ValidContext::kSwapPreferredSeqno);
}
}
} else if (ikey_.type == kTypeMerge) {
if (!merge_helper_->HasOperator()) {
Expand Down Expand Up @@ -1275,8 +1287,18 @@ void CompactionIterator::DecideOutputLevel() {
// considered unsafe, because the key may conflict with higher-level SSTs
// not from this compaction.
// TODO: add statistic for declined output_to_penultimate_level
SequenceNumber seq_for_range_check =
(saved_seq_for_penul_check_.has_value() &&
saved_seq_for_penul_check_.value() != kMaxSequenceNumber)
? saved_seq_for_penul_check_.value()
: ikey_.sequence;
ParsedInternalKey ikey_for_range_check = ikey_;
if (seq_for_range_check != ikey_.sequence) {
ikey_for_range_check.sequence = seq_for_range_check;
saved_seq_for_penul_check_ = std::nullopt;
}
bool safe_to_penultimate_level =
compaction_->WithinPenultimateLevelOutputRange(ikey_);
compaction_->WithinPenultimateLevelOutputRange(ikey_for_range_check);
if (!safe_to_penultimate_level) {
output_to_penultimate_level_ = false;
// It could happen when disable/enable `last_level_temperature` while
Expand Down
11 changes: 11 additions & 0 deletions db/compaction/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,17 @@ class CompactionIterator {
// iterator output (or current key in the underlying iterator during
// NextFromInput()).
ParsedInternalKey ikey_;

// When a kTypeValuePreferredSeqno entry's preferred seqno is safely swapped
// in in this compaction, this field saves its original sequence number for
// range checking whether it's safe to be placed on the penultimate level.
// This is to ensure when such an entry happens to be the right boundary of
// penultimate safe range, it won't get excluded because with the preferred
// seqno swapped in, it's now larger than the right boundary (itself before
// the swap). This is safe to do, because preferred seqno is swapped in only
// when no entries with the same user key exist on lower levels and this entry
// is already visible in the earliest snapshot.
std::optional<SequenceNumber> saved_seq_for_penul_check_ = kMaxSequenceNumber;
// Stores whether ikey_.user_key is valid. If set to false, the user key is
// not compared against the current key in the underlying iterator.
bool has_current_user_key_ = false;
Expand Down
35 changes: 35 additions & 0 deletions db/compaction/compaction_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,28 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest,
false /*key_not_exists_beyond_output_level*/);
}

TEST_F(CompactionIteratorWithSnapshotCheckerTest,
TimedPut_ShouldBeCoverredByRangeDeletionBeforeSwap_NoOutput) {
InitIterators({test::KeyStr("morning", 5, kTypeValuePreferredSeqno),
test::KeyStr("morning", 2, kTypeValuePreferredSeqno),
test::KeyStr("night", 6, kTypeValue)},
{ValueWithPreferredSeqno("zao", 3),
ValueWithPreferredSeqno("zao", 1), "wan"},
{test::KeyStr("ma", 6, kTypeRangeDeletion)}, {"mz"}, 6,
kMaxSequenceNumber /*last_committed_sequence*/,
nullptr /*merge_op*/, nullptr /*filter*/,
false /*bottommost_level*/,
kMaxSequenceNumber /*earliest_write_conflict_snapshot*/,
true /*key_not_exists_beyond_output_level*/);
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("night", 6, kTypeValue), c_iter_->key().ToString());
ASSERT_EQ("wan", c_iter_->value().ToString());
c_iter_->Next();
ASSERT_FALSE(c_iter_->Valid());
ASSERT_OK(c_iter_->status());
}

TEST_F(CompactionIteratorWithSnapshotCheckerTest,
TimedPut_WillBeHiddenByRangeDeletionAfterSwap_NoSwap) {
InitIterators({test::KeyStr("morning", 5, kTypeValuePreferredSeqno),
Expand Down Expand Up @@ -1316,6 +1338,19 @@ TEST_F(
true /*key_not_exists_beyond_output_level*/);
}

TEST_F(CompactionIteratorWithSnapshotCheckerTest,
TimedPut_SequenceNumberAlreadyZeroedOut_ChangeType) {
RunTest(
{test::KeyStr("bar", 0, kTypeValuePreferredSeqno),
test::KeyStr("bar", 0, kTypeValuePreferredSeqno),
test::KeyStr("foo", 0, kTypeValue)},
{ValueWithPreferredSeqno("bv2", 2), ValueWithPreferredSeqno("bv1", 1),
"fv1"},
{test::KeyStr("bar", 0, kTypeValue), test::KeyStr("foo", 0, kTypeValue)},
{"bv2", "fv1"}, 6 /*last_committed_seq*/, nullptr /*merge_operator*/,
nullptr /*compaction_filter*/, true /*bottommost_level*/);
}

// Compaction filter should keep uncommitted key as-is, and
// * Convert the latest value to deletion, and/or
// * if latest value is a merge, apply filter to all subsequent merges.
Expand Down
9 changes: 8 additions & 1 deletion db/compaction/compaction_outputs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ Status CompactionOutputs::Finish(
if (s.ok()) {
SeqnoToTimeMapping relevant_mapping;
relevant_mapping.CopyFromSeqnoRange(
seqno_to_time_mapping, meta->fd.smallest_seqno, meta->fd.largest_seqno);
seqno_to_time_mapping,
std::min(smallest_preferred_seqno_, meta->fd.smallest_seqno),
meta->fd.largest_seqno);
relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST);
builder_->SetSeqnoTimeTableProperties(relevant_mapping,
meta->oldest_ancester_time);
Expand Down Expand Up @@ -422,6 +424,11 @@ Status CompactionOutputs::AddToOutput(
}

const ParsedInternalKey& ikey = c_iter.ikey();
if (ikey.type == kTypeValuePreferredSeqno) {
SequenceNumber preferred_seqno = ParsePackedValueForSeqno(value);
smallest_preferred_seqno_ =
std::min(smallest_preferred_seqno_, preferred_seqno);
}
s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence,
ikey.type);

Expand Down
1 change: 1 addition & 0 deletions db/compaction/compaction_outputs.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ class CompactionOutputs {
std::unique_ptr<TableBuilder> builder_;
std::unique_ptr<WritableFileWriter> file_writer_;
uint64_t current_output_file_size_ = 0;
SequenceNumber smallest_preferred_seqno_ = kMaxSequenceNumber;

// all the compaction outputs so far
std::vector<Output> outputs_;
Expand Down
56 changes: 56 additions & 0 deletions db/compaction/tiered_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,62 @@ TEST_F(PrecludeLastLevelTest, FastTrackTimedPutToLastLevel) {
Close();
}

TEST_F(PrecludeLastLevelTest, PreserveTimedPutOnPenultimateLevel) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.disable_auto_compactions = true;
options.preclude_last_level_data_seconds = 3 * 24 * 60 * 60;
int seconds_between_recording = (3 * 24 * 60 * 60) / kMaxSeqnoTimePairsPerCF;
options.env = mock_env_.get();
options.num_levels = 7;
options.last_level_temperature = Temperature::kCold;
options.default_write_temperature = Temperature::kHot;
DestroyAndReopen(options);

// Creating a snapshot to manually control when preferred sequence number is
// swapped in. An entry's preferred seqno won't get swapped in until it's
// visible to the earliest snapshot. With this, we can test relevant seqno to
// time mapping recorded in SST file also covers preferred seqno, not just
// the seqno in the internal keys.
auto* snap1 = db_->GetSnapshot();
// Start time: kMockStartTime = 10000000;
ASSERT_OK(TimedPut(0, Key(0), "v0", kMockStartTime - 1 * 24 * 60 * 60));
ASSERT_OK(TimedPut(0, Key(1), "v1", kMockStartTime - 1 * 24 * 60 * 60));
ASSERT_OK(TimedPut(0, Key(2), "v2", kMockStartTime - 1 * 24 * 60 * 60));
ASSERT_OK(Flush());

// Should still be in penultimate level.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kHot), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);

// Wait one more day and release snapshot. Data's preferred seqno should be
// swapped in, but data should still stay in penultimate level. SST file's
// seqno to time mapping should continue to cover preferred seqno after
// compaction.
db_->ReleaseSnapshot(snap1);
mock_clock_->MockSleepForSeconds(1 * 24 * 60 * 60);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kHot), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);

// Wait one more day and data are eligible to be placed on last level.
// Instead of waiting exactly one more day, here we waited
// `seconds_between_recording` less seconds to show that it's not precise.
// Data could start to be placed on cold tier one recording interval before
// they exactly become cold based on the setting. For this one column family
// setting preserving 3 days of recording, it's about 43 minutes.
mock_clock_->MockSleepForSeconds(1 * 24 * 60 * 60 -
seconds_between_recording);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
ASSERT_EQ(GetSstSizeHelper(Temperature::kHot), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
Close();
}

TEST_F(PrecludeLastLevelTest, LastLevelOnlyCompactionPartial) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
Expand Down

0 comments on commit 2c02a9b

Please sign in to comment.