New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[extension/healthcheckv2] Add event aggregation logic #32695
base: main
Are you sure you want to change the base?
Conversation
extension/healthcheckv2extension/internal/status/aggregation.go
Outdated
Show resolved
Hide resolved
extension/healthcheckv2extension/internal/status/aggregation.go
Outdated
Show resolved
Hide resolved
return errPriorityFunc(seen) | ||
} | ||
|
||
return func(st *AggregateStatus) Event { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be alone on this, but I find this a bit hard to read given some of the indirections: we are returning an inlined func, which calls a func var, which calls another func var. They are all simple to understand individually so it's not a huge deal, but I wonder if we could do something for readability here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replaced the two inline funcs you mentioned with a conditional. I can probably take this one step further and make the aggregationFunc
a method on the Aggregator
. Part of me prefers this logic to stand on its own, since it's somewhat complicated, but I'm open to whatever makes it the most readable / understandable / maintainable etc.
// events vs RecoverableError events. Lifecycle events will have the timestamp | ||
// of the most recent event and error events will have the timestamp of the | ||
// first occurrence. | ||
func newAggregationFunc(priority ErrorPriority) aggregationFunc { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this priority going to be fixed for the lifecycle of the component?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is actually fixed for the aggregator as a whole (e.g. all components) for the lifetime of the collector process.
matchingEvent = ev | ||
} | ||
case ev.Timestamp().After(matchingEvent.Timestamp()): | ||
matchingEvent = ev |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This got me: matchingEvent is happening later, and if it's not an error, you are getting the latest occurrence of the event, not earliest. Why? Can you add a comment or a test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was partially described in the comment above newAggregationFunc
, but I expanded it, added some inline comments, and a test.
We are interested in the earliest error event for two reasons: causality and recoverable errors. In the event that a component error cascades to other components, the earliest is likely to be the cause. We expect recoverable errors to recover within their recovery duration and we don't want later recoverable errors to shadow earlier ones in the aggregate status. For the non-error case, we use the latest event as it represents the last time a successful status was reported. tl;dr for an error, we're interested in when it started, otherwise we want to know the latest successful status.
// Aggregator records individual status events for components and aggregates statuses for the | ||
// pipelines they belong to and the collector overall. | ||
type Aggregator struct { | ||
mu sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To confirm: this is intended to guard both aggregateStatus
and subscriptions
, so that no concurrent updates to subscriptions and aggregateStatus can happen, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct
@@ -0,0 +1,14 @@ | |||
// Copyright The OpenTelemetry Authors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wasn't this auto-generated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wasn't. I deleted it and reran make generate
to double check and it only seems to generate a package_test for the top level component directory.
Description:
This PR is the second in a series to decompose #30673 into more manageable pieces for review.
Aggregator
This PR introduces an aggregator data structure and event aggregation logic for status events. The extension implements the
StatusWatcher
optional interface, which the collector will call with acomponent.StatusEvent
for each change in component status. These events will be aggregated by an aggregation function, and stored in the aggregator.The aggregator is a recursive data structure. At the top it contains the overall status of the collector. At the next level, it contains the statuses for each pipeline, and at the level below that, it contains the statuses for each component in a pipeline. Each node in the data structure is an aggregation the status events in the level below. The overall collector status is the aggregation of the pipeline statuses, and at the next level, the pipeline statuses are the aggregations of the component statuses. The data structure allows you to query the status of the collector overall, or for individual pipelines by name. There is also a pub/sub mechanism used for streaming aggregated statuses.
Aggregation Function
The purpose of the aggregator is to aggregate events so that the most relevant status event bubbles to the top. This allows us to get the status of the collector overall or a pipeline through a simple lookup. There is an aggregation function that determines the priority of events and how they should be aggregated. In many cases, the result will be an existing status event. In some cases a new event will be synthesized. In order to match the behavior existing healthcheck extension, lifecycle events (e.g. starting, stopping, etc) are prioritized over runtime events. Next, error statuses are prioritized with PermanentErrors as higher priority than RecoverableErrors, but this can vary based on user provided configuration. If PermanentErrors are ignored by configuration, but RecoverableErrors are included, then RecoverableErrors will take priority over PermanentErrors.
The StatusWatcher interface receives immutable events of type
component.StatusEvent
. Since we sometimes need to synthesize new events during aggregation, anEvent
interface was introduced so that the aggregator can usecomponent.StatusEvent
instances or instances of events synthesized by the status package.It's worth mentioning that there is existing status event aggregation logic in collector core, but it did not meet the needs of this extension. It does not prioritize lifecycle events over error events, and it will always prioritize permanent errors over recoverable. By prioritizing lifecycle events over error events we can return a 503 when restarting a collector rather than a 500 when a collector in a final state, such as PermanentError. This is necessary to match the behavior of the existing extension. Since users have the option to include or ignore recoverable and permanent errors, we need the ability to prioritize them accordingly. We can discuss what the fate of the aggregation code in core should be.
Examples
Below are examples of overall collector and pipeline status that are based on the aggregator data structure. The rendering of the examples will come in a later PR. You can also look at the parent PR to see how all of this fits together. Note that the pipeline status example is a subtree of the overall collector status.
Overall collector status:
Status for pipeline
traces/http
:Link to tracking Issue: #26661
Testing: Units / manual
Documentation: Comments, etc