Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(processor): preserve auth in batch processor #10002

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 14 additions & 10 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ var _ consumer.Metrics = (*batchProcessor)(nil)
var _ consumer.Logs = (*batchProcessor)(nil)

// newBatchProcessor returns a new batch processor component.
func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func() batch) (*batchProcessor, error) {
func newBatchProcessor(ctx context.Context, set processor.CreateSettings, cfg *Config, batchFunc func() batch) (*batchProcessor, error) {
// use lower-case, to be consistent with http/2 headers.
mks := make([]string, len(cfg.MetadataKeys))
for i, k := range cfg.MetadataKeys {
Expand All @@ -129,7 +129,9 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func
metadataLimit: int(cfg.MetadataCardinalityLimit),
}
if len(bp.metadataKeys) == 0 {
s := bp.newShard(nil)
// Retrieve info from the context and preserve Auth
info := client.FromContext(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context at this level doesn't have auth data, as newBatchProcessor isn't called in response to incoming data, but during the startup of the collector.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood! The idea is to propogate the existing context in all shards created.

The newShard function is the same regardless of whether we want to start at startup with a single shard or start multiple shards as per the metadata. I figured we just need to amend the signature of newShard to give it an existing (if present) AuthData.

Is my understanding fair?

s := bp.newShard(nil, info.Auth)
s.start()
bp.batcher = &singleShardBatcher{batcher: s}
} else {
Expand All @@ -148,8 +150,10 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func
}

// newShard gets or creates a batcher corresponding with attrs.
func (bp *batchProcessor) newShard(md map[string][]string) *shard {
func (bp *batchProcessor) newShard(md map[string][]string, auth client.AuthData) *shard {
// Get the clientInfo from the context
exportCtx := client.NewContext(context.Background(), client.Info{
Auth: auth,
Metadata: client.NewMetadata(md),
})
b := &shard{
Expand Down Expand Up @@ -323,7 +327,7 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
// aset.ToSlice() returns the sorted, deduplicated,
// and name-downcased list of attributes.
var loaded bool
b, loaded = mb.batchers.LoadOrStore(aset, mb.newShard(md))
b, loaded = mb.batchers.LoadOrStore(aset, mb.newShard(md, info.Auth))
if !loaded {
// Start the goroutine only if we added the object to the map, otherwise is already started.
b.(*shard).start()
Expand Down Expand Up @@ -357,18 +361,18 @@ func (bp *batchProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
}

// newBatchTracesProcessor creates a new batch processor that batches traces by size or with timeout
func newBatchTracesProcessor(set processor.CreateSettings, next consumer.Traces, cfg *Config) (*batchProcessor, error) {
return newBatchProcessor(set, cfg, func() batch { return newBatchTraces(next) })
func newBatchTracesProcessor(ctx context.Context, set processor.CreateSettings, next consumer.Traces, cfg *Config) (*batchProcessor, error) {
return newBatchProcessor(ctx, set, cfg, func() batch { return newBatchTraces(next) })
}

// newBatchMetricsProcessor creates a new batch processor that batches metrics by size or with timeout
func newBatchMetricsProcessor(set processor.CreateSettings, next consumer.Metrics, cfg *Config) (*batchProcessor, error) {
return newBatchProcessor(set, cfg, func() batch { return newBatchMetrics(next) })
func newBatchMetricsProcessor(ctx context.Context, set processor.CreateSettings, next consumer.Metrics, cfg *Config) (*batchProcessor, error) {
return newBatchProcessor(ctx, set, cfg, func() batch { return newBatchMetrics(next) })
}

// newBatchLogsProcessor creates a new batch processor that batches logs by size or with timeout
func newBatchLogsProcessor(set processor.CreateSettings, next consumer.Logs, cfg *Config) (*batchProcessor, error) {
return newBatchProcessor(set, cfg, func() batch { return newBatchLogs(next) })
func newBatchLogsProcessor(ctx context.Context, set processor.CreateSettings, next consumer.Logs, cfg *Config) (*batchProcessor, error) {
return newBatchProcessor(ctx, set, cfg, func() batch { return newBatchLogs(next) })
}

type batchTraces struct {
Expand Down
58 changes: 37 additions & 21 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ func TestProcessorLifecycle(t *testing.T) {
}

func TestBatchProcessorSpansDelivered(t *testing.T) {
ctx := context.Background()
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
batcher, err := newBatchTracesProcessor(ctx, creationSet, sink, cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -121,13 +122,14 @@ func TestBatchProcessorSpansDelivered(t *testing.T) {
}

func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
ctx := context.Background()
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.SendBatchMaxSize = 130
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
batcher, err := newBatchTracesProcessor(ctx, creationSet, sink, cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -169,6 +171,7 @@ func TestBatchProcessorSentBySize(t *testing.T) {
}

func testBatchProcessorSentBySize(t *testing.T, tel testTelemetry) {
ctx := context.Background()
sizer := &ptrace.ProtoMarshaler{}
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
Expand All @@ -177,7 +180,7 @@ func testBatchProcessorSentBySize(t *testing.T, tel testTelemetry) {
cfg.Timeout = 500 * time.Millisecond
creationSet := tel.NewProcessorCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
batcher, err := newBatchTracesProcessor(ctx, creationSet, sink, cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -224,6 +227,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) {
}

func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry) {
ctx := context.Background()
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
sendBatchSize := 20
Expand All @@ -233,7 +237,7 @@ func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry) {
cfg.Timeout = 500 * time.Millisecond
creationSet := tel.NewProcessorCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
batcher, err := newBatchTracesProcessor(ctx, creationSet, sink, cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -268,6 +272,7 @@ func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry) {
}

func TestBatchProcessorSentByTimeout(t *testing.T) {
ctx := context.Background()
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
sendBatchSize := 100
Expand All @@ -280,7 +285,7 @@ func TestBatchProcessorSentByTimeout(t *testing.T) {

creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
batcher, err := newBatchTracesProcessor(ctx, creationSet, sink, cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -319,6 +324,7 @@ func TestBatchProcessorSentByTimeout(t *testing.T) {
}

func TestBatchProcessorTraceSendWhenClosing(t *testing.T) {
ctx := context.Background()
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 1000,
Expand All @@ -327,7 +333,7 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) {

creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, &cfg)
batcher, err := newBatchTracesProcessor(ctx, creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand All @@ -345,6 +351,7 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) {
}

func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
ctx := context.Background()
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Expand All @@ -358,7 +365,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {

creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg)
batcher, err := newBatchMetricsProcessor(ctx, creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -398,6 +405,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) {
}

func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry) {
ctx := context.Background()
sizer := &pmetric.ProtoMarshaler{}

// Instantiate the batch processor with low config values to test data
Expand All @@ -415,7 +423,7 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry) {

creationSet := tel.NewProcessorCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg)
batcher, err := newBatchMetricsProcessor(ctx, creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -472,6 +480,7 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {
}

func TestBatchMetricsProcessor_Timeout(t *testing.T) {
ctx := context.Background()
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 101,
Expand All @@ -482,7 +491,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) {

creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg)
batcher, err := newBatchMetricsProcessor(ctx, creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -521,6 +530,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) {
}

func TestBatchMetricProcessor_Shutdown(t *testing.T) {
ctx := context.Background()
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 1000,
Expand All @@ -531,7 +541,7 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) {

creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg)
batcher, err := newBatchMetricsProcessor(ctx, creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -630,7 +640,7 @@ func runMetricsProcessorBenchmark(b *testing.B, cfg Config) {
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
metricsPerRequest := 1000
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg)
batcher, err := newBatchMetricsProcessor(ctx, creationSet, sink, &cfg)
require.NoError(b, err)
require.NoError(b, batcher.Start(ctx, componenttest.NewNopHost()))

Expand Down Expand Up @@ -664,6 +674,7 @@ func (sme *metricsSink) ConsumeMetrics(_ context.Context, md pmetric.Metrics) er
}

func TestBatchLogProcessor_ReceivingData(t *testing.T) {
ctx := context.Background()
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Expand All @@ -677,7 +688,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {

creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
batcher, err := newBatchLogsProcessor(ctx, creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -717,6 +728,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) {
}

func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry) {
ctx := context.Background()
sizer := &plog.ProtoMarshaler{}

// Instantiate the batch processor with low config values to test data
Expand All @@ -732,7 +744,7 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry) {

creationSet := tel.NewProcessorCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
batcher, err := newBatchLogsProcessor(ctx, creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -770,6 +782,7 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry) {
}

func TestBatchLogsProcessor_Timeout(t *testing.T) {
ctx := context.Background()
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 100,
Expand All @@ -780,7 +793,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) {

creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
batcher, err := newBatchLogsProcessor(ctx, creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -819,6 +832,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) {
}

func TestBatchLogProcessor_Shutdown(t *testing.T) {
ctx := context.Background()
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 1000,
Expand All @@ -829,7 +843,7 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) {

creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
batcher, err := newBatchLogsProcessor(ctx, creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -898,6 +912,7 @@ func (mts *metadataTracesSink) ConsumeTraces(ctx context.Context, td ptrace.Trac
}

func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) {
bg := context.Background()
sink := &metadataTracesSink{
TracesSink: &consumertest.TracesSink{},
spanCountByToken12: map[string]int{},
Expand All @@ -908,11 +923,10 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) {
cfg.MetadataKeys = []string{"token1", "token2"}
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
batcher, err := newBatchTracesProcessor(bg, creationSet, sink, cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

bg := context.Background()
callCtxs := []context.Context{
client.NewContext(bg, client.Info{
Metadata: client.NewMetadata(map[string][]string{
Expand Down Expand Up @@ -994,18 +1008,18 @@ func TestBatchProcessorDuplicateMetadataKeys(t *testing.T) {
}

func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) {
bg := context.Background()
const cardLimit = 10

sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
cfg.MetadataKeys = []string{"token"}
cfg.MetadataCardinalityLimit = cardLimit
creationSet := processortest.NewNopCreateSettings()
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
batcher, err := newBatchTracesProcessor(bg, creationSet, sink, cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

bg := context.Background()
for requestNum := 0; requestNum < cardLimit; requestNum++ {
td := testdata.GenerateTraces(1)
ctx := client.NewContext(bg, client.Info{
Expand Down Expand Up @@ -1033,6 +1047,7 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) {
}

func TestBatchZeroConfig(t *testing.T) {
ctx := context.Background()
// This is a no-op configuration. No need for a timer, no
// minimum, no maximum, just a pass through.
cfg := Config{}
Expand All @@ -1044,7 +1059,7 @@ func TestBatchZeroConfig(t *testing.T) {
sink := new(consumertest.LogsSink)
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
batcher, err := newBatchLogsProcessor(ctx, creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
defer func() { require.NoError(t, batcher.Shutdown(context.Background())) }()
Expand Down Expand Up @@ -1072,6 +1087,7 @@ func TestBatchZeroConfig(t *testing.T) {
}

func TestBatchSplitOnly(t *testing.T) {
ctx := context.Background()
const maxBatch = 10
const requestCount = 5
const logsPerRequest = 100
Expand All @@ -1085,7 +1101,7 @@ func TestBatchSplitOnly(t *testing.T) {
sink := new(consumertest.LogsSink)
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
batcher, err := newBatchLogsProcessor(ctx, creationSet, sink, &cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
defer func() { require.NoError(t, batcher.Shutdown(context.Background())) }()
Expand Down