Skip to content

Commit

Permalink
work on TestNoDataLoss
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Feb 15, 2024
1 parent 5fa8d69 commit 545acff
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 24 deletions.
53 changes: 29 additions & 24 deletions processor/memorylimiterprocessor/internal/mock_exporter.go
Expand Up @@ -8,60 +8,65 @@ import (
"sync/atomic"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
)

type MockExporter struct {
destAvailable int64
acceptedLogCount int64
deliveredLogCount int64
Logs []plog.Logs
type mockExporter struct {
destAvailable atomic.Bool
acceptedLogCount atomic.Int64
deliveredLogCount atomic.Int64
Logs consumertest.LogsSink
}

var _ consumer.Logs = (*MockExporter)(nil)
var _ consumer.Logs = (*mockExporter)(nil)

func (e *MockExporter) Capabilities() consumer.Capabilities {
func (e *mockExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{}
}

func (e *MockExporter) ConsumeLogs(_ context.Context, ld plog.Logs) error {
atomic.AddInt64(&e.acceptedLogCount, int64(ld.LogRecordCount()))
func (e *mockExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
e.acceptedLogCount.Add(int64(ld.LogRecordCount()))

if atomic.LoadInt64(&e.destAvailable) == 1 {
if e.destAvailable.Load() {
// Destination is available, immediately deliver.
atomic.AddInt64(&e.deliveredLogCount, int64(ld.LogRecordCount()))
e.deliveredLogCount.Add(int64(ld.LogRecordCount()))
} else {
// Destination is not available. Queue the logs in the exporter.
e.Logs = append(e.Logs, ld)
return e.Logs.ConsumeLogs(ctx, ld)
}
return nil
}

func (e *MockExporter) SetDestAvailable(available bool) {
func (e *mockExporter) SetDestAvailable(available bool) {
if available {
// Pretend we delivered all queued accepted logs.
atomic.AddInt64(&e.deliveredLogCount, atomic.LoadInt64(&e.acceptedLogCount))
e.deliveredLogCount.Add(e.acceptedLogCount.Load())

// Get rid of the delivered logs so that memory can be collected.
e.Logs = nil
e.Logs.Reset()

// Now mark destination available so that subsequent ConsumeLogs
// don't queue the logs anymore.
atomic.StoreInt64(&e.destAvailable, 1)

e.destAvailable.Store(true)
} else {
atomic.StoreInt64(&e.destAvailable, 0)
e.destAvailable.Store(false)
}
}

func (e *MockExporter) AcceptedLogCount() int {
return int(atomic.LoadInt64(&e.acceptedLogCount))
func (e *mockExporter) AcceptedLogCount() int {
return int(e.acceptedLogCount.Load())
}

func (e *MockExporter) DeliveredLogCount() int {
return int(atomic.LoadInt64(&e.deliveredLogCount))
func (e *mockExporter) DeliveredLogCount() int {
return int(e.deliveredLogCount.Load())
}

func NewMockExporter() *MockExporter {
return &MockExporter{}
func NewMockExporter() *mockExporter {
return &mockExporter{
destAvailable: atomic.Bool{},
acceptedLogCount: atomic.Int64{},
deliveredLogCount: atomic.Int64{},
Logs: consumertest.LogsSink{},
}
}
5 changes: 5 additions & 0 deletions processor/memorylimiterprocessor/memorylimiter_test.go
Expand Up @@ -26,6 +26,11 @@ import (
"go.opentelemetry.io/collector/processor/processortest"
)

func TestMillion(t *testing.T) {
for i := 0; i < 10000; i++ {
TestNoDataLoss(t)
}
}
func TestNoDataLoss(t *testing.T) {
// Create an exporter.
exporter := internal.NewMockExporter()
Expand Down

0 comments on commit 545acff

Please sign in to comment.