-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
connector.go
240 lines (210 loc) · 8.17 KB
/
connector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package datadogconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector"
import (
"context"
"fmt"
"sync"
"time"
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
traceconfig "github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/timing"
"github.com/DataDog/datadog-go/v5/statsd"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics"
"github.com/patrickmn/go-cache"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/collector/semconv/v1.17.0"
"go.opentelemetry.io/otel/metric/noop"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog"
)
// traceToMetricConnector is the schema for connector
type traceToMetricConnector struct {
metricsConsumer consumer.Metrics // the next component in the pipeline to ingest metrics after connector
logger *zap.Logger
// agent specifies the agent used to ingest traces and output APM Stats.
// It is implemented by the traceagent structure; replaced in tests.
agent datadog.Ingester
// translator specifies the translator used to transform APM Stats Payloads
// from the agent to OTLP Metrics.
translator *metrics.Translator
resourceAttrs map[string]string
containerTagCache *cache.Cache
// in specifies the channel through which the agent will output Stats Payloads
// resulting from ingested traces.
in chan *pb.StatsPayload
// exit specifies the exit channel, which will be closed upon shutdown.
exit chan struct{}
// isStarted tracks whether Start() has been called.
isStarted bool
}
var _ component.Component = (*traceToMetricConnector)(nil) // testing that the connectorImp properly implements the type Component interface
// cacheExpiration is the time after which a container tag cache entry will expire
// and be removed from the cache.
var cacheExpiration = time.Minute * 5
// cacheCleanupInterval is the time after which the cache will be cleaned up.
var cacheCleanupInterval = time.Minute
// function to create a new connector
func newTraceToMetricConnector(set component.TelemetrySettings, cfg component.Config, metricsConsumer consumer.Metrics, metricsClient statsd.ClientInterface, timingReporter timing.Reporter) (*traceToMetricConnector, error) {
set.Logger.Info("Building datadog connector for traces to metrics")
in := make(chan *pb.StatsPayload, 100)
set.MeterProvider = noop.NewMeterProvider() // disable metrics for the connector
attributesTranslator, err := attributes.NewTranslator(set)
if err != nil {
return nil, fmt.Errorf("failed to create attributes translator: %w", err)
}
trans, err := metrics.NewTranslator(set, attributesTranslator)
if err != nil {
return nil, fmt.Errorf("failed to create metrics translator: %w", err)
}
ctags := make(map[string]string, len(cfg.(*Config).Traces.ResourceAttributesAsContainerTags))
for _, val := range cfg.(*Config).Traces.ResourceAttributesAsContainerTags {
ctags[val] = ""
}
ddtags := attributes.ContainerTagFromAttributes(ctags)
ctx := context.Background()
return &traceToMetricConnector{
logger: set.Logger,
agent: datadog.NewAgentWithConfig(ctx, getTraceAgentCfg(cfg.(*Config).Traces, attributesTranslator), in, metricsClient, timingReporter),
translator: trans,
in: in,
metricsConsumer: metricsConsumer,
resourceAttrs: ddtags,
containerTagCache: cache.New(cacheExpiration, cacheCleanupInterval),
exit: make(chan struct{}),
}, nil
}
func getTraceAgentCfg(cfg TracesConfig, attributesTranslator *attributes.Translator) *traceconfig.AgentConfig {
acfg := traceconfig.New()
acfg.OTLPReceiver.AttributesTranslator = attributesTranslator
acfg.OTLPReceiver.SpanNameRemappings = cfg.SpanNameRemappings
acfg.OTLPReceiver.SpanNameAsResourceName = cfg.SpanNameAsResourceName
acfg.Ignore["resource"] = cfg.IgnoreResources
acfg.ComputeStatsBySpanKind = cfg.ComputeStatsBySpanKind
acfg.PeerTagsAggregation = cfg.PeerTagsAggregation
acfg.PeerTags = cfg.PeerTags
if len(cfg.ResourceAttributesAsContainerTags) > 0 {
acfg.Features["enable_cid_stats"] = struct{}{}
delete(acfg.Features, "disable_cid_stats")
}
if v := cfg.TraceBuffer; v > 0 {
acfg.TraceBuffer = v
}
return acfg
}
// Start implements the component.Component interface.
func (c *traceToMetricConnector) Start(_ context.Context, _ component.Host) error {
c.logger.Info("Starting datadogconnector")
c.agent.Start()
go c.run()
c.isStarted = true
return nil
}
// Shutdown implements the component.Component interface.
func (c *traceToMetricConnector) Shutdown(context.Context) error {
if !c.isStarted {
// Note: it is not necessary to manually close c.exit, c.in and c.agent.(*datadog.TraceAgent).exit channels as these are unused.
c.logger.Info("Requested shutdown, but not started, ignoring.")
return nil
}
c.logger.Info("Shutting down datadog connector")
c.logger.Info("Stopping datadog agent")
// stop the agent and wait for the run loop to exit
c.agent.Stop()
c.exit <- struct{}{} // signal exit
<-c.exit // wait for close
return nil
}
// Capabilities implements the consumer interface.
// tells use whether the component(connector) will mutate the data passed into it. if set to true the connector does modify the data
func (c *traceToMetricConnector) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (c *traceToMetricConnector) populateContainerTagsCache(traces ptrace.Traces) {
for i := 0; i < traces.ResourceSpans().Len(); i++ {
rs := traces.ResourceSpans().At(i)
attrs := rs.Resource().Attributes()
containerID, ok := attrs.Get(semconv.AttributeContainerID)
if !ok {
continue
}
ddContainerTags := attributes.ContainerTagsFromResourceAttributes(attrs)
for attr := range c.resourceAttrs {
if val, ok := ddContainerTags[attr]; ok {
key := fmt.Sprintf("%s:%s", attr, val)
if v, ok := c.containerTagCache.Get(containerID.AsString()); ok {
cacheVal := v.(*sync.Map)
// check if the key already exists in the cache
if _, ok := cacheVal.Load(key); ok {
continue
}
cacheVal.Store(key, struct{}{})
} else {
cacheVal := &sync.Map{}
cacheVal.Store(key, struct{}{})
c.containerTagCache.Set(containerID.AsString(), cacheVal, cache.DefaultExpiration)
}
}
}
}
}
func (c *traceToMetricConnector) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
c.populateContainerTagsCache(traces)
c.agent.Ingest(ctx, traces)
return nil
}
func (c *traceToMetricConnector) enrichStatsPayload(stats *pb.StatsPayload) {
for _, stat := range stats.Stats {
if stat.ContainerID != "" {
if tags, ok := c.containerTagCache.Get(stat.ContainerID); ok {
tagList := tags.(*sync.Map)
for _, tag := range stat.Tags {
tagList.Store(tag, struct{}{})
}
stat.Tags = make([]string, 0)
tagList.Range(func(key, _ any) bool {
stat.Tags = append(stat.Tags, key.(string))
return true
})
}
}
}
}
// run awaits incoming stats resulting from the agent's ingestion, converts them
// to metrics and flushes them using the configured metrics exporter.
func (c *traceToMetricConnector) run() {
defer close(c.exit)
for {
select {
case stats := <-c.in:
if len(stats.Stats) == 0 {
continue
}
var mx pmetric.Metrics
var err error
// Enrich the stats with container tags
if len(c.resourceAttrs) > 0 {
c.enrichStatsPayload(stats)
}
c.logger.Debug("Received stats payload", zap.Any("stats", stats))
mx, err = c.translator.StatsToMetrics(stats)
if err != nil {
c.logger.Error("Failed to convert stats to metrics", zap.Error(err))
continue
}
// APM stats as metrics
ctx := context.TODO()
// send metrics to the consumer or next component in pipeline
if err := c.metricsConsumer.ConsumeMetrics(ctx, mx); err != nil {
c.logger.Error("Failed ConsumeMetrics", zap.Error(err))
return
}
case <-c.exit:
return
}
}
}