Skip to content

Commit

Permalink
[datadog/connector] Create a simplified Trace to Trace connector (ope…
Browse files Browse the repository at this point in the history
…n-telemetry#31026)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

Datadog Connector is creating two instances of Trace Agent, one in the
trace-to-metrics pipeline and another in the traces-to-traces pipeline.
The PR separates the trace-to-trace connector, simplifying the logic,
this avoid un-necessary serialization.

**Link to tracking Issue:** <Issue number if applicable>

open-telemetry#30828

open-telemetry#30487

**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:** <Describe the documentation added.>
  • Loading branch information
dineshg13 authored and anthoai97 committed Feb 12, 2024
1 parent a6df44d commit 16445b3
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 62 deletions.
22 changes: 22 additions & 0 deletions .chloggen/dinesh.gurumurthy_fix-connector.yaml
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: datadog/connector
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Create a separate connector in the Datadog connector for the trace-to-metrics and trace-to-trace pipelines. It should reduce the number of conversions we do and help with Datadog connector performance.
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30828]
# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Simplify datadog/connector with two separate connectors in trace-to-metrics pipeline and trace-to-trace pipeline.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
36 changes: 15 additions & 21 deletions connector/datadogconnector/connector.go
Expand Up @@ -21,10 +21,9 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog"
)

// connectorImp is the schema for connector
type connectorImp struct {
// traceToMetricConnector is the schema for connector
type traceToMetricConnector struct {
metricsConsumer consumer.Metrics // the next component in the pipeline to ingest metrics after connector
tracesConsumer consumer.Traces // the next component in the pipeline to ingest traces after connector
logger *zap.Logger

// agent specifies the agent used to ingest traces and output APM Stats.
Expand All @@ -43,12 +42,11 @@ type connectorImp struct {
exit chan struct{}
}

var _ component.Component = (*connectorImp)(nil) // testing that the connectorImp properly implements the type Component interface
var _ component.Component = (*traceToMetricConnector)(nil) // testing that the connectorImp properly implements the type Component interface

// function to create a new connector
func newConnector(set component.TelemetrySettings, cfg component.Config, metricsConsumer consumer.Metrics, tracesConsumer consumer.Traces) (*connectorImp, error) {
set.Logger.Info("Building datadog connector")

func newTraceToMetricConnector(set component.TelemetrySettings, cfg component.Config, metricsConsumer consumer.Metrics) (*traceToMetricConnector, error) {
set.Logger.Info("Building datadog connector for traces to metrics")
in := make(chan *pb.StatsPayload, 100)
set.MeterProvider = noop.NewMeterProvider() // disable metrics for the connector
attributesTranslator, err := attributes.NewTranslator(set)
Expand All @@ -61,13 +59,12 @@ func newConnector(set component.TelemetrySettings, cfg component.Config, metrics
}

ctx := context.Background()
return &connectorImp{
return &traceToMetricConnector{
logger: set.Logger,
agent: datadog.NewAgentWithConfig(ctx, getTraceAgentCfg(cfg.(*Config).Traces), in),
translator: trans,
in: in,
metricsConsumer: metricsConsumer,
tracesConsumer: tracesConsumer,
exit: make(chan struct{}),
}, nil
}
Expand All @@ -86,18 +83,18 @@ func getTraceAgentCfg(cfg TracesConfig) *traceconfig.AgentConfig {
}

// Start implements the component.Component interface.
func (c *connectorImp) Start(_ context.Context, _ component.Host) error {
func (c *traceToMetricConnector) Start(_ context.Context, _ component.Host) error {
c.logger.Info("Starting datadogconnector")
c.agent.Start()
if c.metricsConsumer != nil {
go c.run()
}
go c.run()
return nil
}

// Shutdown implements the component.Component interface.
func (c *connectorImp) Shutdown(context.Context) error {
func (c *traceToMetricConnector) Shutdown(context.Context) error {
c.logger.Info("Shutting down datadog connector")
c.logger.Info("Stopping datadog agent")
// stop the agent and wait for the run loop to exit
c.agent.Stop()
c.exit <- struct{}{} // signal exit
<-c.exit // wait for close
Expand All @@ -106,21 +103,18 @@ func (c *connectorImp) Shutdown(context.Context) error {

// Capabilities implements the consumer interface.
// tells use whether the component(connector) will mutate the data passed into it. if set to true the connector does modify the data
func (c *connectorImp) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true} // ConsumeTraces puts a new attribute _dd.stats_computed
func (c *traceToMetricConnector) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (c *connectorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
func (c *traceToMetricConnector) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
c.agent.Ingest(ctx, traces)
if c.tracesConsumer != nil {
return c.tracesConsumer.ConsumeTraces(ctx, traces)
}
return nil
}

// run awaits incoming stats resulting from the agent's ingestion, converts them
// to metrics and flushes them using the configured metrics exporter.
func (c *connectorImp) run() {
func (c *traceToMetricConnector) run() {
defer close(c.exit)
for {
select {
Expand Down
14 changes: 10 additions & 4 deletions connector/datadogconnector/connector_test.go
Expand Up @@ -13,11 +13,10 @@ import (
"go.opentelemetry.io/collector/consumer/consumertest"
)

var _ component.Component = (*connectorImp)(nil) // testing that the connectorImp properly implements the type Component interface
var _ component.Component = (*traceToMetricConnector)(nil) // testing that the connectorImp properly implements the type Component interface

// create test to create a connector, check that basic code compiles
func TestNewConnector(t *testing.T) {

factory := NewFactory()

creationParams := connectortest.NewNopCreateSettings()
Expand All @@ -26,12 +25,19 @@ func TestNewConnector(t *testing.T) {
traceToMetricsConnector, err := factory.CreateTracesToMetrics(context.Background(), creationParams, cfg, consumertest.NewNop())
assert.NoError(t, err)

_, ok := traceToMetricsConnector.(*connectorImp)
_, ok := traceToMetricsConnector.(*traceToMetricConnector)
assert.True(t, ok) // checks if the created connector implements the connectorImp struct
}

func TestTraceToTraceConnector(t *testing.T) {
factory := NewFactory()

creationParams := connectortest.NewNopCreateSettings()
cfg := factory.CreateDefaultConfig().(*Config)

traceToTracesConnector, err := factory.CreateTracesToTraces(context.Background(), creationParams, cfg, consumertest.NewNop())
assert.NoError(t, err)

_, ok = traceToTracesConnector.(*connectorImp)
_, ok := traceToTracesConnector.(*traceToTraceConnector)
assert.True(t, ok) // checks if the created connector implements the connectorImp struct
}
10 changes: 3 additions & 7 deletions connector/datadogconnector/factory.go
Expand Up @@ -37,17 +37,13 @@ func createDefaultConfig() component.Config {
// defines the consumer type of the connector
// we want to consume traces and export metrics therefore define nextConsumer as metrics, consumer is the next component in the pipeline
func createTracesToMetricsConnector(_ context.Context, params connector.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (connector.Traces, error) {
c, err := newConnector(params.TelemetrySettings, cfg, nextConsumer, nil)
c, err := newTraceToMetricConnector(params.TelemetrySettings, cfg, nextConsumer)
if err != nil {
return nil, err
}
return c, nil
}

func createTracesToTracesConnector(_ context.Context, params connector.CreateSettings, cfg component.Config, nextConsumer consumer.Traces) (connector.Traces, error) {
c, err := newConnector(params.TelemetrySettings, cfg, nil, nextConsumer)
if err != nil {
return nil, err
}
return c, nil
func createTracesToTracesConnector(_ context.Context, params connector.CreateSettings, _ component.Config, nextConsumer consumer.Traces) (connector.Traces, error) {
return newTraceToTraceConnector(params.Logger, nextConsumer), nil
}
58 changes: 58 additions & 0 deletions connector/datadogconnector/traces_connector.go
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)

// keyStatsComputed specifies the resource attribute key which indicates if stats have been
// computed for the resource spans.
const keyStatsComputed = "_dd.stats_computed"

type traceToTraceConnector struct {
logger *zap.Logger
tracesConsumer consumer.Traces // the next component in the pipeline to ingest traces after connector
}

func newTraceToTraceConnector(logger *zap.Logger, nextConsumer consumer.Traces) *traceToTraceConnector {
logger.Info("Building datadog connector for trace to trace")
return &traceToTraceConnector{
logger: logger,
tracesConsumer: nextConsumer,
}
}

// Start implements the component interface.
func (c *traceToTraceConnector) Start(_ context.Context, _ component.Host) error {
return nil
}

// Shutdown implements the component interface.
func (c *traceToTraceConnector) Shutdown(_ context.Context) error {
return nil
}

// Capabilities implements the consumer interface.
// tells use whether the component(connector) will mutate the data passed into it. if set to true the connector does modify the data
func (c *traceToTraceConnector) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true} // ConsumeTraces puts a new attribute _dd.stats_computed
}

// ConsumeTraces implements the consumer interface.
func (c *traceToTraceConnector) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
for i := 0; i < traces.ResourceSpans().Len(); i++ {
rs := traces.ResourceSpans().At(i)
// Stats will be computed for p. Mark the original resource spans to ensure that they don't
// get computed twice in case these spans pass through here again.
rs.Resource().Attributes().PutBool(keyStatsComputed, true)

}
return c.tracesConsumer.ConsumeTraces(ctx, traces)
}
7 changes: 0 additions & 7 deletions internal/datadog/agent.go
Expand Up @@ -21,10 +21,6 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
)

// keyStatsComputed specifies the resource attribute key which indicates if stats have been
// computed for the resource spans.
const keyStatsComputed = "_dd.stats_computed"

// TraceAgent specifies a minimal trace agent instance that is able to process traces and output stats.
type TraceAgent struct {
*agent.Agent
Expand Down Expand Up @@ -148,9 +144,6 @@ func (p *TraceAgent) Ingest(ctx context.Context, traces ptrace.Traces) {
// ...the call transforms the OTLP Spans into a Datadog payload and sends the result
// down the p.pchan channel

// Stats will be computed for p. Mark the original resource spans to ensure that they don't
// get computed twice in case these spans pass through here again.
rspans.Resource().Attributes().PutBool(keyStatsComputed, true)
}
}

Expand Down
23 changes: 0 additions & 23 deletions internal/datadog/agent_test.go
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/DataDog/datadog-agent/pkg/trace/testutil"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
)

func TestTraceAgentConfig(t *testing.T) {
Expand Down Expand Up @@ -79,26 +78,4 @@ loop:
// of groups
require.Len(t, stats.Stats[0].Stats[0].Stats, traces.SpanCount())
require.Len(t, a.TraceWriter.In, 0) // the trace writer channel should've been drained

// Check that the payload is labeled
val, ok := traces.ResourceSpans().At(0).Resource().Attributes().Get(keyStatsComputed)
require.True(t, ok)
require.Equal(t, pcommon.ValueTypeBool, val.Type())
require.True(t, val.Bool())

// Ingest again
a.Ingest(ctx, traces)
timeout = time.After(500 * time.Millisecond)
loop2:
for {
select {
case stats = <-out:
if len(stats.Stats) != 0 {
t.Fatal("got payload when none was expected")
}
case <-timeout:
// We got no stats (expected), thus we end the test
break loop2
}
}
}

0 comments on commit 16445b3

Please sign in to comment.