diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 916b27430d7..b6ac1adf19f 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -137,11 +137,18 @@ jobs: unittest-matrix: strategy: matrix: - runner: [ubuntu-latest, actuated-arm64-8cpu-16gb] + runner: [ubuntu-latest, actuated-arm64-4cpu-8gb] go-version: ["~1.22", "~1.21.5"] # 1.20 needs quotes otherwise it's interpreted as 1.2 runs-on: ${{ matrix.runner }} needs: [setup-environment] steps: + - name: Set up arkade + uses: alexellis/setup-arkade@master + - name: Install vmmeter + run: | + sudo -E arkade oci install ghcr.io/openfaasltd/vmmeter:latest --path /usr/local/bin/ + - name: Run vmmeter + uses: self-actuated/vmmeter-action@master - name: Checkout Repo uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 - name: Setup Go @@ -164,7 +171,7 @@ jobs: key: unittest-${{ runner.os }}-${{ matrix.runner }}-go-build-${{ matrix.go-version }}-${{ hashFiles('**/go.sum') }} - name: Run Unit Tests run: | - make -j8 gotest + make -j4 gotest unittest: if: always() runs-on: ubuntu-latest diff --git a/processor/memorylimiterprocessor/internal/mock_exporter.go b/processor/memorylimiterprocessor/internal/mock_exporter.go index 1a55c9299e9..fee7503dd26 100644 --- a/processor/memorylimiterprocessor/internal/mock_exporter.go +++ b/processor/memorylimiterprocessor/internal/mock_exporter.go @@ -8,14 +8,15 @@ import ( "sync/atomic" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/plog" ) type MockExporter struct { - destAvailable int64 - acceptedLogCount int64 - deliveredLogCount int64 - Logs []plog.Logs + destAvailable atomic.Bool + acceptedLogCount atomic.Int64 + deliveredLogCount atomic.Int64 + Logs consumertest.LogsSink } var _ consumer.Logs = (*MockExporter)(nil) @@ -24,15 +25,15 @@ func (e *MockExporter) Capabilities() consumer.Capabilities { return consumer.Capabilities{} } -func (e *MockExporter) ConsumeLogs(_ context.Context, ld plog.Logs) error { - atomic.AddInt64(&e.acceptedLogCount, int64(ld.LogRecordCount())) +func (e *MockExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + e.acceptedLogCount.Add(int64(ld.LogRecordCount())) - if atomic.LoadInt64(&e.destAvailable) == 1 { + if e.destAvailable.Load() { // Destination is available, immediately deliver. - atomic.AddInt64(&e.deliveredLogCount, int64(ld.LogRecordCount())) + e.deliveredLogCount.Add(int64(ld.LogRecordCount())) } else { // Destination is not available. Queue the logs in the exporter. - e.Logs = append(e.Logs, ld) + return e.Logs.ConsumeLogs(ctx, ld) } return nil } @@ -40,28 +41,32 @@ func (e *MockExporter) ConsumeLogs(_ context.Context, ld plog.Logs) error { func (e *MockExporter) SetDestAvailable(available bool) { if available { // Pretend we delivered all queued accepted logs. - atomic.AddInt64(&e.deliveredLogCount, atomic.LoadInt64(&e.acceptedLogCount)) + e.deliveredLogCount.Add(int64(e.Logs.LogRecordCount())) // Get rid of the delivered logs so that memory can be collected. - e.Logs = nil + e.Logs.Reset() // Now mark destination available so that subsequent ConsumeLogs // don't queue the logs anymore. - atomic.StoreInt64(&e.destAvailable, 1) - + e.destAvailable.Store(true) } else { - atomic.StoreInt64(&e.destAvailable, 0) + e.destAvailable.Store(false) } } func (e *MockExporter) AcceptedLogCount() int { - return int(atomic.LoadInt64(&e.acceptedLogCount)) + return int(e.acceptedLogCount.Load()) } func (e *MockExporter) DeliveredLogCount() int { - return int(atomic.LoadInt64(&e.deliveredLogCount)) + return int(e.deliveredLogCount.Load()) } func NewMockExporter() *MockExporter { - return &MockExporter{} + return &MockExporter{ + destAvailable: atomic.Bool{}, + acceptedLogCount: atomic.Int64{}, + deliveredLogCount: atomic.Int64{}, + Logs: consumertest.LogsSink{}, + } } diff --git a/processor/memorylimiterprocessor/memorylimiter_test.go b/processor/memorylimiterprocessor/memorylimiter_test.go index 626bf877d7c..9a8bee3e059 100644 --- a/processor/memorylimiterprocessor/memorylimiter_test.go +++ b/processor/memorylimiterprocessor/memorylimiter_test.go @@ -5,6 +5,7 @@ package memorylimiterprocessor import ( "context" + "fmt" "runtime" "testing" "time" @@ -99,8 +100,10 @@ func TestNoDataLoss(t *testing.T) { // And eventually the exporter must confirm that it delivered exact number of produced logs. require.Eventually(t, func() bool { - return receiver.ProduceCount == exporter.DeliveredLogCount() - }, 5*time.Second, 1*time.Millisecond) + d := exporter.DeliveredLogCount() + fmt.Printf("received: %d, expected: %d\n", d, receiver.ProduceCount) + return receiver.ProduceCount == d + }, 5*time.Second, 100*time.Millisecond) // Double check that the number of logs accepted by exporter matches the number of produced by receiver. assert.Equal(t, receiver.ProduceCount, exporter.AcceptedLogCount())