diff --git a/CHANGELOG.md b/CHANGELOG.md index dbf4eb0d7a..94ed974971 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ - Scrub transactions before enforcing quotas. ([#3248](https://github.com/getsentry/relay/pull/3248)) - Kafka topic config supports default topic names as keys. ([#3282](https://github.com/getsentry/relay/pull/3282)) - Set all span tags on the transaction span. ([#3310](https://github.com/getsentry/relay/pull/3310)) +- Stop producing to sessions topic, the feature is now fully migrated to metrics. ([#3271](https://github.com/getsentry/relay/pull/3271)) ## 24.3.0 diff --git a/relay-dynamic-config/src/metrics.rs b/relay-dynamic-config/src/metrics.rs index f566403497..310314f0d4 100644 --- a/relay-dynamic-config/src/metrics.rs +++ b/relay-dynamic-config/src/metrics.rs @@ -78,9 +78,6 @@ pub struct SessionMetricsConfig { /// /// Version `0` (default) disables extraction. version: u16, - - /// Drop sessions after successfully extracting metrics. - drop: bool, } impl SessionMetricsConfig { @@ -98,11 +95,6 @@ impl SessionMetricsConfig { pub fn should_extract_abnormal_mechanism(&self) -> bool { self.version >= EXTRACT_ABNORMAL_MECHANISM_VERSION } - - /// Returns `true` if the session should be dropped after extracting metrics. - pub fn should_drop(&self) -> bool { - self.drop - } } /// Configuration for extracting custom measurements from transaction payloads. diff --git a/relay-kafka/src/config.rs b/relay-kafka/src/config.rs index 23bec97d57..afe6a05cdc 100644 --- a/relay-kafka/src/config.rs +++ b/relay-kafka/src/config.rs @@ -35,8 +35,6 @@ pub enum KafkaTopic { Outcomes, /// Override for billing critical outcomes. OutcomesBilling, - /// Session health updates. - Sessions, /// Any metric that is extracted from sessions. MetricsSessions, /// Generic metrics topic, excluding sessions (release health). @@ -62,13 +60,12 @@ impl KafkaTopic { /// It will have to be adjusted if the new variants are added. pub fn iter() -> std::slice::Iter<'static, Self> { use KafkaTopic::*; - static TOPICS: [KafkaTopic; 15] = [ + static TOPICS: [KafkaTopic; 14] = [ Events, Attachments, Transactions, Outcomes, OutcomesBilling, - Sessions, MetricsSessions, MetricsGeneric, Profiles, @@ -101,9 +98,6 @@ pub struct TopicAssignments { /// Outcomes topic name for billing critical outcomes. Defaults to the assignment of `outcomes`. #[serde(alias = "outcomes-billing")] pub outcomes_billing: Option, - /// Session health topic name. - #[serde(alias = "ingest-sessions")] - pub sessions: TopicAssignment, /// Topic name for metrics extracted from sessions, aka release health. #[serde(alias = "metrics", alias = "ingest-metrics")] pub metrics_sessions: TopicAssignment, @@ -142,7 +136,6 @@ impl TopicAssignments { KafkaTopic::Transactions => &self.transactions, KafkaTopic::Outcomes => &self.outcomes, KafkaTopic::OutcomesBilling => self.outcomes_billing.as_ref().unwrap_or(&self.outcomes), - KafkaTopic::Sessions => &self.sessions, KafkaTopic::MetricsSessions => &self.metrics_sessions, KafkaTopic::MetricsGeneric => &self.metrics_generic, KafkaTopic::Profiles => &self.profiles, @@ -164,7 +157,6 @@ impl Default for TopicAssignments { transactions: "ingest-transactions".to_owned().into(), outcomes: "outcomes".to_owned().into(), outcomes_billing: None, - sessions: "ingest-sessions".to_owned().into(), metrics_sessions: "ingest-metrics".to_owned().into(), metrics_generic: "ingest-performance-metrics".to_owned().into(), profiles: "profiles".to_owned().into(), diff --git a/relay-server/src/services/processor/session.rs b/relay-server/src/services/processor/session.rs index ce764050ec..2a1c6717c7 100644 --- a/relay-server/src/services/processor/session.rs +++ b/relay-server/src/services/processor/session.rs @@ -220,7 +220,7 @@ fn process_session( } // Drop the session if metrics have been extracted in this or a prior Relay - if metrics_config.should_drop() && item.metrics_extracted() { + if item.metrics_extracted() { return false; } @@ -312,7 +312,7 @@ fn process_session_aggregates( } // Drop the aggregate if metrics have been extracted in this or a prior Relay - if metrics_config.should_drop() && item.metrics_extracted() { + if item.metrics_extracted() { return false; } diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 16b0306024..adc3772ff0 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -4,7 +4,7 @@ use std::borrow::Cow; use std::collections::BTreeMap; use std::error::Error; -use std::sync::{Arc, OnceLock}; +use std::sync::Arc; use std::time::Instant; use bytes::Bytes; @@ -12,9 +12,7 @@ use relay_base_schema::data_category::DataCategory; use relay_base_schema::project::ProjectId; use relay_common::time::{instant_to_date_time, UnixTimestamp}; use relay_config::Config; -use relay_event_schema::protocol::{ - self, EventId, SessionAggregates, SessionStatus, SessionUpdate, VALID_PLATFORMS, -}; +use relay_event_schema::protocol::{EventId, SessionStatus, VALID_PLATFORMS}; use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message}; use relay_metrics::{ @@ -37,9 +35,6 @@ use crate::services::processor::Processed; use crate::statsd::RelayCounters; use crate::utils::{self, ArrayEncoding, BucketEncoder, ExtractionMode, TypedEnvelope}; -/// The maximum number of individual session updates generated for each aggregate item. -const MAX_EXPLODED_SESSIONS: usize = 100; - /// Fallback name used for attachment items without a `filename` header. const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment"; @@ -53,15 +48,6 @@ pub enum StoreError { NoEventId, } -fn make_distinct_id(s: &str) -> Uuid { - static NAMESPACE: OnceLock = OnceLock::new(); - let namespace = - NAMESPACE.get_or_init(|| Uuid::new_v5(&Uuid::NAMESPACE_URL, b"https://sentry.io/#did")); - - s.parse() - .unwrap_or_else(|_| Uuid::new_v5(namespace, s.as_bytes())) -} - struct Producer { client: KafkaClient, } @@ -244,15 +230,6 @@ impl StoreService { item, )?; } - ItemType::Session | ItemType::Sessions => { - self.produce_sessions( - scoping.organization_id, - scoping.project_id, - retention, - client, - item, - )?; - } ItemType::Profile => self.produce_profile( scoping.organization_id, scoping.project_id, @@ -661,158 +638,6 @@ impl StoreService { self.produce(KafkaTopic::Attachments, organization_id, message) } - fn produce_sessions( - &self, - org_id: u64, - project_id: ProjectId, - event_retention: u16, - client: Option<&str>, - item: &Item, - ) -> Result<(), StoreError> { - match item.ty() { - ItemType::Session => { - let mut session = match SessionUpdate::parse(&item.payload()) { - Ok(session) => session, - Err(error) => { - relay_log::error!( - error = &error as &dyn std::error::Error, - "failed to store session" - ); - return Ok(()); - } - }; - - if session.status == SessionStatus::Errored { - // Individual updates should never have the status `errored` - session.status = SessionStatus::Exited; - } - self.produce_session_update(org_id, project_id, event_retention, client, session) - } - ItemType::Sessions => { - let aggregates = match SessionAggregates::parse(&item.payload()) { - Ok(aggregates) => aggregates, - Err(_) => return Ok(()), - }; - - self.produce_sessions_from_aggregate( - org_id, - project_id, - event_retention, - client, - aggregates, - ) - } - _ => Ok(()), - } - } - - fn produce_sessions_from_aggregate( - &self, - org_id: u64, - project_id: ProjectId, - event_retention: u16, - client: Option<&str>, - aggregates: SessionAggregates, - ) -> Result<(), StoreError> { - let SessionAggregates { - aggregates, - attributes, - } = aggregates; - let message = SessionKafkaMessage { - org_id, - project_id, - session_id: Uuid::nil(), - distinct_id: Uuid::nil(), - quantity: 1, - seq: 0, - received: protocol::datetime_to_timestamp(chrono::Utc::now()), - started: 0f64, - duration: None, - errors: 0, - release: attributes.release, - environment: attributes.environment, - sdk: client.map(str::to_owned), - retention_days: event_retention, - status: SessionStatus::Exited, - }; - - if aggregates.len() > MAX_EXPLODED_SESSIONS { - relay_log::warn!("aggregated session items exceed threshold"); - } - - for item in aggregates.into_iter().take(MAX_EXPLODED_SESSIONS) { - let mut message = message.clone(); - message.started = protocol::datetime_to_timestamp(item.started); - message.distinct_id = item - .distinct_id - .as_deref() - .map(make_distinct_id) - .unwrap_or_default(); - - if item.exited > 0 { - message.errors = 0; - message.quantity = item.exited; - self.send_session_message(org_id, message.clone())?; - } - if item.errored > 0 { - message.errors = 1; - message.status = SessionStatus::Errored; - message.quantity = item.errored; - self.send_session_message(org_id, message.clone())?; - } - if item.abnormal > 0 { - message.errors = 1; - message.status = SessionStatus::Abnormal; - message.quantity = item.abnormal; - self.send_session_message(org_id, message.clone())?; - } - if item.crashed > 0 { - message.errors = 1; - message.status = SessionStatus::Crashed; - message.quantity = item.crashed; - self.send_session_message(org_id, message)?; - } - } - Ok(()) - } - - fn produce_session_update( - &self, - org_id: u64, - project_id: ProjectId, - event_retention: u16, - client: Option<&str>, - session: SessionUpdate, - ) -> Result<(), StoreError> { - self.send_session_message( - org_id, - SessionKafkaMessage { - org_id, - project_id, - session_id: session.session_id, - distinct_id: session - .distinct_id - .as_deref() - .map(make_distinct_id) - .unwrap_or_default(), - quantity: 1, - seq: if session.init { 0 } else { session.sequence }, - received: protocol::datetime_to_timestamp(session.timestamp), - started: protocol::datetime_to_timestamp(session.started), - duration: session.duration, - status: session.status.clone(), - errors: session.errors.clamp( - (session.status == SessionStatus::Crashed) as _, - u16::MAX.into(), - ) as _, - release: session.attributes.release, - environment: session.attributes.environment, - sdk: client.map(str::to_owned), - retention_days: event_retention, - }, - ) - } - fn send_metric_message( &self, namespace: MetricNamespace, @@ -851,19 +676,6 @@ impl StoreService { Ok(()) } - fn send_session_message( - &self, - organization_id: u64, - message: SessionKafkaMessage, - ) -> Result<(), StoreError> { - self.produce( - KafkaTopic::Sessions, - organization_id, - KafkaMessage::Session(message), - )?; - Ok(()) - } - fn produce_profile( &self, organization_id: u64, @@ -1600,7 +1412,6 @@ enum KafkaMessage<'a> { Attachment(AttachmentKafkaMessage), AttachmentChunk(AttachmentChunkKafkaMessage), UserReport(UserReportKafkaMessage), - Session(SessionKafkaMessage), Metric { #[serde(skip)] headers: BTreeMap, @@ -1623,7 +1434,6 @@ impl Message for KafkaMessage<'_> { KafkaMessage::Attachment(_) => "attachment", KafkaMessage::AttachmentChunk(_) => "attachment_chunk", KafkaMessage::UserReport(_) => "user_report", - KafkaMessage::Session(_) => "session", KafkaMessage::Metric { .. } => "metric", KafkaMessage::Profile(_) => "profile", KafkaMessage::ReplayEvent(_) => "replay_event", @@ -1651,8 +1461,7 @@ impl Message for KafkaMessage<'_> { Self::CheckIn(message) => message.routing_key_hint.unwrap_or_else(Uuid::nil), // Random partitioning - Self::Session(_) - | Self::Profile(_) + Self::Profile(_) | Self::ReplayRecordingNotChunked(_) | Self::Span(_) | Self::MetricsSummary(_) @@ -1690,9 +1499,6 @@ impl Message for KafkaMessage<'_> { /// Serializes the message into its binary format. fn serialize(&self) -> Result, ClientError> { match self { - KafkaMessage::Session(message) => serde_json::to_vec(message) - .map(Cow::Owned) - .map_err(ClientError::InvalidJson), KafkaMessage::Metric { message, .. } => serde_json::to_vec(message) .map(Cow::Owned) .map_err(ClientError::InvalidJson), diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index e336e88453..048886fe59 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -634,121 +634,6 @@ def test_metrics_full(mini_sentry, relay, relay_with_processing, metrics_consume metrics_consumer.assert_empty() -@pytest.mark.parametrize( - "extract_metrics", [True, False], ids=["extract", "don't extract"] -) -@pytest.mark.parametrize( - "metrics_extracted", [True, False], ids=["extracted", "not extracted"] -) -def test_session_metrics_non_processing( - mini_sentry, relay, extract_metrics, metrics_extracted -): - """ - Tests metrics extraction in a non processing relay - - If and only if the metrics-extraction feature is enabled and the metrics from the session were not already - extracted the relay should extract the metrics from the session and mark the session item as "metrics extracted" - """ - - relay = relay(mini_sentry, options=TEST_CONFIG) - - if extract_metrics: - # enable metrics extraction for the project - extra_config = {"config": {"sessionMetrics": {"version": 1}}} - else: - extra_config = {} - - project_id = 42 - mini_sentry.add_basic_project_config(project_id, extra=extra_config) - - timestamp = datetime.now(tz=timezone.utc) - started = timestamp - timedelta(hours=1) - session_payload = _session_payload(timestamp=timestamp, started=started) - - relay.send_session( - project_id, - session_payload, - item_headers={"metrics_extracted": metrics_extracted}, - ) - - # Get session envelope - first_envelope = mini_sentry.captured_events.get(timeout=2) - - try: - second_envelope = mini_sentry.captured_events.get(timeout=2) - except Exception: - second_envelope = None - - assert first_envelope is not None - assert len(first_envelope.items) == 1 - first_item = first_envelope.items[0] - - if extract_metrics and not metrics_extracted: - # here we have not yet extracted metrics and metric extraction is enabled - # we expect to have two messages a session message and a metrics message - assert second_envelope is not None - assert len(second_envelope.items) == 1 - - second_item = second_envelope.items[0] - - if first_item.type == "session": - session_item = first_item - metrics_item = second_item - else: - session_item = second_item - metrics_item = first_item - - # check the metrics item - assert metrics_item.type == "metric_buckets" - - session_metrics = json.loads(metrics_item.get_bytes().decode()) - session_metrics = sorted(session_metrics, key=lambda x: x["name"]) - - ts = int(started.timestamp()) - assert session_metrics == [ - { - "name": "c:sessions/session@none", - "tags": { - "sdk": "raven-node/2.6.3", - "environment": "production", - "release": "sentry-test@1.0.0", - "session.status": "init", - }, - "timestamp": ts, - "width": 1, - "type": "c", - "value": 1.0, - }, - { - "name": "s:sessions/user@none", - "tags": { - "sdk": "raven-node/2.6.3", - "environment": "production", - "release": "sentry-test@1.0.0", - }, - "timestamp": ts, - "width": 1, - "type": "s", - "value": [1617781333], - }, - ] - else: - # either the metrics are already extracted or we have metric extraction disabled - # only the session message should be present - assert second_envelope is None - session_item = first_item - - assert session_item is not None - assert session_item.type == "session" - - # we have marked the item as "metrics extracted" properly - # already extracted metrics should keep the flag, newly extracted metrics should set the flag - assert ( - session_item.headers.get("metrics_extracted", False) is extract_metrics - or metrics_extracted - ) - - def test_session_metrics_extracted_only_once( mini_sentry, relay, relay_with_processing, metrics_consumer ): @@ -1758,91 +1643,6 @@ def test_custom_metrics_disabled(mini_sentry, relay_with_processing, metrics_con assert "c:custom/bar@second" not in metrics -@pytest.mark.parametrize( - "denied_tag", ["sdk", "release"], ids=["remove sdk tag", "remove release tag"] -) -@pytest.mark.parametrize( - "denied_names", ["*user*", ""], ids=["deny user", "no denied names"] -) -def test_block_metrics_and_tags(mini_sentry, relay, denied_names, denied_tag): - relay = relay(mini_sentry, options=TEST_CONFIG) - - extra_config = { - "config": { - "sessionMetrics": {"version": 1}, - "metrics": { - "deniedNames": [denied_names], - "deniedTags": [{"name": ["*"], "tags": [denied_tag]}], - }, - } - } - - project_id = 42 - mini_sentry.add_basic_project_config(project_id, extra=extra_config) - - timestamp = datetime.now(tz=timezone.utc) - started = timestamp - timedelta(hours=1) - session_payload = _session_payload(timestamp=timestamp, started=started) - - relay.send_session( - project_id, - session_payload, - ) - - envelope = mini_sentry.captured_events.get(timeout=2) - assert len(envelope.items) == 1 - first_item = envelope.items[0] - - second_envelope = mini_sentry.captured_events.get(timeout=2) - assert len(second_envelope.items) == 1 - second_item = second_envelope.items[0] - - if first_item.type == "session": - metrics_item = second_item - else: - metrics_item = first_item - - assert metrics_item.type == "metric_buckets" - - session_metrics = json.loads(metrics_item.get_bytes().decode()) - session_metrics = sorted(session_metrics, key=lambda x: x["name"]) - - if denied_names == "*user*": - assert len(session_metrics) == 1 - assert session_metrics[0]["name"] == "c:sessions/session@none" - elif denied_names == "": - assert len(session_metrics) == 2 - assert session_metrics[0]["name"] == "c:sessions/session@none" - assert session_metrics[1]["name"] == "s:sessions/user@none" - else: - assert False, "add new else-branch if you add another denied name" - - if denied_tag == "sdk": - assert session_metrics[0]["tags"] == { - "environment": "production", - "release": "sentry-test@1.0.0", - "session.status": "init", - } - if denied_names == "": - assert session_metrics[1]["tags"] == { - "environment": "production", - "release": "sentry-test@1.0.0", - } - elif denied_tag == "release": - assert session_metrics[0]["tags"] == { - "sdk": "raven-node/2.6.3", - "environment": "production", - "session.status": "init", - } - if denied_names == "": - assert session_metrics[1]["tags"] == { - "sdk": "raven-node/2.6.3", - "environment": "production", - } - else: - assert False, "add new else-branch if you add another denied tag" - - @pytest.mark.parametrize("is_processing_relay", (False, True)) @pytest.mark.parametrize( "global_generic_filters", diff --git a/tests/integration/test_session.py b/tests/integration/test_session.py index 126ff5be2b..3334793791 100644 --- a/tests/integration/test_session.py +++ b/tests/integration/test_session.py @@ -1,8 +1,5 @@ from datetime import datetime, timedelta, timezone import json -import pytest -from requests.exceptions import HTTPError -import uuid def test_sessions(mini_sentry, relay_chain): @@ -42,253 +39,6 @@ def test_sessions(mini_sentry, relay_chain): assert session == session_payload -def test_session_with_processing(mini_sentry, relay_with_processing, sessions_consumer): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - timestamp = datetime.now(tz=timezone.utc) - started = timestamp - timedelta(hours=1) - - project_id = 42 - mini_sentry.add_full_project_config(project_id) - relay.send_session( - project_id, - { - "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", - "did": "foobarbaz", - "seq": 42, - "init": True, - "timestamp": timestamp.isoformat(), - "started": started.isoformat(), - "duration": 1947.49, - "status": "exited", - "errors": 0, - "attrs": { - "release": "sentry-test@1.0.0", - "environment": "production", - }, - }, - ) - - session = sessions_consumer.get_session() - assert session == { - "org_id": 1, - "project_id": project_id, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "distinct_id": "367e2499-2b45-586d-814f-778b60144e87", - "quantity": 1, - # seq is forced to 0 when init is true - "seq": 0, - "received": timestamp.timestamp(), - "started": started.timestamp(), - "duration": 1947.49, - "status": "exited", - "errors": 0, - "release": "sentry-test@1.0.0", - "environment": "production", - "retention_days": 90, - "sdk": "raven-node/2.6.3", - } - - -def test_session_with_processing_two_events( - mini_sentry, relay_with_processing, sessions_consumer -): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - timestamp = datetime.now(tz=timezone.utc) - started = timestamp - timedelta(hours=1) - - project_id = 42 - mini_sentry.add_full_project_config(project_id) - relay.send_session( - project_id, - { - "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", - "did": "foobarbaz", - "seq": 42, - "init": True, - "timestamp": timestamp.isoformat(), - "started": started.isoformat(), - "status": "ok", - "attrs": { - "release": "sentry-test@1.0.0", - "environment": "production", - }, - }, - ) - session = sessions_consumer.get_session() - assert session == { - "org_id": 1, - "project_id": project_id, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "distinct_id": "367e2499-2b45-586d-814f-778b60144e87", - "quantity": 1, - # seq is forced to 0 when init is true - "seq": 0, - "received": timestamp.timestamp(), - "started": started.timestamp(), - "duration": None, - "status": "ok", - "errors": 0, - "release": "sentry-test@1.0.0", - "environment": "production", - "retention_days": 90, - "sdk": "raven-node/2.6.3", - } - - relay.send_session( - project_id, - { - "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", - "did": "foobarbaz", - "seq": 43, - "timestamp": timestamp.isoformat(), - "started": started.isoformat(), - "duration": 1947.49, - "status": "exited", - "attrs": { - "release": "sentry-test@1.0.0", - "environment": "production", - }, - }, - ) - session = sessions_consumer.get_session() - assert session == { - "org_id": 1, - "project_id": project_id, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "distinct_id": "367e2499-2b45-586d-814f-778b60144e87", - "quantity": 1, - "seq": 43, - "received": timestamp.timestamp(), - "started": started.timestamp(), - "duration": 1947.49, - "status": "exited", - "errors": 0, - "release": "sentry-test@1.0.0", - "environment": "production", - "retention_days": 90, - "sdk": "raven-node/2.6.3", - } - - -def test_session_aggregates(mini_sentry, relay_with_processing, sessions_consumer): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - timestamp = datetime.now(tz=timezone.utc) - started1 = timestamp - timedelta(hours=1) - started2 = started1 - timedelta(hours=1) - - project_id = 42 - mini_sentry.add_full_project_config(project_id) - relay.send_session_aggregates( - project_id, - { - "aggregates": [ - { - "started": started1.isoformat(), - "did": "foobarbaz", - "exited": 2, - "errored": 3, - }, - { - "started": started2.isoformat(), - "abnormal": 1, - }, - ], - "attrs": { - "release": "sentry-test@1.0.0", - "environment": "production", - }, - }, - ) - - session = sessions_consumer.get_session() - del session["received"] - assert session == { - "org_id": 1, - "project_id": project_id, - "session_id": "00000000-0000-0000-0000-000000000000", - "distinct_id": "367e2499-2b45-586d-814f-778b60144e87", - "quantity": 2, - "seq": 0, - "started": started1.timestamp(), - "duration": None, - "status": "exited", - "errors": 0, - "release": "sentry-test@1.0.0", - "environment": "production", - "retention_days": 90, - "sdk": "raven-node/2.6.3", - } - - session = sessions_consumer.get_session() - del session["received"] - assert session == { - "org_id": 1, - "project_id": project_id, - "session_id": "00000000-0000-0000-0000-000000000000", - "distinct_id": "367e2499-2b45-586d-814f-778b60144e87", - "quantity": 3, - "seq": 0, - "started": started1.timestamp(), - "duration": None, - "status": "errored", - "errors": 1, - "release": "sentry-test@1.0.0", - "environment": "production", - "retention_days": 90, - "sdk": "raven-node/2.6.3", - } - - session = sessions_consumer.get_session() - del session["received"] - assert session == { - "org_id": 1, - "project_id": project_id, - "session_id": "00000000-0000-0000-0000-000000000000", - "distinct_id": "00000000-0000-0000-0000-000000000000", - "quantity": 1, - "seq": 0, - "started": started2.timestamp(), - "duration": None, - "status": "abnormal", - "errors": 1, - "release": "sentry-test@1.0.0", - "environment": "production", - "retention_days": 90, - "sdk": "raven-node/2.6.3", - } - - -def test_session_with_custom_retention( - mini_sentry, relay_with_processing, sessions_consumer -): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - project_id = 42 - project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"]["eventRetention"] = 17 - - timestamp = datetime.now(tz=timezone.utc) - relay.send_session( - project_id, - { - "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", - "timestamp": timestamp.isoformat(), - "started": timestamp.isoformat(), - "attrs": {"release": "sentry-test@1.0.0"}, - }, - ) - - session = sessions_consumer.get_session() - assert session["retention_days"] == 17 - - def test_session_age_discard(mini_sentry, relay_with_processing, sessions_consumer): relay = relay_with_processing() sessions_consumer = sessions_consumer() @@ -347,52 +97,6 @@ def test_session_age_discard_aggregates( sessions_consumer.assert_empty() -def test_session_force_errors_on_crash( - mini_sentry, relay_with_processing, sessions_consumer -): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - timestamp = datetime.now(tz=timezone.utc) - started = timestamp - timedelta(hours=1) - - project_id = 42 - mini_sentry.add_full_project_config(project_id) - relay.send_session( - project_id, - { - "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", - "did": "foobarbaz", - "seq": 42, - "init": True, - "timestamp": timestamp.isoformat(), - "started": started.isoformat(), - "status": "crashed", - "attrs": {"release": "sentry-test@1.0.0", "environment": "production"}, - }, - ) - - session = sessions_consumer.get_session() - assert session == { - "org_id": 1, - "project_id": project_id, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "distinct_id": "367e2499-2b45-586d-814f-778b60144e87", - "quantity": 1, - # seq is forced to 0 when init is true - "seq": 0, - "received": timestamp.timestamp(), - "started": started.timestamp(), - "duration": None, - "status": "crashed", - "errors": 1, - "release": "sentry-test@1.0.0", - "environment": "production", - "retention_days": 90, - "sdk": "raven-node/2.6.3", - } - - def test_session_release_required( mini_sentry, relay_with_processing, sessions_consumer ): @@ -450,48 +154,6 @@ def test_session_aggregates_release_required( sessions_consumer.assert_empty() -def test_session_quotas(mini_sentry, relay_with_processing, sessions_consumer): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - project_id = 42 - project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"]["eventRetention"] = 17 - project_config["config"]["quotas"] = [ - { - "id": f"test_rate_limiting_{uuid.uuid4().hex}", - "categories": ["session"], - "scope": "key", - "scopeId": str(project_config["publicKeys"][0]["numericId"]), - "window": 3600, - "limit": 5, - "reasonCode": "sessions_exceeded", - } - ] - - timestamp = datetime.now(tz=timezone.utc) - started = timestamp - timedelta(hours=1) - - session = { - "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", - "timestamp": timestamp.isoformat(), - "started": started.isoformat(), - "attrs": {"release": "sentry-test@1.0.0"}, - } - - for i in range(5): - relay.send_session(project_id, session) - sessions_consumer.get_session() - - # Rate limited, but responds with 200 because of deferred processing - relay.send_session(project_id, session) - sessions_consumer.assert_empty() - - with pytest.raises(HTTPError): - relay.send_session(project_id, session) - sessions_consumer.assert_empty() - - def test_session_disabled(mini_sentry, relay_with_processing, sessions_consumer): relay = relay_with_processing() sessions_consumer = sessions_consumer() @@ -525,30 +187,6 @@ def test_session_disabled(mini_sentry, relay_with_processing, sessions_consumer) sessions_consumer.assert_empty() -def test_session_auto_ip(mini_sentry, relay_with_processing, sessions_consumer): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - project_id = 42 - project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"]["eventRetention"] = 17 - - timestamp = datetime.now(tz=timezone.utc) - relay.send_session( - project_id, - { - "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", - "timestamp": timestamp.isoformat(), - "started": timestamp.isoformat(), - "attrs": {"release": "sentry-test@1.0.0", "ip_address": "{{auto}}"}, - }, - ) - - # Can't test ip_address since it's not posted to Kafka. Just test that it is accepted. - session = sessions_consumer.get_session() - assert session - - def test_session_invalid_release(mini_sentry, relay_with_processing, sessions_consumer): relay = relay_with_processing() sessions_consumer = sessions_consumer() @@ -598,58 +236,3 @@ def test_session_aggregates_invalid_release( ) sessions_consumer.assert_empty() - - -def test_session_invalid_environment( - mini_sentry, relay_with_processing, sessions_consumer -): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - PROJECT_ID = 42 - project_config = mini_sentry.add_full_project_config(PROJECT_ID) - project_config["config"]["eventRetention"] = 17 - - timestamp = datetime.now(tz=timezone.utc) - relay.send_session( - PROJECT_ID, - { - "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", - "timestamp": timestamp.isoformat(), - "started": timestamp.isoformat(), - "attrs": {"release": "sentry-test@1.0.0", "environment": "none"}, - }, - ) - - session = sessions_consumer.get_session() - assert session.get("environment") is None - - -def test_session_aggregates_invalid_environment( - mini_sentry, relay_with_processing, sessions_consumer -): - relay = relay_with_processing() - sessions_consumer = sessions_consumer() - - project_id = 42 - project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"]["eventRetention"] = 17 - - timestamp = datetime.now(tz=timezone.utc) - relay.send_session_aggregates( - project_id, - { - "aggregates": [ - { - "started": timestamp.isoformat(), - "did": "foobarbaz", - "exited": 2, - "errored": 3, - }, - ], - "attrs": {"release": "sentry-test@1.0.0", "environment": "."}, - }, - ) - - session = sessions_consumer.get_session() - assert session.get("environment") is None