Skip to content

Commit

Permalink
Merge branch 'master' of github.com:pingcap/tiflash into optimize_dup…
Browse files Browse the repository at this point in the history
…licated_agg_func
  • Loading branch information
guo-shaoge committed May 7, 2024
2 parents ede3a80 + 8e50de8 commit b62ed01
Show file tree
Hide file tree
Showing 12 changed files with 464 additions and 88 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,8 @@ class DeltaValueReader

bool shouldPlace(
const DMContext & context,
DeltaIndexPtr my_delta_index,
size_t placed_rows,
size_t placed_delete_ranges,
const RowKeyRange & segment_range,
const RowKeyRange & relevant_range,
UInt64 start_ts);
Expand Down
25 changes: 14 additions & 11 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ size_t DeltaValueReader::readRows(
//
// So here, we should filter out those out-of-range rows.

auto mem_table_rows_offset = delta_snap->getMemTableSetRowsOffset();
auto total_delta_rows = delta_snap->getRows();
const auto mem_table_rows_offset = delta_snap->getMemTableSetRowsOffset();
const auto total_delta_rows = delta_snap->getRows();

auto persisted_files_start = std::min(offset, mem_table_rows_offset);
auto persisted_files_end = std::min(offset + limit, mem_table_rows_offset);
auto mem_table_start = offset <= mem_table_rows_offset
const auto persisted_files_start = std::min(offset, mem_table_rows_offset);
const auto persisted_files_end = std::min(offset + limit, mem_table_rows_offset);
const auto mem_table_start = offset <= mem_table_rows_offset
? 0
: std::min(offset - mem_table_rows_offset, total_delta_rows - mem_table_rows_offset);
auto mem_table_end = offset + limit <= mem_table_rows_offset
const auto mem_table_end = offset + limit <= mem_table_rows_offset
? 0
: std::min(offset + limit - mem_table_rows_offset, total_delta_rows - mem_table_rows_offset);

Expand All @@ -146,8 +146,12 @@ size_t DeltaValueReader::readRows(
}
if (mem_table_start < mem_table_end)
{
actual_read += mem_table_reader
->readRows(output_cols, mem_table_start, mem_table_end - mem_table_start, range, row_ids);
actual_read += mem_table_reader->readRows( //
output_cols,
mem_table_start,
mem_table_end - mem_table_start,
range,
row_ids);
}

if (row_ids != nullptr)
Expand Down Expand Up @@ -212,13 +216,12 @@ BlockOrDeletes DeltaValueReader::getPlaceItems(

bool DeltaValueReader::shouldPlace(
const DMContext & context,
DeltaIndexPtr my_delta_index,
const size_t placed_rows,
const size_t placed_delete_ranges,
const RowKeyRange & segment_range_,
const RowKeyRange & relevant_range,
UInt64 start_ts)
{
auto [placed_rows, placed_delete_ranges] = my_delta_index->getPlacedStatus();

// The placed_rows, placed_delete_range already contains the data in delta_snap
if (placed_rows >= delta_snap->getRows() && placed_delete_ranges == delta_snap->getDeletes())
return false;
Expand Down
28 changes: 16 additions & 12 deletions dbms/src/Storages/DeltaMerge/DeltaIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ class DeltaIndex
}
}

DeltaIndexPtr tryCloneInner(size_t placed_deletes_limit, const Updates * updates = nullptr)
DeltaIndexPtr tryCloneInner(size_t rows_limit, size_t placed_deletes_limit, const Updates * updates = nullptr)
{
DeltaTreePtr delta_tree_copy;
size_t placed_rows_copy = 0;
size_t placed_deletes_copy = 0;
// Make sure the delta index do not place more deletes than `placed_deletes_limit`.
// Because delete ranges can break MVCC view.
{
std::scoped_lock lock(mutex);
// Safe to reuse the copy of the existing DeltaIndex
if (placed_deletes <= placed_deletes_limit)
// Make sure the MVCC view will not be broken by the mismatch of delta index and snapshot:
// - First, make sure the delta index do not place more deletes than `placed_deletes_limit`.
// - Second, make sure the snapshot includes all duplicated tuples in the delta index.
if (placed_deletes <= placed_deletes_limit && delta_tree->maxDupTupleID() < static_cast<Int64>(rows_limit))
{
delta_tree_copy = delta_tree;
placed_rows_copy = placed_rows;
Expand Down Expand Up @@ -194,8 +194,9 @@ class DeltaIndex
{
std::scoped_lock lock(mutex);

if ((maybe_advanced.placed_rows >= placed_rows && maybe_advanced.placed_deletes >= placed_deletes)
&& !(maybe_advanced.placed_rows == placed_rows && maybe_advanced.placed_deletes == placed_deletes))
if ((maybe_advanced.placed_rows >= placed_rows && maybe_advanced.placed_deletes >= placed_deletes) // advance
// not excatly the same
&& (maybe_advanced.placed_rows != placed_rows || maybe_advanced.placed_deletes != placed_deletes))
{
delta_tree = maybe_advanced.delta_tree;
placed_rows = maybe_advanced.placed_rows;
Expand All @@ -205,14 +206,17 @@ class DeltaIndex
return false;
}

DeltaIndexPtr tryClone(size_t /*rows*/, size_t deletes) { return tryCloneInner(deletes); }
/**
* Try to get a clone of current instance.
* Return an empty DeltaIndex if `deletes < this->placed_deletes` because the advanced delta-index will break
* the MVCC view.
*/
DeltaIndexPtr tryClone(size_t rows, size_t deletes) { return tryCloneInner(rows, deletes); }

DeltaIndexPtr cloneWithUpdates(const Updates & updates)
{
if (unlikely(updates.empty()))
throw Exception("Unexpected empty updates");

return tryCloneInner(updates.front().delete_ranges_offset, &updates);
RUNTIME_CHECK_MSG(!updates.empty(), "Unexpected empty updates");
return tryCloneInner(updates.front().rows_offset, updates.front().delete_ranges_offset, &updates);
}

const std::optional<Remote::RNDeltaIndexCache::CacheKey> & getRNCacheKey() const { return rn_cache_key; }
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaPlace.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,10 @@ bool placeInsert(
tuple_id = delta_value_space_offset + (offset + i);

if (dup)
{
delta_tree.addDelete(rid);
delta_tree.setMaxDupTupleID(tuple_id);
}
delta_tree.addInsert(rid, tuple_id);
}

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,7 @@ class DeltaTree
size_t num_inserts = 0;
size_t num_deletes = 0;
size_t num_entries = 0;
Int64 max_dup_tuple_id = -1;

std::unique_ptr<Allocator> allocator;
size_t bytes = 0;
Expand Down Expand Up @@ -1039,6 +1040,8 @@ class DeltaTree
size_t numEntries() const { return num_entries; }
size_t numInserts() const { return num_inserts; }
size_t numDeletes() const { return num_deletes; }
Int64 maxDupTupleID() const { return max_dup_tuple_id; }
void setMaxDupTupleID(Int64 tuple_id) { max_dup_tuple_id = std::max(tuple_id, max_dup_tuple_id); }

void addDelete(UInt64 rid);
void addInsert(UInt64 rid, UInt64 tuple_id);
Expand All @@ -1055,6 +1058,7 @@ DT_CLASS::DeltaTree(const DT_CLASS::Self & o)
, num_inserts(o.num_inserts)
, num_deletes(o.num_deletes)
, num_entries(o.num_entries)
, max_dup_tuple_id(o.max_dup_tuple_id)
, allocator(std::make_unique<Allocator>())
{
// If exception is thrown before clear copying_nodes, all nodes will be destroyed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

namespace DB::DM
{
namespace tests
{
class DeltaMergeStoreRWTest;
}
class UnorderedInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "UnorderedInputStream";
Expand Down Expand Up @@ -151,5 +155,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream
// runtime filter
std::vector<RuntimeFilterPtr> runtime_filter_list;
int max_wait_time_ms;

friend class tests::DeltaMergeStoreRWTest;
};
} // namespace DB::DM
12 changes: 10 additions & 2 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2563,7 +2563,8 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(
{
const auto & stable_snap = segment_snap->stable;
auto delta_snap = delta_reader->getDeltaSnap();
// Clone a new delta index.
// Try to clone from the sahred delta index, if it fails to reuse the shared delta index,
// it will return an empty delta index and we should place it in the following branch.
auto my_delta_index = delta_snap->getSharedDeltaIndex()->tryClone(delta_snap->getRows(), delta_snap->getDeletes());
auto my_delta_tree = my_delta_index->getDeltaTree();

Expand All @@ -2579,8 +2580,15 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(
auto [my_placed_rows, my_placed_deletes] = my_delta_index->getPlacedStatus();

// Let's do a fast check, determine whether we need to do place or not.
if (!delta_reader->shouldPlace(dm_context, my_delta_index, rowkey_range, relevant_range, start_ts))
if (!delta_reader->shouldPlace( //
dm_context,
my_placed_rows,
my_placed_deletes,
rowkey_range,
relevant_range,
start_ts))
{
// We can reuse the shared-delta-index
return {my_delta_index, false};
}

Expand Down
183 changes: 183 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <algorithm>
#include <future>
#include <iterator>
#include <memory>
#include <random>

namespace DB
Expand Down Expand Up @@ -3780,6 +3781,188 @@ try
}
CATCH

void DeltaMergeStoreRWTest::dupHandleVersionAndDeltaIndexAdvancedThanSnapshot()
{
auto table_column_defines = DMTestEnv::getDefaultColumns();
store = reload(table_column_defines);

auto create_block = [&](UInt64 beg, UInt64 end, UInt64 ts) {
auto block = DMTestEnv::prepareSimpleWriteBlock(beg, end, false, ts);
block.checkNumberOfRows();
return block;
};

auto write_block = [&](UInt64 beg, UInt64 end, UInt64 ts) {
auto block = create_block(beg, end, ts);
store->write(*db_context, db_context->getSettingsRef(), block);
};

auto create_stream = [&]() {
return store->read(
*db_context,
db_context->getSettingsRef(),
store->getTableColumns(),
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* start_ts= */ std::numeric_limits<UInt64>::max(),
EMPTY_FILTER,
std::vector<RuntimeFilterPtr>{},
/* rf_max_wait_time_ms= */ 0,
TRACING_NAME,
/* keep_order= */ false,
/* is_fast_scan= */ false,
DEFAULT_BLOCK_SIZE)[0];
};

auto count_rows = [](BlockInputStreamPtr stream) {
std::size_t count = 0;
stream->readPrefix();
for (;;)
{
auto block = stream->read();
if (!block)
{
break;
}
count += block.rows();
}
stream->readSuffix();
return count;
};

auto get_seg_read_task = [&](BlockInputStreamPtr stream) {
auto unordered_stream = std::dynamic_pointer_cast<UnorderedInputStream>(stream);
const auto & tasks = unordered_stream->task_pool->getTasks();
RUNTIME_CHECK(tasks.size() == 1, tasks.size());
return tasks.begin()->second;
};

auto clone_delta_index = [](SegmentReadTaskPtr seg_read_task) {
auto delta_snap = seg_read_task->read_snapshot->delta;
return delta_snap->getSharedDeltaIndex()->tryClone(delta_snap->getRows(), delta_snap->getDeletes());
};

auto check_delta_index
= [](DeltaIndexPtr delta_index, size_t expect_rows, size_t expect_deletes, Int64 expect_max_dup_tuple_id) {
auto [placed_rows, placed_deletes] = delta_index->getPlacedStatus();
ASSERT_EQ(placed_rows, expect_rows);
ASSERT_EQ(placed_deletes, expect_deletes);
ASSERT_EQ(delta_index->getDeltaTree()->maxDupTupleID(), expect_max_dup_tuple_id);
};

auto ensure_place = [&](SegmentReadTaskPtr seg_read_task) {
auto pk_ver_col_defs = std::make_shared<ColumnDefines>(
ColumnDefines{getExtraHandleColumnDefine(dm_context->is_common_handle), getVersionColumnDefine()});
auto delta_reader = std::make_shared<DeltaValueReader>(
*dm_context,
seg_read_task->read_snapshot->delta,
pk_ver_col_defs,
RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()),
ReadTag::MVCC);
return seg_read_task->segment->ensurePlace(
*dm_context,
seg_read_task->read_snapshot,
delta_reader,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
std::numeric_limits<UInt64>::max());
};

// Write [0, 128) with ts 1 for initializing stable.
write_block(0, 128, 1);
store->mergeDeltaAll(*db_context);

// Write [50, 60) with ts 2 for initializing delta.
write_block(50, 60, 2);

// Scan table normally.
{
auto stream = create_stream();
auto count = count_rows(stream);
ASSERT_EQ(count, 128);
}

// The snapshot does not include all the duplicated tuples of the delta index.
// This snapshot should rebuild delta index for itself.
// https://github.com/pingcap/tiflash/issues/8845
{
// Create snapshot but not place index
auto stream1 = create_stream();

// !!!Duplicated!!!: Write [50, 60) with ts 2
write_block(50, 60, 2);

// Place index with newest data.
auto stream2 = create_stream();
auto count2 = count_rows(stream2);
ASSERT_EQ(count2, 128);

// stream1 should not resue delta index of stream2

// Check cloning delta index
{
auto seg_read_task = get_seg_read_task(stream1);

// Shared delta index has been placed to the newest by `count_rows(stream2)`.
auto shared_delta_index = seg_read_task->read_snapshot->delta->getSharedDeltaIndex();
check_delta_index(shared_delta_index, 20, 0, 19);

// Cannot clone delta index because it contains duplicated records in the gap of snapshot and the shared delta index.
auto cloned_delta_index = clone_delta_index(seg_read_task);
check_delta_index(cloned_delta_index, 0, 0, -1);
}
// Check scanning result of stream1
auto count1 = count_rows(stream1);
ASSERT_EQ(count1, count2);
}

// Make sure shared delta index can be reused by new snapshot
{
auto stream = create_stream();
auto seg_read_task = get_seg_read_task(stream);
auto cloned_delta_index = clone_delta_index(seg_read_task);
check_delta_index(cloned_delta_index, 20, 0, 19);
}

// The snapshot includes all the duplicated tuples of the delta index.
// Delta index can be reused safely.
{
write_block(70, 80, 2);
auto stream = create_stream();
auto seg_read_task = get_seg_read_task(stream);
auto shared_delta_index = seg_read_task->read_snapshot->delta->getSharedDeltaIndex();
check_delta_index(shared_delta_index, 20, 0, 19);
auto cloned_delta_index = clone_delta_index(seg_read_task);
check_delta_index(cloned_delta_index, 20, 0, 19);
auto [placed_delta_index, fully_indexed] = ensure_place(seg_read_task);
ASSERT_TRUE(fully_indexed);
check_delta_index(placed_delta_index, 30, 0, 19);
auto count = count_rows(stream);
ASSERT_EQ(count, 128);
}

{
write_block(75, 85, 2);
auto stream = create_stream();
auto seg_read_task = get_seg_read_task(stream);
auto shared_delta_index = seg_read_task->read_snapshot->delta->getSharedDeltaIndex();
check_delta_index(shared_delta_index, 30, 0, 19);
auto cloned_delta_index = clone_delta_index(seg_read_task);
check_delta_index(cloned_delta_index, 30, 0, 19);
auto [placed_delta_index, fully_indexed] = ensure_place(seg_read_task);
ASSERT_TRUE(fully_indexed);
check_delta_index(placed_delta_index, 40, 0, 34);
auto count = count_rows(stream);
ASSERT_EQ(count, 128);
}
}

TEST_P(DeltaMergeStoreRWTest, DupHandleVersionAndDeltaIndexAdvancedThanSnapshot)
try
{
dupHandleVersionAndDeltaIndexAdvancedThanSnapshot();
}
CATCH

} // namespace tests
} // namespace DM
} // namespace DB

0 comments on commit b62ed01

Please sign in to comment.