Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Remove sessions #3271

Merged
merged 18 commits into from Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -16,6 +16,8 @@
- Implement volume metric stats. ([#3281](https://github.com/getsentry/relay/pull/3281))
- Scrub transactions before enforcing quotas. ([#3248](https://github.com/getsentry/relay/pull/3248))
- Kafka topic config supports default topic names as keys. ([#3282](https://github.com/getsentry/relay/pull/3282))
- Stop producing to sessions topic, the feature is now fully migrated to metrics ([#3271](https://github.com/getsentry/relay/pull/3271))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Stop producing to sessions topic, the feature is now fully migrated to metrics ([#3271](https://github.com/getsentry/relay/pull/3271))
- Stop producing to sessions topic, the feature is now fully migrated to metrics. ([#3271](https://github.com/getsentry/relay/pull/3271))


## 24.3.0

Expand Down
8 changes: 0 additions & 8 deletions relay-dynamic-config/src/metrics.rs
Expand Up @@ -78,9 +78,6 @@ pub struct SessionMetricsConfig {
///
/// Version `0` (default) disables extraction.
version: u16,

/// Drop sessions after successfully extracting metrics.
drop: bool,
}

impl SessionMetricsConfig {
Expand All @@ -98,11 +95,6 @@ impl SessionMetricsConfig {
pub fn should_extract_abnormal_mechanism(&self) -> bool {
self.version >= EXTRACT_ABNORMAL_MECHANISM_VERSION
}

/// Returns `true` if the session should be dropped after extracting metrics.
pub fn should_drop(&self) -> bool {
self.drop
}
}

/// Configuration for extracting custom measurements from transaction payloads.
Expand Down
10 changes: 1 addition & 9 deletions relay-kafka/src/config.rs
Expand Up @@ -35,8 +35,6 @@ pub enum KafkaTopic {
Outcomes,
/// Override for billing critical outcomes.
OutcomesBilling,
/// Session health updates.
Sessions,
/// Any metric that is extracted from sessions.
MetricsSessions,
/// Generic metrics topic, excluding sessions (release health).
Expand All @@ -62,13 +60,12 @@ impl KafkaTopic {
/// It will have to be adjusted if the new variants are added.
pub fn iter() -> std::slice::Iter<'static, Self> {
use KafkaTopic::*;
static TOPICS: [KafkaTopic; 15] = [
static TOPICS: [KafkaTopic; 14] = [
Events,
Attachments,
Transactions,
Outcomes,
OutcomesBilling,
Sessions,
MetricsSessions,
MetricsGeneric,
Profiles,
Expand Down Expand Up @@ -101,9 +98,6 @@ pub struct TopicAssignments {
/// Outcomes topic name for billing critical outcomes. Defaults to the assignment of `outcomes`.
#[serde(alias = "outcomes-billing")]
pub outcomes_billing: Option<TopicAssignment>,
/// Session health topic name.
#[serde(alias = "ingest-sessions")]
pub sessions: TopicAssignment,
/// Topic name for metrics extracted from sessions, aka release health.
#[serde(alias = "metrics", alias = "ingest-metrics")]
pub metrics_sessions: TopicAssignment,
Expand Down Expand Up @@ -142,7 +136,6 @@ impl TopicAssignments {
KafkaTopic::Transactions => &self.transactions,
KafkaTopic::Outcomes => &self.outcomes,
KafkaTopic::OutcomesBilling => self.outcomes_billing.as_ref().unwrap_or(&self.outcomes),
KafkaTopic::Sessions => &self.sessions,
KafkaTopic::MetricsSessions => &self.metrics_sessions,
KafkaTopic::MetricsGeneric => &self.metrics_generic,
KafkaTopic::Profiles => &self.profiles,
Expand All @@ -164,7 +157,6 @@ impl Default for TopicAssignments {
transactions: "ingest-transactions".to_owned().into(),
outcomes: "outcomes".to_owned().into(),
outcomes_billing: None,
sessions: "ingest-sessions".to_owned().into(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the ops repo still have configuration for this? We don't want Relay to crash because of an unknown topic parameter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. https://github.com/getsentry/ops/blob/master/k8s/clusters/us/default.yaml#L3256-L3257. Though is this comment actually backwards and it needs to be removed from ops first?

metrics_sessions: "ingest-metrics".to_owned().into(),
metrics_generic: "ingest-performance-metrics".to_owned().into(),
profiles: "profiles".to_owned().into(),
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/processor/session.rs
Expand Up @@ -220,7 +220,7 @@ fn process_session(
}

// Drop the session if metrics have been extracted in this or a prior Relay
if metrics_config.should_drop() && item.metrics_extracted() {
if item.metrics_extracted() {
return false;
}

Expand Down Expand Up @@ -312,7 +312,7 @@ fn process_session_aggregates(
}

// Drop the aggregate if metrics have been extracted in this or a prior Relay
if metrics_config.should_drop() && item.metrics_extracted() {
if item.metrics_extracted() {
return false;
}

Expand Down
199 changes: 2 additions & 197 deletions relay-server/src/services/store.rs
Expand Up @@ -8,14 +8,11 @@ use std::sync::Arc;
use std::time::Instant;

use bytes::Bytes;
use once_cell::sync::OnceCell;
use relay_base_schema::data_category::DataCategory;
use relay_base_schema::project::ProjectId;
use relay_common::time::{instant_to_date_time, UnixTimestamp};
use relay_config::Config;
use relay_event_schema::protocol::{
self, EventId, SessionAggregates, SessionStatus, SessionUpdate, VALID_PLATFORMS,
};
use relay_event_schema::protocol::{EventId, SessionStatus, VALID_PLATFORMS};

use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message};
use relay_metrics::{
Expand All @@ -38,9 +35,6 @@ use crate::services::processor::Processed;
use crate::statsd::RelayCounters;
use crate::utils::{self, ArrayEncoding, BucketEncoder, ExtractionMode, TypedEnvelope};

/// The maximum number of individual session updates generated for each aggregate item.
const MAX_EXPLODED_SESSIONS: usize = 100;

/// Fallback name used for attachment items without a `filename` header.
const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";

Expand All @@ -54,15 +48,6 @@ pub enum StoreError {
NoEventId,
}

fn make_distinct_id(s: &str) -> Uuid {
static NAMESPACE: OnceCell<Uuid> = OnceCell::new();
let namespace =
NAMESPACE.get_or_init(|| Uuid::new_v5(&Uuid::NAMESPACE_URL, b"https://sentry.io/#did"));

s.parse()
.unwrap_or_else(|_| Uuid::new_v5(namespace, s.as_bytes()))
}

struct Producer {
client: KafkaClient,
}
Expand Down Expand Up @@ -245,15 +230,6 @@ impl StoreService {
item,
)?;
}
ItemType::Session | ItemType::Sessions => {
self.produce_sessions(
scoping.organization_id,
scoping.project_id,
retention,
client,
item,
)?;
}
ItemType::Profile => self.produce_profile(
scoping.organization_id,
scoping.project_id,
Expand Down Expand Up @@ -662,158 +638,6 @@ impl StoreService {
self.produce(KafkaTopic::Attachments, organization_id, message)
}

fn produce_sessions(
&self,
org_id: u64,
project_id: ProjectId,
event_retention: u16,
client: Option<&str>,
item: &Item,
) -> Result<(), StoreError> {
match item.ty() {
ItemType::Session => {
let mut session = match SessionUpdate::parse(&item.payload()) {
Ok(session) => session,
Err(error) => {
relay_log::error!(
error = &error as &dyn std::error::Error,
"failed to store session"
);
return Ok(());
}
};

if session.status == SessionStatus::Errored {
// Individual updates should never have the status `errored`
session.status = SessionStatus::Exited;
}
self.produce_session_update(org_id, project_id, event_retention, client, session)
}
ItemType::Sessions => {
let aggregates = match SessionAggregates::parse(&item.payload()) {
Ok(aggregates) => aggregates,
Err(_) => return Ok(()),
};

self.produce_sessions_from_aggregate(
org_id,
project_id,
event_retention,
client,
aggregates,
)
}
_ => Ok(()),
}
}

fn produce_sessions_from_aggregate(
&self,
org_id: u64,
project_id: ProjectId,
event_retention: u16,
client: Option<&str>,
aggregates: SessionAggregates,
) -> Result<(), StoreError> {
let SessionAggregates {
aggregates,
attributes,
} = aggregates;
let message = SessionKafkaMessage {
org_id,
project_id,
session_id: Uuid::nil(),
distinct_id: Uuid::nil(),
quantity: 1,
seq: 0,
received: protocol::datetime_to_timestamp(chrono::Utc::now()),
started: 0f64,
duration: None,
errors: 0,
release: attributes.release,
environment: attributes.environment,
sdk: client.map(str::to_owned),
retention_days: event_retention,
status: SessionStatus::Exited,
};

if aggregates.len() > MAX_EXPLODED_SESSIONS {
relay_log::warn!("aggregated session items exceed threshold");
}

for item in aggregates.into_iter().take(MAX_EXPLODED_SESSIONS) {
let mut message = message.clone();
message.started = protocol::datetime_to_timestamp(item.started);
message.distinct_id = item
.distinct_id
.as_deref()
.map(make_distinct_id)
.unwrap_or_default();

if item.exited > 0 {
message.errors = 0;
message.quantity = item.exited;
self.send_session_message(org_id, message.clone())?;
}
if item.errored > 0 {
message.errors = 1;
message.status = SessionStatus::Errored;
message.quantity = item.errored;
self.send_session_message(org_id, message.clone())?;
}
if item.abnormal > 0 {
message.errors = 1;
message.status = SessionStatus::Abnormal;
message.quantity = item.abnormal;
self.send_session_message(org_id, message.clone())?;
}
if item.crashed > 0 {
message.errors = 1;
message.status = SessionStatus::Crashed;
message.quantity = item.crashed;
self.send_session_message(org_id, message)?;
}
}
Ok(())
}

fn produce_session_update(
&self,
org_id: u64,
project_id: ProjectId,
event_retention: u16,
client: Option<&str>,
session: SessionUpdate,
) -> Result<(), StoreError> {
self.send_session_message(
org_id,
SessionKafkaMessage {
org_id,
project_id,
session_id: session.session_id,
distinct_id: session
.distinct_id
.as_deref()
.map(make_distinct_id)
.unwrap_or_default(),
quantity: 1,
seq: if session.init { 0 } else { session.sequence },
received: protocol::datetime_to_timestamp(session.timestamp),
started: protocol::datetime_to_timestamp(session.started),
duration: session.duration,
status: session.status.clone(),
errors: session.errors.clamp(
(session.status == SessionStatus::Crashed) as _,
u16::MAX.into(),
) as _,
release: session.attributes.release,
environment: session.attributes.environment,
sdk: client.map(str::to_owned),
retention_days: event_retention,
},
)
}

fn send_metric_message(
&self,
namespace: MetricNamespace,
Expand Down Expand Up @@ -852,19 +676,6 @@ impl StoreService {
Ok(())
}

fn send_session_message(
&self,
organization_id: u64,
message: SessionKafkaMessage,
) -> Result<(), StoreError> {
self.produce(
KafkaTopic::Sessions,
organization_id,
KafkaMessage::Session(message),
)?;
Ok(())
}

fn produce_profile(
&self,
organization_id: u64,
Expand Down Expand Up @@ -1647,7 +1458,6 @@ enum KafkaMessage<'a> {
Attachment(AttachmentKafkaMessage),
AttachmentChunk(AttachmentChunkKafkaMessage),
UserReport(UserReportKafkaMessage),
Session(SessionKafkaMessage),
Metric {
#[serde(skip)]
headers: BTreeMap<String, String>,
Expand All @@ -1670,7 +1480,6 @@ impl Message for KafkaMessage<'_> {
KafkaMessage::Attachment(_) => "attachment",
KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
KafkaMessage::UserReport(_) => "user_report",
KafkaMessage::Session(_) => "session",
KafkaMessage::Metric { .. } => "metric",
KafkaMessage::Profile(_) => "profile",
KafkaMessage::ReplayEvent(_) => "replay_event",
Expand Down Expand Up @@ -1698,8 +1507,7 @@ impl Message for KafkaMessage<'_> {
Self::CheckIn(message) => message.routing_key_hint.unwrap_or_else(Uuid::nil),

// Random partitioning
Self::Session(_)
| Self::Profile(_)
Self::Profile(_)
| Self::ReplayRecordingNotChunked(_)
| Self::Span(_)
| Self::MetricsSummary(_)
Expand Down Expand Up @@ -1737,9 +1545,6 @@ impl Message for KafkaMessage<'_> {
/// Serializes the message into its binary format.
fn serialize(&self) -> Result<Cow<'_, [u8]>, ClientError> {
match self {
KafkaMessage::Session(message) => serde_json::to_vec(message)
.map(Cow::Owned)
.map_err(ClientError::InvalidJson),
KafkaMessage::Metric { message, .. } => serde_json::to_vec(message)
.map(Cow::Owned)
.map_err(ClientError::InvalidJson),
Expand Down