diff --git a/.chloggen/encoding_fileexporter.yaml b/.chloggen/encoding_fileexporter.yaml new file mode 100644 index 0000000000000..53229897abcf5 --- /dev/null +++ b/.chloggen/encoding_fileexporter.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: fileexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adopt the encoding extension with the file exporter. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31774] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/configschema/go.mod b/cmd/configschema/go.mod index b3c155922e9b1..5d6b87966462a 100644 --- a/cmd/configschema/go.mod +++ b/cmd/configschema/go.mod @@ -1151,3 +1151,7 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/stor replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/dbstorage => ../../extension/storage/dbstorage replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage => ../../extension/storage + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension => ../../extension/encoding/otlpencodingextension + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding diff --git a/exporter/fileexporter/README.md b/exporter/fileexporter/README.md index ce011b725c67e..986343386155c 100644 --- a/exporter/fileexporter/README.md +++ b/exporter/fileexporter/README.md @@ -47,6 +47,7 @@ The following settings are optional: - localtime : [default: false (use UTC)] whether or not the timestamps in backup files is formatted according to the host's local time. - `format`[default: json]: define the data format of encoded telemetry data. The setting can be overridden with `proto`. +- `encoding`[default: none]: if specified, uses an encoding extension to encode telemetry data. Overrides `format`. - `append`[default: `false`] defines whether append to the file (`true`) or truncate (`false`). If `append: true` is set then setting `rotation` or `compression` is currently not supported. - `compression`[no default]: the compression algorithm used when exporting telemetry data to file. Supported compression algorithms:`zstd` - `flush_interval`[default: 1s]: `time.Duration` interval between flushes. See [time.ParseDuration](https://pkg.go.dev/time#ParseDuration) for valid formats. diff --git a/exporter/fileexporter/config.go b/exporter/fileexporter/config.go index 8508b6b01d9e2..bdd05dd278b54 100644 --- a/exporter/fileexporter/config.go +++ b/exporter/fileexporter/config.go @@ -40,6 +40,10 @@ type Config struct { // - proto: OTLP binary protobuf bytes. FormatType string `mapstructure:"format"` + // Encoding defines the encoding of the telemetry data. + // If specified, it overrides `FormatType` and applies an encoding extension. + Encoding *component.ID `mapstructure:"encoding"` + // Compression Codec used to export telemetry data // Supported compression algorithms:`zstd` Compression string `mapstructure:"compression"` diff --git a/exporter/fileexporter/encoding_test.go b/exporter/fileexporter/encoding_test.go new file mode 100644 index 0000000000000..fbcf354158791 --- /dev/null +++ b/exporter/fileexporter/encoding_test.go @@ -0,0 +1,120 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package fileexporter + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/extension/extensiontest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension" +) + +type hostWithEncoding struct { + encodings map[component.ID]component.Component +} + +func (h hostWithEncoding) GetFactory(_ component.Kind, _ component.Type) component.Factory { + panic("unsupported") +} + +func (h hostWithEncoding) GetExtensions() map[component.ID]component.Component { + return h.encodings +} + +func (h hostWithEncoding) GetExporters() map[component.DataType]map[component.ID]component.Component { + panic("unsupported") +} + +func TestEncoding(t *testing.T) { + f := NewFactory() + cfg := f.CreateDefaultConfig().(*Config) + cfg.Path = filepath.Join(t.TempDir(), "encoding.txt") + id := component.MustNewID("otlpjson") + cfg.Encoding = &id + + ef := otlpencodingextension.NewFactory() + efCfg := ef.CreateDefaultConfig().(*otlpencodingextension.Config) + efCfg.Protocol = "otlp_json" + ext, err := ef.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), efCfg) + require.NoError(t, err) + require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost())) + + me, err := f.CreateMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + te, err := f.CreateTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + le, err := f.CreateLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + host := hostWithEncoding{ + map[component.ID]component.Component{id: ext}, + } + require.NoError(t, me.Start(context.Background(), host)) + require.NoError(t, te.Start(context.Background(), host)) + require.NoError(t, le.Start(context.Background(), host)) + t.Cleanup(func() { + + }) + + require.NoError(t, me.ConsumeMetrics(context.Background(), generateMetrics())) + require.NoError(t, te.ConsumeTraces(context.Background(), generateTraces())) + require.NoError(t, le.ConsumeLogs(context.Background(), generateLogs())) + + require.NoError(t, me.Shutdown(context.Background())) + require.NoError(t, te.Shutdown(context.Background())) + require.NoError(t, le.Shutdown(context.Background())) + + b, err := os.ReadFile(cfg.Path) + require.NoError(t, err) + require.Contains(t, string(b), `{"resourceMetrics":`) + require.Contains(t, string(b), `{"resourceSpans":`) + require.Contains(t, string(b), `{"resourceLogs":`) +} + +func generateLogs() plog.Logs { + logs := plog.NewLogs() + rl := logs.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("resource", "R1") + l := rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + l.Body().SetStr("test log message") + l.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + return logs +} + +func generateMetrics() pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutStr("resource", "R1") + m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + m.SetName("test_metric") + dp := m.SetEmptyGauge().DataPoints().AppendEmpty() + dp.Attributes().PutStr("test_attr", "value_1") + dp.SetIntValue(123) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + return metrics +} + +func generateTraces() ptrace.Traces { + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + rs.Resource().Attributes().PutStr("resource", "R1") + span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.Attributes().PutStr("test_attr", "value_1") + span.SetName("test_span") + span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(-1 * time.Second))) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now())) + return traces +} diff --git a/exporter/fileexporter/file_exporter.go b/exporter/fileexporter/file_exporter.go index e3b6a2f6b095b..c7a090f28c6ce 100644 --- a/exporter/fileexporter/file_exporter.go +++ b/exporter/fileexporter/file_exporter.go @@ -44,11 +44,14 @@ func (e *fileExporter) consumeLogs(_ context.Context, ld plog.Logs) error { } // Start starts the flush timer if set. -func (e *fileExporter) Start(_ context.Context, _ component.Host) error { - e.marshaller = newMarshaller(e.conf) +func (e *fileExporter) Start(_ context.Context, host component.Host) error { + var err error + e.marshaller, err = newMarshaller(e.conf, host) + if err != nil { + return err + } export := buildExportFunc(e.conf) - var err error e.writer, err = newFileWriter(e.conf.Path, e.conf.Append, e.conf.Rotation, e.conf.FlushInterval, export) if err != nil { return err diff --git a/exporter/fileexporter/go.mod b/exporter/fileexporter/go.mod index 42e89cafe3752..9311f978264ad 100644 --- a/exporter/fileexporter/go.mod +++ b/exporter/fileexporter/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/klauspost/compress v1.17.7 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension v0.96.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.96.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.96.0 github.com/stretchr/testify v1.9.0 @@ -12,6 +13,7 @@ require ( go.opentelemetry.io/collector/confmap v0.96.1-0.20240315172937-3b5aee0c7a16 go.opentelemetry.io/collector/consumer v0.96.1-0.20240315172937-3b5aee0c7a16 go.opentelemetry.io/collector/exporter v0.96.1-0.20240315172937-3b5aee0c7a16 + go.opentelemetry.io/collector/extension v0.96.1-0.20240315172937-3b5aee0c7a16 go.opentelemetry.io/collector/pdata v1.3.1-0.20240315172937-3b5aee0c7a16 go.opentelemetry.io/otel/metric v1.24.0 go.opentelemetry.io/otel/trace v1.24.0 @@ -24,7 +26,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect @@ -39,15 +41,15 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.96.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.19.0 // indirect github.com/prometheus/client_model v0.6.0 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect go.opentelemetry.io/collector v0.96.1-0.20240315172937-3b5aee0c7a16 // indirect - go.opentelemetry.io/collector/config/configretry v0.96.1-0.20240315172937-3b5aee0c7a16 // indirect + go.opentelemetry.io/collector/config/configretry v0.96.1-0.20240306115632-b2693620eff6 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.96.1-0.20240315172937-3b5aee0c7a16 // indirect - go.opentelemetry.io/collector/extension v0.96.1-0.20240315172937-3b5aee0c7a16 // indirect go.opentelemetry.io/collector/receiver v0.96.1-0.20240315172937-3b5aee0c7a16 // indirect go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect @@ -78,3 +80,7 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension => ../../extension/encoding/otlpencodingextension + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding diff --git a/exporter/fileexporter/go.sum b/exporter/fileexporter/go.sum index fa912963d0d66..0221e549ded47 100644 --- a/exporter/fileexporter/go.sum +++ b/exporter/fileexporter/go.sum @@ -5,8 +5,9 @@ github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -52,8 +53,9 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos= @@ -74,8 +76,8 @@ go.opentelemetry.io/collector v0.96.1-0.20240315172937-3b5aee0c7a16 h1:4pMthIh6E go.opentelemetry.io/collector v0.96.1-0.20240315172937-3b5aee0c7a16/go.mod h1:PFDUr160wBjUPqqVIvpJ0G9JXM8ux+qZkC+oZRB8gnA= go.opentelemetry.io/collector/component v0.96.1-0.20240315172937-3b5aee0c7a16 h1:Is9uHOav+UViEFSyTl/I7Vk2zymZTSw9c6iBVn4/fRI= go.opentelemetry.io/collector/component v0.96.1-0.20240315172937-3b5aee0c7a16/go.mod h1:0evn//YPgN/5VmbbD4JS0yH3ikWxwROQN1MKEOM/U3M= -go.opentelemetry.io/collector/config/configretry v0.96.1-0.20240315172937-3b5aee0c7a16 h1:W4/bDJVoVseNnZ415rR2nGjO90F645cKgYhM2m2jR/g= -go.opentelemetry.io/collector/config/configretry v0.96.1-0.20240315172937-3b5aee0c7a16/go.mod h1:s7A6ZGxK8bxqidFzwbr2pITzbsB2qf+aeHEDQDcanV8= +go.opentelemetry.io/collector/config/configretry v0.96.1-0.20240306115632-b2693620eff6 h1:BzuuN5Oo7knT4areFJxslVWfSpXAgtovd2KzxcVIjUQ= +go.opentelemetry.io/collector/config/configretry v0.96.1-0.20240306115632-b2693620eff6/go.mod h1:s7A6ZGxK8bxqidFzwbr2pITzbsB2qf+aeHEDQDcanV8= go.opentelemetry.io/collector/config/configtelemetry v0.96.1-0.20240315172937-3b5aee0c7a16 h1:4Vi88ksIeP0NseJgnqFPvGOBwCXh4Ary6+NbF1Gi3OM= go.opentelemetry.io/collector/config/configtelemetry v0.96.1-0.20240315172937-3b5aee0c7a16/go.mod h1:YV5PaOdtnU1xRomPcYqoHmyCr48tnaAREeGO96EZw8o= go.opentelemetry.io/collector/confmap v0.96.1-0.20240315172937-3b5aee0c7a16 h1:as8mEhxxXrdtz4cNZyCJFtfORWeEVVDnFjhE9XNEwAA= diff --git a/exporter/fileexporter/grouping_file_exporter.go b/exporter/fileexporter/grouping_file_exporter.go index 10a7385b05fb5..329c618785777 100644 --- a/exporter/fileexporter/grouping_file_exporter.go +++ b/exporter/fileexporter/grouping_file_exporter.go @@ -244,8 +244,12 @@ func group[T any](e *groupingFileExporter, groups map[string][]T, resource pcomm } // Start initializes and starts the exporter. -func (e *groupingFileExporter) Start(context.Context, component.Host) error { - e.marshaller = newMarshaller(e.conf) +func (e *groupingFileExporter) Start(_ context.Context, host component.Host) error { + var err error + e.marshaller, err = newMarshaller(e.conf, host) + if err != nil { + return err + } export := buildExportFunc(e.conf) pathParts := strings.Split(e.conf.Path, "*") diff --git a/exporter/fileexporter/marshaller.go b/exporter/fileexporter/marshaller.go index d45302d86aea1..9090640d8be7f 100644 --- a/exporter/fileexporter/marshaller.go +++ b/exporter/fileexporter/marshaller.go @@ -4,6 +4,10 @@ package fileexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter" import ( + "errors" + "fmt" + + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -34,7 +38,24 @@ type marshaller struct { formatType string } -func newMarshaller(conf *Config) *marshaller { +func newMarshaller(conf *Config, host component.Host) (*marshaller, error) { + if conf.Encoding != nil { + encoding := host.GetExtensions()[*conf.Encoding] + if encoding == nil { + return nil, fmt.Errorf("unknown encoding %q", conf.Encoding) + } + // cast with ok to avoid panics. + tm, _ := encoding.(ptrace.Marshaler) + pm, _ := encoding.(pmetric.Marshaler) + lm, _ := encoding.(plog.Marshaler) + return &marshaller{ + tracesMarshaler: tm, + metricsMarshaler: pm, + logsMarshaler: lm, + compression: conf.Compression, + compressor: buildCompressor(conf.Compression), + }, nil + } return &marshaller{ formatType: conf.FormatType, tracesMarshaler: tracesMarshalers[conf.FormatType], @@ -42,10 +63,13 @@ func newMarshaller(conf *Config) *marshaller { logsMarshaler: logsMarshalers[conf.FormatType], compression: conf.Compression, compressor: buildCompressor(conf.Compression), - } + }, nil } func (m *marshaller) marshalTraces(td ptrace.Traces) ([]byte, error) { + if m.metricsMarshaler == nil { + return nil, errors.New("traces are not supported by encoding") + } buf, err := m.tracesMarshaler.MarshalTraces(td) if err != nil { return nil, err @@ -55,6 +79,9 @@ func (m *marshaller) marshalTraces(td ptrace.Traces) ([]byte, error) { } func (m *marshaller) marshalMetrics(md pmetric.Metrics) ([]byte, error) { + if m.metricsMarshaler == nil { + return nil, errors.New("metrics are not supported by encoding") + } buf, err := m.metricsMarshaler.MarshalMetrics(md) if err != nil { return nil, err @@ -64,6 +91,9 @@ func (m *marshaller) marshalMetrics(md pmetric.Metrics) ([]byte, error) { } func (m *marshaller) marshalLogs(ld plog.Logs) ([]byte, error) { + if m.metricsMarshaler == nil { + return nil, errors.New("logs are not supported by encoding") + } buf, err := m.logsMarshaler.MarshalLogs(ld) if err != nil { return nil, err diff --git a/go.mod b/go.mod index ac3e3bd6156ec..35410be25477c 100644 --- a/go.mod +++ b/go.mod @@ -1152,3 +1152,7 @@ replace ( ) replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery => ./internal/sqlquery + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ./extension/encoding + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension => ./extension/encoding/otlpencodingextension