New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Remove sessions #3271
feat: Remove sessions #3271
Changes from 2 commits
24d3f89
0fb63c8
b8ef1c5
7054dba
55fede5
8999575
db37ddb
73950f1
4211357
aa9c851
400f7e8
f8b44d5
cd99fdc
16c5062
9fe7a2f
acaefcc
682f4f7
cf8d5fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,8 +35,6 @@ pub enum KafkaTopic { | |
Outcomes, | ||
/// Override for billing critical outcomes. | ||
OutcomesBilling, | ||
/// Session health updates. | ||
Sessions, | ||
/// Any metric that is extracted from sessions. | ||
MetricsSessions, | ||
/// Generic metrics topic, excluding sessions (release health). | ||
|
@@ -62,13 +60,12 @@ impl KafkaTopic { | |
/// It will have to be adjusted if the new variants are added. | ||
pub fn iter() -> std::slice::Iter<'static, Self> { | ||
use KafkaTopic::*; | ||
static TOPICS: [KafkaTopic; 15] = [ | ||
static TOPICS: [KafkaTopic; 14] = [ | ||
Events, | ||
Attachments, | ||
Transactions, | ||
Outcomes, | ||
OutcomesBilling, | ||
Sessions, | ||
MetricsSessions, | ||
MetricsGeneric, | ||
Profiles, | ||
|
@@ -97,8 +94,6 @@ pub struct TopicAssignments { | |
pub outcomes: TopicAssignment, | ||
/// Outcomes topic name for billing critical outcomes. Defaults to the assignment of `outcomes`. | ||
pub outcomes_billing: Option<TopicAssignment>, | ||
/// Session health topic name. | ||
pub sessions: TopicAssignment, | ||
/// Default topic name for all aggregate metrics. Specialized topics for session-based and | ||
/// generic metrics can be configured via `metrics_sessions` and `metrics_generic` each. | ||
pub metrics: TopicAssignment, | ||
|
@@ -133,7 +128,6 @@ impl TopicAssignments { | |
KafkaTopic::Transactions => &self.transactions, | ||
KafkaTopic::Outcomes => &self.outcomes, | ||
KafkaTopic::OutcomesBilling => self.outcomes_billing.as_ref().unwrap_or(&self.outcomes), | ||
KafkaTopic::Sessions => &self.sessions, | ||
KafkaTopic::MetricsSessions => self.metrics_sessions.as_ref().unwrap_or(&self.metrics), | ||
KafkaTopic::MetricsGeneric => &self.metrics_generic, | ||
KafkaTopic::Profiles => &self.profiles, | ||
|
@@ -155,7 +149,6 @@ impl Default for TopicAssignments { | |
transactions: "ingest-transactions".to_owned().into(), | ||
outcomes: "outcomes".to_owned().into(), | ||
outcomes_billing: None, | ||
sessions: "ingest-sessions".to_owned().into(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the ops repo still have configuration for this? We don't want Relay to crash because of an unknown topic parameter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. https://github.com/getsentry/ops/blob/master/k8s/clusters/us/default.yaml#L3256-L3257. Though is this comment actually backwards and it needs to be removed from ops first? |
||
metrics: "ingest-metrics".to_owned().into(), | ||
metrics_sessions: None, | ||
metrics_generic: "ingest-performance-metrics".to_owned().into(), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,14 +8,11 @@ use std::sync::Arc; | |
use std::time::Instant; | ||
|
||
use bytes::Bytes; | ||
use once_cell::sync::OnceCell; | ||
use relay_base_schema::data_category::DataCategory; | ||
use relay_base_schema::project::ProjectId; | ||
use relay_common::time::{instant_to_date_time, UnixTimestamp}; | ||
use relay_config::Config; | ||
use relay_event_schema::protocol::{ | ||
self, EventId, SessionAggregates, SessionStatus, SessionUpdate, VALID_PLATFORMS, | ||
}; | ||
use relay_event_schema::protocol::{EventId, SessionStatus, VALID_PLATFORMS}; | ||
|
||
use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message}; | ||
use relay_metrics::{ | ||
|
@@ -37,9 +34,6 @@ use crate::services::processor::Processed; | |
use crate::statsd::RelayCounters; | ||
use crate::utils::{self, ArrayEncoding, BucketEncoder, ExtractionMode, TypedEnvelope}; | ||
|
||
/// The maximum number of individual session updates generated for each aggregate item. | ||
const MAX_EXPLODED_SESSIONS: usize = 100; | ||
|
||
/// Fallback name used for attachment items without a `filename` header. | ||
const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment"; | ||
|
||
|
@@ -53,15 +47,6 @@ pub enum StoreError { | |
NoEventId, | ||
} | ||
|
||
fn make_distinct_id(s: &str) -> Uuid { | ||
static NAMESPACE: OnceCell<Uuid> = OnceCell::new(); | ||
let namespace = | ||
NAMESPACE.get_or_init(|| Uuid::new_v5(&Uuid::NAMESPACE_URL, b"https://sentry.io/#did")); | ||
|
||
s.parse() | ||
.unwrap_or_else(|_| Uuid::new_v5(namespace, s.as_bytes())) | ||
} | ||
|
||
struct Producer { | ||
client: KafkaClient, | ||
} | ||
|
@@ -242,13 +227,7 @@ impl StoreService { | |
)?; | ||
} | ||
ItemType::Session | ItemType::Sessions => { | ||
self.produce_sessions( | ||
scoping.organization_id, | ||
scoping.project_id, | ||
retention, | ||
client, | ||
item, | ||
)?; | ||
// Do nothing | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a catch-all at the bottom of this match, so we can just remove this branch. |
||
} | ||
ItemType::MetricBuckets => self.produce_metrics( | ||
scoping.organization_id, | ||
|
@@ -614,158 +593,6 @@ impl StoreService { | |
self.produce(KafkaTopic::Attachments, organization_id, message) | ||
} | ||
|
||
fn produce_sessions( | ||
&self, | ||
org_id: u64, | ||
project_id: ProjectId, | ||
event_retention: u16, | ||
client: Option<&str>, | ||
item: &Item, | ||
) -> Result<(), StoreError> { | ||
match item.ty() { | ||
ItemType::Session => { | ||
let mut session = match SessionUpdate::parse(&item.payload()) { | ||
Ok(session) => session, | ||
Err(error) => { | ||
relay_log::error!( | ||
error = &error as &dyn std::error::Error, | ||
"failed to store session" | ||
); | ||
return Ok(()); | ||
} | ||
}; | ||
|
||
if session.status == SessionStatus::Errored { | ||
// Individual updates should never have the status `errored` | ||
session.status = SessionStatus::Exited; | ||
} | ||
self.produce_session_update(org_id, project_id, event_retention, client, session) | ||
} | ||
ItemType::Sessions => { | ||
let aggregates = match SessionAggregates::parse(&item.payload()) { | ||
Ok(aggregates) => aggregates, | ||
Err(_) => return Ok(()), | ||
}; | ||
|
||
self.produce_sessions_from_aggregate( | ||
org_id, | ||
project_id, | ||
event_retention, | ||
client, | ||
aggregates, | ||
) | ||
} | ||
_ => Ok(()), | ||
} | ||
} | ||
|
||
fn produce_sessions_from_aggregate( | ||
&self, | ||
org_id: u64, | ||
project_id: ProjectId, | ||
event_retention: u16, | ||
client: Option<&str>, | ||
aggregates: SessionAggregates, | ||
) -> Result<(), StoreError> { | ||
let SessionAggregates { | ||
aggregates, | ||
attributes, | ||
} = aggregates; | ||
let message = SessionKafkaMessage { | ||
org_id, | ||
project_id, | ||
session_id: Uuid::nil(), | ||
distinct_id: Uuid::nil(), | ||
quantity: 1, | ||
seq: 0, | ||
received: protocol::datetime_to_timestamp(chrono::Utc::now()), | ||
started: 0f64, | ||
duration: None, | ||
errors: 0, | ||
release: attributes.release, | ||
environment: attributes.environment, | ||
sdk: client.map(str::to_owned), | ||
retention_days: event_retention, | ||
status: SessionStatus::Exited, | ||
}; | ||
|
||
if aggregates.len() > MAX_EXPLODED_SESSIONS { | ||
relay_log::warn!("aggregated session items exceed threshold"); | ||
} | ||
|
||
for item in aggregates.into_iter().take(MAX_EXPLODED_SESSIONS) { | ||
let mut message = message.clone(); | ||
message.started = protocol::datetime_to_timestamp(item.started); | ||
message.distinct_id = item | ||
.distinct_id | ||
.as_deref() | ||
.map(make_distinct_id) | ||
.unwrap_or_default(); | ||
|
||
if item.exited > 0 { | ||
message.errors = 0; | ||
message.quantity = item.exited; | ||
self.send_session_message(org_id, message.clone())?; | ||
} | ||
if item.errored > 0 { | ||
message.errors = 1; | ||
message.status = SessionStatus::Errored; | ||
message.quantity = item.errored; | ||
self.send_session_message(org_id, message.clone())?; | ||
} | ||
if item.abnormal > 0 { | ||
message.errors = 1; | ||
message.status = SessionStatus::Abnormal; | ||
message.quantity = item.abnormal; | ||
self.send_session_message(org_id, message.clone())?; | ||
} | ||
if item.crashed > 0 { | ||
message.errors = 1; | ||
message.status = SessionStatus::Crashed; | ||
message.quantity = item.crashed; | ||
self.send_session_message(org_id, message)?; | ||
} | ||
} | ||
Ok(()) | ||
} | ||
|
||
fn produce_session_update( | ||
&self, | ||
org_id: u64, | ||
project_id: ProjectId, | ||
event_retention: u16, | ||
client: Option<&str>, | ||
session: SessionUpdate, | ||
) -> Result<(), StoreError> { | ||
self.send_session_message( | ||
org_id, | ||
SessionKafkaMessage { | ||
org_id, | ||
project_id, | ||
session_id: session.session_id, | ||
distinct_id: session | ||
.distinct_id | ||
.as_deref() | ||
.map(make_distinct_id) | ||
.unwrap_or_default(), | ||
quantity: 1, | ||
seq: if session.init { 0 } else { session.sequence }, | ||
received: protocol::datetime_to_timestamp(session.timestamp), | ||
started: protocol::datetime_to_timestamp(session.started), | ||
duration: session.duration, | ||
status: session.status.clone(), | ||
errors: session.errors.clamp( | ||
(session.status == SessionStatus::Crashed) as _, | ||
u16::MAX.into(), | ||
) as _, | ||
release: session.attributes.release, | ||
environment: session.attributes.environment, | ||
sdk: client.map(str::to_owned), | ||
retention_days: event_retention, | ||
}, | ||
) | ||
} | ||
|
||
fn send_metric_message( | ||
&self, | ||
namespace: MetricNamespace, | ||
|
@@ -834,19 +661,6 @@ impl StoreService { | |
Ok(()) | ||
} | ||
|
||
fn send_session_message( | ||
&self, | ||
organization_id: u64, | ||
message: SessionKafkaMessage, | ||
) -> Result<(), StoreError> { | ||
self.produce( | ||
KafkaTopic::Sessions, | ||
organization_id, | ||
KafkaMessage::Session(message), | ||
)?; | ||
Ok(()) | ||
} | ||
|
||
fn produce_profile( | ||
&self, | ||
organization_id: u64, | ||
|
@@ -1628,7 +1442,6 @@ enum KafkaMessage<'a> { | |
Attachment(AttachmentKafkaMessage), | ||
AttachmentChunk(AttachmentChunkKafkaMessage), | ||
UserReport(UserReportKafkaMessage), | ||
Session(SessionKafkaMessage), | ||
Metric { | ||
#[serde(skip)] | ||
headers: BTreeMap<String, String>, | ||
|
@@ -1651,7 +1464,6 @@ impl Message for KafkaMessage<'_> { | |
KafkaMessage::Attachment(_) => "attachment", | ||
KafkaMessage::AttachmentChunk(_) => "attachment_chunk", | ||
KafkaMessage::UserReport(_) => "user_report", | ||
KafkaMessage::Session(_) => "session", | ||
KafkaMessage::Metric { .. } => "metric", | ||
KafkaMessage::Profile(_) => "profile", | ||
KafkaMessage::ReplayEvent(_) => "replay_event", | ||
|
@@ -1679,8 +1491,7 @@ impl Message for KafkaMessage<'_> { | |
Self::CheckIn(message) => message.routing_key_hint.unwrap_or_else(Uuid::nil), | ||
|
||
// Random partitioning | ||
Self::Session(_) | ||
| Self::Profile(_) | ||
Self::Profile(_) | ||
| Self::ReplayRecordingNotChunked(_) | ||
| Self::Span(_) | ||
| Self::MetricsSummary(_) | ||
|
@@ -1718,9 +1529,6 @@ impl Message for KafkaMessage<'_> { | |
/// Serializes the message into its binary format. | ||
fn serialize(&self) -> Result<Cow<'_, [u8]>, ClientError> { | ||
match self { | ||
KafkaMessage::Session(message) => serde_json::to_vec(message) | ||
.map(Cow::Owned) | ||
.map_err(ClientError::InvalidJson), | ||
KafkaMessage::Metric { message, .. } => serde_json::to_vec(message) | ||
.map(Cow::Owned) | ||
.map_err(ClientError::InvalidJson), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I would just remove this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done