Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Remove sessions #3271

Merged
merged 18 commits into from Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 2 additions & 5 deletions relay-dynamic-config/src/metrics.rs
Expand Up @@ -78,9 +78,6 @@ pub struct SessionMetricsConfig {
///
/// Version `0` (default) disables extraction.
version: u16,

/// Drop sessions after successfully extracting metrics.
drop: bool,
}

impl SessionMetricsConfig {
Expand All @@ -99,9 +96,9 @@ impl SessionMetricsConfig {
self.version >= EXTRACT_ABNORMAL_MECHANISM_VERSION
}

/// Returns `true` if the session should be dropped after extracting metrics.
/// Session is dropped after extracting metrics.
pub fn should_drop(&self) -> bool {
self.drop
true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I would just remove this method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}

Expand Down
9 changes: 1 addition & 8 deletions relay-kafka/src/config.rs
Expand Up @@ -35,8 +35,6 @@ pub enum KafkaTopic {
Outcomes,
/// Override for billing critical outcomes.
OutcomesBilling,
/// Session health updates.
Sessions,
/// Any metric that is extracted from sessions.
MetricsSessions,
/// Generic metrics topic, excluding sessions (release health).
Expand All @@ -62,13 +60,12 @@ impl KafkaTopic {
/// It will have to be adjusted if the new variants are added.
pub fn iter() -> std::slice::Iter<'static, Self> {
use KafkaTopic::*;
static TOPICS: [KafkaTopic; 15] = [
static TOPICS: [KafkaTopic; 14] = [
Events,
Attachments,
Transactions,
Outcomes,
OutcomesBilling,
Sessions,
MetricsSessions,
MetricsGeneric,
Profiles,
Expand Down Expand Up @@ -97,8 +94,6 @@ pub struct TopicAssignments {
pub outcomes: TopicAssignment,
/// Outcomes topic name for billing critical outcomes. Defaults to the assignment of `outcomes`.
pub outcomes_billing: Option<TopicAssignment>,
/// Session health topic name.
pub sessions: TopicAssignment,
/// Default topic name for all aggregate metrics. Specialized topics for session-based and
/// generic metrics can be configured via `metrics_sessions` and `metrics_generic` each.
pub metrics: TopicAssignment,
Expand Down Expand Up @@ -133,7 +128,6 @@ impl TopicAssignments {
KafkaTopic::Transactions => &self.transactions,
KafkaTopic::Outcomes => &self.outcomes,
KafkaTopic::OutcomesBilling => self.outcomes_billing.as_ref().unwrap_or(&self.outcomes),
KafkaTopic::Sessions => &self.sessions,
KafkaTopic::MetricsSessions => self.metrics_sessions.as_ref().unwrap_or(&self.metrics),
KafkaTopic::MetricsGeneric => &self.metrics_generic,
KafkaTopic::Profiles => &self.profiles,
Expand All @@ -155,7 +149,6 @@ impl Default for TopicAssignments {
transactions: "ingest-transactions".to_owned().into(),
outcomes: "outcomes".to_owned().into(),
outcomes_billing: None,
sessions: "ingest-sessions".to_owned().into(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the ops repo still have configuration for this? We don't want Relay to crash because of an unknown topic parameter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. https://github.com/getsentry/ops/blob/master/k8s/clusters/us/default.yaml#L3256-L3257. Though is this comment actually backwards and it needs to be removed from ops first?

metrics: "ingest-metrics".to_owned().into(),
metrics_sessions: None,
metrics_generic: "ingest-performance-metrics".to_owned().into(),
Expand Down
198 changes: 3 additions & 195 deletions relay-server/src/services/store.rs
Expand Up @@ -8,14 +8,11 @@ use std::sync::Arc;
use std::time::Instant;

use bytes::Bytes;
use once_cell::sync::OnceCell;
use relay_base_schema::data_category::DataCategory;
use relay_base_schema::project::ProjectId;
use relay_common::time::{instant_to_date_time, UnixTimestamp};
use relay_config::Config;
use relay_event_schema::protocol::{
self, EventId, SessionAggregates, SessionStatus, SessionUpdate, VALID_PLATFORMS,
};
use relay_event_schema::protocol::{EventId, SessionStatus, VALID_PLATFORMS};

use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message};
use relay_metrics::{
Expand All @@ -37,9 +34,6 @@ use crate::services::processor::Processed;
use crate::statsd::RelayCounters;
use crate::utils::{self, ArrayEncoding, BucketEncoder, ExtractionMode, TypedEnvelope};

/// The maximum number of individual session updates generated for each aggregate item.
const MAX_EXPLODED_SESSIONS: usize = 100;

/// Fallback name used for attachment items without a `filename` header.
const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";

Expand All @@ -53,15 +47,6 @@ pub enum StoreError {
NoEventId,
}

fn make_distinct_id(s: &str) -> Uuid {
static NAMESPACE: OnceCell<Uuid> = OnceCell::new();
let namespace =
NAMESPACE.get_or_init(|| Uuid::new_v5(&Uuid::NAMESPACE_URL, b"https://sentry.io/#did"));

s.parse()
.unwrap_or_else(|_| Uuid::new_v5(namespace, s.as_bytes()))
}

struct Producer {
client: KafkaClient,
}
Expand Down Expand Up @@ -242,13 +227,7 @@ impl StoreService {
)?;
}
ItemType::Session | ItemType::Sessions => {
self.produce_sessions(
scoping.organization_id,
scoping.project_id,
retention,
client,
item,
)?;
// Do nothing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a catch-all at the bottom of this match, so we can just remove this branch.

}
ItemType::MetricBuckets => self.produce_metrics(
scoping.organization_id,
Expand Down Expand Up @@ -614,158 +593,6 @@ impl StoreService {
self.produce(KafkaTopic::Attachments, organization_id, message)
}

fn produce_sessions(
&self,
org_id: u64,
project_id: ProjectId,
event_retention: u16,
client: Option<&str>,
item: &Item,
) -> Result<(), StoreError> {
match item.ty() {
ItemType::Session => {
let mut session = match SessionUpdate::parse(&item.payload()) {
Ok(session) => session,
Err(error) => {
relay_log::error!(
error = &error as &dyn std::error::Error,
"failed to store session"
);
return Ok(());
}
};

if session.status == SessionStatus::Errored {
// Individual updates should never have the status `errored`
session.status = SessionStatus::Exited;
}
self.produce_session_update(org_id, project_id, event_retention, client, session)
}
ItemType::Sessions => {
let aggregates = match SessionAggregates::parse(&item.payload()) {
Ok(aggregates) => aggregates,
Err(_) => return Ok(()),
};

self.produce_sessions_from_aggregate(
org_id,
project_id,
event_retention,
client,
aggregates,
)
}
_ => Ok(()),
}
}

fn produce_sessions_from_aggregate(
&self,
org_id: u64,
project_id: ProjectId,
event_retention: u16,
client: Option<&str>,
aggregates: SessionAggregates,
) -> Result<(), StoreError> {
let SessionAggregates {
aggregates,
attributes,
} = aggregates;
let message = SessionKafkaMessage {
org_id,
project_id,
session_id: Uuid::nil(),
distinct_id: Uuid::nil(),
quantity: 1,
seq: 0,
received: protocol::datetime_to_timestamp(chrono::Utc::now()),
started: 0f64,
duration: None,
errors: 0,
release: attributes.release,
environment: attributes.environment,
sdk: client.map(str::to_owned),
retention_days: event_retention,
status: SessionStatus::Exited,
};

if aggregates.len() > MAX_EXPLODED_SESSIONS {
relay_log::warn!("aggregated session items exceed threshold");
}

for item in aggregates.into_iter().take(MAX_EXPLODED_SESSIONS) {
let mut message = message.clone();
message.started = protocol::datetime_to_timestamp(item.started);
message.distinct_id = item
.distinct_id
.as_deref()
.map(make_distinct_id)
.unwrap_or_default();

if item.exited > 0 {
message.errors = 0;
message.quantity = item.exited;
self.send_session_message(org_id, message.clone())?;
}
if item.errored > 0 {
message.errors = 1;
message.status = SessionStatus::Errored;
message.quantity = item.errored;
self.send_session_message(org_id, message.clone())?;
}
if item.abnormal > 0 {
message.errors = 1;
message.status = SessionStatus::Abnormal;
message.quantity = item.abnormal;
self.send_session_message(org_id, message.clone())?;
}
if item.crashed > 0 {
message.errors = 1;
message.status = SessionStatus::Crashed;
message.quantity = item.crashed;
self.send_session_message(org_id, message)?;
}
}
Ok(())
}

fn produce_session_update(
&self,
org_id: u64,
project_id: ProjectId,
event_retention: u16,
client: Option<&str>,
session: SessionUpdate,
) -> Result<(), StoreError> {
self.send_session_message(
org_id,
SessionKafkaMessage {
org_id,
project_id,
session_id: session.session_id,
distinct_id: session
.distinct_id
.as_deref()
.map(make_distinct_id)
.unwrap_or_default(),
quantity: 1,
seq: if session.init { 0 } else { session.sequence },
received: protocol::datetime_to_timestamp(session.timestamp),
started: protocol::datetime_to_timestamp(session.started),
duration: session.duration,
status: session.status.clone(),
errors: session.errors.clamp(
(session.status == SessionStatus::Crashed) as _,
u16::MAX.into(),
) as _,
release: session.attributes.release,
environment: session.attributes.environment,
sdk: client.map(str::to_owned),
retention_days: event_retention,
},
)
}

fn send_metric_message(
&self,
namespace: MetricNamespace,
Expand Down Expand Up @@ -834,19 +661,6 @@ impl StoreService {
Ok(())
}

fn send_session_message(
&self,
organization_id: u64,
message: SessionKafkaMessage,
) -> Result<(), StoreError> {
self.produce(
KafkaTopic::Sessions,
organization_id,
KafkaMessage::Session(message),
)?;
Ok(())
}

fn produce_profile(
&self,
organization_id: u64,
Expand Down Expand Up @@ -1628,7 +1442,6 @@ enum KafkaMessage<'a> {
Attachment(AttachmentKafkaMessage),
AttachmentChunk(AttachmentChunkKafkaMessage),
UserReport(UserReportKafkaMessage),
Session(SessionKafkaMessage),
Metric {
#[serde(skip)]
headers: BTreeMap<String, String>,
Expand All @@ -1651,7 +1464,6 @@ impl Message for KafkaMessage<'_> {
KafkaMessage::Attachment(_) => "attachment",
KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
KafkaMessage::UserReport(_) => "user_report",
KafkaMessage::Session(_) => "session",
KafkaMessage::Metric { .. } => "metric",
KafkaMessage::Profile(_) => "profile",
KafkaMessage::ReplayEvent(_) => "replay_event",
Expand Down Expand Up @@ -1679,8 +1491,7 @@ impl Message for KafkaMessage<'_> {
Self::CheckIn(message) => message.routing_key_hint.unwrap_or_else(Uuid::nil),

// Random partitioning
Self::Session(_)
| Self::Profile(_)
Self::Profile(_)
| Self::ReplayRecordingNotChunked(_)
| Self::Span(_)
| Self::MetricsSummary(_)
Expand Down Expand Up @@ -1718,9 +1529,6 @@ impl Message for KafkaMessage<'_> {
/// Serializes the message into its binary format.
fn serialize(&self) -> Result<Cow<'_, [u8]>, ClientError> {
match self {
KafkaMessage::Session(message) => serde_json::to_vec(message)
.map(Cow::Owned)
.map_err(ClientError::InvalidJson),
KafkaMessage::Metric { message, .. } => serde_json::to_vec(message)
.map(Cow::Owned)
.map_err(ClientError::InvalidJson),
Expand Down