Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
cevian committed Dec 13, 2022
1 parent efdf869 commit 4c34323
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
9 changes: 5 additions & 4 deletions pkg/pgmodel/ingestor/metric_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con

numSeries := pending.batch.CountSeries()
numSamples, numExemplars := pending.batch.Count()

wasFull := pending.IsFull()
start := pending.Start
select {
//try to batch as much as possible before sending
case req, ok := <-recvCh:
Expand All @@ -265,7 +266,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con
span.AddEvent("Sending last non-empty batch")
copySender <- copyRequest{pending, info}
metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries))
metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(pending.Start).Seconds())
metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(start).Seconds())
}
span.AddEvent("Exiting metric batcher batch loop")
span.SetAttributes(attribute.Int("num_series", numSeries))
Expand All @@ -276,8 +277,8 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con
case copySender <- copyRequest{pending, info}:
metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries))
metrics.IngestorFlushInsertables.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSamples + numExemplars))
metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(pending.Start).Seconds())
if pending.IsFull() {
metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(start).Seconds())
if wasFull {
metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher", "reason": "size"}).Inc()
} else {
metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher", "reason": "requested"}).Inc()
Expand Down
2 changes: 1 addition & 1 deletion pkg/pgmodel/ingestor/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (rq *ReservationQueue) Peek() (time.Time, bool) {
case <-waitch:
case <-time.After(250 * time.Millisecond):
}
log.TraceRequest("component", "reservation", "event", "peek", "batched_metrics", rq.q.Len(), "waited", waited, "took", time.Since((*rq.q)[0].GetStartTime()))
log.TraceRequest("component", "reservation", "event", "peek", "batched_metrics", rq.Len(), "waited", waited, "took", time.Since(reservation.GetStartTime()))
}
return reservation.GetStartTime(), ok
}
Expand Down

0 comments on commit 4c34323

Please sign in to comment.