Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Prevent flushing a batch with less than one request
Browse files Browse the repository at this point in the history
Previously, when a new set of requests came in, we often
flushed a very small first batch because we flush as soon
as any data becomes available in Peek. Since small batches
are ineffecient we fix this by waiting till the entire first
request is batched to flush (with a timeout).
  • Loading branch information
cevian committed Nov 3, 2022
1 parent 9ecdcb5 commit 0efce9a
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 12 deletions.
5 changes: 4 additions & 1 deletion pkg/pgmodel/ingestor/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,10 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64
maxt int64
rows = dataTS.Rows
workFinished = new(sync.WaitGroup)
batched = new(sync.WaitGroup)
)
workFinished.Add(len(rows))
batched.Add(len(rows))
// we only allocate enough space for a single error message here as we only
// report one error back upstream. The inserter should not block on this
// channel, but only insert if it's empty, anything else can deadlock.
Expand All @@ -241,7 +243,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64
maxt = ts
}
}
p.getMetricBatcher(metricName) <- &insertDataRequest{requestCtx: ctx, spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, errChan: errChan}
p.getMetricBatcher(metricName) <- &insertDataRequest{requestCtx: ctx, spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, batched: batched, errChan: errChan}
}
span.SetAttributes(attribute.Int64("num_rows", int64(numRows)))
span.SetAttributes(attribute.Int("num_metrics", len(rows)))
Expand Down Expand Up @@ -339,6 +341,7 @@ type insertDataRequest struct {
spanCtx trace.SpanContext
metric string
finished *sync.WaitGroup
batched *sync.WaitGroup
data []model.Insertable
errChan chan error
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/pgmodel/ingestor/metric_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con
}
metrics.IngestorPipelineTime.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(t).Seconds())
reservation.Update(reservationQ, t, len(req.data))
req.batched.Done()
addSpan.End()
}
//This channel in synchronous (no buffering). This provides backpressure
Expand All @@ -225,7 +226,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con
t = time.Time{}
}
metrics.IngestorPipelineTime.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(t).Seconds())
reservation = reservationQ.Add(copySender, t)
reservation = reservationQ.Add(copySender, req.batched, t)
}

pending := NewPendingBuffer()
Expand Down
4 changes: 3 additions & 1 deletion pkg/pgmodel/ingestor/metric_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,16 @@ func TestSendBatches(t *testing.T) {
return l
}
var workFinished sync.WaitGroup
var batched sync.WaitGroup
batched.Add(1)
errChan := make(chan error, 1)
data := []model.Insertable{
model.NewPromSamples(makeSeries(1), make([]prompb.Sample, 1)),
model.NewPromSamples(makeSeries(2), make([]prompb.Sample, 1)),
model.NewPromSamples(makeSeries(3), make([]prompb.Sample, 1)),
}
spanCtx := psctx.WithStartTime(context.Background(), time.Now().Add(-time.Hour))
firstReq := &insertDataRequest{metric: "test", requestCtx: spanCtx, data: data, finished: &workFinished, errChan: errChan}
firstReq := &insertDataRequest{metric: "test", requestCtx: spanCtx, data: data, finished: &workFinished, batched: &batched, errChan: errChan}
reservationQ := NewReservationQueue()
go sendBatches(firstReq, nil, nil, &pgmodel.MetricInfo{MetricID: 1, TableName: "test"}, reservationQ)
resos := make([]readRequest, 0, 1)
Expand Down
42 changes: 33 additions & 9 deletions pkg/pgmodel/ingestor/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@ import (
)

type reservation struct {
copySender <-chan copyRequest
index int
copySender <-chan copyRequest
firstRequestBatched *sync.WaitGroup
index int

lock sync.Mutex
startTime time.Time

items int64
}

func newReservation(cs <-chan copyRequest, startTime time.Time) *reservation {
return &reservation{cs, -1, sync.Mutex{}, startTime, 1}
func newReservation(cs <-chan copyRequest, startTime time.Time, batched *sync.WaitGroup) *reservation {
return &reservation{cs, batched, -1, sync.Mutex{}, startTime, 1}
}

func (res *reservation) Update(rq *ReservationQueue, t time.Time, num_insertables int) {
Expand Down Expand Up @@ -110,8 +111,8 @@ func NewReservationQueue() *ReservationQueue {
return res
}

func (rq *ReservationQueue) Add(cs <-chan copyRequest, startTime time.Time) Reservation {
si := newReservation(cs, startTime)
func (rq *ReservationQueue) Add(cs <-chan copyRequest, batched *sync.WaitGroup, startTime time.Time) Reservation {
si := newReservation(cs, startTime, batched)

rq.lock.Lock()
defer rq.lock.Unlock()
Expand Down Expand Up @@ -146,19 +147,42 @@ func (rq *ReservationQueue) Close() {
// Peek gives the first startTime as well as if the queue is not closed.
// It blocks until there is an element in the queue or it has been closed.
func (rq *ReservationQueue) Peek() (time.Time, bool) {
reservation, waited, ok := rq.peek()
if !ok {
return time.Time{}, false
}
if waited {
/* If this is the first reservation in the queue, wait for the entire request to be batched with a timeout.
* (timeout is really a safety measure to prevent deadlocks if some metric batcher is full, which is unlikely)*/
waitch := make(chan struct{})
go func() {
reservation.firstRequestBatched.Wait()
close(waitch)
}()
select {
case <-waitch:
case <-time.After(250 * time.Millisecond):
}
}
return reservation.GetStartTime(), ok
}

func (rq *ReservationQueue) peek() (*reservation, bool, bool) {
rq.lock.Lock()
defer rq.lock.Unlock()
waited := false
for !rq.closed && rq.q.Len() == 0 {
waited = true
rq.cond.Wait()
}

if rq.q.Len() > 0 {
first := (*rq.q)[0]
return first.GetStartTime(), true
firstReservation := (*rq.q)[0]
return firstReservation, waited, true
}

//must be closed
return time.Time{}, false
return nil, false, false
}

// PopBatch pops from the queue to populate the batch until either batch is full or the queue is empty.
Expand Down

0 comments on commit 0efce9a

Please sign in to comment.