Skip to content

Commit

Permalink
[chore] rework memorylimiter test to avoid flaky tests (#9733)
Browse files Browse the repository at this point in the history
Peeling this set of changes from #9584 as a separate PR.

These changes allow a reduction of issues when working with ARM64, which
seems to fail on some of the resource locking used in those tests.
  • Loading branch information
atoulme committed Mar 19, 2024
1 parent 7f13812 commit 2a6a3f9
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 19 deletions.
39 changes: 22 additions & 17 deletions processor/memorylimiterprocessor/internal/mock_exporter.go
Expand Up @@ -8,14 +8,15 @@ import (
"sync/atomic"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
)

type MockExporter struct {
destAvailable int64
acceptedLogCount int64
deliveredLogCount int64
Logs []plog.Logs
destAvailable atomic.Bool
acceptedLogCount atomic.Int64
deliveredLogCount atomic.Int64
Logs consumertest.LogsSink
}

var _ consumer.Logs = (*MockExporter)(nil)
Expand All @@ -24,44 +25,48 @@ func (e *MockExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{}
}

func (e *MockExporter) ConsumeLogs(_ context.Context, ld plog.Logs) error {
atomic.AddInt64(&e.acceptedLogCount, int64(ld.LogRecordCount()))
func (e *MockExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
e.acceptedLogCount.Add(int64(ld.LogRecordCount()))

if atomic.LoadInt64(&e.destAvailable) == 1 {
if e.destAvailable.Load() {
// Destination is available, immediately deliver.
atomic.AddInt64(&e.deliveredLogCount, int64(ld.LogRecordCount()))
e.deliveredLogCount.Add(int64(ld.LogRecordCount()))
} else {
// Destination is not available. Queue the logs in the exporter.
e.Logs = append(e.Logs, ld)
return e.Logs.ConsumeLogs(ctx, ld)
}
return nil
}

func (e *MockExporter) SetDestAvailable(available bool) {
if available {
// Pretend we delivered all queued accepted logs.
atomic.AddInt64(&e.deliveredLogCount, atomic.LoadInt64(&e.acceptedLogCount))
e.deliveredLogCount.Add(int64(e.Logs.LogRecordCount()))

// Get rid of the delivered logs so that memory can be collected.
e.Logs = nil
e.Logs.Reset()

// Now mark destination available so that subsequent ConsumeLogs
// don't queue the logs anymore.
atomic.StoreInt64(&e.destAvailable, 1)

e.destAvailable.Store(true)
} else {
atomic.StoreInt64(&e.destAvailable, 0)
e.destAvailable.Store(false)
}
}

func (e *MockExporter) AcceptedLogCount() int {
return int(atomic.LoadInt64(&e.acceptedLogCount))
return int(e.acceptedLogCount.Load())
}

func (e *MockExporter) DeliveredLogCount() int {
return int(atomic.LoadInt64(&e.deliveredLogCount))
return int(e.deliveredLogCount.Load())
}

func NewMockExporter() *MockExporter {
return &MockExporter{}
return &MockExporter{
destAvailable: atomic.Bool{},
acceptedLogCount: atomic.Int64{},
deliveredLogCount: atomic.Int64{},
Logs: consumertest.LogsSink{},
}
}
6 changes: 4 additions & 2 deletions processor/memorylimiterprocessor/memorylimiter_test.go
Expand Up @@ -99,8 +99,10 @@ func TestNoDataLoss(t *testing.T) {

// And eventually the exporter must confirm that it delivered exact number of produced logs.
require.Eventually(t, func() bool {
return receiver.ProduceCount == exporter.DeliveredLogCount()
}, 5*time.Second, 1*time.Millisecond)
d := exporter.DeliveredLogCount()
t.Logf("received: %d, expected: %d\n", d, receiver.ProduceCount)
return receiver.ProduceCount == d
}, 5*time.Second, 100*time.Millisecond)

// Double check that the number of logs accepted by exporter matches the number of produced by receiver.
assert.Equal(t, receiver.ProduceCount, exporter.AcceptedLogCount())
Expand Down

0 comments on commit 2a6a3f9

Please sign in to comment.