Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sharded compaction #3537

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft

Sharded compaction #3537

wants to merge 17 commits into from

Conversation

mdisibio
Copy link
Contributor

@mdisibio mdisibio commented Apr 3, 2024

What this PR does:
This is the largest change to compaction in years. It is a new sharding compactor that splits blocks up by trace ID divisions. For example: instead of having 2 blocks with the entire range of trace IDs 00..FF, the sharding compactor can combine and split them into 2 blocks with the ranges 00..80 and 80..FF where each block contains half of the range it did before.

There are several benefits from this approach:

  • The read path is more efficient. Trace lookup can filter out more blocks faster by checking the Min/Max ID in the meta without having to resort to the bloom filter. Metrics queries inspect fewer blocks to find the traces in the job shard.
  • More effective deduplication of data in the backend. Because the sharding compactor clusters traces with similar values together, it is more effective at finding and deduplicating the copies of a trace due to replication factor. A recent internal analysis measured that compactors were only bringing the replication factor from 3 down to ~2.5. The sharding compactor should achieve ~1.5 or better.

The shard count is configurable globally and per-tenant. Setting to 2 or higher activates the new sharding compactor. 0 or 1 will fallback to the existing compactor. Expecting useful values to be 2 through 8. Values higher than 8 could be interesting, but at the cost of greatly increasing the size of the blocklist.

Description
How the sharding compactor works can be considered as 3 phases.

  1. The first phase is taking fresh blocks from the ingesters and splitting them. The ingesters haven't been affected in this PR, and continue to flush blocks with the full range 00..FF. The sharding compactor identifies them by looking for blocks with CompactionLevel=0 and will split them. This is a combination of combine/split like described at the time, so that up to 4 (maxInputBlocks) ingester blocks will be rewritten as N sharded blocks. This step feeds work to the next step:
  2. The second phase is the typical compaction/reduction of sharded blocks with other blocks in the same shard. This is same as the existing compaction, but shard-aware. The compactor identifies them by CompactionLevel>0 and the block min/max trace IDs fall within the same shard (i.e. the block is "well-sharded")
  3. A third phase exists that will upgrade/resplit older blocks too, but it is the lowest priority and only when the first 2 phases have no outstanding work.

Which issue(s) this PR fixes:
Fixes n/a

Checklist

  • Tests updated
  • Documentation added
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX]

@@ -75,6 +75,7 @@ func (i *rawIterator) Next(context.Context) (common.ID, parquet.Row, error) {
}

if errors.Is(err, io.EOF) {
i.pool.Put(rows[0])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes a "memory leak" where the buffer wasn't returned to the pool. This actually has a significant effect on memory because this isn't happening once at the end, but on every loop of the multiblock iterator. Once 1 block is exhausted it begins leaking a pooled buffer on every call. The better fix is to make the multiblock iterator stop calling Next() once an iter is exhausted, but it was going to be a lot more involved.

Copy link
Member

@mapno mapno left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work!

modules/overrides/config_legacy.go Show resolved Hide resolved
tempodb/encoding/common/interfaces.go Outdated Show resolved Hide resolved
tempodb/compactor_test.go Outdated Show resolved Hide resolved
tempodb/sharding_block_selector.go Outdated Show resolved Hide resolved
tempodb/tempodb.go Outdated Show resolved Hide resolved
Copy link
Contributor

@knylander-grafana knylander-grafana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding some information to the configuration docs. Will we need additional information in the docs that talks about when to use this ocnfiguration?

Copy link
Member

@joe-elliott joe-elliott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long overdue work. looks great. added some Qs but fine with things as is.

if you need my approval please ask. the only reason i'm not approving is b/c you and @mapno have been handling these PRs

modules/overrides/config_legacy.go Show resolved Hide resolved
modules/querier/querier_query_range.go Show resolved Hide resolved
tempodb/encoding/common/interfaces.go Show resolved Hide resolved
// ship block to backend if done
if currentBlock != nil && cmd.CutBlock(currentBlock.meta, lowestID) {
currentBlockPtrCopy := currentBlock
currentBlockPtrCopy.meta.StartTime = minBlockStart
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't this adjust the original "currentBlock" since they point to the same thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this block was moved from the bottom of the loop to the top so that the CutBlock callback can inspect the next trace (lowestID). You're right it does seem to be out of date and the copy is no longer needed. I'll take a look.

tempodb/sharding_block_selector.go Show resolved Hide resolved
tempodb/sharding_block_selector.go Show resolved Hide resolved
@mdisibio mdisibio marked this pull request as draft April 16, 2024 11:17
@mdisibio
Copy link
Contributor Author

Hi everyone: Putting this back to draft because it is not yet ready for large scale. Testing it out in a large cluster is showing it difficult to achieve the balance between splitting and combining blocks, even with tuning. Would like to review the switch statement and priorities of each group of blocks. When there is an imbalance the result is that compactors are less effective and the blocklist increases more than acceptable: i.e. If the compactor spends too much time splitting blocks then the backend is composed of (too) many small blocks, and if the compactor spends too much combining, then ingester level-0 blocks are neglected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants