Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/file] add encoding extension support #31774

Merged
merged 3 commits into from Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/encoding_fileexporter.yaml
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: fileexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adopt the encoding extension with the file exporter.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31774]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
4 changes: 4 additions & 0 deletions cmd/configschema/go.mod
Expand Up @@ -1151,3 +1151,7 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/stor
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/dbstorage => ../../extension/storage/dbstorage

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage => ../../extension/storage

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension => ../../extension/encoding/otlpencodingextension

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding
1 change: 1 addition & 0 deletions exporter/fileexporter/README.md
Expand Up @@ -47,6 +47,7 @@ The following settings are optional:
- localtime : [default: false (use UTC)] whether or not the timestamps in backup files is formatted according to the host's local time.

- `format`[default: json]: define the data format of encoded telemetry data. The setting can be overridden with `proto`.
- `encoding`[default: none]: if specified, uses an encoding extension to encode telemetry data. Overrides `format`.
- `append`[default: `false`] defines whether append to the file (`true`) or truncate (`false`). If `append: true` is set then setting `rotation` or `compression` is currently not supported.
- `compression`[no default]: the compression algorithm used when exporting telemetry data to file. Supported compression algorithms:`zstd`
- `flush_interval`[default: 1s]: `time.Duration` interval between flushes. See [time.ParseDuration](https://pkg.go.dev/time#ParseDuration) for valid formats.
Expand Down
4 changes: 4 additions & 0 deletions exporter/fileexporter/config.go
Expand Up @@ -40,6 +40,10 @@ type Config struct {
// - proto: OTLP binary protobuf bytes.
FormatType string `mapstructure:"format"`

// Encoding defines the encoding of the telemetry data.
// If specified, it overrides `FormatType` and applies an encoding extension.
Encoding *component.ID `mapstructure:"encoding"`

// Compression Codec used to export telemetry data
// Supported compression algorithms:`zstd`
Compression string `mapstructure:"compression"`
Expand Down
120 changes: 120 additions & 0 deletions exporter/fileexporter/encoding_test.go
@@ -0,0 +1,120 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package fileexporter

import (
"context"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/extension/extensiontest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension"
)

type hostWithEncoding struct {
encodings map[component.ID]component.Component
}

func (h hostWithEncoding) GetFactory(_ component.Kind, _ component.Type) component.Factory {
panic("unsupported")
}

func (h hostWithEncoding) GetExtensions() map[component.ID]component.Component {
return h.encodings
}

func (h hostWithEncoding) GetExporters() map[component.DataType]map[component.ID]component.Component {
panic("unsupported")
}

func TestEncoding(t *testing.T) {
f := NewFactory()
cfg := f.CreateDefaultConfig().(*Config)
cfg.Path = filepath.Join(t.TempDir(), "encoding.txt")
id := component.MustNewID("otlpjson")
cfg.Encoding = &id

ef := otlpencodingextension.NewFactory()
efCfg := ef.CreateDefaultConfig().(*otlpencodingextension.Config)
efCfg.Protocol = "otlp_json"
ext, err := ef.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), efCfg)
require.NoError(t, err)
require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost()))

me, err := f.CreateMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
require.NoError(t, err)
te, err := f.CreateTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
require.NoError(t, err)
le, err := f.CreateLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
require.NoError(t, err)
host := hostWithEncoding{
map[component.ID]component.Component{id: ext},
}
require.NoError(t, me.Start(context.Background(), host))
require.NoError(t, te.Start(context.Background(), host))
require.NoError(t, le.Start(context.Background(), host))
t.Cleanup(func() {

})

require.NoError(t, me.ConsumeMetrics(context.Background(), generateMetrics()))
require.NoError(t, te.ConsumeTraces(context.Background(), generateTraces()))
require.NoError(t, le.ConsumeLogs(context.Background(), generateLogs()))

require.NoError(t, me.Shutdown(context.Background()))
require.NoError(t, te.Shutdown(context.Background()))
require.NoError(t, le.Shutdown(context.Background()))

b, err := os.ReadFile(cfg.Path)
require.NoError(t, err)
require.Contains(t, string(b), `{"resourceMetrics":`)
require.Contains(t, string(b), `{"resourceSpans":`)
require.Contains(t, string(b), `{"resourceLogs":`)
}

func generateLogs() plog.Logs {
logs := plog.NewLogs()
rl := logs.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("resource", "R1")
l := rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
l.Body().SetStr("test log message")
l.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
return logs
}

func generateMetrics() pmetric.Metrics {
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
rm.Resource().Attributes().PutStr("resource", "R1")
m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
m.SetName("test_metric")
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
dp.Attributes().PutStr("test_attr", "value_1")
dp.SetIntValue(123)
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
return metrics
}

func generateTraces() ptrace.Traces {
traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
rs.Resource().Attributes().PutStr("resource", "R1")
span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty()
span.Attributes().PutStr("test_attr", "value_1")
span.SetName("test_span")
span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(-1 * time.Second)))
span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now()))
return traces
}
9 changes: 6 additions & 3 deletions exporter/fileexporter/file_exporter.go
Expand Up @@ -44,11 +44,14 @@ func (e *fileExporter) consumeLogs(_ context.Context, ld plog.Logs) error {
}

// Start starts the flush timer if set.
func (e *fileExporter) Start(_ context.Context, _ component.Host) error {
e.marshaller = newMarshaller(e.conf)
func (e *fileExporter) Start(_ context.Context, host component.Host) error {
var err error
e.marshaller, err = newMarshaller(e.conf, host)
if err != nil {
return err
}
export := buildExportFunc(e.conf)

var err error
e.writer, err = newFileWriter(e.conf.Path, e.conf.Append, e.conf.Rotation, e.conf.FlushInterval, export)
if err != nil {
return err
Expand Down
14 changes: 10 additions & 4 deletions exporter/fileexporter/go.mod
Expand Up @@ -5,13 +5,15 @@ go 1.21
require (
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/klauspost/compress v1.17.7
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension v0.96.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.96.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.96.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.96.1-0.20240315172937-3b5aee0c7a16
go.opentelemetry.io/collector/confmap v0.96.1-0.20240315172937-3b5aee0c7a16
go.opentelemetry.io/collector/consumer v0.96.1-0.20240315172937-3b5aee0c7a16
go.opentelemetry.io/collector/exporter v0.96.1-0.20240315172937-3b5aee0c7a16
go.opentelemetry.io/collector/extension v0.96.1-0.20240315172937-3b5aee0c7a16
go.opentelemetry.io/collector/pdata v1.3.1-0.20240315172937-3b5aee0c7a16
go.opentelemetry.io/otel/metric v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
Expand All @@ -24,7 +26,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect
Expand All @@ -39,15 +41,15 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.96.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.opentelemetry.io/collector v0.96.1-0.20240315172937-3b5aee0c7a16 // indirect
go.opentelemetry.io/collector/config/configretry v0.96.1-0.20240315172937-3b5aee0c7a16 // indirect
go.opentelemetry.io/collector/config/configretry v0.96.1-0.20240306115632-b2693620eff6 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.96.1-0.20240315172937-3b5aee0c7a16 // indirect
go.opentelemetry.io/collector/extension v0.96.1-0.20240315172937-3b5aee0c7a16 // indirect
go.opentelemetry.io/collector/receiver v0.96.1-0.20240315172937-3b5aee0c7a16 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect
Expand Down Expand Up @@ -78,3 +80,7 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension => ../../extension/encoding/otlpencodingextension

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding
10 changes: 6 additions & 4 deletions exporter/fileexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions exporter/fileexporter/grouping_file_exporter.go
Expand Up @@ -244,8 +244,12 @@ func group[T any](e *groupingFileExporter, groups map[string][]T, resource pcomm
}

// Start initializes and starts the exporter.
func (e *groupingFileExporter) Start(context.Context, component.Host) error {
e.marshaller = newMarshaller(e.conf)
func (e *groupingFileExporter) Start(_ context.Context, host component.Host) error {
var err error
e.marshaller, err = newMarshaller(e.conf, host)
if err != nil {
return err
}
export := buildExportFunc(e.conf)

pathParts := strings.Split(e.conf.Path, "*")
Expand Down
34 changes: 32 additions & 2 deletions exporter/fileexporter/marshaller.go
Expand Up @@ -4,6 +4,10 @@
package fileexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter"

import (
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -34,18 +38,38 @@ type marshaller struct {
formatType string
}

func newMarshaller(conf *Config) *marshaller {
func newMarshaller(conf *Config, host component.Host) (*marshaller, error) {
if conf.Encoding != nil {
encoding := host.GetExtensions()[*conf.Encoding]
if encoding == nil {
return nil, fmt.Errorf("unknown encoding %q", conf.Encoding)
}
// cast with ok to avoid panics.
tm, _ := encoding.(ptrace.Marshaler)
pm, _ := encoding.(pmetric.Marshaler)
lm, _ := encoding.(plog.Marshaler)
return &marshaller{
tracesMarshaler: tm,
metricsMarshaler: pm,
logsMarshaler: lm,
compression: conf.Compression,
compressor: buildCompressor(conf.Compression),
}, nil
}
return &marshaller{
formatType: conf.FormatType,
tracesMarshaler: tracesMarshalers[conf.FormatType],
metricsMarshaler: metricsMarshalers[conf.FormatType],
logsMarshaler: logsMarshalers[conf.FormatType],
compression: conf.Compression,
compressor: buildCompressor(conf.Compression),
}
}, nil
}

func (m *marshaller) marshalTraces(td ptrace.Traces) ([]byte, error) {
if m.metricsMarshaler == nil {
return nil, errors.New("traces are not supported by encoding")
}
buf, err := m.tracesMarshaler.MarshalTraces(td)
if err != nil {
return nil, err
Expand All @@ -55,6 +79,9 @@ func (m *marshaller) marshalTraces(td ptrace.Traces) ([]byte, error) {
}

func (m *marshaller) marshalMetrics(md pmetric.Metrics) ([]byte, error) {
if m.metricsMarshaler == nil {
return nil, errors.New("metrics are not supported by encoding")
}
buf, err := m.metricsMarshaler.MarshalMetrics(md)
if err != nil {
return nil, err
Expand All @@ -64,6 +91,9 @@ func (m *marshaller) marshalMetrics(md pmetric.Metrics) ([]byte, error) {
}

func (m *marshaller) marshalLogs(ld plog.Logs) ([]byte, error) {
if m.metricsMarshaler == nil {
return nil, errors.New("logs are not supported by encoding")
}
buf, err := m.logsMarshaler.MarshalLogs(ld)
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Expand Up @@ -1152,3 +1152,7 @@ replace (
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery => ./internal/sqlquery

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ./extension/encoding

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension => ./extension/encoding/otlpencodingextension