Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: Kafka topic configuration #3282

Merged
merged 10 commits into from Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
@@ -1,6 +1,7 @@
# Changelog

## Unreleased
- Kafka topic config supports default topic names as keys ([#3282](https://github.com/getsentry/relay/pull/3282))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Kafka topic config supports default topic names as keys ([#3282](https://github.com/getsentry/relay/pull/3282))
- Kafka topic config supports default topic names as keys. ([#3282](https://github.com/getsentry/relay/pull/3282))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, is this normally generated by hand?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, usually done by hand, but many PRs without impact have no changelog entry (this one should)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move it to the Internal section.


**Internal**:

Expand Down
58 changes: 46 additions & 12 deletions relay-kafka/src/config.rs
Expand Up @@ -88,38 +88,49 @@ impl KafkaTopic {
#[serde(default)]
pub struct TopicAssignments {
/// Simple events topic name.
#[serde(alias = "ingest-events")]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alias is supposed to be temporary, while we cut over config in ops. Later this should change to rename

pub events: TopicAssignment,
/// Events with attachments topic name.
#[serde(alias = "ingest-attachments")]
pub attachments: TopicAssignment,
/// Transaction events topic name.
#[serde(alias = "ingest-transactions")]
pub transactions: TopicAssignment,
/// Outcomes topic name.
#[serde(alias = "outcomes")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Field is already called outcomes.

pub outcomes: TopicAssignment,
/// Outcomes topic name for billing critical outcomes. Defaults to the assignment of `outcomes`.
#[serde(alias = "outcomes-billing")]
pub outcomes_billing: Option<TopicAssignment>,
/// Session health topic name.
#[serde(alias = "ingest-sessions")]
pub sessions: TopicAssignment,
/// Default topic name for all aggregate metrics. Specialized topics for session-based and
/// generic metrics can be configured via `metrics_sessions` and `metrics_generic` each.
pub metrics: TopicAssignment,
/// Topic name for metrics extracted from sessions. Defaults to the assignment of `metrics`.
pub metrics_sessions: Option<TopicAssignment>,
/// Topic name for metrics extracted from sessions, aka release health.
#[serde(alias = "metrics", alias = "ingest-metrics")]
pub metrics_sessions: TopicAssignment,
/// Topic name for all other kinds of metrics. Defaults to the assignment of `metrics`.
#[serde(alias = "metrics_transactions")]
#[serde(alias = "ingest-generic-metrics")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[serde(alias = "metrics_transactions")]
#[serde(alias = "ingest-generic-metrics")]
#[serde(alias = "metrics_transactions", alias = "ingest-generic-metrics")]

pub metrics_generic: TopicAssignment,
/// Stacktrace topic name
pub profiles: TopicAssignment,
/// Replay Events topic name.
#[serde(alias = "ingest-replay-events")]
pub replay_events: TopicAssignment,
/// Recordings topic name.
#[serde(alias = "ingest-replay-recordings")]
pub replay_recordings: TopicAssignment,
/// Monitor check-ins.
#[serde(alias = "ingest-monitors")]
pub monitors: TopicAssignment,
/// Standalone spans without a transaction.
#[serde(alias = "snuba-spans")]
pub spans: TopicAssignment,
/// Summary for metrics collected during a span.
#[serde(alias = "snuba-metrics-summaries")]
pub metrics_summaries: TopicAssignment,
/// COGS measurements.
#[serde(alias = "shared-resources-usage")]
pub cogs: TopicAssignment,
}

Expand All @@ -134,7 +145,7 @@ impl TopicAssignments {
KafkaTopic::Outcomes => &self.outcomes,
KafkaTopic::OutcomesBilling => self.outcomes_billing.as_ref().unwrap_or(&self.outcomes),
KafkaTopic::Sessions => &self.sessions,
KafkaTopic::MetricsSessions => self.metrics_sessions.as_ref().unwrap_or(&self.metrics),
KafkaTopic::MetricsSessions => &self.metrics_sessions,
KafkaTopic::MetricsGeneric => &self.metrics_generic,
KafkaTopic::Profiles => &self.profiles,
KafkaTopic::ReplayEvents => &self.replay_events,
Expand All @@ -156,8 +167,7 @@ impl Default for TopicAssignments {
outcomes: "outcomes".to_owned().into(),
outcomes_billing: None,
sessions: "ingest-sessions".to_owned().into(),
metrics: "ingest-metrics".to_owned().into(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this because I think it's confusing / error prone. There shouldn't be a metrics default. Only 2 completely separate topics for either release health metrics and generic metrics.

metrics_sessions: None,
metrics_sessions: "ingest-metrics".to_owned().into(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now there is duplication here between in default and the "alias" values, which I would like to avoid. Not sure the best way to tackle this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's fine.

metrics_generic: "ingest-performance-metrics".to_owned().into(),
profiles: "profiles".to_owned().into(),
replay_events: "ingest-replay-events".to_owned().into(),
Expand Down Expand Up @@ -344,11 +354,11 @@ mod tests {
#[test]
fn test_kafka_config() {
let yaml = r#"
events: "ingest-events-kafka-topic"
ingest-events: "ingest-events-kafka-topic"
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
profiles:
name: "ingest-profiles"
config: "profiles"
metrics:
ingest-metrics:
shards: 65000
mapping:
0:
Expand All @@ -360,6 +370,7 @@ metrics:
45000:
name: "ingest-metrics-3"
config: "metrics_3"
transactions: "ingest-transactions-kafka-topic"
"#;

let def_config = vec![KafkaConfigParam {
Expand Down Expand Up @@ -398,7 +409,8 @@ metrics:
let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
let events = topics.events;
let profiles = topics.profiles;
let metrics = topics.metrics;
let metrics = topics.metrics_sessions;
let transactions = topics.transactions;

assert!(matches!(events, TopicAssignment::Primary(_)));
assert!(matches!(profiles, TopicAssignment::Secondary { .. }));
Expand All @@ -412,7 +424,29 @@ metrics:
let events_config = events
.kafka_config(&def_config, &second_config)
.expect("Kafka config for events topic");
assert!(matches!(events_config, KafkaConfig::Single { .. }));
assert!(matches!(
events_config,
KafkaConfig::Single {
params: KafkaParams {
topic_name: "ingest-events-kafka-topic",
..
}
}
));

// Legacy keys are still supported
let transactions_config = transactions
.kafka_config(&def_config, &second_config)
.expect("Kafka config for transactions topic");
assert!(matches!(
transactions_config,
KafkaConfig::Single {
params: KafkaParams {
topic_name: "ingest-transactions-kafka-topic",
..
}
}
));

let (shards, mapping) =
if let TopicAssignment::Sharded(Sharded { shards, mapping }) = metrics {
Expand Down