From 24d3f8973d381131fd975cd37691c0c9d87da907 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 18 Mar 2024 12:01:24 -0700 Subject: [PATCH] only skip produce --- relay-dynamic-config/src/metrics.rs | 7 +- relay-kafka/src/config.rs | 9 +- relay-server/src/services/store.rs | 198 +--------------------------- 3 files changed, 6 insertions(+), 208 deletions(-) diff --git a/relay-dynamic-config/src/metrics.rs b/relay-dynamic-config/src/metrics.rs index f566403497..4f8490e5f1 100644 --- a/relay-dynamic-config/src/metrics.rs +++ b/relay-dynamic-config/src/metrics.rs @@ -78,9 +78,6 @@ pub struct SessionMetricsConfig { /// /// Version `0` (default) disables extraction. version: u16, - - /// Drop sessions after successfully extracting metrics. - drop: bool, } impl SessionMetricsConfig { @@ -99,9 +96,9 @@ impl SessionMetricsConfig { self.version >= EXTRACT_ABNORMAL_MECHANISM_VERSION } - /// Returns `true` if the session should be dropped after extracting metrics. + /// Session is dropped after extracting metrics. pub fn should_drop(&self) -> bool { - self.drop + true } } diff --git a/relay-kafka/src/config.rs b/relay-kafka/src/config.rs index 5feb3353d7..644361bce2 100644 --- a/relay-kafka/src/config.rs +++ b/relay-kafka/src/config.rs @@ -35,8 +35,6 @@ pub enum KafkaTopic { Outcomes, /// Override for billing critical outcomes. OutcomesBilling, - /// Session health updates. - Sessions, /// Any metric that is extracted from sessions. MetricsSessions, /// Generic metrics topic, excluding sessions (release health). @@ -62,13 +60,12 @@ impl KafkaTopic { /// It will have to be adjusted if the new variants are added. pub fn iter() -> std::slice::Iter<'static, Self> { use KafkaTopic::*; - static TOPICS: [KafkaTopic; 15] = [ + static TOPICS: [KafkaTopic; 14] = [ Events, Attachments, Transactions, Outcomes, OutcomesBilling, - Sessions, MetricsSessions, MetricsGeneric, Profiles, @@ -97,8 +94,6 @@ pub struct TopicAssignments { pub outcomes: TopicAssignment, /// Outcomes topic name for billing critical outcomes. Defaults to the assignment of `outcomes`. pub outcomes_billing: Option, - /// Session health topic name. - pub sessions: TopicAssignment, /// Default topic name for all aggregate metrics. Specialized topics for session-based and /// generic metrics can be configured via `metrics_sessions` and `metrics_generic` each. pub metrics: TopicAssignment, @@ -133,7 +128,6 @@ impl TopicAssignments { KafkaTopic::Transactions => &self.transactions, KafkaTopic::Outcomes => &self.outcomes, KafkaTopic::OutcomesBilling => self.outcomes_billing.as_ref().unwrap_or(&self.outcomes), - KafkaTopic::Sessions => &self.sessions, KafkaTopic::MetricsSessions => self.metrics_sessions.as_ref().unwrap_or(&self.metrics), KafkaTopic::MetricsGeneric => &self.metrics_generic, KafkaTopic::Profiles => &self.profiles, @@ -155,7 +149,6 @@ impl Default for TopicAssignments { transactions: "ingest-transactions".to_owned().into(), outcomes: "outcomes".to_owned().into(), outcomes_billing: None, - sessions: "ingest-sessions".to_owned().into(), metrics: "ingest-metrics".to_owned().into(), metrics_sessions: None, metrics_generic: "ingest-performance-metrics".to_owned().into(), diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 364387989a..44e0e13ade 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -8,14 +8,11 @@ use std::sync::Arc; use std::time::Instant; use bytes::Bytes; -use once_cell::sync::OnceCell; use relay_base_schema::data_category::DataCategory; use relay_base_schema::project::ProjectId; use relay_common::time::{instant_to_date_time, UnixTimestamp}; use relay_config::Config; -use relay_event_schema::protocol::{ - self, EventId, SessionAggregates, SessionStatus, SessionUpdate, VALID_PLATFORMS, -}; +use relay_event_schema::protocol::{EventId, SessionStatus, VALID_PLATFORMS}; use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message}; use relay_metrics::{ @@ -37,9 +34,6 @@ use crate::services::processor::Processed; use crate::statsd::RelayCounters; use crate::utils::{self, ArrayEncoding, BucketEncoder, ExtractionMode, TypedEnvelope}; -/// The maximum number of individual session updates generated for each aggregate item. -const MAX_EXPLODED_SESSIONS: usize = 100; - /// Fallback name used for attachment items without a `filename` header. const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment"; @@ -53,15 +47,6 @@ pub enum StoreError { NoEventId, } -fn make_distinct_id(s: &str) -> Uuid { - static NAMESPACE: OnceCell = OnceCell::new(); - let namespace = - NAMESPACE.get_or_init(|| Uuid::new_v5(&Uuid::NAMESPACE_URL, b"https://sentry.io/#did")); - - s.parse() - .unwrap_or_else(|_| Uuid::new_v5(namespace, s.as_bytes())) -} - struct Producer { client: KafkaClient, } @@ -242,13 +227,7 @@ impl StoreService { )?; } ItemType::Session | ItemType::Sessions => { - self.produce_sessions( - scoping.organization_id, - scoping.project_id, - retention, - client, - item, - )?; + // Do nothing } ItemType::MetricBuckets => self.produce_metrics( scoping.organization_id, @@ -614,158 +593,6 @@ impl StoreService { self.produce(KafkaTopic::Attachments, organization_id, message) } - fn produce_sessions( - &self, - org_id: u64, - project_id: ProjectId, - event_retention: u16, - client: Option<&str>, - item: &Item, - ) -> Result<(), StoreError> { - match item.ty() { - ItemType::Session => { - let mut session = match SessionUpdate::parse(&item.payload()) { - Ok(session) => session, - Err(error) => { - relay_log::error!( - error = &error as &dyn std::error::Error, - "failed to store session" - ); - return Ok(()); - } - }; - - if session.status == SessionStatus::Errored { - // Individual updates should never have the status `errored` - session.status = SessionStatus::Exited; - } - self.produce_session_update(org_id, project_id, event_retention, client, session) - } - ItemType::Sessions => { - let aggregates = match SessionAggregates::parse(&item.payload()) { - Ok(aggregates) => aggregates, - Err(_) => return Ok(()), - }; - - self.produce_sessions_from_aggregate( - org_id, - project_id, - event_retention, - client, - aggregates, - ) - } - _ => Ok(()), - } - } - - fn produce_sessions_from_aggregate( - &self, - org_id: u64, - project_id: ProjectId, - event_retention: u16, - client: Option<&str>, - aggregates: SessionAggregates, - ) -> Result<(), StoreError> { - let SessionAggregates { - aggregates, - attributes, - } = aggregates; - let message = SessionKafkaMessage { - org_id, - project_id, - session_id: Uuid::nil(), - distinct_id: Uuid::nil(), - quantity: 1, - seq: 0, - received: protocol::datetime_to_timestamp(chrono::Utc::now()), - started: 0f64, - duration: None, - errors: 0, - release: attributes.release, - environment: attributes.environment, - sdk: client.map(str::to_owned), - retention_days: event_retention, - status: SessionStatus::Exited, - }; - - if aggregates.len() > MAX_EXPLODED_SESSIONS { - relay_log::warn!("aggregated session items exceed threshold"); - } - - for item in aggregates.into_iter().take(MAX_EXPLODED_SESSIONS) { - let mut message = message.clone(); - message.started = protocol::datetime_to_timestamp(item.started); - message.distinct_id = item - .distinct_id - .as_deref() - .map(make_distinct_id) - .unwrap_or_default(); - - if item.exited > 0 { - message.errors = 0; - message.quantity = item.exited; - self.send_session_message(org_id, message.clone())?; - } - if item.errored > 0 { - message.errors = 1; - message.status = SessionStatus::Errored; - message.quantity = item.errored; - self.send_session_message(org_id, message.clone())?; - } - if item.abnormal > 0 { - message.errors = 1; - message.status = SessionStatus::Abnormal; - message.quantity = item.abnormal; - self.send_session_message(org_id, message.clone())?; - } - if item.crashed > 0 { - message.errors = 1; - message.status = SessionStatus::Crashed; - message.quantity = item.crashed; - self.send_session_message(org_id, message)?; - } - } - Ok(()) - } - - fn produce_session_update( - &self, - org_id: u64, - project_id: ProjectId, - event_retention: u16, - client: Option<&str>, - session: SessionUpdate, - ) -> Result<(), StoreError> { - self.send_session_message( - org_id, - SessionKafkaMessage { - org_id, - project_id, - session_id: session.session_id, - distinct_id: session - .distinct_id - .as_deref() - .map(make_distinct_id) - .unwrap_or_default(), - quantity: 1, - seq: if session.init { 0 } else { session.sequence }, - received: protocol::datetime_to_timestamp(session.timestamp), - started: protocol::datetime_to_timestamp(session.started), - duration: session.duration, - status: session.status.clone(), - errors: session.errors.clamp( - (session.status == SessionStatus::Crashed) as _, - u16::MAX.into(), - ) as _, - release: session.attributes.release, - environment: session.attributes.environment, - sdk: client.map(str::to_owned), - retention_days: event_retention, - }, - ) - } - fn send_metric_message( &self, namespace: MetricNamespace, @@ -834,19 +661,6 @@ impl StoreService { Ok(()) } - fn send_session_message( - &self, - organization_id: u64, - message: SessionKafkaMessage, - ) -> Result<(), StoreError> { - self.produce( - KafkaTopic::Sessions, - organization_id, - KafkaMessage::Session(message), - )?; - Ok(()) - } - fn produce_profile( &self, organization_id: u64, @@ -1628,7 +1442,6 @@ enum KafkaMessage<'a> { Attachment(AttachmentKafkaMessage), AttachmentChunk(AttachmentChunkKafkaMessage), UserReport(UserReportKafkaMessage), - Session(SessionKafkaMessage), Metric { #[serde(skip)] headers: BTreeMap, @@ -1651,7 +1464,6 @@ impl Message for KafkaMessage<'_> { KafkaMessage::Attachment(_) => "attachment", KafkaMessage::AttachmentChunk(_) => "attachment_chunk", KafkaMessage::UserReport(_) => "user_report", - KafkaMessage::Session(_) => "session", KafkaMessage::Metric { .. } => "metric", KafkaMessage::Profile(_) => "profile", KafkaMessage::ReplayEvent(_) => "replay_event", @@ -1679,8 +1491,7 @@ impl Message for KafkaMessage<'_> { Self::CheckIn(message) => message.routing_key_hint.unwrap_or_else(Uuid::nil), // Random partitioning - Self::Session(_) - | Self::Profile(_) + Self::Profile(_) | Self::ReplayRecordingNotChunked(_) | Self::Span(_) | Self::MetricsSummary(_) @@ -1718,9 +1529,6 @@ impl Message for KafkaMessage<'_> { /// Serializes the message into its binary format. fn serialize(&self) -> Result, ClientError> { match self { - KafkaMessage::Session(message) => serde_json::to_vec(message) - .map(Cow::Owned) - .map_err(ClientError::InvalidJson), KafkaMessage::Metric { message, .. } => serde_json::to_vec(message) .map(Cow::Owned) .map_err(ClientError::InvalidJson),