Skip to content

Commit

Permalink
fix(cogs): Fix cogs measurements for batched metrics (#3266)
Browse files Browse the repository at this point in the history
The cogs were updated in a loop -> only the last iteration was actually
counted.
  • Loading branch information
Dav1dde committed Mar 14, 2024
1 parent 437fe72 commit 1c91340
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 24 deletions.
14 changes: 14 additions & 0 deletions relay-cogs/src/cogs.rs
Expand Up @@ -204,6 +204,20 @@ impl FeatureWeights {
Some((*feature, ratio))
})
}

/// Returns `true` if there are no weights contained.
///
/// # Examples
///
/// ```
/// use relay_cogs::{AppFeature, FeatureWeights};
///
/// assert!(FeatureWeights::none().is_empty());
/// assert!(!FeatureWeights::new(AppFeature::Spans).is_empty());
/// ```
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}

impl fmt::Debug for FeatureWeights {
Expand Down
58 changes: 34 additions & 24 deletions relay-server/src/services/processor.rs
Expand Up @@ -1747,38 +1747,48 @@ impl EnvelopeProcessorService {
buckets: HashMap<ProjectKey, Vec<Bucket>>,
}

match serde_json::from_slice(&payload) {
Ok(Wrapper { buckets }) => {
for (public_key, mut buckets) in buckets {
for bucket in &mut buckets {
clock_drift_processor.process_timestamp(&mut bucket.timestamp);
if !keep_metadata {
bucket.metadata = BucketMetadata::new();
}
}

MetricStats::new(&buckets).emit(
RelayCounters::ProcessorBatchedMetricsCalls,
RelayCounters::ProcessorBatchedMetricsCount,
RelayCounters::ProcessorBatchedMetricsCost,
);

cogs.update(relay_metrics::cogs::BySize(&buckets));

relay_log::trace!("merging metric buckets into project cache");
self.inner
.project_cache
.send(MergeBuckets::new(public_key, buckets));
}
}
let buckets = match serde_json::from_slice(&payload) {
Ok(Wrapper { buckets }) => buckets,
Err(error) => {
relay_log::debug!(
error = &error as &dyn Error,
"failed to parse batched metrics",
);
metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
return;
}
};

let mut feature_weights = FeatureWeights::none();
for (public_key, mut buckets) in buckets {
if buckets.is_empty() {
continue;
}

for bucket in &mut buckets {
clock_drift_processor.process_timestamp(&mut bucket.timestamp);
if !keep_metadata {
bucket.metadata = BucketMetadata::new();
}
}

MetricStats::new(&buckets).emit(
RelayCounters::ProcessorBatchedMetricsCalls,
RelayCounters::ProcessorBatchedMetricsCount,
RelayCounters::ProcessorBatchedMetricsCost,
);

feature_weights = feature_weights.merge(relay_metrics::cogs::BySize(&buckets).into());

relay_log::trace!("merging metric buckets into project cache");
self.inner
.project_cache
.send(MergeBuckets::new(public_key, buckets));
}

if !feature_weights.is_empty() {
cogs.update(feature_weights);
}
}

fn handle_process_metric_meta(&self, message: ProcessMetricMeta) {
Expand Down

0 comments on commit 1c91340

Please sign in to comment.