Skip to content

Commit

Permalink
only skip produce
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnagara committed Mar 18, 2024
1 parent 1c91340 commit 24d3f89
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 208 deletions.
7 changes: 2 additions & 5 deletions relay-dynamic-config/src/metrics.rs
Expand Up @@ -78,9 +78,6 @@ pub struct SessionMetricsConfig {
///
/// Version `0` (default) disables extraction.
version: u16,

/// Drop sessions after successfully extracting metrics.
drop: bool,
}

impl SessionMetricsConfig {
Expand All @@ -99,9 +96,9 @@ impl SessionMetricsConfig {
self.version >= EXTRACT_ABNORMAL_MECHANISM_VERSION
}

/// Returns `true` if the session should be dropped after extracting metrics.
/// Session is dropped after extracting metrics.
pub fn should_drop(&self) -> bool {
self.drop
true
}
}

Expand Down
9 changes: 1 addition & 8 deletions relay-kafka/src/config.rs
Expand Up @@ -35,8 +35,6 @@ pub enum KafkaTopic {
Outcomes,
/// Override for billing critical outcomes.
OutcomesBilling,
/// Session health updates.
Sessions,
/// Any metric that is extracted from sessions.
MetricsSessions,
/// Generic metrics topic, excluding sessions (release health).
Expand All @@ -62,13 +60,12 @@ impl KafkaTopic {
/// It will have to be adjusted if the new variants are added.
pub fn iter() -> std::slice::Iter<'static, Self> {
use KafkaTopic::*;
static TOPICS: [KafkaTopic; 15] = [
static TOPICS: [KafkaTopic; 14] = [
Events,
Attachments,
Transactions,
Outcomes,
OutcomesBilling,
Sessions,
MetricsSessions,
MetricsGeneric,
Profiles,
Expand Down Expand Up @@ -97,8 +94,6 @@ pub struct TopicAssignments {
pub outcomes: TopicAssignment,
/// Outcomes topic name for billing critical outcomes. Defaults to the assignment of `outcomes`.
pub outcomes_billing: Option<TopicAssignment>,
/// Session health topic name.
pub sessions: TopicAssignment,
/// Default topic name for all aggregate metrics. Specialized topics for session-based and
/// generic metrics can be configured via `metrics_sessions` and `metrics_generic` each.
pub metrics: TopicAssignment,
Expand Down Expand Up @@ -133,7 +128,6 @@ impl TopicAssignments {
KafkaTopic::Transactions => &self.transactions,
KafkaTopic::Outcomes => &self.outcomes,
KafkaTopic::OutcomesBilling => self.outcomes_billing.as_ref().unwrap_or(&self.outcomes),
KafkaTopic::Sessions => &self.sessions,
KafkaTopic::MetricsSessions => self.metrics_sessions.as_ref().unwrap_or(&self.metrics),
KafkaTopic::MetricsGeneric => &self.metrics_generic,
KafkaTopic::Profiles => &self.profiles,
Expand All @@ -155,7 +149,6 @@ impl Default for TopicAssignments {
transactions: "ingest-transactions".to_owned().into(),
outcomes: "outcomes".to_owned().into(),
outcomes_billing: None,
sessions: "ingest-sessions".to_owned().into(),
metrics: "ingest-metrics".to_owned().into(),
metrics_sessions: None,
metrics_generic: "ingest-performance-metrics".to_owned().into(),
Expand Down
198 changes: 3 additions & 195 deletions relay-server/src/services/store.rs
Expand Up @@ -8,14 +8,11 @@ use std::sync::Arc;
use std::time::Instant;

use bytes::Bytes;
use once_cell::sync::OnceCell;
use relay_base_schema::data_category::DataCategory;
use relay_base_schema::project::ProjectId;
use relay_common::time::{instant_to_date_time, UnixTimestamp};
use relay_config::Config;
use relay_event_schema::protocol::{
self, EventId, SessionAggregates, SessionStatus, SessionUpdate, VALID_PLATFORMS,
};
use relay_event_schema::protocol::{EventId, SessionStatus, VALID_PLATFORMS};

use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message};
use relay_metrics::{
Expand All @@ -37,9 +34,6 @@ use crate::services::processor::Processed;
use crate::statsd::RelayCounters;
use crate::utils::{self, ArrayEncoding, BucketEncoder, ExtractionMode, TypedEnvelope};

/// The maximum number of individual session updates generated for each aggregate item.
const MAX_EXPLODED_SESSIONS: usize = 100;

/// Fallback name used for attachment items without a `filename` header.
const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";

Expand All @@ -53,15 +47,6 @@ pub enum StoreError {
NoEventId,
}

fn make_distinct_id(s: &str) -> Uuid {
static NAMESPACE: OnceCell<Uuid> = OnceCell::new();
let namespace =
NAMESPACE.get_or_init(|| Uuid::new_v5(&Uuid::NAMESPACE_URL, b"https://sentry.io/#did"));

s.parse()
.unwrap_or_else(|_| Uuid::new_v5(namespace, s.as_bytes()))
}

struct Producer {
client: KafkaClient,
}
Expand Down Expand Up @@ -242,13 +227,7 @@ impl StoreService {
)?;
}
ItemType::Session | ItemType::Sessions => {
self.produce_sessions(
scoping.organization_id,
scoping.project_id,
retention,
client,
item,
)?;
// Do nothing
}
ItemType::MetricBuckets => self.produce_metrics(
scoping.organization_id,
Expand Down Expand Up @@ -614,158 +593,6 @@ impl StoreService {
self.produce(KafkaTopic::Attachments, organization_id, message)
}

fn produce_sessions(
&self,
org_id: u64,
project_id: ProjectId,
event_retention: u16,
client: Option<&str>,
item: &Item,
) -> Result<(), StoreError> {
match item.ty() {
ItemType::Session => {
let mut session = match SessionUpdate::parse(&item.payload()) {
Ok(session) => session,
Err(error) => {
relay_log::error!(
error = &error as &dyn std::error::Error,
"failed to store session"
);
return Ok(());
}
};

if session.status == SessionStatus::Errored {
// Individual updates should never have the status `errored`
session.status = SessionStatus::Exited;
}
self.produce_session_update(org_id, project_id, event_retention, client, session)
}
ItemType::Sessions => {
let aggregates = match SessionAggregates::parse(&item.payload()) {
Ok(aggregates) => aggregates,
Err(_) => return Ok(()),
};

self.produce_sessions_from_aggregate(
org_id,
project_id,
event_retention,
client,
aggregates,
)
}
_ => Ok(()),
}
}

fn produce_sessions_from_aggregate(
&self,
org_id: u64,
project_id: ProjectId,
event_retention: u16,
client: Option<&str>,
aggregates: SessionAggregates,
) -> Result<(), StoreError> {
let SessionAggregates {
aggregates,
attributes,
} = aggregates;
let message = SessionKafkaMessage {
org_id,
project_id,
session_id: Uuid::nil(),
distinct_id: Uuid::nil(),
quantity: 1,
seq: 0,
received: protocol::datetime_to_timestamp(chrono::Utc::now()),
started: 0f64,
duration: None,
errors: 0,
release: attributes.release,
environment: attributes.environment,
sdk: client.map(str::to_owned),
retention_days: event_retention,
status: SessionStatus::Exited,
};

if aggregates.len() > MAX_EXPLODED_SESSIONS {
relay_log::warn!("aggregated session items exceed threshold");
}

for item in aggregates.into_iter().take(MAX_EXPLODED_SESSIONS) {
let mut message = message.clone();
message.started = protocol::datetime_to_timestamp(item.started);
message.distinct_id = item
.distinct_id
.as_deref()
.map(make_distinct_id)
.unwrap_or_default();

if item.exited > 0 {
message.errors = 0;
message.quantity = item.exited;
self.send_session_message(org_id, message.clone())?;
}
if item.errored > 0 {
message.errors = 1;
message.status = SessionStatus::Errored;
message.quantity = item.errored;
self.send_session_message(org_id, message.clone())?;
}
if item.abnormal > 0 {
message.errors = 1;
message.status = SessionStatus::Abnormal;
message.quantity = item.abnormal;
self.send_session_message(org_id, message.clone())?;
}
if item.crashed > 0 {
message.errors = 1;
message.status = SessionStatus::Crashed;
message.quantity = item.crashed;
self.send_session_message(org_id, message)?;
}
}
Ok(())
}

fn produce_session_update(
&self,
org_id: u64,
project_id: ProjectId,
event_retention: u16,
client: Option<&str>,
session: SessionUpdate,
) -> Result<(), StoreError> {
self.send_session_message(
org_id,
SessionKafkaMessage {
org_id,
project_id,
session_id: session.session_id,
distinct_id: session
.distinct_id
.as_deref()
.map(make_distinct_id)
.unwrap_or_default(),
quantity: 1,
seq: if session.init { 0 } else { session.sequence },
received: protocol::datetime_to_timestamp(session.timestamp),
started: protocol::datetime_to_timestamp(session.started),
duration: session.duration,
status: session.status.clone(),
errors: session.errors.clamp(
(session.status == SessionStatus::Crashed) as _,
u16::MAX.into(),
) as _,
release: session.attributes.release,
environment: session.attributes.environment,
sdk: client.map(str::to_owned),
retention_days: event_retention,
},
)
}

fn send_metric_message(
&self,
namespace: MetricNamespace,
Expand Down Expand Up @@ -834,19 +661,6 @@ impl StoreService {
Ok(())
}

fn send_session_message(
&self,
organization_id: u64,
message: SessionKafkaMessage,
) -> Result<(), StoreError> {
self.produce(
KafkaTopic::Sessions,
organization_id,
KafkaMessage::Session(message),
)?;
Ok(())
}

fn produce_profile(
&self,
organization_id: u64,
Expand Down Expand Up @@ -1628,7 +1442,6 @@ enum KafkaMessage<'a> {
Attachment(AttachmentKafkaMessage),
AttachmentChunk(AttachmentChunkKafkaMessage),
UserReport(UserReportKafkaMessage),
Session(SessionKafkaMessage),
Metric {
#[serde(skip)]
headers: BTreeMap<String, String>,
Expand All @@ -1651,7 +1464,6 @@ impl Message for KafkaMessage<'_> {
KafkaMessage::Attachment(_) => "attachment",
KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
KafkaMessage::UserReport(_) => "user_report",
KafkaMessage::Session(_) => "session",
KafkaMessage::Metric { .. } => "metric",
KafkaMessage::Profile(_) => "profile",
KafkaMessage::ReplayEvent(_) => "replay_event",
Expand Down Expand Up @@ -1679,8 +1491,7 @@ impl Message for KafkaMessage<'_> {
Self::CheckIn(message) => message.routing_key_hint.unwrap_or_else(Uuid::nil),

// Random partitioning
Self::Session(_)
| Self::Profile(_)
Self::Profile(_)
| Self::ReplayRecordingNotChunked(_)
| Self::Span(_)
| Self::MetricsSummary(_)
Expand Down Expand Up @@ -1718,9 +1529,6 @@ impl Message for KafkaMessage<'_> {
/// Serializes the message into its binary format.
fn serialize(&self) -> Result<Cow<'_, [u8]>, ClientError> {
match self {
KafkaMessage::Session(message) => serde_json::to_vec(message)
.map(Cow::Owned)
.map_err(ClientError::InvalidJson),
KafkaMessage::Metric { message, .. } => serde_json::to_vec(message)
.map(Cow::Owned)
.map_err(ClientError::InvalidJson),
Expand Down

0 comments on commit 24d3f89

Please sign in to comment.