New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add revised partition stats tables #22163
Conversation
partition UInt64, | ||
offset UInt64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delta encoding for partition and delta or double delta for offset would probably yield pretty significant compression here - can either try those here, or add them later to get a more interesting before/after comparison.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't worry too much about size on these tables since we are TTLing after 30 days.
That being said I do think this kind of codec could work nicely for something like this, especially offset where it's generally going to be small and repeated values especially compared to the original offset
value
ENGINE = {engine} | ||
PARTITION BY (topic, toStartOfDay(timestamp)) | ||
ORDER BY (topic, partition, offset) | ||
TTL timestamp + INTERVAL 30 DAY |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
30 days was a pretty arbitrary choice, no issues with this being longer (though I'm not sure why we'd need more data) or shorter (probably would want to keep at least 7 days.)
I didn't really do any estimation on this, my expectation is that most of the table will compress pretty well aside from token, distinct ID and IP address. We can always drop partitions if it becomes untenably large.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no I think 30 days is just about perfect. We can always tune down if we need to.
ip String, | ||
event String, | ||
data_length UInt64, | ||
timestamp DateTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timestamp probably would compress relatively well with either delta or Gorilla encoding.
Note that this is DateTime
and not DateTime64
. I don't think the extra precision would be particularly useful here for anything we're realistically using this table for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would actually work out really nicely. I'm interested to try this 👀
Gorilla just uses Delta encoding for Timestamps right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gorilla is XOR based and the docs say "The smaller the difference between consecutive values is, i.e. the slower the values of the series changes, the better the compression rate" which makes me think it might make sense here (these values will change slowly but don't have a constant stride.)
T64 also looks interesting: https://clickhouse.com/docs/en/sql-reference/statements/create/table#t64
I'll leave this and the other columns from #22163 (comment) alone for now — we can play with them later to see their impact.
) | ||
ENGINE = {engine} | ||
PARTITION BY (topic, toStartOfDay(timestamp)) | ||
ORDER BY (topic, partition, offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't account for multiple clusters, which means that splitting off existing topics to new clusters or failing over to a new cluster will make queries wonky unless a timestamp predicate is added to the query based on when the offset reset occurred.
I'd prefer to have some sort of cluster ID here but I'm not sure this is the right place to start trying to start designing better multi-cluster support than just having KAFKA_HOSTS
and SESSION_RECORDING_KAFKA_HOSTS
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could just have a static cluster
string column for each kafka
table that we can use to trace back to which kafka cluster the stats/data came from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
I'd prefer to keep the naming consistent if we can across different parts of the system (rather than having the same cluster ending up being named different things in different contexts) so I'll punt on this for now to avoid the potential of introducing inconsistency.
token, | ||
distinct_id, | ||
ip, | ||
if(startsWith(JSONExtractString(data, 'event') AS _event, '$'), _event, '') AS event, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's much reason to track everybody's custom events as those don't (or shouldn't) have much effect on ingestion performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've used it before to communicate with a customer that they are sending xyz
event with only one distinct ID a million times a second. Basically saves them a bit of time looking into it themselves.
From a diagnostic perspective I agree, but from a comms perspective it has helped quite a bit. 'Please stop producing xyz
event 🙏 '
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense — probably easy enough to be permissive with this to start and be more restrictive later if things get out of control.
I was going to say that also means event
shouldn't be LowCardinality
… but it already wasn't, so I guess that doesn't require any changes after all. 🙃
Removed this with f8d9671.
KafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL), | ||
KafkaTable(SESSION_RECORDING_KAFKA_HOSTS, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS), | ||
KafkaTable(SESSION_RECORDING_KAFKA_HOSTS, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Including mutable configuration in what should be an immutable migration seems like a dangerous thing to me but not something I want to tackle right now. This is at least clearer than it was before, where KAFKA_HOSTS
was added to the table engine as a default value several layers down the stack.
|
||
|
||
class PartitionStatsV2Table: | ||
table_name: str = "events_plugin_ingestion_partition_statistics_v2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not attached to this name (or any of the others in here for that matter), just following the existing pattern. I'm not sure that this table really represents "partition statistics" any longer, but I don't have any better ideas on what to call this either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We often want more information about the events in a Kafka topic for diagnostic purposes, particularly during incidents.
events_plugin_ingestion_diagnostics_v2
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to leave this as-is for now just to avoid introducing additional names/concepts even if this one isn't quite right anymore.
timestamp DateTime | ||
) | ||
ENGINE = {engine} | ||
PARTITION BY (topic, toStartOfDay(timestamp)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Date range here was also fairly arbitrarily selected, this could be week or month. For the query patterns I'm anticipating, I'm not sure it matters that much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope it's not by month if TTL timestamp + INTERVAL 30 DAY
!
I think toStartOfDay
is kind of perfect
`uuid` String, | ||
`distinct_id` String, | ||
`ip` String, | ||
`site_url` String, | ||
`data` String, | ||
`team_id` Int64, | ||
`now` String, | ||
`sent_at` String, | ||
`token` String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how accurate this is (some of these values don't seem to be sent any longer, or at least not sent consistently) but I'm not sure there's any canonical definition here for what is valid beyond "whatever capture is sending"? (This section is just a refactor of the existing consumer anyway. I tried to ensure the fields used in the new destination table and materialized view were only those that seem to be consistently sent.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish we could bring back
👏 Proto 👏 Buf 👏
offset UInt64, | ||
token String, | ||
distinct_id String, | ||
ip String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could also be stored more compactly as IPv4 or IPv6, I guess?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% also more native functions against the data type directly with no need for conversion
https://clickhouse.com/docs/en/sql-reference/functions/ip-address-functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated this with 3bdfe63, had hoped to use with a Variant
type but it doesn't look like we're there yet.
This PR hasn't seen activity in a week! Should it be merged, closed, or further worked on? If you want to keep it open, post a comment or remove the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love the new @dataclass
class structure for generating migrations. Much cleaner than what we had before with just string constants everywhere.
Answered a bunch of questions but in general this is a great step forward!
`uuid` String, | ||
`distinct_id` String, | ||
`ip` String, | ||
`site_url` String, | ||
`data` String, | ||
`team_id` Int64, | ||
`now` String, | ||
`sent_at` String, | ||
`token` String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish we could bring back
👏 Proto 👏 Buf 👏
|
||
|
||
class PartitionStatsV2Table: | ||
table_name: str = "events_plugin_ingestion_partition_statistics_v2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We often want more information about the events in a Kafka topic for diagnostic purposes, particularly during incidents.
events_plugin_ingestion_diagnostics_v2
?
partition UInt64, | ||
offset UInt64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't worry too much about size on these tables since we are TTLing after 30 days.
That being said I do think this kind of codec could work nicely for something like this, especially offset where it's generally going to be small and repeated values especially compared to the original offset
value
offset UInt64, | ||
token String, | ||
distinct_id String, | ||
ip String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% also more native functions against the data type directly with no need for conversion
https://clickhouse.com/docs/en/sql-reference/functions/ip-address-functions
ip String, | ||
event String, | ||
data_length UInt64, | ||
timestamp DateTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would actually work out really nicely. I'm interested to try this 👀
Gorilla just uses Delta encoding for Timestamps right?
timestamp DateTime | ||
) | ||
ENGINE = {engine} | ||
PARTITION BY (topic, toStartOfDay(timestamp)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope it's not by month if TTL timestamp + INTERVAL 30 DAY
!
I think toStartOfDay
is kind of perfect
) | ||
ENGINE = {engine} | ||
PARTITION BY (topic, toStartOfDay(timestamp)) | ||
ORDER BY (topic, partition, offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could just have a static cluster
string column for each kafka
table that we can use to trace back to which kafka cluster the stats/data came from.
ENGINE = {engine} | ||
PARTITION BY (topic, toStartOfDay(timestamp)) | ||
ORDER BY (topic, partition, offset) | ||
TTL timestamp + INTERVAL 30 DAY |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no I think 30 days is just about perfect. We can always tune down if we need to.
token, | ||
distinct_id, | ||
ip, | ||
if(startsWith(JSONExtractString(data, 'event') AS _event, '$'), _event, '') AS event, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've used it before to communicate with a customer that they are sending xyz
event with only one distinct ID a million times a second. Basically saves them a bit of time looking into it themselves.
From a diagnostic perspective I agree, but from a comms perspective it has helped quite a bit. 'Please stop producing xyz
event 🙏 '
distinct_id, | ||
ip, | ||
if(startsWith(JSONExtractString(data, 'event') AS _event, '$'), _event, '') AS event, | ||
length(data) AS data_length, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be fair, I just stole this from your original table DDL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love the new
@dataclass
class structure for generating migrations. Much cleaner than what we had before with just string constants everywhere.
To be honest I just did this because the "all uppercase at module scope lambdas" approach gives me a headache. 😄 I don't love that the migrations have references back to the main codebase itself (versus rendered SQL statements that don't change over time) but that's a conversation for later.
token, | ||
distinct_id, | ||
ip, | ||
if(startsWith(JSONExtractString(data, 'event') AS _event, '$'), _event, '') AS event, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense — probably easy enough to be permissive with this to start and be more restrictive later if things get out of control.
I was going to say that also means event
shouldn't be LowCardinality
… but it already wasn't, so I guess that doesn't require any changes after all. 🙃
Removed this with f8d9671.
distinct_id, | ||
ip, | ||
if(startsWith(JSONExtractString(data, 'event') AS _event, '$'), _event, '') AS event, | ||
length(data) AS data_length, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be fair, I just stole this from your original table DDL.
) | ||
ENGINE = {engine} | ||
PARTITION BY (topic, toStartOfDay(timestamp)) | ||
ORDER BY (topic, partition, offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
I'd prefer to keep the naming consistent if we can across different parts of the system (rather than having the same cluster ending up being named different things in different contexts) so I'll punt on this for now to avoid the potential of introducing inconsistency.
offset UInt64, | ||
token String, | ||
distinct_id String, | ||
ip String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated this with 3bdfe63, had hoped to use with a Variant
type but it doesn't look like we're there yet.
|
||
|
||
class PartitionStatsV2Table: | ||
table_name: str = "events_plugin_ingestion_partition_statistics_v2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to leave this as-is for now just to avoid introducing additional names/concepts even if this one isn't quite right anymore.
ip String, | ||
event String, | ||
data_length UInt64, | ||
timestamp DateTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gorilla is XOR based and the docs say "The smaller the difference between consecutive values is, i.e. the slower the values of the series changes, the better the compression rate" which makes me think it might make sense here (these values will change slowly but don't have a constant stride.)
T64 also looks interesting: https://clickhouse.com/docs/en/sql-reference/statements/create/table#t64
I'll leave this and the other columns from #22163 (comment) alone for now — we can play with them later to see their impact.
Problem
We often want more information about the events in a Kafka topic for diagnostic purposes, particularly during incidents. There is currently no good way to answer questions like:
events_plugin_ingestion_partition_statistics
has some of this data, but its use is inconsistent across clusters.events
data is not well-suited to answer these questions as it:Changes
Adds a new table (
events_plugin_ingestion_partition_statistics_v2
) that tracks several of the more relevant event characteristics.This allows us to run ad-hoc queries such as this example to find heavy hitter token and distinct ID pairs within an offset range:
Does this work well for both Cloud and self-hosted?
It should.
How did you test this code?
See snapshots, also ran some data through functional tests just to verify things were generally working as expected.