Skip to content

Commit

Permalink
work on TestNoDataLoss
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Feb 24, 2024
1 parent 5fa8d69 commit c7617b3
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 21 deletions.
11 changes: 9 additions & 2 deletions .github/workflows/build-and-test.yml
Expand Up @@ -137,11 +137,18 @@ jobs:
unittest-matrix:
strategy:
matrix:
runner: [ubuntu-latest, actuated-arm64-8cpu-16gb]
runner: [ubuntu-latest, actuated-arm64-4cpu-4gb]
go-version: ["~1.22", "~1.21.5"] # 1.20 needs quotes otherwise it's interpreted as 1.2
runs-on: ${{ matrix.runner }}
needs: [setup-environment]
steps:
- name: Set up arkade
uses: alexellis/setup-arkade@master
- name: Install vmmeter
run: |
sudo -E arkade oci install ghcr.io/openfaasltd/vmmeter:latest --path /usr/local/bin/
- name: Run vmmeter
uses: self-actuated/vmmeter-action@master
- name: Checkout Repo
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
- name: Setup Go
Expand All @@ -164,7 +171,7 @@ jobs:
key: unittest-${{ runner.os }}-${{ matrix.runner }}-go-build-${{ matrix.go-version }}-${{ hashFiles('**/go.sum') }}
- name: Run Unit Tests
run: |
make -j8 gotest
make -j4 gotest
unittest:
if: always()
runs-on: ubuntu-latest
Expand Down
39 changes: 22 additions & 17 deletions processor/memorylimiterprocessor/internal/mock_exporter.go
Expand Up @@ -8,14 +8,15 @@ import (
"sync/atomic"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
)

type MockExporter struct {
destAvailable int64
acceptedLogCount int64
deliveredLogCount int64
Logs []plog.Logs
destAvailable atomic.Bool
acceptedLogCount atomic.Int64
deliveredLogCount atomic.Int64
Logs consumertest.LogsSink
}

var _ consumer.Logs = (*MockExporter)(nil)
Expand All @@ -24,44 +25,48 @@ func (e *MockExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{}
}

func (e *MockExporter) ConsumeLogs(_ context.Context, ld plog.Logs) error {
atomic.AddInt64(&e.acceptedLogCount, int64(ld.LogRecordCount()))
func (e *MockExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
e.acceptedLogCount.Add(int64(ld.LogRecordCount()))

if atomic.LoadInt64(&e.destAvailable) == 1 {
if e.destAvailable.Load() {
// Destination is available, immediately deliver.
atomic.AddInt64(&e.deliveredLogCount, int64(ld.LogRecordCount()))
e.deliveredLogCount.Add(int64(ld.LogRecordCount()))
} else {
// Destination is not available. Queue the logs in the exporter.
e.Logs = append(e.Logs, ld)
return e.Logs.ConsumeLogs(ctx, ld)
}
return nil
}

func (e *MockExporter) SetDestAvailable(available bool) {
if available {
// Pretend we delivered all queued accepted logs.
atomic.AddInt64(&e.deliveredLogCount, atomic.LoadInt64(&e.acceptedLogCount))
e.deliveredLogCount.Add(int64(e.Logs.LogRecordCount()))

// Get rid of the delivered logs so that memory can be collected.
e.Logs = nil
e.Logs.Reset()

// Now mark destination available so that subsequent ConsumeLogs
// don't queue the logs anymore.
atomic.StoreInt64(&e.destAvailable, 1)

e.destAvailable.Store(true)
} else {
atomic.StoreInt64(&e.destAvailable, 0)
e.destAvailable.Store(false)
}
}

func (e *MockExporter) AcceptedLogCount() int {
return int(atomic.LoadInt64(&e.acceptedLogCount))
return int(e.acceptedLogCount.Load())
}

func (e *MockExporter) DeliveredLogCount() int {
return int(atomic.LoadInt64(&e.deliveredLogCount))
return int(e.deliveredLogCount.Load())
}

func NewMockExporter() *MockExporter {
return &MockExporter{}
return &MockExporter{
destAvailable: atomic.Bool{},
acceptedLogCount: atomic.Int64{},
deliveredLogCount: atomic.Int64{},
Logs: consumertest.LogsSink{},
}
}
6 changes: 4 additions & 2 deletions processor/memorylimiterprocessor/memorylimiter_test.go
Expand Up @@ -99,8 +99,10 @@ func TestNoDataLoss(t *testing.T) {

// And eventually the exporter must confirm that it delivered exact number of produced logs.
require.Eventually(t, func() bool {
return receiver.ProduceCount == exporter.DeliveredLogCount()
}, 5*time.Second, 1*time.Millisecond)
d := exporter.DeliveredLogCount()
t.Logf("received: %d, expected: %d\n", d, receiver.ProduceCount)
return receiver.ProduceCount == d
}, 5*time.Second, 100*time.Millisecond)

// Double check that the number of logs accepted by exporter matches the number of produced by receiver.
assert.Equal(t, receiver.ProduceCount, exporter.AcceptedLogCount())
Expand Down

0 comments on commit c7617b3

Please sign in to comment.