Skip to content

Commit

Permalink
work on TestNoDataLoss
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Feb 24, 2024
1 parent 5fa8d69 commit 4393d45
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 18 deletions.
39 changes: 22 additions & 17 deletions processor/memorylimiterprocessor/internal/mock_exporter.go
Expand Up @@ -8,14 +8,15 @@ import (
"sync/atomic"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
)

type MockExporter struct {
destAvailable int64
acceptedLogCount int64
deliveredLogCount int64
Logs []plog.Logs
destAvailable atomic.Bool
acceptedLogCount atomic.Int64
deliveredLogCount atomic.Int64
Logs consumertest.LogsSink
}

var _ consumer.Logs = (*MockExporter)(nil)
Expand All @@ -24,44 +25,48 @@ func (e *MockExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{}
}

func (e *MockExporter) ConsumeLogs(_ context.Context, ld plog.Logs) error {
atomic.AddInt64(&e.acceptedLogCount, int64(ld.LogRecordCount()))
func (e *MockExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
e.acceptedLogCount.Add(int64(ld.LogRecordCount()))

if atomic.LoadInt64(&e.destAvailable) == 1 {
if e.destAvailable.Load() {
// Destination is available, immediately deliver.
atomic.AddInt64(&e.deliveredLogCount, int64(ld.LogRecordCount()))
e.deliveredLogCount.Add(int64(ld.LogRecordCount()))
} else {
// Destination is not available. Queue the logs in the exporter.
e.Logs = append(e.Logs, ld)
return e.Logs.ConsumeLogs(ctx, ld)
}
return nil
}

func (e *MockExporter) SetDestAvailable(available bool) {
if available {
// Pretend we delivered all queued accepted logs.
atomic.AddInt64(&e.deliveredLogCount, atomic.LoadInt64(&e.acceptedLogCount))
e.deliveredLogCount.Add(int64(e.Logs.LogRecordCount()))

// Get rid of the delivered logs so that memory can be collected.
e.Logs = nil
e.Logs.Reset()

// Now mark destination available so that subsequent ConsumeLogs
// don't queue the logs anymore.
atomic.StoreInt64(&e.destAvailable, 1)

e.destAvailable.Store(true)
} else {
atomic.StoreInt64(&e.destAvailable, 0)
e.destAvailable.Store(false)
}
}

func (e *MockExporter) AcceptedLogCount() int {
return int(atomic.LoadInt64(&e.acceptedLogCount))
return int(e.acceptedLogCount.Load())
}

func (e *MockExporter) DeliveredLogCount() int {
return int(atomic.LoadInt64(&e.deliveredLogCount))
return int(e.deliveredLogCount.Load())
}

func NewMockExporter() *MockExporter {
return &MockExporter{}
return &MockExporter{
destAvailable: atomic.Bool{},
acceptedLogCount: atomic.Int64{},
deliveredLogCount: atomic.Int64{},
Logs: consumertest.LogsSink{},
}
}
2 changes: 1 addition & 1 deletion processor/memorylimiterprocessor/memorylimiter_test.go
Expand Up @@ -100,7 +100,7 @@ func TestNoDataLoss(t *testing.T) {
// And eventually the exporter must confirm that it delivered exact number of produced logs.
require.Eventually(t, func() bool {
return receiver.ProduceCount == exporter.DeliveredLogCount()
}, 5*time.Second, 1*time.Millisecond)
}, 5*time.Second, 100*time.Millisecond)

// Double check that the number of logs accepted by exporter matches the number of produced by receiver.
assert.Equal(t, receiver.ProduceCount, exporter.AcceptedLogCount())
Expand Down

0 comments on commit 4393d45

Please sign in to comment.