Skip to content

Commit

Permalink
[exporter/file] add encoding extension support
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Mar 15, 2024
1 parent 0d9b1b0 commit 859349b
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 11 deletions.
27 changes: 27 additions & 0 deletions .chloggen/encoding_fileexporter.yaml
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: fileexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adopt the encoding extension with the file exporter.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31774]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions exporter/fileexporter/README.md
Expand Up @@ -47,6 +47,7 @@ The following settings are optional:
- localtime : [default: false (use UTC)] whether or not the timestamps in backup files is formatted according to the host's local time.

- `format`[default: json]: define the data format of encoded telemetry data. The setting can be overridden with `proto`.
- `encoding`[default: none]: if specified, uses an encoding extension to encode telemetry data. Overrides `format`.
- `append`[default: `false`] defines whether append to the file (`true`) or truncate (`false`). If `append: true` is set then setting `rotation` or `compression` is currently not supported.
- `compression`[no default]: the compression algorithm used when exporting telemetry data to file. Supported compression algorithms:`zstd`
- `flush_interval`[default: 1s]: `time.Duration` interval between flushes. See [time.ParseDuration](https://pkg.go.dev/time#ParseDuration) for valid formats.
Expand Down
4 changes: 4 additions & 0 deletions exporter/fileexporter/config.go
Expand Up @@ -40,6 +40,10 @@ type Config struct {
// - proto: OTLP binary protobuf bytes.
FormatType string `mapstructure:"format"`

// Encoding defines the encoding of the telemetry data.
// If specified, it overrides `FormatType` and applies an encoding extension.
Encoding *component.ID `mapstructure:"encoding"`

// Compression Codec used to export telemetry data
// Supported compression algorithms:`zstd`
Compression string `mapstructure:"compression"`
Expand Down
120 changes: 120 additions & 0 deletions exporter/fileexporter/encoding_test.go
@@ -0,0 +1,120 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package fileexporter

import (
"context"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/extension/extensiontest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension"
)

type hostWithEncoding struct {
encodings map[component.ID]component.Component
}

func (h hostWithEncoding) GetFactory(kind component.Kind, componentType component.Type) component.Factory {
panic("unsupported")
}

func (h hostWithEncoding) GetExtensions() map[component.ID]component.Component {
return h.encodings
}

func (h hostWithEncoding) GetExporters() map[component.DataType]map[component.ID]component.Component {
panic("unsupported")
}

func TestEncoding(t *testing.T) {
f := NewFactory()
cfg := f.CreateDefaultConfig().(*Config)
cfg.Path = filepath.Join(t.TempDir(), "encoding.txt")
id := component.MustNewID("otlpjson")
cfg.Encoding = &id

ef := otlpencodingextension.NewFactory()
efCfg := ef.CreateDefaultConfig().(*otlpencodingextension.Config)
efCfg.Protocol = "otlp_json"
ext, err := ef.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), efCfg)
require.NoError(t, err)
require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost()))

me, err := f.CreateMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
require.NoError(t, err)
te, err := f.CreateTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
require.NoError(t, err)
le, err := f.CreateLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg)
require.NoError(t, err)
host := hostWithEncoding{
map[component.ID]component.Component{id: ext},
}
require.NoError(t, me.Start(context.Background(), host))
require.NoError(t, te.Start(context.Background(), host))
require.NoError(t, le.Start(context.Background(), host))
t.Cleanup(func() {

})

require.NoError(t, me.ConsumeMetrics(context.Background(), generateMetrics()))
require.NoError(t, te.ConsumeTraces(context.Background(), generateTraces()))
require.NoError(t, le.ConsumeLogs(context.Background(), generateLogs()))

require.NoError(t, me.Shutdown(context.Background()))
require.NoError(t, te.Shutdown(context.Background()))
require.NoError(t, le.Shutdown(context.Background()))

b, err := os.ReadFile(cfg.Path)
require.NoError(t, err)
require.Contains(t, string(b), `{"resourceMetrics":`)
require.Contains(t, string(b), `{"resourceSpans":`)
require.Contains(t, string(b), `{"resourceLogs":`)
}

func generateLogs() plog.Logs {
logs := plog.NewLogs()
rl := logs.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("resource", "R1")
l := rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
l.Body().SetStr("test log message")
l.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
return logs
}

func generateMetrics() pmetric.Metrics {
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
rm.Resource().Attributes().PutStr("resource", "R1")
m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
m.SetName("test_metric")
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
dp.Attributes().PutStr("test_attr", "value_1")
dp.SetIntValue(123)
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
return metrics
}

func generateTraces() ptrace.Traces {
traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
rs.Resource().Attributes().PutStr("resource", "R1")
span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty()
span.Attributes().PutStr("test_attr", "value_1")
span.SetName("test_span")
span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(-1 * time.Second)))
span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now()))
return traces
}
9 changes: 6 additions & 3 deletions exporter/fileexporter/file_exporter.go
Expand Up @@ -44,11 +44,14 @@ func (e *fileExporter) consumeLogs(_ context.Context, ld plog.Logs) error {
}

// Start starts the flush timer if set.
func (e *fileExporter) Start(_ context.Context, _ component.Host) error {
e.marshaller = newMarshaller(e.conf)
func (e *fileExporter) Start(_ context.Context, host component.Host) error {
var err error
e.marshaller, err = newMarshaller(e.conf, host)
if err != nil {
return err
}
export := buildExportFunc(e.conf)

var err error
e.writer, err = newFileWriter(e.conf.Path, e.conf.Append, e.conf.Rotation, e.conf.FlushInterval, export)
if err != nil {
return err
Expand Down
8 changes: 6 additions & 2 deletions exporter/fileexporter/go.mod
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/klauspost/compress v1.17.7
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension v0.96.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.96.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.96.0
github.com/stretchr/testify v1.9.0
Expand All @@ -24,7 +25,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect
Expand All @@ -38,7 +39,8 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.96.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
Expand Down Expand Up @@ -77,3 +79,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension => ../../extension/encoding/otlpencodingextension
10 changes: 8 additions & 2 deletions exporter/fileexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions exporter/fileexporter/grouping_file_exporter.go
Expand Up @@ -244,8 +244,12 @@ func group[T any](e *groupingFileExporter, groups map[string][]T, resource pcomm
}

// Start initializes and starts the exporter.
func (e *groupingFileExporter) Start(context.Context, component.Host) error {
e.marshaller = newMarshaller(e.conf)
func (e *groupingFileExporter) Start(_ context.Context, host component.Host) error {
var err error
e.marshaller, err = newMarshaller(e.conf, host)
if err != nil {
return err
}
export := buildExportFunc(e.conf)

pathParts := strings.Split(e.conf.Path, "*")
Expand Down
34 changes: 32 additions & 2 deletions exporter/fileexporter/marshaller.go
Expand Up @@ -4,6 +4,10 @@
package fileexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter"

import (
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -34,18 +38,38 @@ type marshaller struct {
formatType string
}

func newMarshaller(conf *Config) *marshaller {
func newMarshaller(conf *Config, host component.Host) (*marshaller, error) {
if conf.Encoding != nil {
encoding := host.GetExtensions()[*conf.Encoding]
if encoding == nil {
return nil, fmt.Errorf("unknown encoding %q", conf.Encoding)
}
// cast with ok to avoid panics.
tm, _ := encoding.(ptrace.Marshaler)
pm, _ := encoding.(pmetric.Marshaler)
lm, _ := encoding.(plog.Marshaler)
return &marshaller{
tracesMarshaler: tm,
metricsMarshaler: pm,
logsMarshaler: lm,
compression: conf.Compression,
compressor: buildCompressor(conf.Compression),
}, nil
}
return &marshaller{
formatType: conf.FormatType,
tracesMarshaler: tracesMarshalers[conf.FormatType],
metricsMarshaler: metricsMarshalers[conf.FormatType],
logsMarshaler: logsMarshalers[conf.FormatType],
compression: conf.Compression,
compressor: buildCompressor(conf.Compression),
}
}, nil
}

func (m *marshaller) marshalTraces(td ptrace.Traces) ([]byte, error) {
if m.metricsMarshaler == nil {
return nil, errors.New("traces are not supported by encoding")
}
buf, err := m.tracesMarshaler.MarshalTraces(td)
if err != nil {
return nil, err
Expand All @@ -55,6 +79,9 @@ func (m *marshaller) marshalTraces(td ptrace.Traces) ([]byte, error) {
}

func (m *marshaller) marshalMetrics(md pmetric.Metrics) ([]byte, error) {
if m.metricsMarshaler == nil {
return nil, errors.New("metrics are not supported by encoding")
}
buf, err := m.metricsMarshaler.MarshalMetrics(md)
if err != nil {
return nil, err
Expand All @@ -64,6 +91,9 @@ func (m *marshaller) marshalMetrics(md pmetric.Metrics) ([]byte, error) {
}

func (m *marshaller) marshalLogs(ld plog.Logs) ([]byte, error) {
if m.metricsMarshaler == nil {
return nil, errors.New("logs are not supported by encoding")
}
buf, err := m.logsMarshaler.MarshalLogs(ld)
if err != nil {
return nil, err
Expand Down

0 comments on commit 859349b

Please sign in to comment.