From 24d3f8973d381131fd975cd37691c0c9d87da907 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 18 Mar 2024 12:01:24 -0700 Subject: [PATCH 01/13] only skip produce --- relay-dynamic-config/src/metrics.rs | 7 +- relay-kafka/src/config.rs | 9 +- relay-server/src/services/store.rs | 198 +--------------------------- 3 files changed, 6 insertions(+), 208 deletions(-) diff --git a/relay-dynamic-config/src/metrics.rs b/relay-dynamic-config/src/metrics.rs index f566403497..4f8490e5f1 100644 --- a/relay-dynamic-config/src/metrics.rs +++ b/relay-dynamic-config/src/metrics.rs @@ -78,9 +78,6 @@ pub struct SessionMetricsConfig { /// /// Version `0` (default) disables extraction. version: u16, - - /// Drop sessions after successfully extracting metrics. - drop: bool, } impl SessionMetricsConfig { @@ -99,9 +96,9 @@ impl SessionMetricsConfig { self.version >= EXTRACT_ABNORMAL_MECHANISM_VERSION } - /// Returns `true` if the session should be dropped after extracting metrics. + /// Session is dropped after extracting metrics. pub fn should_drop(&self) -> bool { - self.drop + true } } diff --git a/relay-kafka/src/config.rs b/relay-kafka/src/config.rs index 5feb3353d7..644361bce2 100644 --- a/relay-kafka/src/config.rs +++ b/relay-kafka/src/config.rs @@ -35,8 +35,6 @@ pub enum KafkaTopic { Outcomes, /// Override for billing critical outcomes. OutcomesBilling, - /// Session health updates. - Sessions, /// Any metric that is extracted from sessions. MetricsSessions, /// Generic metrics topic, excluding sessions (release health). @@ -62,13 +60,12 @@ impl KafkaTopic { /// It will have to be adjusted if the new variants are added. pub fn iter() -> std::slice::Iter<'static, Self> { use KafkaTopic::*; - static TOPICS: [KafkaTopic; 15] = [ + static TOPICS: [KafkaTopic; 14] = [ Events, Attachments, Transactions, Outcomes, OutcomesBilling, - Sessions, MetricsSessions, MetricsGeneric, Profiles, @@ -97,8 +94,6 @@ pub struct TopicAssignments { pub outcomes: TopicAssignment, /// Outcomes topic name for billing critical outcomes. Defaults to the assignment of `outcomes`. pub outcomes_billing: Option, - /// Session health topic name. - pub sessions: TopicAssignment, /// Default topic name for all aggregate metrics. Specialized topics for session-based and /// generic metrics can be configured via `metrics_sessions` and `metrics_generic` each. pub metrics: TopicAssignment, @@ -133,7 +128,6 @@ impl TopicAssignments { KafkaTopic::Transactions => &self.transactions, KafkaTopic::Outcomes => &self.outcomes, KafkaTopic::OutcomesBilling => self.outcomes_billing.as_ref().unwrap_or(&self.outcomes), - KafkaTopic::Sessions => &self.sessions, KafkaTopic::MetricsSessions => self.metrics_sessions.as_ref().unwrap_or(&self.metrics), KafkaTopic::MetricsGeneric => &self.metrics_generic, KafkaTopic::Profiles => &self.profiles, @@ -155,7 +149,6 @@ impl Default for TopicAssignments { transactions: "ingest-transactions".to_owned().into(), outcomes: "outcomes".to_owned().into(), outcomes_billing: None, - sessions: "ingest-sessions".to_owned().into(), metrics: "ingest-metrics".to_owned().into(), metrics_sessions: None, metrics_generic: "ingest-performance-metrics".to_owned().into(), diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 364387989a..44e0e13ade 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -8,14 +8,11 @@ use std::sync::Arc; use std::time::Instant; use bytes::Bytes; -use once_cell::sync::OnceCell; use relay_base_schema::data_category::DataCategory; use relay_base_schema::project::ProjectId; use relay_common::time::{instant_to_date_time, UnixTimestamp}; use relay_config::Config; -use relay_event_schema::protocol::{ - self, EventId, SessionAggregates, SessionStatus, SessionUpdate, VALID_PLATFORMS, -}; +use relay_event_schema::protocol::{EventId, SessionStatus, VALID_PLATFORMS}; use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message}; use relay_metrics::{ @@ -37,9 +34,6 @@ use crate::services::processor::Processed; use crate::statsd::RelayCounters; use crate::utils::{self, ArrayEncoding, BucketEncoder, ExtractionMode, TypedEnvelope}; -/// The maximum number of individual session updates generated for each aggregate item. -const MAX_EXPLODED_SESSIONS: usize = 100; - /// Fallback name used for attachment items without a `filename` header. const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment"; @@ -53,15 +47,6 @@ pub enum StoreError { NoEventId, } -fn make_distinct_id(s: &str) -> Uuid { - static NAMESPACE: OnceCell = OnceCell::new(); - let namespace = - NAMESPACE.get_or_init(|| Uuid::new_v5(&Uuid::NAMESPACE_URL, b"https://sentry.io/#did")); - - s.parse() - .unwrap_or_else(|_| Uuid::new_v5(namespace, s.as_bytes())) -} - struct Producer { client: KafkaClient, } @@ -242,13 +227,7 @@ impl StoreService { )?; } ItemType::Session | ItemType::Sessions => { - self.produce_sessions( - scoping.organization_id, - scoping.project_id, - retention, - client, - item, - )?; + // Do nothing } ItemType::MetricBuckets => self.produce_metrics( scoping.organization_id, @@ -614,158 +593,6 @@ impl StoreService { self.produce(KafkaTopic::Attachments, organization_id, message) } - fn produce_sessions( - &self, - org_id: u64, - project_id: ProjectId, - event_retention: u16, - client: Option<&str>, - item: &Item, - ) -> Result<(), StoreError> { - match item.ty() { - ItemType::Session => { - let mut session = match SessionUpdate::parse(&item.payload()) { - Ok(session) => session, - Err(error) => { - relay_log::error!( - error = &error as &dyn std::error::Error, - "failed to store session" - ); - return Ok(()); - } - }; - - if session.status == SessionStatus::Errored { - // Individual updates should never have the status `errored` - session.status = SessionStatus::Exited; - } - self.produce_session_update(org_id, project_id, event_retention, client, session) - } - ItemType::Sessions => { - let aggregates = match SessionAggregates::parse(&item.payload()) { - Ok(aggregates) => aggregates, - Err(_) => return Ok(()), - }; - - self.produce_sessions_from_aggregate( - org_id, - project_id, - event_retention, - client, - aggregates, - ) - } - _ => Ok(()), - } - } - - fn produce_sessions_from_aggregate( - &self, - org_id: u64, - project_id: ProjectId, - event_retention: u16, - client: Option<&str>, - aggregates: SessionAggregates, - ) -> Result<(), StoreError> { - let SessionAggregates { - aggregates, - attributes, - } = aggregates; - let message = SessionKafkaMessage { - org_id, - project_id, - session_id: Uuid::nil(), - distinct_id: Uuid::nil(), - quantity: 1, - seq: 0, - received: protocol::datetime_to_timestamp(chrono::Utc::now()), - started: 0f64, - duration: None, - errors: 0, - release: attributes.release, - environment: attributes.environment, - sdk: client.map(str::to_owned), - retention_days: event_retention, - status: SessionStatus::Exited, - }; - - if aggregates.len() > MAX_EXPLODED_SESSIONS { - relay_log::warn!("aggregated session items exceed threshold"); - } - - for item in aggregates.into_iter().take(MAX_EXPLODED_SESSIONS) { - let mut message = message.clone(); - message.started = protocol::datetime_to_timestamp(item.started); - message.distinct_id = item - .distinct_id - .as_deref() - .map(make_distinct_id) - .unwrap_or_default(); - - if item.exited > 0 { - message.errors = 0; - message.quantity = item.exited; - self.send_session_message(org_id, message.clone())?; - } - if item.errored > 0 { - message.errors = 1; - message.status = SessionStatus::Errored; - message.quantity = item.errored; - self.send_session_message(org_id, message.clone())?; - } - if item.abnormal > 0 { - message.errors = 1; - message.status = SessionStatus::Abnormal; - message.quantity = item.abnormal; - self.send_session_message(org_id, message.clone())?; - } - if item.crashed > 0 { - message.errors = 1; - message.status = SessionStatus::Crashed; - message.quantity = item.crashed; - self.send_session_message(org_id, message)?; - } - } - Ok(()) - } - - fn produce_session_update( - &self, - org_id: u64, - project_id: ProjectId, - event_retention: u16, - client: Option<&str>, - session: SessionUpdate, - ) -> Result<(), StoreError> { - self.send_session_message( - org_id, - SessionKafkaMessage { - org_id, - project_id, - session_id: session.session_id, - distinct_id: session - .distinct_id - .as_deref() - .map(make_distinct_id) - .unwrap_or_default(), - quantity: 1, - seq: if session.init { 0 } else { session.sequence }, - received: protocol::datetime_to_timestamp(session.timestamp), - started: protocol::datetime_to_timestamp(session.started), - duration: session.duration, - status: session.status.clone(), - errors: session.errors.clamp( - (session.status == SessionStatus::Crashed) as _, - u16::MAX.into(), - ) as _, - release: session.attributes.release, - environment: session.attributes.environment, - sdk: client.map(str::to_owned), - retention_days: event_retention, - }, - ) - } - fn send_metric_message( &self, namespace: MetricNamespace, @@ -834,19 +661,6 @@ impl StoreService { Ok(()) } - fn send_session_message( - &self, - organization_id: u64, - message: SessionKafkaMessage, - ) -> Result<(), StoreError> { - self.produce( - KafkaTopic::Sessions, - organization_id, - KafkaMessage::Session(message), - )?; - Ok(()) - } - fn produce_profile( &self, organization_id: u64, @@ -1628,7 +1442,6 @@ enum KafkaMessage<'a> { Attachment(AttachmentKafkaMessage), AttachmentChunk(AttachmentChunkKafkaMessage), UserReport(UserReportKafkaMessage), - Session(SessionKafkaMessage), Metric { #[serde(skip)] headers: BTreeMap, @@ -1651,7 +1464,6 @@ impl Message for KafkaMessage<'_> { KafkaMessage::Attachment(_) => "attachment", KafkaMessage::AttachmentChunk(_) => "attachment_chunk", KafkaMessage::UserReport(_) => "user_report", - KafkaMessage::Session(_) => "session", KafkaMessage::Metric { .. } => "metric", KafkaMessage::Profile(_) => "profile", KafkaMessage::ReplayEvent(_) => "replay_event", @@ -1679,8 +1491,7 @@ impl Message for KafkaMessage<'_> { Self::CheckIn(message) => message.routing_key_hint.unwrap_or_else(Uuid::nil), // Random partitioning - Self::Session(_) - | Self::Profile(_) + Self::Profile(_) | Self::ReplayRecordingNotChunked(_) | Self::Span(_) | Self::MetricsSummary(_) @@ -1718,9 +1529,6 @@ impl Message for KafkaMessage<'_> { /// Serializes the message into its binary format. fn serialize(&self) -> Result, ClientError> { match self { - KafkaMessage::Session(message) => serde_json::to_vec(message) - .map(Cow::Owned) - .map_err(ClientError::InvalidJson), KafkaMessage::Metric { message, .. } => serde_json::to_vec(message) .map(Cow::Owned) .map_err(ClientError::InvalidJson), From 0fb63c89ffb40d5b4111e413d6f0f575a6eaf97c Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 18 Mar 2024 14:44:54 -0700 Subject: [PATCH 02/13] update test --- tests/integration/test_session.py | 204 ++---------------------------- 1 file changed, 10 insertions(+), 194 deletions(-) diff --git a/tests/integration/test_session.py b/tests/integration/test_session.py index 126ff5be2b..cd20a19c51 100644 --- a/tests/integration/test_session.py +++ b/tests/integration/test_session.py @@ -70,25 +70,7 @@ def test_session_with_processing(mini_sentry, relay_with_processing, sessions_co }, ) - session = sessions_consumer.get_session() - assert session == { - "org_id": 1, - "project_id": project_id, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "distinct_id": "367e2499-2b45-586d-814f-778b60144e87", - "quantity": 1, - # seq is forced to 0 when init is true - "seq": 0, - "received": timestamp.timestamp(), - "started": started.timestamp(), - "duration": 1947.49, - "status": "exited", - "errors": 0, - "release": "sentry-test@1.0.0", - "environment": "production", - "retention_days": 90, - "sdk": "raven-node/2.6.3", - } + sessions_consumer.assert_empty() def test_session_with_processing_two_events( @@ -118,25 +100,8 @@ def test_session_with_processing_two_events( }, }, ) - session = sessions_consumer.get_session() - assert session == { - "org_id": 1, - "project_id": project_id, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "distinct_id": "367e2499-2b45-586d-814f-778b60144e87", - "quantity": 1, - # seq is forced to 0 when init is true - "seq": 0, - "received": timestamp.timestamp(), - "started": started.timestamp(), - "duration": None, - "status": "ok", - "errors": 0, - "release": "sentry-test@1.0.0", - "environment": "production", - "retention_days": 90, - "sdk": "raven-node/2.6.3", - } + + sessions_consumer.assert_empty() relay.send_session( project_id, @@ -154,24 +119,7 @@ def test_session_with_processing_two_events( }, }, ) - session = sessions_consumer.get_session() - assert session == { - "org_id": 1, - "project_id": project_id, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "distinct_id": "367e2499-2b45-586d-814f-778b60144e87", - "quantity": 1, - "seq": 43, - "received": timestamp.timestamp(), - "started": started.timestamp(), - "duration": 1947.49, - "status": "exited", - "errors": 0, - "release": "sentry-test@1.0.0", - "environment": "production", - "retention_days": 90, - "sdk": "raven-node/2.6.3", - } + sessions_consumer.assert_empty() def test_session_aggregates(mini_sentry, relay_with_processing, sessions_consumer): @@ -206,62 +154,7 @@ def test_session_aggregates(mini_sentry, relay_with_processing, sessions_consume }, ) - session = sessions_consumer.get_session() - del session["received"] - assert session == { - "org_id": 1, - "project_id": project_id, - "session_id": "00000000-0000-0000-0000-000000000000", - "distinct_id": "367e2499-2b45-586d-814f-778b60144e87", - "quantity": 2, - "seq": 0, - "started": started1.timestamp(), - "duration": None, - "status": "exited", - "errors": 0, - "release": "sentry-test@1.0.0", - "environment": "production", - "retention_days": 90, - "sdk": "raven-node/2.6.3", - } - - session = sessions_consumer.get_session() - del session["received"] - assert session == { - "org_id": 1, - "project_id": project_id, - "session_id": "00000000-0000-0000-0000-000000000000", - "distinct_id": "367e2499-2b45-586d-814f-778b60144e87", - "quantity": 3, - "seq": 0, - "started": started1.timestamp(), - "duration": None, - "status": "errored", - "errors": 1, - "release": "sentry-test@1.0.0", - "environment": "production", - "retention_days": 90, - "sdk": "raven-node/2.6.3", - } - - session = sessions_consumer.get_session() - del session["received"] - assert session == { - "org_id": 1, - "project_id": project_id, - "session_id": "00000000-0000-0000-0000-000000000000", - "distinct_id": "00000000-0000-0000-0000-000000000000", - "quantity": 1, - "seq": 0, - "started": started2.timestamp(), - "duration": None, - "status": "abnormal", - "errors": 1, - "release": "sentry-test@1.0.0", - "environment": "production", - "retention_days": 90, - "sdk": "raven-node/2.6.3", - } + sessions_consumer.assert_empty() def test_session_with_custom_retention( @@ -285,8 +178,7 @@ def test_session_with_custom_retention( }, ) - session = sessions_consumer.get_session() - assert session["retention_days"] == 17 + sessions_consumer.assert_empty() def test_session_age_discard(mini_sentry, relay_with_processing, sessions_consumer): @@ -371,26 +263,7 @@ def test_session_force_errors_on_crash( "attrs": {"release": "sentry-test@1.0.0", "environment": "production"}, }, ) - - session = sessions_consumer.get_session() - assert session == { - "org_id": 1, - "project_id": project_id, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "distinct_id": "367e2499-2b45-586d-814f-778b60144e87", - "quantity": 1, - # seq is forced to 0 when init is true - "seq": 0, - "received": timestamp.timestamp(), - "started": started.timestamp(), - "duration": None, - "status": "crashed", - "errors": 1, - "release": "sentry-test@1.0.0", - "environment": "production", - "retention_days": 90, - "sdk": "raven-node/2.6.3", - } + sessions_consumer.assert_empty() def test_session_release_required( @@ -481,14 +354,13 @@ def test_session_quotas(mini_sentry, relay_with_processing, sessions_consumer): for i in range(5): relay.send_session(project_id, session) - sessions_consumer.get_session() + sessions_consumer.assert_empty() # Rate limited, but responds with 200 because of deferred processing relay.send_session(project_id, session) sessions_consumer.assert_empty() - with pytest.raises(HTTPError): - relay.send_session(project_id, session) + relay.send_session(project_id, session) sessions_consumer.assert_empty() @@ -545,8 +417,7 @@ def test_session_auto_ip(mini_sentry, relay_with_processing, sessions_consumer): ) # Can't test ip_address since it's not posted to Kafka. Just test that it is accepted. - session = sessions_consumer.get_session() - assert session + sessions_consumer.assert_empty() def test_session_invalid_release(mini_sentry, relay_with_processing, sessions_consumer): @@ -598,58 +469,3 @@ def test_session_aggregates_invalid_release( ) sessions_consumer.assert_empty() - - -def test_session_invalid_environment( - mini_sentry, relay_with_processing, sessions_consumer -): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - PROJECT_ID = 42 - project_config = mini_sentry.add_full_project_config(PROJECT_ID) - project_config["config"]["eventRetention"] = 17 - - timestamp = datetime.now(tz=timezone.utc) - relay.send_session( - PROJECT_ID, - { - "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", - "timestamp": timestamp.isoformat(), - "started": timestamp.isoformat(), - "attrs": {"release": "sentry-test@1.0.0", "environment": "none"}, - }, - ) - - session = sessions_consumer.get_session() - assert session.get("environment") is None - - -def test_session_aggregates_invalid_environment( - mini_sentry, relay_with_processing, sessions_consumer -): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - project_id = 42 - project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"]["eventRetention"] = 17 - - timestamp = datetime.now(tz=timezone.utc) - relay.send_session_aggregates( - project_id, - { - "aggregates": [ - { - "started": timestamp.isoformat(), - "did": "foobarbaz", - "exited": 2, - "errored": 3, - }, - ], - "attrs": {"release": "sentry-test@1.0.0", "environment": "."}, - }, - ) - - session = sessions_consumer.get_session() - assert session.get("environment") is None From b8ef1c5f68088c798cb89ebfd1e78ff77d8a8291 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Tue, 19 Mar 2024 09:49:43 -0700 Subject: [PATCH 03/13] remove should_drop --- relay-dynamic-config/src/metrics.rs | 5 ----- relay-server/src/services/processor/session.rs | 4 ++-- tests/integration/test_session.py | 2 -- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/relay-dynamic-config/src/metrics.rs b/relay-dynamic-config/src/metrics.rs index 4f8490e5f1..310314f0d4 100644 --- a/relay-dynamic-config/src/metrics.rs +++ b/relay-dynamic-config/src/metrics.rs @@ -95,11 +95,6 @@ impl SessionMetricsConfig { pub fn should_extract_abnormal_mechanism(&self) -> bool { self.version >= EXTRACT_ABNORMAL_MECHANISM_VERSION } - - /// Session is dropped after extracting metrics. - pub fn should_drop(&self) -> bool { - true - } } /// Configuration for extracting custom measurements from transaction payloads. diff --git a/relay-server/src/services/processor/session.rs b/relay-server/src/services/processor/session.rs index ce764050ec..2a1c6717c7 100644 --- a/relay-server/src/services/processor/session.rs +++ b/relay-server/src/services/processor/session.rs @@ -220,7 +220,7 @@ fn process_session( } // Drop the session if metrics have been extracted in this or a prior Relay - if metrics_config.should_drop() && item.metrics_extracted() { + if item.metrics_extracted() { return false; } @@ -312,7 +312,7 @@ fn process_session_aggregates( } // Drop the aggregate if metrics have been extracted in this or a prior Relay - if metrics_config.should_drop() && item.metrics_extracted() { + if item.metrics_extracted() { return false; } diff --git a/tests/integration/test_session.py b/tests/integration/test_session.py index cd20a19c51..933bb4b8b2 100644 --- a/tests/integration/test_session.py +++ b/tests/integration/test_session.py @@ -1,7 +1,5 @@ from datetime import datetime, timedelta, timezone import json -import pytest -from requests.exceptions import HTTPError import uuid From 7054dba2bb3557d66d0edf77596b14057b27b28c Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Tue, 19 Mar 2024 10:13:39 -0700 Subject: [PATCH 04/13] cleanup --- relay-server/src/services/store.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 44e0e13ade..86dc79c5b5 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -226,9 +226,6 @@ impl StoreService { item, )?; } - ItemType::Session | ItemType::Sessions => { - // Do nothing - } ItemType::MetricBuckets => self.produce_metrics( scoping.organization_id, scoping.project_id, From 55fede5933aff09489a752001504bc68a22df345 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Tue, 19 Mar 2024 13:08:03 -0700 Subject: [PATCH 05/13] remove tests --- tests/integration/test_metrics.py | 200 ------------------------------ tests/integration/test_session.py | 49 -------- 2 files changed, 249 deletions(-) diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index 5294404734..6c180c15ca 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -634,121 +634,6 @@ def test_metrics_full(mini_sentry, relay, relay_with_processing, metrics_consume metrics_consumer.assert_empty() -@pytest.mark.parametrize( - "extract_metrics", [True, False], ids=["extract", "don't extract"] -) -@pytest.mark.parametrize( - "metrics_extracted", [True, False], ids=["extracted", "not extracted"] -) -def test_session_metrics_non_processing( - mini_sentry, relay, extract_metrics, metrics_extracted -): - """ - Tests metrics extraction in a non processing relay - - If and only if the metrics-extraction feature is enabled and the metrics from the session were not already - extracted the relay should extract the metrics from the session and mark the session item as "metrics extracted" - """ - - relay = relay(mini_sentry, options=TEST_CONFIG) - - if extract_metrics: - # enable metrics extraction for the project - extra_config = {"config": {"sessionMetrics": {"version": 1}}} - else: - extra_config = {} - - project_id = 42 - mini_sentry.add_basic_project_config(project_id, extra=extra_config) - - timestamp = datetime.now(tz=timezone.utc) - started = timestamp - timedelta(hours=1) - session_payload = _session_payload(timestamp=timestamp, started=started) - - relay.send_session( - project_id, - session_payload, - item_headers={"metrics_extracted": metrics_extracted}, - ) - - # Get session envelope - first_envelope = mini_sentry.captured_events.get(timeout=2) - - try: - second_envelope = mini_sentry.captured_events.get(timeout=2) - except Exception: - second_envelope = None - - assert first_envelope is not None - assert len(first_envelope.items) == 1 - first_item = first_envelope.items[0] - - if extract_metrics and not metrics_extracted: - # here we have not yet extracted metrics and metric extraction is enabled - # we expect to have two messages a session message and a metrics message - assert second_envelope is not None - assert len(second_envelope.items) == 1 - - second_item = second_envelope.items[0] - - if first_item.type == "session": - session_item = first_item - metrics_item = second_item - else: - session_item = second_item - metrics_item = first_item - - # check the metrics item - assert metrics_item.type == "metric_buckets" - - session_metrics = json.loads(metrics_item.get_bytes().decode()) - session_metrics = sorted(session_metrics, key=lambda x: x["name"]) - - ts = int(started.timestamp()) - assert session_metrics == [ - { - "name": "c:sessions/session@none", - "tags": { - "sdk": "raven-node/2.6.3", - "environment": "production", - "release": "sentry-test@1.0.0", - "session.status": "init", - }, - "timestamp": ts, - "width": 1, - "type": "c", - "value": 1.0, - }, - { - "name": "s:sessions/user@none", - "tags": { - "sdk": "raven-node/2.6.3", - "environment": "production", - "release": "sentry-test@1.0.0", - }, - "timestamp": ts, - "width": 1, - "type": "s", - "value": [1617781333], - }, - ] - else: - # either the metrics are already extracted or we have metric extraction disabled - # only the session message should be present - assert second_envelope is None - session_item = first_item - - assert session_item is not None - assert session_item.type == "session" - - # we have marked the item as "metrics extracted" properly - # already extracted metrics should keep the flag, newly extracted metrics should set the flag - assert ( - session_item.headers.get("metrics_extracted", False) is extract_metrics - or metrics_extracted - ) - - def test_session_metrics_extracted_only_once( mini_sentry, relay, relay_with_processing, metrics_consumer ): @@ -1733,91 +1618,6 @@ def test_custom_metrics_disabled(mini_sentry, relay_with_processing, metrics_con assert "c:custom/bar@second" not in metrics -@pytest.mark.parametrize( - "denied_tag", ["sdk", "release"], ids=["remove sdk tag", "remove release tag"] -) -@pytest.mark.parametrize( - "denied_names", ["*user*", ""], ids=["deny user", "no denied names"] -) -def test_block_metrics_and_tags(mini_sentry, relay, denied_names, denied_tag): - relay = relay(mini_sentry, options=TEST_CONFIG) - - extra_config = { - "config": { - "sessionMetrics": {"version": 1}, - "metrics": { - "deniedNames": [denied_names], - "deniedTags": [{"name": ["*"], "tags": [denied_tag]}], - }, - } - } - - project_id = 42 - mini_sentry.add_basic_project_config(project_id, extra=extra_config) - - timestamp = datetime.now(tz=timezone.utc) - started = timestamp - timedelta(hours=1) - session_payload = _session_payload(timestamp=timestamp, started=started) - - relay.send_session( - project_id, - session_payload, - ) - - envelope = mini_sentry.captured_events.get(timeout=2) - assert len(envelope.items) == 1 - first_item = envelope.items[0] - - second_envelope = mini_sentry.captured_events.get(timeout=2) - assert len(second_envelope.items) == 1 - second_item = second_envelope.items[0] - - if first_item.type == "session": - metrics_item = second_item - else: - metrics_item = first_item - - assert metrics_item.type == "metric_buckets" - - session_metrics = json.loads(metrics_item.get_bytes().decode()) - session_metrics = sorted(session_metrics, key=lambda x: x["name"]) - - if denied_names == "*user*": - assert len(session_metrics) == 1 - assert session_metrics[0]["name"] == "c:sessions/session@none" - elif denied_names == "": - assert len(session_metrics) == 2 - assert session_metrics[0]["name"] == "c:sessions/session@none" - assert session_metrics[1]["name"] == "s:sessions/user@none" - else: - assert False, "add new else-branch if you add another denied name" - - if denied_tag == "sdk": - assert session_metrics[0]["tags"] == { - "environment": "production", - "release": "sentry-test@1.0.0", - "session.status": "init", - } - if denied_names == "": - assert session_metrics[1]["tags"] == { - "environment": "production", - "release": "sentry-test@1.0.0", - } - elif denied_tag == "release": - assert session_metrics[0]["tags"] == { - "sdk": "raven-node/2.6.3", - "environment": "production", - "session.status": "init", - } - if denied_names == "": - assert session_metrics[1]["tags"] == { - "sdk": "raven-node/2.6.3", - "environment": "production", - } - else: - assert False, "add new else-branch if you add another denied tag" - - @pytest.mark.parametrize("is_processing_relay", (False, True)) @pytest.mark.parametrize( "global_generic_filters", diff --git a/tests/integration/test_session.py b/tests/integration/test_session.py index 933bb4b8b2..4af28dc7d7 100644 --- a/tests/integration/test_session.py +++ b/tests/integration/test_session.py @@ -71,55 +71,6 @@ def test_session_with_processing(mini_sentry, relay_with_processing, sessions_co sessions_consumer.assert_empty() -def test_session_with_processing_two_events( - mini_sentry, relay_with_processing, sessions_consumer -): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - timestamp = datetime.now(tz=timezone.utc) - started = timestamp - timedelta(hours=1) - - project_id = 42 - mini_sentry.add_full_project_config(project_id) - relay.send_session( - project_id, - { - "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", - "did": "foobarbaz", - "seq": 42, - "init": True, - "timestamp": timestamp.isoformat(), - "started": started.isoformat(), - "status": "ok", - "attrs": { - "release": "sentry-test@1.0.0", - "environment": "production", - }, - }, - ) - - sessions_consumer.assert_empty() - - relay.send_session( - project_id, - { - "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", - "did": "foobarbaz", - "seq": 43, - "timestamp": timestamp.isoformat(), - "started": started.isoformat(), - "duration": 1947.49, - "status": "exited", - "attrs": { - "release": "sentry-test@1.0.0", - "environment": "production", - }, - }, - ) - sessions_consumer.assert_empty() - - def test_session_aggregates(mini_sentry, relay_with_processing, sessions_consumer): relay = relay_with_processing() sessions_consumer = sessions_consumer() From 8999575688c990d6e9bfe2d3df9cc5ebb51202a1 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Tue, 19 Mar 2024 13:10:22 -0700 Subject: [PATCH 06/13] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2beae000fa..d4fe202c57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- Stop producing to sessions topic, the feature is now fully migrated to metrics + **Features**: - Extend GPU context with data for Unreal Engine crash reports. ([#3144](https://github.com/getsentry/relay/pull/3144)) From 42113578732ea3c5e32643f7f5e3fd1f5633bf2a Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Wed, 20 Mar 2024 15:52:25 -0700 Subject: [PATCH 07/13] fix bad merge --- relay-kafka/src/config.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/relay-kafka/src/config.rs b/relay-kafka/src/config.rs index 325df8016e..afe6a05cdc 100644 --- a/relay-kafka/src/config.rs +++ b/relay-kafka/src/config.rs @@ -101,6 +101,8 @@ pub struct TopicAssignments { /// Topic name for metrics extracted from sessions, aka release health. #[serde(alias = "metrics", alias = "ingest-metrics")] pub metrics_sessions: TopicAssignment, + /// Topic name for all other kinds of metrics. Defaults to the assignment of `metrics`. + #[serde(alias = "metrics_transactions", alias = "ingest-generic-metrics")] pub metrics_generic: TopicAssignment, /// Stacktrace topic name pub profiles: TopicAssignment, From aa9c851fb6dcd5643384039dde37203d3b80d585 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 21 Mar 2024 14:25:24 -0700 Subject: [PATCH 08/13] just delete the failing test --- tests/integration/test_session.py | 41 ------------------------------- 1 file changed, 41 deletions(-) diff --git a/tests/integration/test_session.py b/tests/integration/test_session.py index 4af28dc7d7..69ca9cf7df 100644 --- a/tests/integration/test_session.py +++ b/tests/integration/test_session.py @@ -272,47 +272,6 @@ def test_session_aggregates_release_required( sessions_consumer.assert_empty() -def test_session_quotas(mini_sentry, relay_with_processing, sessions_consumer): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - project_id = 42 - project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"]["eventRetention"] = 17 - project_config["config"]["quotas"] = [ - { - "id": f"test_rate_limiting_{uuid.uuid4().hex}", - "categories": ["session"], - "scope": "key", - "scopeId": str(project_config["publicKeys"][0]["numericId"]), - "window": 3600, - "limit": 5, - "reasonCode": "sessions_exceeded", - } - ] - - timestamp = datetime.now(tz=timezone.utc) - started = timestamp - timedelta(hours=1) - - session = { - "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", - "timestamp": timestamp.isoformat(), - "started": started.isoformat(), - "attrs": {"release": "sentry-test@1.0.0"}, - } - - for i in range(5): - relay.send_session(project_id, session) - sessions_consumer.assert_empty() - - # Rate limited, but responds with 200 because of deferred processing - relay.send_session(project_id, session) - sessions_consumer.assert_empty() - - relay.send_session(project_id, session) - sessions_consumer.assert_empty() - - def test_session_disabled(mini_sentry, relay_with_processing, sessions_consumer): relay = relay_with_processing() sessions_consumer = sessions_consumer() From f8b44d5087b95fef5305d8a877a9c65a6bec10ee Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 21 Mar 2024 14:54:51 -0700 Subject: [PATCH 09/13] lint --- tests/integration/test_session.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/test_session.py b/tests/integration/test_session.py index 69ca9cf7df..1fbd9cdd12 100644 --- a/tests/integration/test_session.py +++ b/tests/integration/test_session.py @@ -1,6 +1,5 @@ from datetime import datetime, timedelta, timezone import json -import uuid def test_sessions(mini_sentry, relay_chain): From 16c506243e3f9f3aec4b6bc22a99654275320f22 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Fri, 22 Mar 2024 12:46:37 -0700 Subject: [PATCH 10/13] . --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 65ee105170..73be6a8d85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ - Implement volume metric stats. ([#3281](https://github.com/getsentry/relay/pull/3281)) - Scrub transactions before enforcing quotas. ([#3248](https://github.com/getsentry/relay/pull/3248)) - Kafka topic config supports default topic names as keys. ([#3282](https://github.com/getsentry/relay/pull/3282)) -- Stop producing to sessions topic, the feature is now fully migrated to metrics ([#3271](https://github.com/getsentry/relay/pull/3271)) +- Stop producing to sessions topic, the feature is now fully migrated to metrics. ([#3271](https://github.com/getsentry/relay/pull/3271)) ## 24.3.0 From 9fe7a2fb2675a3d11808d02c3e50c0cca8212679 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Fri, 22 Mar 2024 12:47:03 -0700 Subject: [PATCH 11/13] lint --- relay-server/src/services/store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index e5a266d156..b11fe38de1 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -4,7 +4,7 @@ use std::borrow::Cow; use std::collections::BTreeMap; use std::error::Error; -use std::sync::{Arc, OnceLock}; +use std::sync::Arc; use std::time::Instant; use bytes::Bytes; From acaefcc71492167e8682157ed43dce8934b461bb Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 25 Mar 2024 10:19:57 -0700 Subject: [PATCH 12/13] remove sessions test --- tests/integration/test_session.py | 113 ------------------------------ 1 file changed, 113 deletions(-) diff --git a/tests/integration/test_session.py b/tests/integration/test_session.py index 1fbd9cdd12..ef745e3de7 100644 --- a/tests/integration/test_session.py +++ b/tests/integration/test_session.py @@ -39,96 +39,6 @@ def test_sessions(mini_sentry, relay_chain): assert session == session_payload -def test_session_with_processing(mini_sentry, relay_with_processing, sessions_consumer): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - timestamp = datetime.now(tz=timezone.utc) - started = timestamp - timedelta(hours=1) - - project_id = 42 - mini_sentry.add_full_project_config(project_id) - relay.send_session( - project_id, - { - "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", - "did": "foobarbaz", - "seq": 42, - "init": True, - "timestamp": timestamp.isoformat(), - "started": started.isoformat(), - "duration": 1947.49, - "status": "exited", - "errors": 0, - "attrs": { - "release": "sentry-test@1.0.0", - "environment": "production", - }, - }, - ) - - sessions_consumer.assert_empty() - - -def test_session_aggregates(mini_sentry, relay_with_processing, sessions_consumer): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - timestamp = datetime.now(tz=timezone.utc) - started1 = timestamp - timedelta(hours=1) - started2 = started1 - timedelta(hours=1) - - project_id = 42 - mini_sentry.add_full_project_config(project_id) - relay.send_session_aggregates( - project_id, - { - "aggregates": [ - { - "started": started1.isoformat(), - "did": "foobarbaz", - "exited": 2, - "errored": 3, - }, - { - "started": started2.isoformat(), - "abnormal": 1, - }, - ], - "attrs": { - "release": "sentry-test@1.0.0", - "environment": "production", - }, - }, - ) - - sessions_consumer.assert_empty() - - -def test_session_with_custom_retention( - mini_sentry, relay_with_processing, sessions_consumer -): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - project_id = 42 - project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"]["eventRetention"] = 17 - - timestamp = datetime.now(tz=timezone.utc) - relay.send_session( - project_id, - { - "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", - "timestamp": timestamp.isoformat(), - "started": timestamp.isoformat(), - "attrs": {"release": "sentry-test@1.0.0"}, - }, - ) - - sessions_consumer.assert_empty() - - def test_session_age_discard(mini_sentry, relay_with_processing, sessions_consumer): relay = relay_with_processing() sessions_consumer = sessions_consumer() @@ -304,29 +214,6 @@ def test_session_disabled(mini_sentry, relay_with_processing, sessions_consumer) sessions_consumer.assert_empty() -def test_session_auto_ip(mini_sentry, relay_with_processing, sessions_consumer): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - project_id = 42 - project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"]["eventRetention"] = 17 - - timestamp = datetime.now(tz=timezone.utc) - relay.send_session( - project_id, - { - "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", - "timestamp": timestamp.isoformat(), - "started": timestamp.isoformat(), - "attrs": {"release": "sentry-test@1.0.0", "ip_address": "{{auto}}"}, - }, - ) - - # Can't test ip_address since it's not posted to Kafka. Just test that it is accepted. - sessions_consumer.assert_empty() - - def test_session_invalid_release(mini_sentry, relay_with_processing, sessions_consumer): relay = relay_with_processing() sessions_consumer = sessions_consumer() From 682f4f7c9121835aa2eaf7176706761cdea56ca8 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 25 Mar 2024 10:22:41 -0700 Subject: [PATCH 13/13] . --- tests/integration/test_session.py | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/tests/integration/test_session.py b/tests/integration/test_session.py index ef745e3de7..3334793791 100644 --- a/tests/integration/test_session.py +++ b/tests/integration/test_session.py @@ -97,33 +97,6 @@ def test_session_age_discard_aggregates( sessions_consumer.assert_empty() -def test_session_force_errors_on_crash( - mini_sentry, relay_with_processing, sessions_consumer -): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - timestamp = datetime.now(tz=timezone.utc) - started = timestamp - timedelta(hours=1) - - project_id = 42 - mini_sentry.add_full_project_config(project_id) - relay.send_session( - project_id, - { - "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", - "did": "foobarbaz", - "seq": 42, - "init": True, - "timestamp": timestamp.isoformat(), - "started": started.isoformat(), - "status": "crashed", - "attrs": {"release": "sentry-test@1.0.0", "environment": "production"}, - }, - ) - sessions_consumer.assert_empty() - - def test_session_release_required( mini_sentry, relay_with_processing, sessions_consumer ):