Skip to content

Commit

Permalink
[chore] move kafka exporter to generated lifecycle tests (#30531)
Browse files Browse the repository at this point in the history
Relates to
#27849

---------

Co-authored-by: Antoine Toulme <antoine@lunar-ocean.com>
  • Loading branch information
Frapschen and atoulme committed Mar 14, 2024
1 parent 9db1c15 commit b4965d2
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 29 deletions.
3 changes: 3 additions & 0 deletions exporter/kafkaexporter/factory.go
Expand Up @@ -148,6 +148,7 @@ func (f *kafkaExporterFactory) createTracesExporter(
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.BackOffConfig),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.Close))
}

Expand Down Expand Up @@ -178,6 +179,7 @@ func (f *kafkaExporterFactory) createMetricsExporter(
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.BackOffConfig),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.Close))
}

Expand Down Expand Up @@ -208,5 +210,6 @@ func (f *kafkaExporterFactory) createLogsExporter(
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.BackOffConfig),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.Close))
}
13 changes: 10 additions & 3 deletions exporter/kafkaexporter/factory_test.go
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/IBM/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -124,9 +125,11 @@ func TestCreateMetricExporter(t *testing.T) {
exportertest.NewNopCreateSettings(),
tc.conf,
)
require.NoError(t, err)
assert.NotNil(t, exporter, "Must return valid exporter")
err = exporter.Start(context.Background(), componenttest.NewNopHost())
if tc.err != nil {
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
return
}
assert.NoError(t, err, "Must not error")
Expand Down Expand Up @@ -199,9 +202,11 @@ func TestCreateLogExporter(t *testing.T) {
exportertest.NewNopCreateSettings(),
tc.conf,
)
require.NoError(t, err)
assert.NotNil(t, exporter, "Must return valid exporter")
err = exporter.Start(context.Background(), componenttest.NewNopHost())
if tc.err != nil {
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
return
}
assert.NoError(t, err, "Must not error")
Expand Down Expand Up @@ -274,9 +279,11 @@ func TestCreateTraceExporter(t *testing.T) {
exportertest.NewNopCreateSettings(),
tc.conf,
)
require.NoError(t, err)
assert.NotNil(t, exporter, "Must return valid exporter")
err = exporter.Start(context.Background(), componenttest.NewNopHost())
if tc.err != nil {
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
return
}
assert.NoError(t, err, "Must not error")
Expand Down
101 changes: 101 additions & 0 deletions exporter/kafkaexporter/generated_component_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 44 additions & 16 deletions exporter/kafkaexporter/kafka_exporter.go
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"

"github.com/IBM/sarama"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -23,6 +24,7 @@ var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")

// kafkaTracesProducer uses sarama to produce trace messages to Kafka.
type kafkaTracesProducer struct {
cfg Config
producer sarama.SyncProducer
topic string
marshaler TracesMarshaler
Expand Down Expand Up @@ -57,11 +59,24 @@ func (e *kafkaTracesProducer) tracesPusher(_ context.Context, td ptrace.Traces)
}

func (e *kafkaTracesProducer) Close(context.Context) error {
if e.producer == nil {
return nil
}
return e.producer.Close()
}

func (e *kafkaTracesProducer) start(_ context.Context, _ component.Host) error {
producer, err := newSaramaProducer(e.cfg)
if err != nil {
return err
}
e.producer = producer
return nil
}

// kafkaMetricsProducer uses sarama to produce metrics messages to kafka
type kafkaMetricsProducer struct {
cfg Config
producer sarama.SyncProducer
topic string
marshaler MetricsMarshaler
Expand All @@ -87,11 +102,24 @@ func (e *kafkaMetricsProducer) metricsDataPusher(_ context.Context, md pmetric.M
}

func (e *kafkaMetricsProducer) Close(context.Context) error {
if e.producer == nil {
return nil
}
return e.producer.Close()
}

func (e *kafkaMetricsProducer) start(_ context.Context, _ component.Host) error {
producer, err := newSaramaProducer(e.cfg)
if err != nil {
return err
}
e.producer = producer
return nil
}

// kafkaLogsProducer uses sarama to produce logs messages to kafka
type kafkaLogsProducer struct {
cfg Config
producer sarama.SyncProducer
topic string
marshaler LogsMarshaler
Expand All @@ -117,9 +145,21 @@ func (e *kafkaLogsProducer) logsDataPusher(_ context.Context, ld plog.Logs) erro
}

func (e *kafkaLogsProducer) Close(context.Context) error {
if e.producer == nil {
return nil
}
return e.producer.Close()
}

func (e *kafkaLogsProducer) start(_ context.Context, _ component.Host) error {
producer, err := newSaramaProducer(e.cfg)
if err != nil {
return err
}
e.producer = producer
return nil
}

func newSaramaProducer(config Config) (sarama.SyncProducer, error) {
c := sarama.NewConfig()

Expand Down Expand Up @@ -171,13 +211,8 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
producer, err := newSaramaProducer(config)
if err != nil {
return nil, err
}

return &kafkaMetricsProducer{
producer: producer,
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
Expand All @@ -196,12 +231,9 @@ func newTracesExporter(config Config, set exporter.CreateSettings, marshalers ma
keyableMarshaler.Key()
}
}
producer, err := newSaramaProducer(config)
if err != nil {
return nil, err
}

return &kafkaTracesProducer{
producer: producer,
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
Expand All @@ -213,13 +245,9 @@ func newLogsExporter(config Config, set exporter.CreateSettings, marshalers map[
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
producer, err := newSaramaProducer(config)
if err != nil {
return nil, err
}

return &kafkaLogsProducer{
producer: producer,
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
Expand Down
24 changes: 16 additions & 8 deletions exporter/kafkaexporter/kafka_exporter_test.go
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/IBM/sarama/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -26,8 +27,9 @@ import (
func TestNewExporter_err_version(t *testing.T) {
c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
require.NoError(t, err)
err = texp.start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
assert.Nil(t, texp)
}

func TestNewExporter_err_encoding(t *testing.T) {
Expand All @@ -40,8 +42,9 @@ func TestNewExporter_err_encoding(t *testing.T) {
func TestNewMetricsExporter_err_version(t *testing.T) {
c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers())
require.NoError(t, err)
err = mexp.start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
assert.Nil(t, mexp)
}

func TestNewMetricsExporter_err_encoding(t *testing.T) {
Expand All @@ -60,9 +63,10 @@ func TestNewMetricsExporter_err_traces_encoding(t *testing.T) {

func TestNewLogsExporter_err_version(t *testing.T) {
c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
mexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers())
lexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers())
require.NoError(t, err)
err = lexp.start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
assert.Nil(t, mexp)
}

func TestNewLogsExporter_err_encoding(t *testing.T) {
Expand Down Expand Up @@ -98,17 +102,20 @@ func TestNewExporter_err_auth_type(t *testing.T) {
},
}
texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
require.NoError(t, err)
err = texp.start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to load TLS config")
assert.Nil(t, texp)
mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers())
require.NoError(t, err)
err = mexp.start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to load TLS config")
assert.Nil(t, mexp)
lexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers())
require.NoError(t, err)
err = lexp.start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to load TLS config")
assert.Nil(t, lexp)

}

Expand All @@ -120,9 +127,10 @@ func TestNewExporter_err_compression(t *testing.T) {
},
}
texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
require.NoError(t, err)
err = texp.start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
assert.Contains(t, err.Error(), "producer.compression should be one of 'none', 'gzip', 'snappy', 'lz4', or 'zstd'. configured value idk")
assert.Nil(t, texp)
}

func TestTracesPusher(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions exporter/kafkaexporter/metadata.yaml
Expand Up @@ -9,7 +9,6 @@ status:
codeowners:
active: [pavolloffay, MovieStoreGuy]

# TODO: Update the exporter to pass the tests
tests:
config:
skip_lifecycle: true
skip_shutdown: true

0 comments on commit b4965d2

Please sign in to comment.