Skip to content

Commit

Permalink
[receiver/syslog] Fix issue where static resource and attributes were…
Browse files Browse the repository at this point in the history
… ignored (#32010)

Fixes #31849
  • Loading branch information
djaglowski committed Mar 27, 2024
1 parent 91f8f22 commit 4f9eab6
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 8 deletions.
27 changes: 27 additions & 0 deletions .chloggen/syslog-metadata-fix.yaml
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: syslogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix issue where static resource and attributes were ignored

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31849]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
5 changes: 4 additions & 1 deletion pkg/stanza/operator/input/syslog/syslog.go
Expand Up @@ -62,6 +62,8 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {

if c.TCP != nil {
tcpInputCfg := tcp.NewConfigWithID(inputBase.ID() + "_internal_tcp")
tcpInputCfg.InputConfig.AttributerConfig = c.InputConfig.AttributerConfig
tcpInputCfg.InputConfig.IdentifierConfig = c.InputConfig.IdentifierConfig
tcpInputCfg.BaseConfig = *c.TCP
if syslogParserCfg.EnableOctetCounting {
tcpInputCfg.SplitFuncBuilder = OctetSplitFuncBuilder
Expand All @@ -85,8 +87,9 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
}

if c.UDP != nil {

udpInputCfg := udp.NewConfigWithID(inputBase.ID() + "_internal_udp")
udpInputCfg.InputConfig.AttributerConfig = c.InputConfig.AttributerConfig
udpInputCfg.InputConfig.IdentifierConfig = c.InputConfig.IdentifierConfig
udpInputCfg.BaseConfig = *c.UDP

// Octet counting and Non-Transparent-Framing are invalid for UDP connections
Expand Down
83 changes: 76 additions & 7 deletions pkg/stanza/operator/input/syslog/syslog_test.go
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/syslog"
Expand All @@ -22,6 +23,7 @@ import (
)

var (
ts = time.Now()
basicConfig = func() *syslog.Config {
cfg := syslog.NewConfigWithID("test_syslog_parser")
return cfg
Expand Down Expand Up @@ -63,10 +65,39 @@ var (
},
ValidForTCP: true,
}
WithMetadata = syslog.Case{
Name: "RFC3164",
Config: func() *syslog.Config {
cfg := basicConfig()
cfg.Protocol = syslog.RFC3164
return cfg
}(),
Input: &entry.Entry{
Body: fmt.Sprintf("<34>%s 1.2.3.4 apache_server: test message", ts.Format("Jan _2 15:04:05")),
},
Expect: &entry.Entry{
Timestamp: time.Date(ts.Year(), ts.Month(), ts.Day(), ts.Hour(), ts.Minute(), ts.Second(), 0, time.UTC),
Severity: entry.Error2,
SeverityText: "crit",
Resource: map[string]any{
"service.name": "apache_server",
},
Attributes: map[string]any{
"foo": "bar",
"appname": "apache_server",
"facility": 4,
"hostname": "1.2.3.4",
"message": "test message",
"priority": 34,
},
Body: fmt.Sprintf("<34>%s 1.2.3.4 apache_server: test message", ts.Format("Jan _2 15:04:05")),
},
ValidForTCP: true,
ValidForUDP: true,
}
)

func TestInput(t *testing.T) {

cases, err := syslog.CreateCases(basicConfig)
require.NoError(t, err)
cases = append(cases, OctetCase)
Expand All @@ -75,19 +106,37 @@ func TestInput(t *testing.T) {
cfg := tc.Config.BaseConfig
if tc.ValidForTCP {
t.Run(fmt.Sprintf("TCP-%s", tc.Name), func(t *testing.T) {
InputTest(t, NewConfigWithTCP(&cfg), tc)
InputTest(t, tc, NewConfigWithTCP(&cfg), nil, nil)
})
}

if tc.ValidForUDP {
t.Run(fmt.Sprintf("UDP-%s", tc.Name), func(t *testing.T) {
InputTest(t, NewConfigWithUDP(&cfg), tc)
InputTest(t, tc, NewConfigWithUDP(&cfg), nil, nil)
})
}
}

withMetadataCfg := WithMetadata.Config.BaseConfig
t.Run("TCPWithMetadata", func(t *testing.T) {
cfg := NewConfigWithTCP(&withMetadataCfg)
cfg.IdentifierConfig = helper.NewIdentifierConfig()
cfg.IdentifierConfig.Resource["service.name"] = helper.ExprStringConfig("apache_server")
cfg.AttributerConfig = helper.NewAttributerConfig()
cfg.AttributerConfig.Attributes["foo"] = helper.ExprStringConfig("bar")
InputTest(t, WithMetadata, cfg, map[string]any{"service.name": "apache_server"}, map[string]any{"foo": "bar"})
})

t.Run("UDPWithMetadata", func(t *testing.T) {
cfg := NewConfigWithUDP(&withMetadataCfg)
cfg.IdentifierConfig = helper.NewIdentifierConfig()
cfg.IdentifierConfig.Resource["service.name"] = helper.ExprStringConfig("apache_server")
cfg.AttributerConfig = helper.NewAttributerConfig()
cfg.AttributerConfig.Attributes["foo"] = helper.ExprStringConfig("bar")
InputTest(t, WithMetadata, cfg, map[string]any{"service.name": "apache_server"}, map[string]any{"foo": "bar"})
})
}

func InputTest(t *testing.T, cfg *Config, tc syslog.Case) {
func InputTest(t *testing.T, tc syslog.Case, cfg *Config, rsrc map[string]any, attr map[string]any) {
op, err := cfg.Build(testutil.Logger(t))
require.NoError(t, err)

Expand Down Expand Up @@ -126,8 +175,28 @@ func InputTest(t *testing.T, cfg *Config, tc syslog.Case) {
// close pipeline to avoid data race
ots := time.Now()
e.ObservedTimestamp = ots
tc.Expect.ObservedTimestamp = ots
require.Equal(t, tc.Expect, e)

expect := tc.Expect
expect.ObservedTimestamp = ots
if rsrc != nil {
if expect.Resource == nil {
expect.Resource = rsrc
} else {
for k, v := range rsrc {
expect.Resource[k] = v
}
}
}
if attr != nil {
if expect.Attributes == nil {
expect.Attributes = attr
} else {
for k, v := range attr {
expect.Attributes[k] = v
}
}
}
require.Equal(t, expect, e)
case <-time.After(time.Second):
require.FailNow(t, "Timed out waiting for entry to be processed")
}
Expand Down

0 comments on commit 4f9eab6

Please sign in to comment.