Skip to content

Commit

Permalink
Remove checks of ErrNilNextConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Mar 17, 2024
1 parent 794cd24 commit 9a3ec60
Show file tree
Hide file tree
Showing 21 changed files with 27 additions and 92 deletions.
27 changes: 27 additions & 0 deletions .chloggen/remove_ErrNilNextConsumer_impl.yaml
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: all

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Remove explicit checks in all receivers to check if the next consumer is nil

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31793]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: The nil check is now done by the pipeline builder.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
4 changes: 0 additions & 4 deletions processor/tailsamplingprocessor/processor.go
Expand Up @@ -77,10 +77,6 @@ const (
// newTracesProcessor returns a processor.TracesProcessor that will perform tail sampling according to the given
// configuration.
func newTracesProcessor(ctx context.Context, settings component.TelemetrySettings, nextConsumer consumer.Traces, cfg Config) (processor.Traces, error) {
if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}

policyNames := map[string]bool{}
policies := make([]*policy, len(cfg.PolicyCfgs))
for i := range cfg.PolicyCfgs {
Expand Down
3 changes: 0 additions & 3 deletions receiver/awscontainerinsightreceiver/receiver.go
Expand Up @@ -44,9 +44,6 @@ func newAWSContainerInsightReceiver(
settings component.TelemetrySettings,
config *Config,
nextConsumer consumer.Metrics) (receiver.Metrics, error) {
if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}

r := &awsContainerInsightReceiver{
settings: settings,
Expand Down
4 changes: 0 additions & 4 deletions receiver/awsecscontainermetricsreceiver/receiver.go
Expand Up @@ -34,10 +34,6 @@ func newAWSECSContainermetrics(
config *Config,
nextConsumer consumer.Metrics,
rest ecsutil.RestClient) (receiver.Metrics, error) {
if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}

r := &awsEcsContainerMetricsReceiver{
logger: logger,
nextConsumer: nextConsumer,
Expand Down
4 changes: 0 additions & 4 deletions receiver/awsfirehosereceiver/metrics_receiver.go
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"net/http"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"

Expand Down Expand Up @@ -35,9 +34,6 @@ func newMetricsReceiver(
unmarshalers map[string]unmarshaler.MetricsUnmarshaler,
nextConsumer consumer.Metrics,
) (receiver.Metrics, error) {
if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}

configuredUnmarshaler := unmarshalers[config.RecordType]
if configuredUnmarshaler == nil {
Expand Down
4 changes: 0 additions & 4 deletions receiver/awsxrayreceiver/receiver.go
Expand Up @@ -42,10 +42,6 @@ func newReceiver(config *Config,
consumer consumer.Traces,
set receiver.CreateSettings) (receiver.Traces, error) {

if consumer == nil {
return nil, component.ErrNilNextConsumer
}

set.Logger.Info("Going to listen on endpoint for X-Ray segments",
zap.String(udppoller.Transport, config.Endpoint))
poller, err := udppoller.New(&udppoller.Config{
Expand Down
4 changes: 0 additions & 4 deletions receiver/carbonreceiver/receiver.go
Expand Up @@ -42,10 +42,6 @@ func newMetricsReceiver(
nextConsumer consumer.Metrics,
) (receiver.Metrics, error) {

if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}

if config.Endpoint == "" {
return nil, errEmptyEndpoint
}
Expand Down
4 changes: 0 additions & 4 deletions receiver/cloudfoundryreceiver/receiver.go
Expand Up @@ -43,10 +43,6 @@ func newCloudFoundryReceiver(
config Config,
nextConsumer consumer.Metrics) (receiver.Metrics, error) {

if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}

obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: settings.ID,
Transport: transport,
Expand Down
3 changes: 0 additions & 3 deletions receiver/collectdreceiver/receiver.go
Expand Up @@ -41,9 +41,6 @@ func newCollectdReceiver(
defaultAttrsPrefix string,
nextConsumer consumer.Metrics,
createSettings receiver.CreateSettings) (receiver.Metrics, error) {
if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}

r := &collectdReceiver{
logger: logger,
Expand Down
3 changes: 0 additions & 3 deletions receiver/datadogreceiver/receiver.go
Expand Up @@ -26,9 +26,6 @@ type datadogReceiver struct {
}

func newDataDogReceiver(config *Config, nextConsumer consumer.Traces, params receiver.CreateSettings) (receiver.Traces, error) {
if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}

instance, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{LongLivedCtx: false, ReceiverID: params.ID, Transport: "http", ReceiverCreateSettings: params})
if err != nil {
Expand Down
9 changes: 0 additions & 9 deletions receiver/googlecloudpubsubreceiver/factory.go
Expand Up @@ -71,9 +71,6 @@ func (factory *pubsubReceiverFactory) CreateTracesReceiver(
cfg component.Config,
consumer consumer.Traces) (receiver.Traces, error) {

if consumer == nil {
return nil, component.ErrNilNextConsumer
}
err := cfg.(*Config).validateForTrace()
if err != nil {
return nil, err
Expand All @@ -92,9 +89,6 @@ func (factory *pubsubReceiverFactory) CreateMetricsReceiver(
cfg component.Config,
consumer consumer.Metrics) (receiver.Metrics, error) {

if consumer == nil {
return nil, component.ErrNilNextConsumer
}
err := cfg.(*Config).validateForMetric()
if err != nil {
return nil, err
Expand All @@ -113,9 +107,6 @@ func (factory *pubsubReceiverFactory) CreateLogsReceiver(
cfg component.Config,
consumer consumer.Logs) (receiver.Logs, error) {

if consumer == nil {
return nil, component.ErrNilNextConsumer
}
err := cfg.(*Config).validateForLog()
if err != nil {
return nil, err
Expand Down
4 changes: 0 additions & 4 deletions receiver/lokireceiver/loki.go
Expand Up @@ -69,10 +69,6 @@ func newLokiReceiver(conf *Config, nextConsumer consumer.Logs, settings receiver
return nil, err
}

if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}

if conf.HTTP != nil {
r.httpMux = http.NewServeMux()
r.httpMux.HandleFunc("/loki/api/v1/push", func(resp http.ResponseWriter, req *http.Request) {
Expand Down
4 changes: 0 additions & 4 deletions receiver/opencensusreceiver/internal/ocmetrics/opencensus.go
Expand Up @@ -12,7 +12,6 @@ import (
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
ocmetrics "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
Expand All @@ -29,9 +28,6 @@ type Receiver struct {

// New creates a new ocmetrics.Receiver reference.
func New(nextConsumer consumer.Metrics, set receiver.CreateSettings) (*Receiver, error) {
if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: set.ID,
Transport: receiverTransport,
Expand Down
4 changes: 0 additions & 4 deletions receiver/opencensusreceiver/internal/octrace/opencensus.go
Expand Up @@ -11,7 +11,6 @@ import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver"
Expand All @@ -34,9 +33,6 @@ type Receiver struct {

// New creates a new opencensus.Receiver reference.
func New(nextConsumer consumer.Traces, set receiver.CreateSettings) (*Receiver, error) {
if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}

obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: set.ID,
Expand Down
9 changes: 0 additions & 9 deletions receiver/otelarrowreceiver/otelarrow.go
Expand Up @@ -171,25 +171,16 @@ func (r *otelArrowReceiver) Shutdown(_ context.Context) error {
}

func (r *otelArrowReceiver) registerTraceConsumer(tc consumer.Traces) error {
if tc == nil {
return component.ErrNilNextConsumer
}
r.tracesReceiver = trace.New(tc, r.obsrepGRPC)
return nil
}

func (r *otelArrowReceiver) registerMetricsConsumer(mc consumer.Metrics) error {
if mc == nil {
return component.ErrNilNextConsumer
}
r.metricsReceiver = metrics.New(mc, r.obsrepGRPC)
return nil
}

func (r *otelArrowReceiver) registerLogsConsumer(lc consumer.Logs) error {
if lc == nil {
return component.ErrNilNextConsumer
}
r.logsReceiver = logs.New(lc, r.obsrepGRPC)
return nil
}
Expand Down
9 changes: 0 additions & 9 deletions receiver/receivercreator/receiver.go
Expand Up @@ -29,9 +29,6 @@ type receiverCreator struct {

// newLogsReceiverCreator creates the receiver_creator with the given parameters.
func newLogsReceiverCreator(params receiver.CreateSettings, cfg *Config, nextConsumer consumer.Logs) (receiver.Logs, error) {
if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}

r := &receiverCreator{
params: params,
Expand All @@ -43,9 +40,6 @@ func newLogsReceiverCreator(params receiver.CreateSettings, cfg *Config, nextCon

// newMetricsReceiverCreator creates the receiver_creator with the given parameters.
func newMetricsReceiverCreator(params receiver.CreateSettings, cfg *Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) {
if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}

r := &receiverCreator{
params: params,
Expand All @@ -57,9 +51,6 @@ func newMetricsReceiverCreator(params receiver.CreateSettings, cfg *Config, next

// newTracesReceiverCreator creates the receiver_creator with the given parameters.
func newTracesReceiverCreator(params receiver.CreateSettings, cfg *Config, nextConsumer consumer.Traces) (receiver.Traces, error) {
if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}

r := &receiverCreator{
params: params,
Expand Down
3 changes: 0 additions & 3 deletions receiver/signalfxreceiver/receiver.go
Expand Up @@ -122,9 +122,6 @@ func (r *sfxReceiver) RegisterLogsConsumer(lc consumer.Logs) {
// By convention the consumer of the received data is set when the receiver
// instance is created.
func (r *sfxReceiver) Start(_ context.Context, host component.Host) error {
if r.metricsConsumer == nil && r.logsConsumer == nil {
return component.ErrNilNextConsumer
}

if r.server != nil {
return nil
Expand Down
6 changes: 0 additions & 6 deletions receiver/skywalkingreceiver/skywalking_receiver.go
Expand Up @@ -70,9 +70,6 @@ func newSkywalkingReceiver(

// registerTraceConsumer register a TracesReceiver that receives trace
func (sr *swReceiver) registerTraceConsumer(tc consumer.Traces) error {
if tc == nil {
return component.ErrNilNextConsumer
}
var err error
sr.traceReceiver, err = trace.NewReceiver(tc, sr.settings)
if err != nil {
Expand All @@ -83,9 +80,6 @@ func (sr *swReceiver) registerTraceConsumer(tc consumer.Traces) error {

// registerTraceConsumer register a TracesReceiver that receives trace
func (sr *swReceiver) registerMetricsConsumer(mc consumer.Metrics) error {
if mc == nil {
return component.ErrNilNextConsumer
}
var err error
sr.metricsReceiver, err = metrics.NewReceiver(mc, sr.settings)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions receiver/solacereceiver/receiver.go
Expand Up @@ -40,10 +40,6 @@ type solaceTracesReceiver struct {

// newTracesReceiver creates a new solaceTraceReceiver as a receiver.Traces
func newTracesReceiver(config *Config, set receiver.CreateSettings, nextConsumer consumer.Traces) (receiver.Traces, error) {
if nextConsumer == nil {
set.Logger.Warn("Next consumer in pipeline is null, stopping receiver")
return nil, component.ErrNilNextConsumer
}

factory, err := newAMQPMessagingServiceFactory(config, set.Logger)
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions receiver/statsdreceiver/receiver.go
Expand Up @@ -42,9 +42,6 @@ func newReceiver(
config Config,
nextConsumer consumer.Metrics,
) (receiver.Metrics, error) {
if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}

if config.NetAddr.Endpoint == "" {
config.NetAddr.Endpoint = "localhost:8125"
Expand Down
4 changes: 0 additions & 4 deletions receiver/zipkinreceiver/trace_receiver.go
Expand Up @@ -57,10 +57,6 @@ var _ http.Handler = (*zipkinReceiver)(nil)

// newReceiver creates a new zipkinReceiver reference.
func newReceiver(config *Config, nextConsumer consumer.Traces, settings receiver.CreateSettings) (*zipkinReceiver, error) {
if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}

transports := []string{receiverTransportV1Thrift, receiverTransportV1JSON, receiverTransportV2JSON, receiverTransportV2PROTO}
obsrecvrs := make(map[string]*receiverhelper.ObsReport)
for _, transport := range transports {
Expand Down

0 comments on commit 9a3ec60

Please sign in to comment.