Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve TimedPut on penultimate level until it actually expires #12543

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 7 additions & 3 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ Status BuildTable(
const bool logical_strip_timestamp =
ts_sz > 0 && !ioptions.persist_user_defined_timestamps;

SequenceNumber smallest_preferred_seqno = kMaxSequenceNumber;
std::string key_after_flush_buf;
std::string value_buf;
c_iter.SeekToFirst();
Expand Down Expand Up @@ -242,6 +243,8 @@ Status BuildTable(
if (preferred_seqno < ikey.sequence) {
value_after_flush =
PackValueAndSeqno(unpacked_value, preferred_seqno, &value_buf);
smallest_preferred_seqno =
std::min(smallest_preferred_seqno, preferred_seqno);
} else {
// Cannot get a useful preferred seqno, convert it to a kTypeValue.
UpdateInternalKey(&key_after_flush_buf, ikey.sequence, kTypeValue);
Expand Down Expand Up @@ -326,9 +329,10 @@ Status BuildTable(
} else {
SeqnoToTimeMapping relevant_mapping;
if (seqno_to_time_mapping) {
relevant_mapping.CopyFromSeqnoRange(*seqno_to_time_mapping,
meta->fd.smallest_seqno,
meta->fd.largest_seqno);
relevant_mapping.CopyFromSeqnoRange(
*seqno_to_time_mapping,
std::min(meta->fd.smallest_seqno, smallest_preferred_seqno),
meta->fd.largest_seqno);
relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST);
relevant_mapping.Enforce(tboptions.file_creation_time);
}
Expand Down
58 changes: 40 additions & 18 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -997,25 +997,37 @@ void CompactionIterator::NextFromInput() {
// A special case involving range deletion is handled separately below.
auto [unpacked_value, preferred_seqno] =
ParsePackedValueWithSeqno(value_);
assert(preferred_seqno < ikey_.sequence);
InternalKey ikey_after_swap(ikey_.user_key, preferred_seqno, kTypeValue);
Slice ikey_after_swap_slice(*ikey_after_swap.rep());
assert(preferred_seqno < ikey_.sequence || ikey_.sequence == 0);
if (range_del_agg_->ShouldDelete(
ikey_after_swap_slice,
RangeDelPositioningMode::kForwardTraversal)) {
// A range tombstone that doesn't cover this kTypeValuePreferredSeqno
// entry may end up covering the entry, so it's not safe to swap
// preferred sequence number. In this case, we output the entry as is.
validity_info_.SetValid(ValidContext::kNewUserKey);
key_, RangeDelPositioningMode::kForwardTraversal)) {
++iter_stats_.num_record_drop_hidden;
++iter_stats_.num_record_drop_range_del;
AdvanceInputIter();
} else {
iter_stats_.num_timed_put_swap_preferred_seqno++;
ikey_.sequence = preferred_seqno;
ikey_.type = kTypeValue;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetInternalKey();
ikey_.user_key = current_key_.GetUserKey();
value_ = unpacked_value;
validity_info_.SetValid(ValidContext::kSwapPreferredSeqno);
InternalKey ikey_after_swap(ikey_.user_key,
std::min(preferred_seqno, ikey_.sequence),
kTypeValue);
Slice ikey_after_swap_slice(*ikey_after_swap.rep());
if (range_del_agg_->ShouldDelete(
ikey_after_swap_slice,
RangeDelPositioningMode::kForwardTraversal)) {
// A range tombstone that doesn't cover this kTypeValuePreferredSeqno
// entry will end up covering the entry, so it's not safe to swap
// preferred sequence number. In this case, we output the entry as is.
validity_info_.SetValid(ValidContext::kNewUserKey);
} else {
if (ikey_.sequence != 0) {
iter_stats_.num_timed_put_swap_preferred_seqno++;
saved_seq_for_penul_check_ = ikey_.sequence;
ikey_.sequence = preferred_seqno;
}
ikey_.type = kTypeValue;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetInternalKey();
ikey_.user_key = current_key_.GetUserKey();
value_ = unpacked_value;
validity_info_.SetValid(ValidContext::kSwapPreferredSeqno);
}
}
} else if (ikey_.type == kTypeMerge) {
if (!merge_helper_->HasOperator()) {
Expand Down Expand Up @@ -1275,8 +1287,18 @@ void CompactionIterator::DecideOutputLevel() {
// considered unsafe, because the key may conflict with higher-level SSTs
// not from this compaction.
// TODO: add statistic for declined output_to_penultimate_level
SequenceNumber seq_for_range_check =
(saved_seq_for_penul_check_.has_value() &&
saved_seq_for_penul_check_.value() != kMaxSequenceNumber)
? saved_seq_for_penul_check_.value()
: ikey_.sequence;
ParsedInternalKey ikey_for_range_check = ikey_;
if (seq_for_range_check != ikey_.sequence) {
ikey_for_range_check.sequence = seq_for_range_check;
saved_seq_for_penul_check_ = std::nullopt;
}
bool safe_to_penultimate_level =
compaction_->WithinPenultimateLevelOutputRange(ikey_);
compaction_->WithinPenultimateLevelOutputRange(ikey_for_range_check);
if (!safe_to_penultimate_level) {
output_to_penultimate_level_ = false;
// It could happen when disable/enable `last_level_temperature` while
Expand Down
11 changes: 11 additions & 0 deletions db/compaction/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,17 @@ class CompactionIterator {
// iterator output (or current key in the underlying iterator during
// NextFromInput()).
ParsedInternalKey ikey_;

// When a kTypeValuePreferredSeqno entry's preferred seqno is safely swapped
// in in this compaction, this field saves its original sequence number for
// range checking whether it's safe to be placed on the penultimate level.
// This is to ensure when such an entry happens to be the right boundary of
// penultimate safe range, it won't get excluded because with the preferred
// seqno swapped in, it's now larger than the right boundary (itself before
// the swap). This is safe to do, because preferred seqno is swapped in only
// when no entries with the same user key exist on lower levels and this entry
// is already visible in the earliest snapshot.
std::optional<SequenceNumber> saved_seq_for_penul_check_ = kMaxSequenceNumber;
// Stores whether ikey_.user_key is valid. If set to false, the user key is
// not compared against the current key in the underlying iterator.
bool has_current_user_key_ = false;
Expand Down
35 changes: 35 additions & 0 deletions db/compaction/compaction_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,28 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest,
false /*key_not_exists_beyond_output_level*/);
}

TEST_F(CompactionIteratorWithSnapshotCheckerTest,
TimedPut_ShouldBeCoverredByRangeDeletionBeforeSwap_NoOutput) {
InitIterators({test::KeyStr("morning", 5, kTypeValuePreferredSeqno),
test::KeyStr("morning", 2, kTypeValuePreferredSeqno),
test::KeyStr("night", 6, kTypeValue)},
{ValueWithPreferredSeqno("zao", 3),
ValueWithPreferredSeqno("zao", 1), "wan"},
{test::KeyStr("ma", 6, kTypeRangeDeletion)}, {"mz"}, 6,
kMaxSequenceNumber /*last_committed_sequence*/,
nullptr /*merge_op*/, nullptr /*filter*/,
false /*bottommost_level*/,
kMaxSequenceNumber /*earliest_write_conflict_snapshot*/,
true /*key_not_exists_beyond_output_level*/);
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("night", 6, kTypeValue), c_iter_->key().ToString());
ASSERT_EQ("wan", c_iter_->value().ToString());
c_iter_->Next();
ASSERT_FALSE(c_iter_->Valid());
ASSERT_OK(c_iter_->status());
}

TEST_F(CompactionIteratorWithSnapshotCheckerTest,
TimedPut_WillBeHiddenByRangeDeletionAfterSwap_NoSwap) {
InitIterators({test::KeyStr("morning", 5, kTypeValuePreferredSeqno),
Expand Down Expand Up @@ -1316,6 +1338,19 @@ TEST_F(
true /*key_not_exists_beyond_output_level*/);
}

TEST_F(CompactionIteratorWithSnapshotCheckerTest,
TimedPut_SequenceNumberAlreadyZeroedOut_ChangeType) {
RunTest(
{test::KeyStr("bar", 0, kTypeValuePreferredSeqno),
test::KeyStr("bar", 0, kTypeValuePreferredSeqno),
test::KeyStr("foo", 0, kTypeValue)},
{ValueWithPreferredSeqno("bv2", 2), ValueWithPreferredSeqno("bv1", 1),
"fv1"},
{test::KeyStr("bar", 0, kTypeValue), test::KeyStr("foo", 0, kTypeValue)},
{"bv2", "fv1"}, 6 /*last_committed_seq*/, nullptr /*merge_operator*/,
nullptr /*compaction_filter*/, true /*bottommost_level*/);
}

// Compaction filter should keep uncommitted key as-is, and
// * Convert the latest value to deletion, and/or
// * if latest value is a merge, apply filter to all subsequent merges.
Expand Down
9 changes: 8 additions & 1 deletion db/compaction/compaction_outputs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ Status CompactionOutputs::Finish(
if (s.ok()) {
SeqnoToTimeMapping relevant_mapping;
relevant_mapping.CopyFromSeqnoRange(
seqno_to_time_mapping, meta->fd.smallest_seqno, meta->fd.largest_seqno);
seqno_to_time_mapping,
std::min(smallest_preferred_seqno_, meta->fd.smallest_seqno),
meta->fd.largest_seqno);
relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST);
builder_->SetSeqnoTimeTableProperties(relevant_mapping,
meta->oldest_ancester_time);
Expand Down Expand Up @@ -422,6 +424,11 @@ Status CompactionOutputs::AddToOutput(
}

const ParsedInternalKey& ikey = c_iter.ikey();
if (ikey.type == kTypeValuePreferredSeqno) {
SequenceNumber preferred_seqno = ParsePackedValueForSeqno(value);
smallest_preferred_seqno_ =
std::min(smallest_preferred_seqno_, preferred_seqno);
}
s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence,
ikey.type);

Expand Down
1 change: 1 addition & 0 deletions db/compaction/compaction_outputs.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ class CompactionOutputs {
std::unique_ptr<TableBuilder> builder_;
std::unique_ptr<WritableFileWriter> file_writer_;
uint64_t current_output_file_size_ = 0;
SequenceNumber smallest_preferred_seqno_ = kMaxSequenceNumber;

// all the compaction outputs so far
std::vector<Output> outputs_;
Expand Down
56 changes: 56 additions & 0 deletions db/compaction/tiered_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,62 @@ TEST_F(PrecludeLastLevelTest, FastTrackTimedPutToLastLevel) {
Close();
}

TEST_F(PrecludeLastLevelTest, PreserveTimedPutOnPenultimateLevel) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.disable_auto_compactions = true;
options.preclude_last_level_data_seconds = 3 * 24 * 60 * 60;
int seconds_between_recording = (3 * 24 * 60 * 60) / kMaxSeqnoTimePairsPerCF;
options.env = mock_env_.get();
options.num_levels = 7;
options.last_level_temperature = Temperature::kCold;
options.default_write_temperature = Temperature::kHot;
DestroyAndReopen(options);

// Creating a snapshot to manually control when preferred sequence number is
// swapped in. An entry's preferred seqno won't get swapped in until it's
// visible to the earliest snapshot. With this, we can test relevant seqno to
// time mapping recorded in SST file also covers preferred seqno, not just
// the seqno in the internal keys.
auto* snap1 = db_->GetSnapshot();
// Start time: kMockStartTime = 10000000;
ASSERT_OK(TimedPut(0, Key(0), "v0", kMockStartTime - 1 * 24 * 60 * 60));
ASSERT_OK(TimedPut(0, Key(1), "v1", kMockStartTime - 1 * 24 * 60 * 60));
ASSERT_OK(TimedPut(0, Key(2), "v2", kMockStartTime - 1 * 24 * 60 * 60));
ASSERT_OK(Flush());

// Should still be in penultimate level.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kHot), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);

// Wait one more day and release snapshot. Data's preferred seqno should be
// swapped in, but data should still stay in penultimate level. SST file's
// seqno to time mapping should continue to cover preferred seqno after
// compaction.
db_->ReleaseSnapshot(snap1);
mock_clock_->MockSleepForSeconds(1 * 24 * 60 * 60);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kHot), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);

// Wait one more day and data are eligible to be placed on last level.
// Instead of waiting exactly one more day, here we waited
// `seconds_between_recording` less seconds to show that it's not precise.
// Data could start to be placed on cold tier one recording interval before
// they exactly become cold based on the setting. For this one column family
// setting preserving 3 days of recording, it's about 43 minutes.
mock_clock_->MockSleepForSeconds(1 * 24 * 60 * 60 -
seconds_between_recording);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
ASSERT_EQ(GetSstSizeHelper(Temperature::kHot), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
Close();
}

TEST_F(PrecludeLastLevelTest, LastLevelOnlyCompactionPartial) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
Expand Down