Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/healthcheckv2] Add event aggregation logic #32695

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

mwear
Copy link
Member

@mwear mwear commented Apr 25, 2024

Description:
This PR is the second in a series to decompose #30673 into more manageable pieces for review.

Aggregator

This PR introduces an aggregator data structure and event aggregation logic for status events. The extension implements the StatusWatcher optional interface, which the collector will call with a component.StatusEvent for each change in component status. These events will be aggregated by an aggregation function, and stored in the aggregator.

The aggregator is a recursive data structure. At the top it contains the overall status of the collector. At the next level, it contains the statuses for each pipeline, and at the level below that, it contains the statuses for each component in a pipeline. Each node in the data structure is an aggregation the status events in the level below. The overall collector status is the aggregation of the pipeline statuses, and at the next level, the pipeline statuses are the aggregations of the component statuses. The data structure allows you to query the status of the collector overall, or for individual pipelines by name. There is also a pub/sub mechanism used for streaming aggregated statuses.

Aggregation Function

The purpose of the aggregator is to aggregate events so that the most relevant status event bubbles to the top. This allows us to get the status of the collector overall or a pipeline through a simple lookup. There is an aggregation function that determines the priority of events and how they should be aggregated. In many cases, the result will be an existing status event. In some cases a new event will be synthesized. In order to match the behavior existing healthcheck extension, lifecycle events (e.g. starting, stopping, etc) are prioritized over runtime events. Next, error statuses are prioritized with PermanentErrors as higher priority than RecoverableErrors, but this can vary based on user provided configuration. If PermanentErrors are ignored by configuration, but RecoverableErrors are included, then RecoverableErrors will take priority over PermanentErrors.

The StatusWatcher interface receives immutable events of type component.StatusEvent. Since we sometimes need to synthesize new events during aggregation, an Event interface was introduced so that the aggregator can use component.StatusEvent instances or instances of events synthesized by the status package.

It's worth mentioning that there is existing status event aggregation logic in collector core, but it did not meet the needs of this extension. It does not prioritize lifecycle events over error events, and it will always prioritize permanent errors over recoverable. By prioritizing lifecycle events over error events we can return a 503 when restarting a collector rather than a 500 when a collector in a final state, such as PermanentError. This is necessary to match the behavior of the existing extension. Since users have the option to include or ignore recoverable and permanent errors, we need the ability to prioritize them accordingly. We can discuss what the fate of the aggregation code in core should be.

Examples
Below are examples of overall collector and pipeline status that are based on the aggregator data structure. The rendering of the examples will come in a later PR. You can also look at the parent PR to see how all of this fits together. Note that the pipeline status example is a subtree of the overall collector status.

Overall collector status:

{
    "start_time": "2024-01-18T17:27:12.570394-08:00",
    "healthy": true,
    "status": "StatusRecoverableError",
    "error": "rpc error: code = ResourceExhausted desc = resource exhausted",
    "status_time": "2024-01-18T17:27:32.572301-08:00",
    "components": {
        "extensions": {
            "healthy": true,
            "status": "StatusOK",
            "status_time": "2024-01-18T17:27:12.570428-08:00",
            "components": {
                "extension:healthcheckv2": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.570428-08:00"
                }
            }
        },
        "pipeline:metrics/grpc": {
            "healthy": true,
            "status": "StatusRecoverableError",
            "error": "rpc error: code = ResourceExhausted desc = resource exhausted",
            "status_time": "2024-01-18T17:27:32.572301-08:00",
            "components": {
                "exporter:otlp/staging": {
                    "healthy": true,
                    "status": "StatusRecoverableError",
                    "error": "rpc error: code = ResourceExhausted desc = resource exhausted",
                    "status_time": "2024-01-18T17:27:32.572301-08:00"
                },
                "processor:batch": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.571132-08:00"
                },
                "receiver:otlp": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.571576-08:00"
                }
            }
        },
        "pipeline:traces/http": {
            "healthy": true,
            "status": "StatusOK",
            "status_time": "2024-01-18T17:27:12.571625-08:00",
            "components": {
                "exporter:otlphttp/staging": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.571615-08:00"
                },
                "processor:batch": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.571621-08:00"
                },
                "receiver:otlp": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.571625-08:00"
                }
            }
        }
    }
}

Status for pipeline traces/http:

{
    "start_time": "2024-01-18T17:27:12.570394-08:00",
    "healthy": true,
    "status": "StatusOK",
    "status_time": "2024-01-18T17:27:12.571625-08:00",
    "components": {
        "exporter:otlphttp/staging": {
            "healthy": true,
            "status": "StatusOK",
            "status_time": "2024-01-18T17:27:12.571615-08:00"
        },
        "processor:batch": {
            "healthy": true,
            "status": "StatusOK",
            "status_time": "2024-01-18T17:27:12.571621-08:00"
        },
        "receiver:otlp": {
            "healthy": true,
            "status": "StatusOK",
            "status_time": "2024-01-18T17:27:12.571625-08:00"
        }
    }
}

Link to tracking Issue: #26661

Testing: Units / manual

Documentation: Comments, etc

return errPriorityFunc(seen)
}

return func(st *AggregateStatus) Event {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be alone on this, but I find this a bit hard to read given some of the indirections: we are returning an inlined func, which calls a func var, which calls another func var. They are all simple to understand individually so it's not a huge deal, but I wonder if we could do something for readability here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced the two inline funcs you mentioned with a conditional. I can probably take this one step further and make the aggregationFunc a method on the Aggregator. Part of me prefers this logic to stand on its own, since it's somewhat complicated, but I'm open to whatever makes it the most readable / understandable / maintainable etc.

// events vs RecoverableError events. Lifecycle events will have the timestamp
// of the most recent event and error events will have the timestamp of the
// first occurrence.
func newAggregationFunc(priority ErrorPriority) aggregationFunc {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this priority going to be fixed for the lifecycle of the component?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is actually fixed for the aggregator as a whole (e.g. all components) for the lifetime of the collector process.

matchingEvent = ev
}
case ev.Timestamp().After(matchingEvent.Timestamp()):
matchingEvent = ev
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This got me: matchingEvent is happening later, and if it's not an error, you are getting the latest occurrence of the event, not earliest. Why? Can you add a comment or a test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was partially described in the comment above newAggregationFunc, but I expanded it, added some inline comments, and a test.

We are interested in the earliest error event for two reasons: causality and recoverable errors. In the event that a component error cascades to other components, the earliest is likely to be the cause. We expect recoverable errors to recover within their recovery duration and we don't want later recoverable errors to shadow earlier ones in the aggregate status. For the non-error case, we use the latest event as it represents the last time a successful status was reported. tl;dr for an error, we're interested in when it started, otherwise we want to know the latest successful status.

// Aggregator records individual status events for components and aggregates statuses for the
// pipelines they belong to and the collector overall.
type Aggregator struct {
mu sync.RWMutex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm: this is intended to guard both aggregateStatus and subscriptions, so that no concurrent updates to subscriptions and aggregateStatus can happen, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wasn't this auto-generated?

Copy link
Member Author

@mwear mwear May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't. I deleted it and reran make generate to double check and it only seems to generate a package_test for the top level component directory.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants