From 7deeff7f6b91010bb829a6f8c808b6b09cf0aa8d Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Fri, 15 Mar 2024 00:01:47 -0700 Subject: [PATCH] [exporter/file] add encoding extension support --- .chloggen/encoding_fileexporter.yaml | 27 ++++ exporter/fileexporter/README.md | 1 + exporter/fileexporter/config.go | 4 + exporter/fileexporter/encoding_test.go | 120 ++++++++++++++++++ exporter/fileexporter/file_exporter.go | 9 +- exporter/fileexporter/go.mod | 8 +- exporter/fileexporter/go.sum | 10 +- .../fileexporter/grouping_file_exporter.go | 8 +- exporter/fileexporter/marshaller.go | 34 ++++- 9 files changed, 210 insertions(+), 11 deletions(-) create mode 100644 .chloggen/encoding_fileexporter.yaml create mode 100644 exporter/fileexporter/encoding_test.go diff --git a/.chloggen/encoding_fileexporter.yaml b/.chloggen/encoding_fileexporter.yaml new file mode 100644 index 0000000000000..53229897abcf5 --- /dev/null +++ b/.chloggen/encoding_fileexporter.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: fileexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adopt the encoding extension with the file exporter. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31774] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/fileexporter/README.md b/exporter/fileexporter/README.md index ce011b725c67e..986343386155c 100644 --- a/exporter/fileexporter/README.md +++ b/exporter/fileexporter/README.md @@ -47,6 +47,7 @@ The following settings are optional: - localtime : [default: false (use UTC)] whether or not the timestamps in backup files is formatted according to the host's local time. - `format`[default: json]: define the data format of encoded telemetry data. The setting can be overridden with `proto`. +- `encoding`[default: none]: if specified, uses an encoding extension to encode telemetry data. Overrides `format`. - `append`[default: `false`] defines whether append to the file (`true`) or truncate (`false`). If `append: true` is set then setting `rotation` or `compression` is currently not supported. - `compression`[no default]: the compression algorithm used when exporting telemetry data to file. Supported compression algorithms:`zstd` - `flush_interval`[default: 1s]: `time.Duration` interval between flushes. See [time.ParseDuration](https://pkg.go.dev/time#ParseDuration) for valid formats. diff --git a/exporter/fileexporter/config.go b/exporter/fileexporter/config.go index 8508b6b01d9e2..bdd05dd278b54 100644 --- a/exporter/fileexporter/config.go +++ b/exporter/fileexporter/config.go @@ -40,6 +40,10 @@ type Config struct { // - proto: OTLP binary protobuf bytes. FormatType string `mapstructure:"format"` + // Encoding defines the encoding of the telemetry data. + // If specified, it overrides `FormatType` and applies an encoding extension. + Encoding *component.ID `mapstructure:"encoding"` + // Compression Codec used to export telemetry data // Supported compression algorithms:`zstd` Compression string `mapstructure:"compression"` diff --git a/exporter/fileexporter/encoding_test.go b/exporter/fileexporter/encoding_test.go new file mode 100644 index 0000000000000..fbcf354158791 --- /dev/null +++ b/exporter/fileexporter/encoding_test.go @@ -0,0 +1,120 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package fileexporter + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/extension/extensiontest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension" +) + +type hostWithEncoding struct { + encodings map[component.ID]component.Component +} + +func (h hostWithEncoding) GetFactory(_ component.Kind, _ component.Type) component.Factory { + panic("unsupported") +} + +func (h hostWithEncoding) GetExtensions() map[component.ID]component.Component { + return h.encodings +} + +func (h hostWithEncoding) GetExporters() map[component.DataType]map[component.ID]component.Component { + panic("unsupported") +} + +func TestEncoding(t *testing.T) { + f := NewFactory() + cfg := f.CreateDefaultConfig().(*Config) + cfg.Path = filepath.Join(t.TempDir(), "encoding.txt") + id := component.MustNewID("otlpjson") + cfg.Encoding = &id + + ef := otlpencodingextension.NewFactory() + efCfg := ef.CreateDefaultConfig().(*otlpencodingextension.Config) + efCfg.Protocol = "otlp_json" + ext, err := ef.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), efCfg) + require.NoError(t, err) + require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost())) + + me, err := f.CreateMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + te, err := f.CreateTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + le, err := f.CreateLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + host := hostWithEncoding{ + map[component.ID]component.Component{id: ext}, + } + require.NoError(t, me.Start(context.Background(), host)) + require.NoError(t, te.Start(context.Background(), host)) + require.NoError(t, le.Start(context.Background(), host)) + t.Cleanup(func() { + + }) + + require.NoError(t, me.ConsumeMetrics(context.Background(), generateMetrics())) + require.NoError(t, te.ConsumeTraces(context.Background(), generateTraces())) + require.NoError(t, le.ConsumeLogs(context.Background(), generateLogs())) + + require.NoError(t, me.Shutdown(context.Background())) + require.NoError(t, te.Shutdown(context.Background())) + require.NoError(t, le.Shutdown(context.Background())) + + b, err := os.ReadFile(cfg.Path) + require.NoError(t, err) + require.Contains(t, string(b), `{"resourceMetrics":`) + require.Contains(t, string(b), `{"resourceSpans":`) + require.Contains(t, string(b), `{"resourceLogs":`) +} + +func generateLogs() plog.Logs { + logs := plog.NewLogs() + rl := logs.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("resource", "R1") + l := rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + l.Body().SetStr("test log message") + l.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + return logs +} + +func generateMetrics() pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutStr("resource", "R1") + m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + m.SetName("test_metric") + dp := m.SetEmptyGauge().DataPoints().AppendEmpty() + dp.Attributes().PutStr("test_attr", "value_1") + dp.SetIntValue(123) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + return metrics +} + +func generateTraces() ptrace.Traces { + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + rs.Resource().Attributes().PutStr("resource", "R1") + span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.Attributes().PutStr("test_attr", "value_1") + span.SetName("test_span") + span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(-1 * time.Second))) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now())) + return traces +} diff --git a/exporter/fileexporter/file_exporter.go b/exporter/fileexporter/file_exporter.go index e3b6a2f6b095b..c7a090f28c6ce 100644 --- a/exporter/fileexporter/file_exporter.go +++ b/exporter/fileexporter/file_exporter.go @@ -44,11 +44,14 @@ func (e *fileExporter) consumeLogs(_ context.Context, ld plog.Logs) error { } // Start starts the flush timer if set. -func (e *fileExporter) Start(_ context.Context, _ component.Host) error { - e.marshaller = newMarshaller(e.conf) +func (e *fileExporter) Start(_ context.Context, host component.Host) error { + var err error + e.marshaller, err = newMarshaller(e.conf, host) + if err != nil { + return err + } export := buildExportFunc(e.conf) - var err error e.writer, err = newFileWriter(e.conf.Path, e.conf.Append, e.conf.Rotation, e.conf.FlushInterval, export) if err != nil { return err diff --git a/exporter/fileexporter/go.mod b/exporter/fileexporter/go.mod index 04ff79bdeb903..be2265c61e3e7 100644 --- a/exporter/fileexporter/go.mod +++ b/exporter/fileexporter/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/klauspost/compress v1.17.7 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension v0.96.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.96.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.96.0 github.com/stretchr/testify v1.9.0 @@ -24,7 +25,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect @@ -38,7 +39,8 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.96.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.19.0 // indirect github.com/prometheus/client_model v0.6.0 // indirect github.com/prometheus/common v0.48.0 // indirect @@ -77,3 +79,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension => ../../extension/encoding/otlpencodingextension \ No newline at end of file diff --git a/exporter/fileexporter/go.sum b/exporter/fileexporter/go.sum index dad2fce4cc431..cd68b6de244f7 100644 --- a/exporter/fileexporter/go.sum +++ b/exporter/fileexporter/go.sum @@ -5,8 +5,9 @@ github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -50,8 +51,13 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.96.0 h1:6UrPsjMRW8EjRkQcBZvocJDu4do3NYIRM4qRn/sqOCI= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.96.0/go.mod h1:x8U2mHdcPjEbFOas82bVFepHL2NjHjWLhCU0WC2kDBo= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension v0.96.0 h1:Y4cOPsElxWLB3HEVu0zIhJyIRm2xr1kYCfmLbo3LCco= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension v0.96.0/go.mod h1:FR5D5ZhMGiGQgf9ZDteHzpxz0S7DNaYXvJaeyhdemn8= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos= diff --git a/exporter/fileexporter/grouping_file_exporter.go b/exporter/fileexporter/grouping_file_exporter.go index 10a7385b05fb5..329c618785777 100644 --- a/exporter/fileexporter/grouping_file_exporter.go +++ b/exporter/fileexporter/grouping_file_exporter.go @@ -244,8 +244,12 @@ func group[T any](e *groupingFileExporter, groups map[string][]T, resource pcomm } // Start initializes and starts the exporter. -func (e *groupingFileExporter) Start(context.Context, component.Host) error { - e.marshaller = newMarshaller(e.conf) +func (e *groupingFileExporter) Start(_ context.Context, host component.Host) error { + var err error + e.marshaller, err = newMarshaller(e.conf, host) + if err != nil { + return err + } export := buildExportFunc(e.conf) pathParts := strings.Split(e.conf.Path, "*") diff --git a/exporter/fileexporter/marshaller.go b/exporter/fileexporter/marshaller.go index d45302d86aea1..9090640d8be7f 100644 --- a/exporter/fileexporter/marshaller.go +++ b/exporter/fileexporter/marshaller.go @@ -4,6 +4,10 @@ package fileexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter" import ( + "errors" + "fmt" + + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -34,7 +38,24 @@ type marshaller struct { formatType string } -func newMarshaller(conf *Config) *marshaller { +func newMarshaller(conf *Config, host component.Host) (*marshaller, error) { + if conf.Encoding != nil { + encoding := host.GetExtensions()[*conf.Encoding] + if encoding == nil { + return nil, fmt.Errorf("unknown encoding %q", conf.Encoding) + } + // cast with ok to avoid panics. + tm, _ := encoding.(ptrace.Marshaler) + pm, _ := encoding.(pmetric.Marshaler) + lm, _ := encoding.(plog.Marshaler) + return &marshaller{ + tracesMarshaler: tm, + metricsMarshaler: pm, + logsMarshaler: lm, + compression: conf.Compression, + compressor: buildCompressor(conf.Compression), + }, nil + } return &marshaller{ formatType: conf.FormatType, tracesMarshaler: tracesMarshalers[conf.FormatType], @@ -42,10 +63,13 @@ func newMarshaller(conf *Config) *marshaller { logsMarshaler: logsMarshalers[conf.FormatType], compression: conf.Compression, compressor: buildCompressor(conf.Compression), - } + }, nil } func (m *marshaller) marshalTraces(td ptrace.Traces) ([]byte, error) { + if m.metricsMarshaler == nil { + return nil, errors.New("traces are not supported by encoding") + } buf, err := m.tracesMarshaler.MarshalTraces(td) if err != nil { return nil, err @@ -55,6 +79,9 @@ func (m *marshaller) marshalTraces(td ptrace.Traces) ([]byte, error) { } func (m *marshaller) marshalMetrics(md pmetric.Metrics) ([]byte, error) { + if m.metricsMarshaler == nil { + return nil, errors.New("metrics are not supported by encoding") + } buf, err := m.metricsMarshaler.MarshalMetrics(md) if err != nil { return nil, err @@ -64,6 +91,9 @@ func (m *marshaller) marshalMetrics(md pmetric.Metrics) ([]byte, error) { } func (m *marshaller) marshalLogs(ld plog.Logs) ([]byte, error) { + if m.metricsMarshaler == nil { + return nil, errors.New("logs are not supported by encoding") + } buf, err := m.logsMarshaler.MarshalLogs(ld) if err != nil { return nil, err