forked from open-telemetry/opentelemetry-collector-contrib
/
connector.go
158 lines (138 loc) · 5.68 KB
/
connector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package datadogconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector"
import (
"context"
"fmt"
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
traceconfig "github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/timing"
"github.com/DataDog/datadog-go/v5/statsd"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/metric/noop"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog"
)
// traceToMetricConnector is the schema for connector
type traceToMetricConnector struct {
metricsConsumer consumer.Metrics // the next component in the pipeline to ingest metrics after connector
logger *zap.Logger
// agent specifies the agent used to ingest traces and output APM Stats.
// It is implemented by the traceagent structure; replaced in tests.
agent datadog.Ingester
// translator specifies the translator used to transform APM Stats Payloads
// from the agent to OTLP Metrics.
translator *metrics.Translator
// in specifies the channel through which the agent will output Stats Payloads
// resulting from ingested traces.
in chan *pb.StatsPayload
// exit specifies the exit channel, which will be closed upon shutdown.
exit chan struct{}
// isStarted tracks whether Start() has been called.
isStarted bool
}
var _ component.Component = (*traceToMetricConnector)(nil) // testing that the connectorImp properly implements the type Component interface
// function to create a new connector
func newTraceToMetricConnector(set component.TelemetrySettings, cfg component.Config, metricsConsumer consumer.Metrics, metricsClient statsd.ClientInterface, timingReporter timing.Reporter) (*traceToMetricConnector, error) {
set.Logger.Info("Building datadog connector for traces to metrics")
in := make(chan *pb.StatsPayload, 100)
set.MeterProvider = noop.NewMeterProvider() // disable metrics for the connector
attributesTranslator, err := attributes.NewTranslator(set)
if err != nil {
return nil, fmt.Errorf("failed to create attributes translator: %w", err)
}
trans, err := metrics.NewTranslator(set, attributesTranslator)
if err != nil {
return nil, fmt.Errorf("failed to create metrics translator: %w", err)
}
ctx := context.Background()
return &traceToMetricConnector{
logger: set.Logger,
agent: datadog.NewAgentWithConfig(ctx, getTraceAgentCfg(cfg.(*Config).Traces, attributesTranslator), in, metricsClient, timingReporter),
translator: trans,
in: in,
metricsConsumer: metricsConsumer,
exit: make(chan struct{}),
}, nil
}
func getTraceAgentCfg(cfg TracesConfig, attributesTranslator *attributes.Translator) *traceconfig.AgentConfig {
acfg := traceconfig.New()
acfg.OTLPReceiver.AttributesTranslator = attributesTranslator
acfg.OTLPReceiver.SpanNameRemappings = cfg.SpanNameRemappings
acfg.OTLPReceiver.SpanNameAsResourceName = cfg.SpanNameAsResourceName
acfg.Ignore["resource"] = cfg.IgnoreResources
acfg.ComputeStatsBySpanKind = cfg.ComputeStatsBySpanKind
acfg.PeerTagsAggregation = cfg.PeerTagsAggregation
acfg.PeerTags = cfg.PeerTags
if v := cfg.TraceBuffer; v > 0 {
acfg.TraceBuffer = v
}
return acfg
}
// Start implements the component.Component interface.
func (c *traceToMetricConnector) Start(_ context.Context, _ component.Host) error {
c.logger.Info("Starting datadogconnector")
c.agent.Start()
go c.run()
c.isStarted = true
return nil
}
// Shutdown implements the component.Component interface.
func (c *traceToMetricConnector) Shutdown(context.Context) error {
if !c.isStarted {
// Note: it is not necessary to manually close c.exit, c.in and c.agent.(*datadog.TraceAgent).exit channels as these are unused.
c.logger.Info("Requested shutdown, but not started, ignoring.")
return nil
}
c.logger.Info("Shutting down datadog connector")
c.logger.Info("Stopping datadog agent")
// stop the agent and wait for the run loop to exit
c.agent.Stop()
c.exit <- struct{}{} // signal exit
<-c.exit // wait for close
return nil
}
// Capabilities implements the consumer interface.
// tells use whether the component(connector) will mutate the data passed into it. if set to true the connector does modify the data
func (c *traceToMetricConnector) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (c *traceToMetricConnector) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
c.agent.Ingest(ctx, traces)
return nil
}
// run awaits incoming stats resulting from the agent's ingestion, converts them
// to metrics and flushes them using the configured metrics exporter.
func (c *traceToMetricConnector) run() {
defer close(c.exit)
for {
select {
case stats := <-c.in:
if len(stats.Stats) == 0 {
continue
}
var mx pmetric.Metrics
var err error
c.logger.Debug("Received stats payload", zap.Any("stats", stats))
mx, err = c.translator.StatsToMetrics(stats)
if err != nil {
c.logger.Error("Failed to convert stats to metrics", zap.Error(err))
continue
}
// APM stats as metrics
ctx := context.TODO()
// send metrics to the consumer or next component in pipeline
if err := c.metricsConsumer.ConsumeMetrics(ctx, mx); err != nil {
c.logger.Error("Failed ConsumeMetrics", zap.Error(err))
return
}
case <-c.exit:
return
}
}
}