Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add revised partition stats tables #22163

Merged
merged 9 commits into from May 23, 2024
Merged

Conversation

tkaemming
Copy link
Contributor

@tkaemming tkaemming commented May 7, 2024

Problem

We often want more information about the events in a Kafka topic for diagnostic purposes, particularly during incidents. There is currently no good way to answer questions like:

  • Has a particularly token recently sent a large volume of events?
  • For a particular token, is one distinct ID more prevalent than others over some timestamp or offset range?
  • In the case of an accumulating backlog, is one token, distinct ID, or event type overrepresented within a partition?

events_plugin_ingestion_partition_statistics has some of this data, but its use is inconsistent across clusters.

events data is not well-suited to answer these questions as it:

  1. only contains data that has made it through the entire ingestion pipeline (often we want to see what data is between the current ingestion consumer offset and the latest offset for a partition to identify impact of data drops, for example),
  2. does not contain any record of the originating topic or partition for the message (just the offset), so even retroactively identifying what messages were within a partition during an incident is difficult.

Changes

Adds a new table (events_plugin_ingestion_partition_statistics_v2) that tracks several of the more relevant event characteristics.

This allows us to run ad-hoc queries such as this example to find heavy hitter token and distinct ID pairs within an offset range:

SELECT
    token,
    distinct_id,
    count()
FROM events_plugin_ingestion_partition_statistics_v2
WHERE (topic = 'events_plugin_ingestion') AND (partition = 0) AND (offset >= 10)
GROUP BY ALL
ORDER BY count() DESC
LIMIT 5

Does this work well for both Cloud and self-hosted?

It should.

How did you test this code?

See snapshots, also ran some data through functional tests just to verify things were generally working as expected.

Comment on lines +112 to +113
partition UInt64,
offset UInt64,
Copy link
Contributor Author

@tkaemming tkaemming May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delta encoding for partition and delta or double delta for offset would probably yield pretty significant compression here - can either try those here, or add them later to get a more interesting before/after comparison.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't worry too much about size on these tables since we are TTLing after 30 days.
That being said I do think this kind of codec could work nicely for something like this, especially offset where it's generally going to be small and repeated values especially compared to the original offset value

ENGINE = {engine}
PARTITION BY (topic, toStartOfDay(timestamp))
ORDER BY (topic, partition, offset)
TTL timestamp + INTERVAL 30 DAY
Copy link
Contributor Author

@tkaemming tkaemming May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

30 days was a pretty arbitrary choice, no issues with this being longer (though I'm not sure why we'd need more data) or shorter (probably would want to keep at least 7 days.)

I didn't really do any estimation on this, my expectation is that most of the table will compress pretty well aside from token, distinct ID and IP address. We can always drop partitions if it becomes untenably large.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no I think 30 days is just about perfect. We can always tune down if we need to.

ip String,
event String,
data_length UInt64,
timestamp DateTime
Copy link
Contributor Author

@tkaemming tkaemming May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timestamp probably would compress relatively well with either delta or Gorilla encoding.

Note that this is DateTime and not DateTime64. I don't think the extra precision would be particularly useful here for anything we're realistically using this table for?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would actually work out really nicely. I'm interested to try this 👀

Gorilla just uses Delta encoding for Timestamps right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gorilla is XOR based and the docs say "The smaller the difference between consecutive values is, i.e. the slower the values of the series changes, the better the compression rate" which makes me think it might make sense here (these values will change slowly but don't have a constant stride.)

T64 also looks interesting: https://clickhouse.com/docs/en/sql-reference/statements/create/table#t64

I'll leave this and the other columns from #22163 (comment) alone for now — we can play with them later to see their impact.

)
ENGINE = {engine}
PARTITION BY (topic, toStartOfDay(timestamp))
ORDER BY (topic, partition, offset)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't account for multiple clusters, which means that splitting off existing topics to new clusters or failing over to a new cluster will make queries wonky unless a timestamp predicate is added to the query based on when the offset reset occurred.

I'd prefer to have some sort of cluster ID here but I'm not sure this is the right place to start trying to start designing better multi-cluster support than just having KAFKA_HOSTS and SESSION_RECORDING_KAFKA_HOSTS.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could just have a static cluster string column for each kafka table that we can use to trace back to which kafka cluster the stats/data came from.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

I'd prefer to keep the naming consistent if we can across different parts of the system (rather than having the same cluster ending up being named different things in different contexts) so I'll punt on this for now to avoid the potential of introducing inconsistency.

token,
distinct_id,
ip,
if(startsWith(JSONExtractString(data, 'event') AS _event, '$'), _event, '') AS event,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's much reason to track everybody's custom events as those don't (or shouldn't) have much effect on ingestion performance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've used it before to communicate with a customer that they are sending xyz event with only one distinct ID a million times a second. Basically saves them a bit of time looking into it themselves.

From a diagnostic perspective I agree, but from a comms perspective it has helped quite a bit. 'Please stop producing xyz event 🙏 '

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense — probably easy enough to be permissive with this to start and be more restrictive later if things get out of control.

I was going to say that also means event shouldn't be LowCardinality … but it already wasn't, so I guess that doesn't require any changes after all. 🙃

Removed this with f8d9671.

Comment on lines +26 to +28
KafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL),
KafkaTable(SESSION_RECORDING_KAFKA_HOSTS, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS),
KafkaTable(SESSION_RECORDING_KAFKA_HOSTS, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Including mutable configuration in what should be an immutable migration seems like a dangerous thing to me but not something I want to tackle right now. This is at least clearer than it was before, where KAFKA_HOSTS was added to the table engine as a default value several layers down the stack.



class PartitionStatsV2Table:
table_name: str = "events_plugin_ingestion_partition_statistics_v2"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not attached to this name (or any of the others in here for that matter), just following the existing pattern. I'm not sure that this table really represents "partition statistics" any longer, but I don't have any better ideas on what to call this either.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We often want more information about the events in a Kafka topic for diagnostic purposes, particularly during incidents.

events_plugin_ingestion_diagnostics_v2 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to leave this as-is for now just to avoid introducing additional names/concepts even if this one isn't quite right anymore.

timestamp DateTime
)
ENGINE = {engine}
PARTITION BY (topic, toStartOfDay(timestamp))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Date range here was also fairly arbitrarily selected, this could be week or month. For the query patterns I'm anticipating, I'm not sure it matters that much.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope it's not by month if TTL timestamp + INTERVAL 30 DAY !

I think toStartOfDay is kind of perfect

Comment on lines +22 to +30
`uuid` String,
`distinct_id` String,
`ip` String,
`site_url` String,
`data` String,
`team_id` Int64,
`now` String,
`sent_at` String,
`token` String
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how accurate this is (some of these values don't seem to be sent any longer, or at least not sent consistently) but I'm not sure there's any canonical definition here for what is valid beyond "whatever capture is sending"? (This section is just a refactor of the existing consumer anyway. I tried to ensure the fields used in the new destination table and materialized view were only those that seem to be consistently sent.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish we could bring back
👏 Proto 👏 Buf 👏

#10932

offset UInt64,
token String,
distinct_id String,
ip String,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also be stored more compactly as IPv4 or IPv6, I guess?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% also more native functions against the data type directly with no need for conversion
https://clickhouse.com/docs/en/sql-reference/functions/ip-address-functions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated this with 3bdfe63, had hoped to use with a Variant type but it doesn't look like we're there yet.

@tkaemming tkaemming marked this pull request as ready for review May 7, 2024 22:55
@tkaemming tkaemming requested a review from fuziontech as a code owner May 7, 2024 22:55
@posthog-bot
Copy link
Contributor

This PR hasn't seen activity in a week! Should it be merged, closed, or further worked on? If you want to keep it open, post a comment or remove the stale label – otherwise this will be closed in another week.

Copy link
Member

@fuziontech fuziontech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love the new @dataclass class structure for generating migrations. Much cleaner than what we had before with just string constants everywhere.

Answered a bunch of questions but in general this is a great step forward!

Comment on lines +22 to +30
`uuid` String,
`distinct_id` String,
`ip` String,
`site_url` String,
`data` String,
`team_id` Int64,
`now` String,
`sent_at` String,
`token` String
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish we could bring back
👏 Proto 👏 Buf 👏

#10932



class PartitionStatsV2Table:
table_name: str = "events_plugin_ingestion_partition_statistics_v2"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We often want more information about the events in a Kafka topic for diagnostic purposes, particularly during incidents.

events_plugin_ingestion_diagnostics_v2 ?

Comment on lines +112 to +113
partition UInt64,
offset UInt64,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't worry too much about size on these tables since we are TTLing after 30 days.
That being said I do think this kind of codec could work nicely for something like this, especially offset where it's generally going to be small and repeated values especially compared to the original offset value

offset UInt64,
token String,
distinct_id String,
ip String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% also more native functions against the data type directly with no need for conversion
https://clickhouse.com/docs/en/sql-reference/functions/ip-address-functions

ip String,
event String,
data_length UInt64,
timestamp DateTime
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would actually work out really nicely. I'm interested to try this 👀

Gorilla just uses Delta encoding for Timestamps right?

timestamp DateTime
)
ENGINE = {engine}
PARTITION BY (topic, toStartOfDay(timestamp))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope it's not by month if TTL timestamp + INTERVAL 30 DAY !

I think toStartOfDay is kind of perfect

)
ENGINE = {engine}
PARTITION BY (topic, toStartOfDay(timestamp))
ORDER BY (topic, partition, offset)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could just have a static cluster string column for each kafka table that we can use to trace back to which kafka cluster the stats/data came from.

ENGINE = {engine}
PARTITION BY (topic, toStartOfDay(timestamp))
ORDER BY (topic, partition, offset)
TTL timestamp + INTERVAL 30 DAY
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no I think 30 days is just about perfect. We can always tune down if we need to.

token,
distinct_id,
ip,
if(startsWith(JSONExtractString(data, 'event') AS _event, '$'), _event, '') AS event,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've used it before to communicate with a customer that they are sending xyz event with only one distinct ID a million times a second. Basically saves them a bit of time looking into it themselves.

From a diagnostic perspective I agree, but from a comms perspective it has helped quite a bit. 'Please stop producing xyz event 🙏 '

distinct_id,
ip,
if(startsWith(JSONExtractString(data, 'event') AS _event, '$'), _event, '') AS event,
length(data) AS data_length,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be fair, I just stole this from your original table DDL.

@posthog-bot posthog-bot removed the stale label May 21, 2024
Copy link
Contributor Author

@tkaemming tkaemming left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love the new @dataclass class structure for generating migrations. Much cleaner than what we had before with just string constants everywhere.

To be honest I just did this because the "all uppercase at module scope lambdas" approach gives me a headache. 😄 I don't love that the migrations have references back to the main codebase itself (versus rendered SQL statements that don't change over time) but that's a conversation for later.

token,
distinct_id,
ip,
if(startsWith(JSONExtractString(data, 'event') AS _event, '$'), _event, '') AS event,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense — probably easy enough to be permissive with this to start and be more restrictive later if things get out of control.

I was going to say that also means event shouldn't be LowCardinality … but it already wasn't, so I guess that doesn't require any changes after all. 🙃

Removed this with f8d9671.

distinct_id,
ip,
if(startsWith(JSONExtractString(data, 'event') AS _event, '$'), _event, '') AS event,
length(data) AS data_length,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be fair, I just stole this from your original table DDL.

)
ENGINE = {engine}
PARTITION BY (topic, toStartOfDay(timestamp))
ORDER BY (topic, partition, offset)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

I'd prefer to keep the naming consistent if we can across different parts of the system (rather than having the same cluster ending up being named different things in different contexts) so I'll punt on this for now to avoid the potential of introducing inconsistency.

offset UInt64,
token String,
distinct_id String,
ip String,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated this with 3bdfe63, had hoped to use with a Variant type but it doesn't look like we're there yet.



class PartitionStatsV2Table:
table_name: str = "events_plugin_ingestion_partition_statistics_v2"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to leave this as-is for now just to avoid introducing additional names/concepts even if this one isn't quite right anymore.

ip String,
event String,
data_length UInt64,
timestamp DateTime
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gorilla is XOR based and the docs say "The smaller the difference between consecutive values is, i.e. the slower the values of the series changes, the better the compression rate" which makes me think it might make sense here (these values will change slowly but don't have a constant stride.)

T64 also looks interesting: https://clickhouse.com/docs/en/sql-reference/statements/create/table#t64

I'll leave this and the other columns from #22163 (comment) alone for now — we can play with them later to see their impact.

@tkaemming tkaemming merged commit 6e13f17 into master May 23, 2024
77 checks passed
@tkaemming tkaemming deleted the partition-stats-simple branch May 23, 2024 15:17
xrdt pushed a commit that referenced this pull request May 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants