diff --git a/.chloggen/hec_receiver_ack.yaml b/.chloggen/hec_receiver_ack.yaml new file mode 100644 index 0000000000000..41125cecf741d --- /dev/null +++ b/.chloggen/hec_receiver_ack.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: 'splunkhecreceiver' + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "adding support for ack in the splunkhecreceiver" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26376] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/cmd/configschema/go.mod b/cmd/configschema/go.mod index f9c2dcd9304b9..d338b1d3ebe82 100644 --- a/cmd/configschema/go.mod +++ b/cmd/configschema/go.mod @@ -483,6 +483,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opencensusexporter v0.98.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opensearchexporter v0.98.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter v0.98.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension v0.98.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/extension/asapauthextension v0.98.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/extension/awsproxy v0.98.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/extension/basicauthextension v0.98.0 // indirect @@ -1150,6 +1151,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/enco replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension => ../../extension/ackextension + replace github.com/open-telemetry/opentelemetry-collector-contrib/connector/grafanacloudconnector => ../../connector/grafanacloudconnector replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/sumologicextension => ../../extension/sumologicextension diff --git a/cmd/oteltestbedcol/builder-config.yaml b/cmd/oteltestbedcol/builder-config.yaml index fb66929e58a31..a4fab717726c0 100644 --- a/cmd/oteltestbedcol/builder-config.yaml +++ b/cmd/oteltestbedcol/builder-config.yaml @@ -99,3 +99,4 @@ replaces: - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil - github.com/openshift/api v3.9.0+incompatible => github.com/openshift/api v0.0.0-20180801171038-322a19404e37 - github.com/outcaste-io/ristretto v0.2.0 => github.com/outcaste-io/ristretto v0.2.1 + - github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension => ../../extension/ackextension diff --git a/cmd/oteltestbedcol/go.mod b/cmd/oteltestbedcol/go.mod index d2d0253f8a3d8..2a212a6114abf 100644 --- a/cmd/oteltestbedcol/go.mod +++ b/cmd/oteltestbedcol/go.mod @@ -166,6 +166,7 @@ require ( github.com/mostynb/go-grpc-compression v1.2.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension v0.98.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.98.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.98.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.98.0 // indirect @@ -391,3 +392,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil replace github.com/openshift/api v3.9.0+incompatible => github.com/openshift/api v0.0.0-20180801171038-322a19404e37 replace github.com/outcaste-io/ristretto v0.2.0 => github.com/outcaste-io/ristretto v0.2.1 + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension => ../../extension/ackextension diff --git a/exporter/elasticsearchexporter/integrationtest/go.mod b/exporter/elasticsearchexporter/integrationtest/go.mod index 113c7af5d9c0d..3e0191d1bed5d 100644 --- a/exporter/elasticsearchexporter/integrationtest/go.mod +++ b/exporter/elasticsearchexporter/integrationtest/go.mod @@ -255,3 +255,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sapmr replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry => ../../../pkg/resourcetotelemetry replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter => ../../prometheusremotewriteexporter + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension => ../../../extension/ackextension diff --git a/go.mod b/go.mod index 74be0c801d215..406da485a2f13 100644 --- a/go.mod +++ b/go.mod @@ -489,6 +489,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/nginxinc/nginx-prometheus-exporter v0.11.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension v0.98.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.98.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/extension/sumologicextension v0.98.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil v0.98.0 // indirect @@ -1153,3 +1154,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlqu replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ./extension/encoding replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension => ./extension/encoding/otlpencodingextension + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension => ./extension/ackextension diff --git a/internal/splunk/common.go b/internal/splunk/common.go index 7792b73381da6..a9ad5088ebb22 100644 --- a/internal/splunk/common.go +++ b/internal/splunk/common.go @@ -23,11 +23,14 @@ const ( DefaultSeverityTextLabel = "otel.log.severity.text" DefaultSeverityNumberLabel = "otel.log.severity.number" HECTokenHeader = "Splunk" - HecTokenLabel = "com.splunk.hec.access_token" // #nosec + HTTPSplunkChannelHeader = "X-Splunk-Request-Channel" + + HecTokenLabel = "com.splunk.hec.access_token" // #nosec // HecEventMetricType is the type of HEC event. Set to metric, as per https://docs.splunk.com/Documentation/Splunk/8.0.3/Metrics/GetMetricsInOther. HecEventMetricType = "metric" DefaultRawPath = "/services/collector/raw" DefaultHealthPath = "/services/collector/health" + DefaultAckPath = "/services/collector/ack" ) // AccessTokenPassthroughConfig configures passing through access tokens. @@ -112,3 +115,7 @@ type HecToOtelAttrs struct { // Host indicates the mapping of the host field to a specific unified model attribute. Host string `mapstructure:"host"` } + +type AckRequest struct { + Acks []uint64 `json:"acks"` +} diff --git a/receiver/splunkhecreceiver/README.md b/receiver/splunkhecreceiver/README.md index dccfb30c88e29..d04a318547a72 100644 --- a/receiver/splunkhecreceiver/README.md +++ b/receiver/splunkhecreceiver/README.md @@ -51,6 +51,10 @@ The following settings are optional: * `hec_metadata_to_otel_attrs/sourcetype` (default = 'com.splunk.sourcetype'): Specifies the mapping of the sourcetype field to a specific unified model attribute. * `hec_metadata_to_otel_attrs/index` (default = 'com.splunk.index'): Specifies the mapping of the index field to a specific unified model attribute. * `hec_metadata_to_otel_attrs/host` (default = 'host.name'): Specifies the mapping of the host field to a specific unified model attribute. +* `ack` (no default): defines the ackextension to use for acknowledging events + * `extension` (no default): Specifies the ack extension ID the receiver should use. If left blank, ack is disabled. + * `path` (default = '/services/collector/ack'): The path the ack extension will listen on for ack requests, if the extension is enabled. + Example: ```yaml @@ -67,6 +71,8 @@ receivers: sourcetype: "mysourcetype" index: "myindex" host: "myhost" + ack: + extension: ack/in_memory ``` The full list of settings exposed for this receiver are documented [here](./config.go) diff --git a/receiver/splunkhecreceiver/config.go b/receiver/splunkhecreceiver/config.go index 095151aa62887..8d6de7259aae5 100644 --- a/receiver/splunkhecreceiver/config.go +++ b/receiver/splunkhecreceiver/config.go @@ -4,6 +4,7 @@ package splunkhecreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver" import ( + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk" @@ -21,6 +22,9 @@ type Config struct { confighttp.ServerConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct splunk.AccessTokenPassthroughConfig `mapstructure:",squash"` + + Ack `mapstructure:"ack"` + // RawPath for raw data collection, default is '/services/collector/raw' RawPath string `mapstructure:"raw_path"` // Splitting defines the splitting strategy used by the receiver when ingesting raw events. Can be set to "line" or "none". Default is "line". @@ -30,3 +34,11 @@ type Config struct { // HecToOtelAttrs creates a mapping from HEC metadata to attributes. HecToOtelAttrs splunk.HecToOtelAttrs `mapstructure:"hec_metadata_to_otel_attrs"` } + +// Ack defines configuration for the ACK functionality of the HEC receiver +type Ack struct { + // Extension defines the extension to use for acking of events. Without specifying an extension, the ACK endpoint won't be exposed + Extension *component.ID `mapstructure:"extension"` + // Path for Ack API, default is '/services/collector/ack'. Ignored if Extension is not provided. + Path string `mapstructure:"path"` +} diff --git a/receiver/splunkhecreceiver/config_test.go b/receiver/splunkhecreceiver/config_test.go index dfb3f59eb90bc..2da02b3bb90c6 100644 --- a/receiver/splunkhecreceiver/config_test.go +++ b/receiver/splunkhecreceiver/config_test.go @@ -44,6 +44,7 @@ func TestLoadConfig(t *testing.T) { RawPath: "/foo", Splitting: SplittingStrategyLine, HealthPath: "/bar", + Ack: Ack{Path: "/services/collector/ack"}, HecToOtelAttrs: splunk.HecToOtelAttrs{ Source: "file.name", SourceType: "foobar", @@ -70,6 +71,7 @@ func TestLoadConfig(t *testing.T) { RawPath: "/services/collector/raw", Splitting: SplittingStrategyLine, HealthPath: "/services/collector/health", + Ack: Ack{Path: "/services/collector/ack"}, HecToOtelAttrs: splunk.HecToOtelAttrs{ Source: "com.splunk.source", SourceType: "com.splunk.sourcetype", diff --git a/receiver/splunkhecreceiver/factory.go b/receiver/splunkhecreceiver/factory.go index 7bce66a2d0bb5..f2a0ff92707b5 100644 --- a/receiver/splunkhecreceiver/factory.go +++ b/receiver/splunkhecreceiver/factory.go @@ -49,7 +49,11 @@ func createDefaultConfig() component.Config { }, RawPath: splunk.DefaultRawPath, HealthPath: splunk.DefaultHealthPath, - Splitting: SplittingStrategyLine, + Ack: Ack{ + Extension: nil, + Path: splunk.DefaultAckPath, + }, + Splitting: SplittingStrategyLine, } } diff --git a/receiver/splunkhecreceiver/go.mod b/receiver/splunkhecreceiver/go.mod index 49c8e1195645a..7eff3e1d78e1c 100644 --- a/receiver/splunkhecreceiver/go.mod +++ b/receiver/splunkhecreceiver/go.mod @@ -3,9 +3,11 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunk go 1.21 require ( + github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 github.com/json-iterator/go v1.1.12 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter v0.98.0 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension v0.98.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.98.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.98.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.98.0 @@ -38,8 +40,8 @@ require ( github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.6.0 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/klauspost/compress v1.17.8 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect @@ -106,3 +108,5 @@ retract ( replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension => ../../extension/ackextension diff --git a/receiver/splunkhecreceiver/go.sum b/receiver/splunkhecreceiver/go.sum index 28d0a41b2acc0..ba95de1a9bd9c 100644 --- a/receiver/splunkhecreceiver/go.sum +++ b/receiver/splunkhecreceiver/go.sum @@ -84,6 +84,8 @@ github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= diff --git a/receiver/splunkhecreceiver/receiver.go b/receiver/splunkhecreceiver/receiver.go index d7ef07fb82174..9a1d7d8248274 100644 --- a/receiver/splunkhecreceiver/receiver.go +++ b/receiver/splunkhecreceiver/receiver.go @@ -6,16 +6,17 @@ package splunkhecreceiver // import "github.com/open-telemetry/opentelemetry-col import ( "compress/gzip" "context" + "encoding/json" "errors" "fmt" "io" - "net" "net/http" "strconv" "strings" "sync" "time" + "github.com/google/uuid" "github.com/gorilla/mux" jsoniter "github.com/json-iterator/go" "go.opentelemetry.io/collector/component" @@ -25,6 +26,7 @@ import ( "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver/internal/metadata" ) @@ -32,9 +34,11 @@ import ( const ( defaultServerTimeout = 20 * time.Second + ackResponse = `{"acks": %s}` responseOK = `{"text": "Success", "code": 0}` + responseOKWithAckID = `{"text": "Success", "code": 0, "ackId": %d}` responseHecHealthy = `{"text": "HEC is healthy", "code": 17}` - responseInvalidMethod = `"Only \"POST\" method is supported"` + responseInvalidMethodPostOnly = `"Only \"POST\" method is supported"` responseInvalidEncoding = `"\"Content-Encoding\" must be \"gzip\" or empty"` responseInvalidDataFormat = `{"text":"Invalid data format","code":6}` responseErrEventRequired = `{"text":"Event field is required","code":12}` @@ -45,6 +49,8 @@ const ( responseErrUnsupportedMetricEvent = `"Unsupported metric event"` responseErrUnsupportedLogEvent = `"Unsupported log event"` responseErrHandlingIndexedFields = `{"text":"Error in handling indexed fields","code":15,"invalid-event-number":%d}` + responseErrDataChannelMissing = `{"text": "Data channel is missing","code":10}` + responseErrInvalidDataChannel = `{"text": "Invalid data channel", "code": 11}` responseNoData = `{"text":"No data","code":5}` // Centralizing some HTTP and related string constants. gzipEncoding = "gzip" @@ -59,19 +65,21 @@ var ( errEmptyEndpoint = errors.New("empty endpoint") errInvalidMethod = errors.New("invalid http method") errInvalidEncoding = errors.New("invalid encoding") - - okRespBody = []byte(responseOK) - eventRequiredRespBody = []byte(responseErrEventRequired) - eventBlankRespBody = []byte(responseErrEventBlank) - invalidEncodingRespBody = []byte(responseInvalidEncoding) - invalidFormatRespBody = []byte(responseInvalidDataFormat) - invalidMethodRespBody = []byte(responseInvalidMethod) - errGzipReaderRespBody = []byte(responseErrGzipReader) - errUnmarshalBodyRespBody = []byte(responseErrUnmarshalBody) - errInternalServerError = []byte(responseErrInternalServerError) - errUnsupportedMetricEvent = []byte(responseErrUnsupportedMetricEvent) - errUnsupportedLogEvent = []byte(responseErrUnsupportedLogEvent) - noDataRespBody = []byte(responseNoData) + errExtensionMissing = errors.New("ack extension not found") + + okRespBody = []byte(responseOK) + eventRequiredRespBody = []byte(responseErrEventRequired) + eventBlankRespBody = []byte(responseErrEventBlank) + requiredDataChannelHeader = []byte(responseErrDataChannelMissing) + invalidEncodingRespBody = []byte(responseInvalidEncoding) + invalidFormatRespBody = []byte(responseInvalidDataFormat) + invalidMethodRespBodyPostOnly = []byte(responseInvalidMethodPostOnly) + errGzipReaderRespBody = []byte(responseErrGzipReader) + errUnmarshalBodyRespBody = []byte(responseErrUnmarshalBody) + errInternalServerError = []byte(responseErrInternalServerError) + errUnsupportedMetricEvent = []byte(responseErrUnsupportedMetricEvent) + errUnsupportedLogEvent = []byte(responseErrUnsupportedLogEvent) + noDataRespBody = []byte(responseNoData) ) // splunkReceiver implements the receiver.Metrics for Splunk HEC metric protocol. @@ -84,6 +92,7 @@ type splunkReceiver struct { shutdownWG sync.WaitGroup obsrecv *receiverhelper.ObsReport gzipReaderPool *sync.Pool + ackExt ackextension.AckExtension } var _ receiver.Metrics = (*splunkReceiver)(nil) @@ -187,20 +196,28 @@ func (r *splunkReceiver) Start(ctx context.Context, host component.Host) error { return nil } - var ln net.Listener - // set up the listener - ln, err := r.config.ServerConfig.ToListener(ctx) - if err != nil { - return fmt.Errorf("failed to bind to address %s: %w", r.config.Endpoint, err) + mx := mux.NewRouter() + // set up the ack API handler if the ack extension is present + if r.config.Ack.Extension != nil { + if ext, found := host.GetExtensions()[*r.config.Ack.Extension]; found { + r.ackExt = ext.(ackextension.AckExtension) + mx.NewRoute().Path(r.config.Ack.Path).HandlerFunc(r.handleAck) + } else { + return fmt.Errorf("specified ack extension with id %q could not be found", *r.config.Ack.Extension) + } } - mx := mux.NewRouter() mx.NewRoute().Path(r.config.HealthPath).HandlerFunc(r.handleHealthReq) mx.NewRoute().Path(r.config.HealthPath + "/1.0").HandlerFunc(r.handleHealthReq).Methods("GET") if r.logsConsumer != nil { mx.NewRoute().Path(r.config.RawPath).HandlerFunc(r.handleRawReq) } mx.NewRoute().HandlerFunc(r.handleReq) + // set up the listener + ln, err := r.config.ServerConfig.ToListener(ctx) + if err != nil { + return fmt.Errorf("failed to bind to address %s: %w", r.config.Endpoint, err) + } r.server, err = r.config.ServerConfig.ToServer(ctx, host, r.settings.TelemetrySettings, mx) if err != nil { @@ -231,20 +248,74 @@ func (r *splunkReceiver) Shutdown(context.Context) error { return err } -func (r *splunkReceiver) writeSuccessResponse(ctx context.Context, resp http.ResponseWriter, eventCount int) { +func (r *splunkReceiver) processSuccessResponseWithAck(ctx context.Context, resp http.ResponseWriter, eventCount int, channelID string) { + if r.ackExt == nil { + panic("writing response with ack when ack extension is not configured") + } + + ackID := r.ackExt.ProcessEvent(channelID) + r.ackExt.Ack(channelID, ackID) + r.processSuccessResponse(ctx, resp, eventCount, []byte(fmt.Sprintf(responseOKWithAckID, ackID))) +} + +func (r *splunkReceiver) processSuccessResponse(ctx context.Context, resp http.ResponseWriter, eventCount int, bodyContent []byte) { resp.Header().Set(httpContentTypeHeader, httpJSONTypeHeader) resp.WriteHeader(http.StatusOK) - if _, err := resp.Write(okRespBody); err != nil { + if _, err := resp.Write(bodyContent); err != nil { r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, eventCount, err) } } +func (r *splunkReceiver) handleAck(resp http.ResponseWriter, req *http.Request) { + ctx := req.Context() + if req.Method != http.MethodPost { + r.failRequest(ctx, resp, http.StatusBadRequest, invalidMethodRespBodyPostOnly, 0, errInvalidMethod) + return + } + + // shouldn't run into this case since we only enable this handler IF ackExt exists. But we have this check just in case + if r.ackExt == nil { + r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, 0, errExtensionMissing) + return + } + + var channelID string + var extracted bool + if channelID, extracted = r.extractChannelHeader(req); extracted { + if channelErr := r.validateChannelHeader(channelID); channelErr != nil { + r.failRequest(ctx, resp, http.StatusBadRequest, []byte(channelErr.Error()), 0, channelErr) + return + } + } else { + r.failRequest(ctx, resp, http.StatusBadRequest, requiredDataChannelHeader, 0, nil) + return + } + + dec := json.NewDecoder(req.Body) + var ackRequest splunk.AckRequest + + err := dec.Decode(&ackRequest) + if err != nil { + r.failRequest(ctx, resp, http.StatusBadRequest, invalidFormatRespBody, 0, err) + return + } + + if len(ackRequest.Acks) == 0 { + r.failRequest(ctx, resp, http.StatusBadRequest, invalidFormatRespBody, 0, errors.New("request body must include at least one ackID to be queried")) + return + } + + queriedAcks := r.ackExt.QueryAcks(channelID, ackRequest.Acks) + ackString, _ := json.Marshal(queriedAcks) + r.processSuccessResponse(ctx, resp, 0, []byte(fmt.Sprintf(ackResponse, ackString))) +} + func (r *splunkReceiver) handleRawReq(resp http.ResponseWriter, req *http.Request) { ctx := req.Context() ctx = r.obsrecv.StartLogsOp(ctx) if req.Method != http.MethodPost { - r.failRequest(ctx, resp, http.StatusBadRequest, invalidMethodRespBody, 0, errInvalidMethod) + r.failRequest(ctx, resp, http.StatusBadRequest, invalidMethodRespBodyPostOnly, 0, errInvalidMethod) return } @@ -254,6 +325,15 @@ func (r *splunkReceiver) handleRawReq(resp http.ResponseWriter, req *http.Reques return } + var channelID string + var extracted bool + if channelID, extracted = r.extractChannelHeader(req); extracted { + if channelErr := r.validateChannelHeader(channelID); channelErr != nil { + r.failRequest(ctx, resp, http.StatusBadRequest, []byte(channelErr.Error()), 0, channelErr) + return + } + } + if req.ContentLength == 0 { r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), 0, nil) r.failRequest(ctx, resp, http.StatusBadRequest, noDataRespBody, 0, nil) @@ -302,11 +382,38 @@ func (r *splunkReceiver) handleRawReq(resp http.ResponseWriter, req *http.Reques if consumerErr != nil { r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, slLen, consumerErr) } else { - r.writeSuccessResponse(ctx, resp, ld.LogRecordCount()) + if len(channelID) > 0 && r.ackExt != nil { + r.processSuccessResponseWithAck(ctx, resp, ld.LogRecordCount(), channelID) + } else { + r.processSuccessResponse(ctx, resp, ld.LogRecordCount(), okRespBody) + } r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), slLen, nil) } } +func (r *splunkReceiver) extractChannelHeader(req *http.Request) (string, bool) { + if headers, ok := req.Header[splunk.HTTPSplunkChannelHeader]; ok { + return headers[0], true + } + + return "", false +} + +func (r *splunkReceiver) validateChannelHeader(channelID string) error { + if len(channelID) == 0 { + return errors.New(responseErrDataChannelMissing) + } + + // channel id must be a valid uuid + // https://docs.splunk.com/Documentation/Splunk/9.2.1/Data/AboutHECIDXAck#:~:text=close%20the%20file.-,About%20channels%20and%20sending%20data,-Sending%20events%20to + _, err := uuid.Parse(channelID) + if err != nil { + return errors.New(responseErrInvalidDataChannel) + } + + return nil +} + func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request) { ctx := req.Context() if r.logsConsumer != nil { @@ -317,7 +424,7 @@ func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request) } if req.Method != http.MethodPost { - r.failRequest(ctx, resp, http.StatusBadRequest, invalidMethodRespBody, 0, errInvalidMethod) + r.failRequest(ctx, resp, http.StatusBadRequest, invalidMethodRespBodyPostOnly, 0, errInvalidMethod) return } @@ -327,6 +434,14 @@ func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request) return } + channelID, extracted := r.extractChannelHeader(req) + if extracted { + if channelErr := r.validateChannelHeader(channelID); channelErr != nil { + r.failRequest(ctx, resp, http.StatusBadRequest, []byte(channelErr.Error()), 0, channelErr) + return + } + } + bodyReader := req.Body if encoding == gzipEncoding { reader := r.gzipReaderPool.Get().(*gzip.Reader) @@ -401,7 +516,6 @@ func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request) r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, len(events), decodeErr) return } - } if r.metricsConsumer != nil && len(metricEvents) > 0 { md, _ := splunkHecToMetricsData(r.settings.Logger, metricEvents, resourceCustomizer, r.config) @@ -414,7 +528,11 @@ func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request) } } - r.writeSuccessResponse(ctx, resp, len(events)+len(metricEvents)) + if len(channelID) > 0 && r.ackExt != nil { + r.processSuccessResponseWithAck(ctx, resp, len(events)+len(metricEvents), channelID) + } else { + r.processSuccessResponse(ctx, resp, len(events)+len(metricEvents), okRespBody) + } } func (r *splunkReceiver) createResourceCustomizer(req *http.Request) func(resource pcommon.Resource) { diff --git a/receiver/splunkhecreceiver/receiver_test.go b/receiver/splunkhecreceiver/receiver_test.go index 23ea3894686b0..77146db2c79c8 100644 --- a/receiver/splunkhecreceiver/receiver_test.go +++ b/receiver/splunkhecreceiver/receiver_test.go @@ -44,6 +44,13 @@ func assertHecSuccessResponse(t *testing.T, resp *http.Response, body any) { assert.Equal(t, map[string]any{"code": float64(0), "text": "Success"}, body) } +func assertHecSuccessResponseWithAckID(t *testing.T, resp *http.Response, body any, ackID uint64) { + status := resp.StatusCode + assert.Equal(t, http.StatusOK, status) + assert.Equal(t, httpJSONTypeHeader, resp.Header.Get(httpContentTypeHeader)) + assert.Equal(t, map[string]any{"code": float64(0), "text": "Success", "ackId": float64(ackID)}, body) +} + func Test_splunkhecreceiver_NewLogsReceiver(t *testing.T) { defaultConfig := createDefaultConfig().(*Config) emptyEndpointConfig := createDefaultConfig().(*Config) @@ -903,6 +910,12 @@ func buildSplunkHecMsg(time float64, dimensions uint) *splunk.Event { return ev } +func buildSplunkHecAckMsg(acks []uint64) *splunk.AckRequest { + return &splunk.AckRequest{ + Acks: acks, + } +} + type badReqBody struct{} var _ io.ReadCloser = (*badReqBody)(nil) @@ -1084,6 +1097,7 @@ func Test_splunkhecReceiver_handleRawReq(t *testing.T) { assert.NoError(t, err) r := rcv.(*splunkReceiver) + assert.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) defer func() { assert.NoError(t, r.Shutdown(context.Background())) @@ -1105,6 +1119,478 @@ func Test_splunkhecReceiver_handleRawReq(t *testing.T) { }) } } +func Test_splunkhecReceiver_Start(t *testing.T) { + tests := []struct { + name string + getConfig func() *Config + errorExpected bool + }{ + { + name: "no_ack_extension_configured", + getConfig: func() *Config { + return createDefaultConfig().(*Config) + }, + errorExpected: false, + }, + { + name: "ack_extension_does_not_exist", + getConfig: func() *Config { + config := createDefaultConfig().(*Config) + config.Ack.Extension = &component.ID{} + return config + }, + errorExpected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sink := new(consumertest.LogsSink) + rcv, err := newLogsReceiver(receivertest.NewNopCreateSettings(), *tt.getConfig(), sink) + assert.NoError(t, err) + + r := rcv.(*splunkReceiver) + if tt.errorExpected { + assert.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) + } else { + assert.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) + } + assert.NoError(t, r.Shutdown(context.Background())) + }) + } +} +func Test_splunkhecReceiver_handleAck(t *testing.T) { + t.Parallel() + config := createDefaultConfig().(*Config) + config.Endpoint = "localhost:0" // Actually not creating the endpoint + config.Ack.Path = "/ack" + id := component.MustNewID("ack_extension") + config.Ack.Extension = &id + + tests := []struct { + name string + req *http.Request + setupMockAckExtension func() component.Component + assertResponse func(t *testing.T, resp *http.Response, body any) + }{ + { + name: "incorrect_method", + req: httptest.NewRequest("PUT", "http://localhost/ack", nil), + setupMockAckExtension: func() component.Component { + return &mockAckExtension{} + }, + assertResponse: func(t *testing.T, resp *http.Response, body any) { + status := resp.StatusCode + assert.Equal(t, http.StatusBadRequest, status) + assert.Equal(t, "Only \"POST\" method is supported", body) + }, + }, + { + name: "no_channel_header", + req: func() *http.Request { + req := httptest.NewRequest("POST", "http://localhost/ack", nil) + return req + }(), + setupMockAckExtension: func() component.Component { + return &mockAckExtension{} + }, + assertResponse: func(t *testing.T, resp *http.Response, body any) { + status := resp.StatusCode + assert.Equal(t, http.StatusBadRequest, status) + assert.Equal(t, map[string]any{"code": float64(10), "text": "Data channel is missing"}, body) + }, + }, + { + name: "invalid_channel_header", + req: func() *http.Request { + req := httptest.NewRequest("POST", "http://localhost/ack", nil) + req.Header.Set("X-Splunk-Request-Channel", "invalid-id") + return req + }(), + setupMockAckExtension: func() component.Component { + return &mockAckExtension{} + }, + assertResponse: func(t *testing.T, resp *http.Response, body any) { + status := resp.StatusCode + assert.Equal(t, http.StatusBadRequest, status) + assert.Equal(t, map[string]any{"text": "Invalid data channel", "code": float64(11)}, body) + }, + }, + { + name: "empty_request_body", + req: func() *http.Request { + req := httptest.NewRequest("POST", "http://localhost/ack", nil) + req.Header.Set("X-Splunk-Request-Channel", "fbd3036f-0f1c-4e98-b71c-d4cd61213f90") + return req + }(), + setupMockAckExtension: func() component.Component { + return &mockAckExtension{} + }, + assertResponse: func(t *testing.T, resp *http.Response, body any) { + status := resp.StatusCode + assert.Equal(t, http.StatusBadRequest, status) + assert.Equal(t, map[string]any{"text": "Invalid data format", "code": float64(6)}, body) + }, + }, + { + name: "empty_ack_in_request_body", + req: func() *http.Request { + msgBytes, err := json.Marshal(buildSplunkHecAckMsg([]uint64{})) + require.NoError(t, err) + req := httptest.NewRequest("POST", "http://localhost/ack", bytes.NewReader(msgBytes)) + req.Header.Set("X-Splunk-Request-Channel", "fbd3036f-0f1c-4e98-b71c-d4cd61213f90") + return req + }(), + setupMockAckExtension: func() component.Component { + return &mockAckExtension{} + }, + assertResponse: func(t *testing.T, resp *http.Response, body any) { + status := resp.StatusCode + assert.Equal(t, http.StatusBadRequest, status) + assert.Equal(t, map[string]any{"text": "Invalid data format", "code": float64(6)}, body) + }, + }, + { + name: "invalid_request_body", + req: func() *http.Request { + req := httptest.NewRequest("POST", "http://localhost/ack", bytes.NewReader([]byte(`hi there`))) + req.Header.Set("X-Splunk-Request-Channel", "fbd3036f-0f1c-4e98-b71c-d4cd61213f90") + return req + }(), + setupMockAckExtension: func() component.Component { + return &mockAckExtension{} + }, + assertResponse: func(t *testing.T, resp *http.Response, body any) { + status := resp.StatusCode + assert.Equal(t, http.StatusBadRequest, status) + assert.Equal(t, map[string]any{"text": "Invalid data format", "code": float64(6)}, body) + }, + }, + { + name: "happy_path", + req: func() *http.Request { + msgBytes, err := json.Marshal(buildSplunkHecAckMsg([]uint64{1, 2, 3})) + require.NoError(t, err) + req := httptest.NewRequest("POST", "http://localhost/ack", bytes.NewReader(msgBytes)) + req.Header.Set("X-Splunk-Request-Channel", "fbd3036f-0f1c-4e98-b71c-d4cd61213f90") + return req + }(), + setupMockAckExtension: func() component.Component { + return &mockAckExtension{ + queryAcks: func(_ string, _ []uint64) map[uint64]bool { + return map[uint64]bool{ + 1: true, + 2: false, + 3: true, + } + }, + } + }, + assertResponse: func(t *testing.T, resp *http.Response, body any) { + status := resp.StatusCode + assert.Equal(t, http.StatusOK, status) + assert.Equal(t, map[string]any{"acks": map[string]any{ + "1": true, + "2": false, + "3": true, + }}, body) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sink := new(consumertest.LogsSink) + rcv, err := newLogsReceiver(receivertest.NewNopCreateSettings(), *config, sink) + assert.NoError(t, err) + + mockHost := mockHost{extensions: map[component.ID]component.Component{ + id: tt.setupMockAckExtension(), + }} + + r := rcv.(*splunkReceiver) + assert.NoError(t, r.Start(context.Background(), mockHost)) + defer func() { + assert.NoError(t, r.Shutdown(context.Background())) + }() + w := httptest.NewRecorder() + r.handleAck(w, tt.req) + + resp := w.Result() + respBytes, err := io.ReadAll(resp.Body) + assert.NoError(t, err) + defer resp.Body.Close() + + var body any + if len(respBytes) > 0 { + assert.NoError(t, json.Unmarshal(respBytes, &body)) + } + + tt.assertResponse(t, resp, body) + }) + } +} + +func Test_splunkhecReceiver_handleRawReq_WithAck(t *testing.T) { + t.Parallel() + config := createDefaultConfig().(*Config) + config.Endpoint = "localhost:0" // Actually not creating the endpoint + config.RawPath = "/foo" + id := component.MustNewID("ack_extension") + config.Ack.Extension = &id + currentTime := float64(time.Now().UnixNano()) / 1e6 + splunkMsg := buildSplunkHecMsg(currentTime, 3) + + tests := []struct { + name string + req *http.Request + setupMockAckExtension func() component.Component + assertResponse func(t *testing.T, resp *http.Response, body any) + }{ + { + name: "no_channel_header", + req: func() *http.Request { + msgBytes, err := json.Marshal(splunkMsg) + require.NoError(t, err) + req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader(msgBytes)) + return req + }(), + setupMockAckExtension: func() component.Component { + return &mockAckExtension{} + }, + assertResponse: func(t *testing.T, resp *http.Response, body any) { + assertHecSuccessResponse(t, resp, body) + }, + }, + { + name: "empty_channel_header", + req: func() *http.Request { + msgBytes, err := json.Marshal(splunkMsg) + require.NoError(t, err) + req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader(msgBytes)) + req.Header.Set("X-Splunk-Request-Channel", "") + return req + }(), + setupMockAckExtension: func() component.Component { + return &mockAckExtension{} + }, + assertResponse: func(t *testing.T, resp *http.Response, body any) { + status := resp.StatusCode + assert.Equal(t, http.StatusBadRequest, status) + assert.Equal(t, map[string]any{"code": float64(10), "text": "Data channel is missing"}, body) + }, + }, + { + name: "invalid_channel_header", + req: func() *http.Request { + msgBytes, err := json.Marshal(splunkMsg) + require.NoError(t, err) + req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader(msgBytes)) + req.Header.Set("X-Splunk-Request-Channel", "invalid-id") + return req + }(), + setupMockAckExtension: func() component.Component { + return &mockAckExtension{} + }, + assertResponse: func(t *testing.T, resp *http.Response, body any) { + status := resp.StatusCode + assert.Equal(t, http.StatusBadRequest, status) + assert.Equal(t, map[string]any{"text": "Invalid data channel", "code": float64(11)}, body) + }, + }, + { + name: "happy_path", + req: func() *http.Request { + msgBytes, err := json.Marshal(splunkMsg) + require.NoError(t, err) + req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader(msgBytes)) + req.Header.Set("X-Splunk-Request-Channel", "fbd3036f-0f1c-4e98-b71c-d4cd61213f90") + return req + }(), + setupMockAckExtension: func() component.Component { + return &mockAckExtension{ + processEvent: func(_ string) (ackID uint64) { + return uint64(1) + }, + ack: func(_ string, _ uint64) {}, + } + }, + assertResponse: func(t *testing.T, resp *http.Response, body any) { + assertHecSuccessResponseWithAckID(t, resp, body, 1) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sink := new(consumertest.LogsSink) + rcv, err := newLogsReceiver(receivertest.NewNopCreateSettings(), *config, sink) + assert.NoError(t, err) + + mh := mockHost{extensions: map[component.ID]component.Component{ + id: tt.setupMockAckExtension(), + }} + + r := rcv.(*splunkReceiver) + assert.NoError(t, r.Start(context.Background(), mh)) + defer func() { + assert.NoError(t, r.Shutdown(context.Background())) + }() + w := httptest.NewRecorder() + r.handleRawReq(w, tt.req) + + resp := w.Result() + respBytes, err := io.ReadAll(resp.Body) + assert.NoError(t, err) + defer resp.Body.Close() + + var body any + if len(respBytes) > 0 { + assert.NoError(t, json.Unmarshal(respBytes, &body)) + } + + tt.assertResponse(t, resp, body) + }) + } +} + +func Test_splunkhecReceiver_handleReq_WithAck(t *testing.T) { + config := createDefaultConfig().(*Config) + config.Endpoint = "localhost:0" // Actually not creating the endpoint + id := component.MustNewID("ack_extension") + config.Ack.Extension = &id + currentTime := float64(time.Now().UnixNano()) / 1e6 + splunkMsg := buildSplunkHecMsg(currentTime, 3) + + tests := []struct { + name string + req *http.Request + assertResponse func(t *testing.T, resp *http.Response, body any) + assertSink func(t *testing.T, sink *consumertest.LogsSink) + setupMockAckExtension func() component.Component + }{ + { + name: "no_channel_header", + req: func() *http.Request { + msgBytes, err := json.Marshal(splunkMsg) + require.NoError(t, err) + req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader(msgBytes)) + return req + }(), + setupMockAckExtension: func() component.Component { + return &mockAckExtension{} + }, + assertResponse: func(t *testing.T, resp *http.Response, body any) { + assertHecSuccessResponse(t, resp, body) + }, + assertSink: func(t *testing.T, sink *consumertest.LogsSink) { + assert.Equal(t, 1, len(sink.AllLogs())) + }, + }, + { + name: "empty_channel_header", + req: func() *http.Request { + msgBytes, err := json.Marshal(splunkMsg) + require.NoError(t, err) + req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader(msgBytes)) + req.Header.Set("X-Splunk-Request-Channel", "") + return req + }(), + setupMockAckExtension: func() component.Component { + return &mockAckExtension{} + }, + assertResponse: func(t *testing.T, resp *http.Response, body any) { + status := resp.StatusCode + assert.Equal(t, http.StatusBadRequest, status) + assert.Equal(t, map[string]any{"code": float64(10), "text": "Data channel is missing"}, body) + }, + assertSink: func(t *testing.T, sink *consumertest.LogsSink) { + assert.Equal(t, 0, len(sink.AllLogs())) + }, + }, + { + name: "invalid_channel_header", + req: func() *http.Request { + msgBytes, err := json.Marshal(splunkMsg) + require.NoError(t, err) + req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader(msgBytes)) + req.Header.Set("X-Splunk-Request-Channel", "invalid-id") + return req + }(), + setupMockAckExtension: func() component.Component { + return &mockAckExtension{} + }, + assertResponse: func(t *testing.T, resp *http.Response, body any) { + status := resp.StatusCode + assert.Equal(t, http.StatusBadRequest, status) + assert.Equal(t, map[string]any{"text": "Invalid data channel", "code": float64(11)}, body) + }, + assertSink: func(t *testing.T, sink *consumertest.LogsSink) { + assert.Equal(t, 0, len(sink.AllLogs())) + }, + }, + { + name: "msg_accepted", + req: func() *http.Request { + msgBytes, err := json.Marshal(splunkMsg) + require.NoError(t, err) + req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader(msgBytes)) + req.Header.Set("X-Splunk-Request-Channel", "fbd3036f-0f1c-4e98-b71c-d4cd61213f90") + return req + }(), + setupMockAckExtension: func() component.Component { + return &mockAckExtension{ + processEvent: func(_ string) (ackID uint64) { + return uint64(1) + }, + ack: func(_ string, _ uint64) { + }, + } + }, + assertResponse: func(t *testing.T, resp *http.Response, body any) { + assertHecSuccessResponseWithAckID(t, resp, body, 1) + }, + assertSink: func(t *testing.T, sink *consumertest.LogsSink) { + assert.Equal(t, 1, len(sink.AllLogs())) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sink := new(consumertest.LogsSink) + rcv, err := newLogsReceiver(receivertest.NewNopCreateSettings(), *config, sink) + assert.NoError(t, err) + + r := rcv.(*splunkReceiver) + w := httptest.NewRecorder() + + mh := mockHost{extensions: map[component.ID]component.Component{ + id: tt.setupMockAckExtension(), + }} + + assert.NoError(t, r.Start(context.Background(), mh)) + defer func() { + assert.NoError(t, r.Shutdown(context.Background())) + }() + r.handleReq(w, tt.req) + + resp := w.Result() + defer resp.Body.Close() + respBytes, err := io.ReadAll(resp.Body) + assert.NoError(t, err) + + var body any + fmt.Println(string(respBytes)) + assert.NoError(t, json.Unmarshal(respBytes, &body)) + + tt.assertResponse(t, resp, body) + if tt.assertSink != nil { + tt.assertSink(t, sink) + } + }) + } +} func Test_splunkhecreceiver_handleHealthPath(t *testing.T) { config := createDefaultConfig().(*Config) @@ -1422,3 +1908,38 @@ func Test_splunkhecReceiver_healthCheck_success(t *testing.T) { }) } } + +type mockHost struct { + component.Host + extensions map[component.ID]component.Component +} + +func (h mockHost) GetExtensions() map[component.ID]component.Component { + return h.extensions +} + +type mockAckExtension struct { + queryAcks func(partitionID string, ackIDs []uint64) map[uint64]bool + ack func(partitionID string, ackID uint64) + processEvent func(partitionID string) (ackID uint64) +} + +func (ae *mockAckExtension) Start(_ context.Context, _ component.Host) error { + return nil +} + +func (ae *mockAckExtension) Shutdown(_ context.Context) error { + return nil +} + +func (ae *mockAckExtension) QueryAcks(partitionID string, ackIDs []uint64) map[uint64]bool { + return ae.queryAcks(partitionID, ackIDs) +} + +func (ae *mockAckExtension) Ack(partitionID string, ackID uint64) { + ae.ack(partitionID, ackID) +} + +func (ae *mockAckExtension) ProcessEvent(partitionID string) (ackID uint64) { + return ae.processEvent(partitionID) +} diff --git a/testbed/go.mod b/testbed/go.mod index 51c965ba34e0e..763998f5c1141 100644 --- a/testbed/go.mod +++ b/testbed/go.mod @@ -144,6 +144,7 @@ require ( github.com/hashicorp/go-rootcerts v1.0.2 // indirect github.com/hashicorp/go-version v1.6.0 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/hashicorp/nomad/api v0.0.0-20240306004928-3e7191ccb702 // indirect github.com/hashicorp/serf v0.10.1 // indirect github.com/hetznercloud/hcloud-go/v2 v2.6.0 // indirect @@ -178,6 +179,7 @@ require ( github.com/mostynb/go-grpc-compression v1.2.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension v0.98.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.98.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.98.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.98.0 // indirect @@ -362,3 +364,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../pkg/pdatautil replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../pkg/golden + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension => ../extension/ackextension diff --git a/testbed/go.sum b/testbed/go.sum index c3ca64467b3a8..d481cafeacc58 100644 --- a/testbed/go.sum +++ b/testbed/go.sum @@ -356,6 +356,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=