Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[healthcheckextensionv2] Introduce health check extension based on component status reporting #30673

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8fc1b26
Introduce health check extension based on component status reporting
mwear Jan 19, 2024
3a19a5a
Refactor aggregator
mwear Jan 25, 2024
dd628d1
Make gotidy
mwear Jan 25, 2024
7eb6512
Update extension/healthcheckextensionv2/config.go
mwear Jan 25, 2024
8c0c3f4
Apply suggestions from code review
mwear Jan 26, 2024
18fe9a7
Implement suggestions from review
mwear Jan 26, 2024
449544e
Add Scope and Detail types to Aggregator
mwear Jan 26, 2024
04fbd5f
Use pipeline status for availability during start/shutdown
mwear Jan 30, 2024
0d21264
Multiple health strategies for http
mwear Feb 7, 2024
6085882
Multiple health strategies via different aggregation funcs
mwear Feb 9, 2024
c8f9d56
Simplify HTTP strategies
mwear Feb 21, 2024
99afda4
gRPC support for component health options
mwear Feb 22, 2024
4075e13
Fusion
mwear Feb 29, 2024
c773103
Update deps / rename settings -> config
mwear Mar 1, 2024
8adc96c
Readme fusion
mwear Mar 2, 2024
fce6ae1
Update extension/healthcheckextensionv2/README.md
mwear Mar 5, 2024
2c5670b
Address code review feedback
mwear Mar 5, 2024
2d23a47
Address more feedback; additional cleanup
mwear Mar 5, 2024
34a00f6
Post rebase fixes
mwear Mar 7, 2024
cde6507
Post rebase fixes
mwear Mar 28, 2024
e52f4d1
Rename healthcheckextensionv2 -> healthcheckv2extension
mwear Mar 29, 2024
17edc87
Update metadata
mwear Mar 29, 2024
ac459be
Post rebase updates
mwear Apr 15, 2024
04cf05b
Enable goleak
mwear Apr 18, 2024
1a45be5
Update collector module version
mwear Apr 18, 2024
47d2100
Update go.mod
mwear Apr 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/healthcheck-v2.yaml
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'new_component'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: healthcheckv2extension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Health Check Extension V2 is meant to be a replacement for the current Health Check Extension. It is based off of component status reporting and provides HTTP and gRPC services health check services.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26661]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
6 changes: 3 additions & 3 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Expand Up @@ -285,10 +285,10 @@ testbed/mockdatasenders/mockdatadogagentexporter/ @open-telemetry/collect
# List of distribution maintainers for OpenTelemetry Collector Contrib
#
#####################################################
reports/distributions/core.yaml @open-telemetry/collector-contrib-approvers
reports/distributions/contrib.yaml @open-telemetry/collector-contrib-approvers
reports/distributions/core.yaml @open-telemetry/collector-contrib-approvers
reports/distributions/contrib.yaml @open-telemetry/collector-contrib-approvers


## UNMAINTAINED components

exporter/skywalkingexporter/ @open-telemetry/collector-contrib-approvers
exporter/skywalkingexporter/ @open-telemetry/collector-contrib-approvers
178 changes: 170 additions & 8 deletions extension/healthcheckv2extension/extension.go
Expand Up @@ -7,36 +7,198 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/extension"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/grpc"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/http"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status"
)

type eventSourcePair struct {
source *component.InstanceID
event *component.StatusEvent
}

type healthCheckExtension struct {
config Config
telemetry component.TelemetrySettings
config Config
telemetry component.TelemetrySettings
aggregator *status.Aggregator
subcomponents []component.Component
eventCh chan *eventSourcePair
readyCh chan struct{}
}

var _ component.Component = (*healthCheckExtension)(nil)
var _ extension.ConfigWatcher = (*healthCheckExtension)(nil)
var _ extension.PipelineWatcher = (*healthCheckExtension)(nil)

func newExtension(
_ context.Context,
ctx context.Context,
config Config,
set extension.CreateSettings,
) *healthCheckExtension {
return &healthCheckExtension{
config: config,
telemetry: set.TelemetrySettings,
var comps []component.Component

errPriority := status.PriorityPermanent
if config.ComponentHealthConfig != nil &&
config.ComponentHealthConfig.IncludeRecoverable &&
!config.ComponentHealthConfig.IncludePermanent {
errPriority = status.PriorityRecoverable
}

aggregator := status.NewAggregator(errPriority)

if config.UseV2 && config.GRPCConfig != nil {
grpcServer := grpc.NewServer(
config.GRPCConfig,
config.ComponentHealthConfig,
set.TelemetrySettings,
aggregator,
)
comps = append(comps, grpcServer)
}

if !config.UseV2 || config.UseV2 && config.HTTPConfig != nil {
httpServer := http.NewServer(
config.HTTPConfig,
config.LegacyConfig,
config.ComponentHealthConfig,
set.TelemetrySettings,
aggregator,
)
comps = append(comps, httpServer)
}

hc := &healthCheckExtension{
config: config,
subcomponents: comps,
telemetry: set.TelemetrySettings,
aggregator: aggregator,
eventCh: make(chan *eventSourcePair),
readyCh: make(chan struct{}),
}

// Start processing events in the background so that our status watcher doesn't
// block others before the extension starts.
go hc.eventLoop(ctx)

return hc
}

// Start implements the component.Component interface.
func (hc *healthCheckExtension) Start(context.Context, component.Host) error {
func (hc *healthCheckExtension) Start(ctx context.Context, host component.Host) error {
hc.telemetry.Logger.Debug("Starting health check extension V2", zap.Any("config", hc.config))

for _, comp := range hc.subcomponents {
if err := comp.Start(ctx, host); err != nil {
return err
}
}

return nil
}

// Shutdown implements the component.Component interface.
func (hc *healthCheckExtension) Shutdown(context.Context) error {
func (hc *healthCheckExtension) Shutdown(ctx context.Context) error {
// Preemptively send the stopped event, so it can be exported before shutdown
hc.telemetry.ReportStatus(component.NewStatusEvent(component.StatusStopped))

close(hc.eventCh)
hc.aggregator.Close()

var err error
for _, comp := range hc.subcomponents {
err = multierr.Append(err, comp.Shutdown(ctx))
}

return err
}

// ComponentStatusChanged implements the extension.StatusWatcher interface.
func (hc *healthCheckExtension) ComponentStatusChanged(
source *component.InstanceID,
event *component.StatusEvent,
) {
// There can be late arriving events after shutdown. We need to close
// the event channel so that this function doesn't block and we release all
// goroutines, but attempting to write to a closed channel will panic; log
// and recover.
defer func() {
if r := recover(); r != nil {
hc.telemetry.Logger.Info(
"discarding event received after shutdown",
zap.Any("source", source),
zap.Any("event", event),
)
}
}()
hc.eventCh <- &eventSourcePair{source: source, event: event}
}

// NotifyConfig implements the extension.ConfigWatcher interface.
func (hc *healthCheckExtension) NotifyConfig(ctx context.Context, conf *confmap.Conf) error {
var err error
for _, comp := range hc.subcomponents {
if cw, ok := comp.(extension.ConfigWatcher); ok {
err = multierr.Append(err, cw.NotifyConfig(ctx, conf))
}
}
return err
}

// Ready implements the extension.PipelineWatcher interface.
func (hc *healthCheckExtension) Ready() error {
close(hc.readyCh)
return nil
}

// NotReady implements the extension.PipelineWatcher interface.
func (hc *healthCheckExtension) NotReady() error {
return nil
}

func (hc *healthCheckExtension) eventLoop(ctx context.Context) {
// Record events with component.StatusStarting, but queue other events until
// PipelineWatcher.Ready is called. This prevents aggregate statuses from
// flapping between StatusStarting and StatusOK as components are started
// individually by the service.
var eventQueue []*eventSourcePair

for loop := true; loop; {
select {
case esp, ok := <-hc.eventCh:
if !ok {
return
}
if esp.event.Status() != component.StatusStarting {
eventQueue = append(eventQueue, esp)
continue
}
hc.aggregator.RecordStatus(esp.source, esp.event)
case <-hc.readyCh:
for _, esp := range eventQueue {
hc.aggregator.RecordStatus(esp.source, esp.event)
}
eventQueue = nil
loop = false
case <-ctx.Done():
return
}
}

// After PipelineWatcher.Ready, record statuses as they are received.
for {
select {
case esp, ok := <-hc.eventCh:
if !ok {
return
}
hc.aggregator.RecordStatus(esp.source, esp.event)
case <-ctx.Done():
return
}
}
}
134 changes: 134 additions & 0 deletions extension/healthcheckv2extension/extension_test.go
@@ -0,0 +1,134 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package healthcheckv2extension

import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/extension/extensiontest"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/testhelpers"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
)

func TestComponentStatus(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.HTTPConfig.Endpoint = testutil.GetAvailableLocalAddress(t)
cfg.UseV2 = true
ext := newExtension(context.Background(), *cfg, extensiontest.NewNopCreateSettings())

// Status before Start will be StatusNone
st, ok := ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
assert.Equal(t, st.Status(), component.StatusNone)

require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost()))

traces := testhelpers.NewPipelineMetadata("traces")

// StatusStarting will be sent immediately.
for _, id := range traces.InstanceIDs() {
ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusStarting))
}

// StatusOK will be queued until the PipelineWatcher Ready method is called.
for _, id := range traces.InstanceIDs() {
ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusOK))
}

// Note the use of assert.Eventually here and throughout this test is because
// status events are processed asynchronously in the background.
assert.Eventually(t, func() bool {
st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
return st.Status() == component.StatusStarting
}, time.Second, 10*time.Millisecond)

require.NoError(t, ext.Ready())

assert.Eventually(t, func() bool {
st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
return st.Status() == component.StatusOK
}, time.Second, 10*time.Millisecond)

// StatusStopping will be sent immediately.
for _, id := range traces.InstanceIDs() {
ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusStopping))
}

assert.Eventually(t, func() bool {
st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
return st.Status() == component.StatusStopping
}, time.Second, 10*time.Millisecond)

require.NoError(t, ext.NotReady())
require.NoError(t, ext.Shutdown(context.Background()))

// Events sent after shutdown will be discarded
for _, id := range traces.InstanceIDs() {
ext.ComponentStatusChanged(id, component.NewStatusEvent(component.StatusStopped))
}

st, ok = ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
assert.Equal(t, component.StatusStopping, st.Status())
}

func TestNotifyConfig(t *testing.T) {
confMap, err := confmaptest.LoadConf(
filepath.Join("internal", "http", "testdata", "config.yaml"),
)
require.NoError(t, err)
confJSON, err := os.ReadFile(
filepath.Clean(filepath.Join("internal", "http", "testdata", "config.json")),
)
require.NoError(t, err)

endpoint := testutil.GetAvailableLocalAddress(t)

cfg := createDefaultConfig().(*Config)
cfg.UseV2 = true
cfg.HTTPConfig.Endpoint = endpoint
cfg.HTTPConfig.Config.Enabled = true
cfg.HTTPConfig.Config.Path = "/config"

ext := newExtension(context.Background(), *cfg, extensiontest.NewNopCreateSettings())

require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) })

client := &http.Client{}
url := fmt.Sprintf("http://%s/config", endpoint)

var resp *http.Response

resp, err = client.Get(url)
require.NoError(t, err)
assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode)

require.NoError(t, ext.NotifyConfig(context.Background(), confMap))

resp, err = client.Get(url)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
assert.Equal(t, confJSON, body)
}