Skip to content

Commit

Permalink
Enable goleak
Browse files Browse the repository at this point in the history
  • Loading branch information
mwear committed Apr 18, 2024
1 parent 22741ad commit 148d145
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 25 deletions.
8 changes: 8 additions & 0 deletions extension/healthcheckv2extension/doc.go
@@ -0,0 +1,8 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:generate mdatagen metadata.yaml

// Package healthcheckv2extension implements an extension that enables HTTP
// and or GRPC health checks for a collector.
package healthcheckv2extension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension"
45 changes: 23 additions & 22 deletions extension/healthcheckv2extension/extension.go
Expand Up @@ -29,7 +29,6 @@ type healthCheckExtension struct {
subcomponents []component.Component
eventCh chan *eventSourcePair
readyCh chan struct{}
doneCh chan struct{}
}

var _ component.Component = (*healthCheckExtension)(nil)
Expand Down Expand Up @@ -80,7 +79,6 @@ func newExtension(
aggregator: aggregator,
eventCh: make(chan *eventSourcePair),
readyCh: make(chan struct{}),
doneCh: make(chan struct{}),
}

// Start processing events in the background so that our status watcher doesn't
Expand Down Expand Up @@ -108,7 +106,7 @@ func (hc *healthCheckExtension) Shutdown(ctx context.Context) error {
// Preemptively send the stopped event, so it can be exported before shutdown
hc.telemetry.ReportStatus(component.NewStatusEvent(component.StatusStopped))

close(hc.doneCh)
close(hc.eventCh)
hc.aggregator.Close()

var err error
Expand All @@ -124,6 +122,19 @@ func (hc *healthCheckExtension) ComponentStatusChanged(
source *component.InstanceID,
event *component.StatusEvent,
) {
// There can be late arriving events after shutdown. We need to close
// the event channel so that this function doesn't block and we release all
// goroutines, but attempting to write to a closed channel will panic; log
// and recover.
defer func() {
if r := recover(); r != nil {
hc.telemetry.Logger.Info(
"discarding event received after shutdown",
zap.Any("source", source),
zap.Any("event", event),
)
}
}()
hc.eventCh <- &eventSourcePair{source: source, event: event}
}

Expand Down Expand Up @@ -158,7 +169,10 @@ func (hc *healthCheckExtension) eventLoop(ctx context.Context) {

for loop := true; loop; {
select {
case esp := <-hc.eventCh:
case esp, ok := <-hc.eventCh:
if !ok {
return
}
if esp.event.Status() != component.StatusStarting {
eventQueue = append(eventQueue, esp)
continue
Expand All @@ -176,26 +190,13 @@ func (hc *healthCheckExtension) eventLoop(ctx context.Context) {
}

// After PipelineWatcher.Ready, record statuses as they are received.
for loop := true; loop; {
select {
case esp := <-hc.eventCh:
hc.aggregator.RecordStatus(esp.source, esp.event)
case <-hc.doneCh:
loop = false
case <-ctx.Done():
return
}
}

// After shutdown read late arriving events from channel and discard
for {
select {
case esp := <-hc.eventCh:
hc.telemetry.Logger.Info(
"discarding event received after shutdown",
zap.Any("source", esp.source),
zap.Any("event", esp.event),
)
case esp, ok := <-hc.eventCh:
if !ok {
return
}
hc.aggregator.RecordStatus(esp.source, esp.event)
case <-ctx.Done():
return
}
Expand Down
9 changes: 6 additions & 3 deletions extension/healthcheckv2extension/factory_test.go
Expand Up @@ -54,16 +54,19 @@ func TestCreateDefaultConfig(t *testing.T) {
}, cfg)

assert.NoError(t, componenttest.CheckConfigStruct(cfg))
ext, err := createExtension(context.Background(), extensiontest.NewNopCreateSettings(), cfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ext, err := createExtension(ctx, extensiontest.NewNopCreateSettings(), cfg)
require.NoError(t, err)
require.NotNil(t, ext)
}

func TestCreateExtension(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Endpoint = testutil.GetAvailableLocalAddress(t)

ext, err := createExtension(context.Background(), extensiontest.NewNopCreateSettings(), cfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ext, err := createExtension(ctx, extensiontest.NewNopCreateSettings(), cfg)
require.NoError(t, err)
require.NotNil(t, ext)
}
1 change: 1 addition & 0 deletions extension/healthcheckv2extension/go.mod
Expand Up @@ -16,6 +16,7 @@ require (
go.opentelemetry.io/collector/extension v0.98.1-0.20240412014414-62f589864e3d
go.opentelemetry.io/otel/metric v1.25.0
go.opentelemetry.io/otel/trace v1.25.0
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
google.golang.org/grpc v1.63.2
Expand Down
14 changes: 14 additions & 0 deletions extension/healthcheckv2extension/internal/grpc/package_test.go
@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package grpc // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/grpc"

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
14 changes: 14 additions & 0 deletions extension/healthcheckv2extension/internal/http/package_test.go
@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package http // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/http"

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
14 changes: 14 additions & 0 deletions extension/healthcheckv2extension/internal/status/package_test.go
@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package status // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status"

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
14 changes: 14 additions & 0 deletions extension/healthcheckv2extension/package_test.go
@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package healthcheckv2extension

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

0 comments on commit 148d145

Please sign in to comment.