New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ref: Kafka topic configuration #3282
Changes from 7 commits
5363ffb
0db7d1b
d98acb5
399d78f
897c360
8b7b3ce
fe939da
99e1b4f
bb3ce9f
6b324ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
# Changelog | ||
|
||
## Unreleased | ||
- Kafka topic config supports default topic names as keys ([#3282](https://github.com/getsentry/relay/pull/3282)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please move it to the |
||
|
||
**Internal**: | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -88,38 +88,49 @@ impl KafkaTopic { | |||||||
#[serde(default)] | ||||||||
pub struct TopicAssignments { | ||||||||
/// Simple events topic name. | ||||||||
#[serde(alias = "ingest-events")] | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. alias is supposed to be temporary, while we cut over config in ops. Later this should change to rename |
||||||||
pub events: TopicAssignment, | ||||||||
/// Events with attachments topic name. | ||||||||
#[serde(alias = "ingest-attachments")] | ||||||||
pub attachments: TopicAssignment, | ||||||||
/// Transaction events topic name. | ||||||||
#[serde(alias = "ingest-transactions")] | ||||||||
pub transactions: TopicAssignment, | ||||||||
/// Outcomes topic name. | ||||||||
#[serde(alias = "outcomes")] | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Field is already called outcomes. |
||||||||
pub outcomes: TopicAssignment, | ||||||||
/// Outcomes topic name for billing critical outcomes. Defaults to the assignment of `outcomes`. | ||||||||
#[serde(alias = "outcomes-billing")] | ||||||||
pub outcomes_billing: Option<TopicAssignment>, | ||||||||
/// Session health topic name. | ||||||||
#[serde(alias = "ingest-sessions")] | ||||||||
pub sessions: TopicAssignment, | ||||||||
/// Default topic name for all aggregate metrics. Specialized topics for session-based and | ||||||||
/// generic metrics can be configured via `metrics_sessions` and `metrics_generic` each. | ||||||||
pub metrics: TopicAssignment, | ||||||||
/// Topic name for metrics extracted from sessions. Defaults to the assignment of `metrics`. | ||||||||
pub metrics_sessions: Option<TopicAssignment>, | ||||||||
/// Topic name for metrics extracted from sessions, aka release health. | ||||||||
#[serde(alias = "metrics", alias = "ingest-metrics")] | ||||||||
pub metrics_sessions: TopicAssignment, | ||||||||
/// Topic name for all other kinds of metrics. Defaults to the assignment of `metrics`. | ||||||||
#[serde(alias = "metrics_transactions")] | ||||||||
#[serde(alias = "ingest-generic-metrics")] | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
pub metrics_generic: TopicAssignment, | ||||||||
/// Stacktrace topic name | ||||||||
pub profiles: TopicAssignment, | ||||||||
/// Replay Events topic name. | ||||||||
#[serde(alias = "ingest-replay-events")] | ||||||||
pub replay_events: TopicAssignment, | ||||||||
/// Recordings topic name. | ||||||||
#[serde(alias = "ingest-replay-recordings")] | ||||||||
pub replay_recordings: TopicAssignment, | ||||||||
/// Monitor check-ins. | ||||||||
#[serde(alias = "ingest-monitors")] | ||||||||
pub monitors: TopicAssignment, | ||||||||
/// Standalone spans without a transaction. | ||||||||
#[serde(alias = "snuba-spans")] | ||||||||
pub spans: TopicAssignment, | ||||||||
/// Summary for metrics collected during a span. | ||||||||
#[serde(alias = "snuba-metrics-summaries")] | ||||||||
pub metrics_summaries: TopicAssignment, | ||||||||
/// COGS measurements. | ||||||||
#[serde(alias = "shared-resources-usage")] | ||||||||
pub cogs: TopicAssignment, | ||||||||
} | ||||||||
|
||||||||
|
@@ -134,7 +145,7 @@ impl TopicAssignments { | |||||||
KafkaTopic::Outcomes => &self.outcomes, | ||||||||
KafkaTopic::OutcomesBilling => self.outcomes_billing.as_ref().unwrap_or(&self.outcomes), | ||||||||
KafkaTopic::Sessions => &self.sessions, | ||||||||
KafkaTopic::MetricsSessions => self.metrics_sessions.as_ref().unwrap_or(&self.metrics), | ||||||||
KafkaTopic::MetricsSessions => &self.metrics_sessions, | ||||||||
KafkaTopic::MetricsGeneric => &self.metrics_generic, | ||||||||
KafkaTopic::Profiles => &self.profiles, | ||||||||
KafkaTopic::ReplayEvents => &self.replay_events, | ||||||||
|
@@ -156,8 +167,7 @@ impl Default for TopicAssignments { | |||||||
outcomes: "outcomes".to_owned().into(), | ||||||||
outcomes_billing: None, | ||||||||
sessions: "ingest-sessions".to_owned().into(), | ||||||||
metrics: "ingest-metrics".to_owned().into(), | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I removed this because I think it's confusing / error prone. There shouldn't be a metrics default. Only 2 completely separate topics for either release health metrics and generic metrics. |
||||||||
metrics_sessions: None, | ||||||||
metrics_sessions: "ingest-metrics".to_owned().into(), | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now there is duplication here between in default and the "alias" values, which I would like to avoid. Not sure the best way to tackle this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that's fine. |
||||||||
metrics_generic: "ingest-performance-metrics".to_owned().into(), | ||||||||
profiles: "profiles".to_owned().into(), | ||||||||
replay_events: "ingest-replay-events".to_owned().into(), | ||||||||
|
@@ -344,11 +354,11 @@ mod tests { | |||||||
#[test] | ||||||||
fn test_kafka_config() { | ||||||||
let yaml = r#" | ||||||||
events: "ingest-events-kafka-topic" | ||||||||
ingest-events: "ingest-events-kafka-topic" | ||||||||
Dav1dde marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
profiles: | ||||||||
name: "ingest-profiles" | ||||||||
config: "profiles" | ||||||||
metrics: | ||||||||
ingest-metrics: | ||||||||
shards: 65000 | ||||||||
mapping: | ||||||||
0: | ||||||||
|
@@ -360,6 +370,7 @@ metrics: | |||||||
45000: | ||||||||
name: "ingest-metrics-3" | ||||||||
config: "metrics_3" | ||||||||
transactions: "ingest-transactions-kafka-topic" | ||||||||
"#; | ||||||||
|
||||||||
let def_config = vec![KafkaConfigParam { | ||||||||
|
@@ -398,7 +409,8 @@ metrics: | |||||||
let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap(); | ||||||||
let events = topics.events; | ||||||||
let profiles = topics.profiles; | ||||||||
let metrics = topics.metrics; | ||||||||
let metrics = topics.metrics_sessions; | ||||||||
let transactions = topics.transactions; | ||||||||
|
||||||||
assert!(matches!(events, TopicAssignment::Primary(_))); | ||||||||
assert!(matches!(profiles, TopicAssignment::Secondary { .. })); | ||||||||
|
@@ -412,7 +424,29 @@ metrics: | |||||||
let events_config = events | ||||||||
.kafka_config(&def_config, &second_config) | ||||||||
.expect("Kafka config for events topic"); | ||||||||
assert!(matches!(events_config, KafkaConfig::Single { .. })); | ||||||||
assert!(matches!( | ||||||||
events_config, | ||||||||
KafkaConfig::Single { | ||||||||
params: KafkaParams { | ||||||||
topic_name: "ingest-events-kafka-topic", | ||||||||
.. | ||||||||
} | ||||||||
} | ||||||||
)); | ||||||||
|
||||||||
// Legacy keys are still supported | ||||||||
let transactions_config = transactions | ||||||||
.kafka_config(&def_config, &second_config) | ||||||||
.expect("Kafka config for transactions topic"); | ||||||||
assert!(matches!( | ||||||||
transactions_config, | ||||||||
KafkaConfig::Single { | ||||||||
params: KafkaParams { | ||||||||
topic_name: "ingest-transactions-kafka-topic", | ||||||||
.. | ||||||||
} | ||||||||
} | ||||||||
)); | ||||||||
|
||||||||
let (shards, mapping) = | ||||||||
if let TopicAssignment::Sharded(Sharded { shards, mapping }) = metrics { | ||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, is this normally generated by hand?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, usually done by hand, but many PRs without impact have no changelog entry (this one should)