Skip to content

Commit

Permalink
aws-s3 input: Split S3 poller and SQS reader into explicit input ob…
Browse files Browse the repository at this point in the history
…jects (#39353)

A large cleanup in the `aws-s3` input, reorganizing the file structure and splitting internal APIs into additional helpers.

This change is meant to have no functional effect, it is strictly a cleanup and reorganization in preparation for future changes. The hope is that the new layout makes initialization steps and logical dependencies clearer. The main changes are:

- Make `s3Poller` and `sqsReader` into standalone input objects, `s3PollerInput` and `sqsReaderInput`, that implement the `v2.Input` interface, instead of interleaving the two implementations within the same object.
  * Choose the appropriate input in `(*s3InputManager).Create` based on configuration
  * Move associated internal API out of the shared `input.go` into the new `s3_input.go` and `sqs_input.go`, while leaving `s3.go` and `sqs.go` for auxiliary helpers.
  * Give each input a copy of `config` and `awsConfig`, and remove redundant struct fields that simply shadowed fields already in those configs.
- In `sqsReaderInput`, use a fixed set of worker goroutines and track task allocation via channel-based work requests instead of creating ephemeral workers via the previous custom semaphore implementation (similar to the [recent cloudwatch cleanup](#38953)).
  * Delete `aws.Sem`, since this was its last remaining caller
- Collect the helpers related to approximate message count polling into a helper object, `messageCountMonitor`, so their role in the input is clearer.
- Generally, break larger steps up into smaller helper functions
- Generally, collect initialization dependencies in the same place so the sequencing is clearer.
  • Loading branch information
faec committed May 9, 2024
1 parent 07892f1 commit cc35cce
Show file tree
Hide file tree
Showing 20 changed files with 1,010 additions and 1,009 deletions.
60 changes: 60 additions & 0 deletions x-pack/filebeat/input/awss3/config.go
Expand Up @@ -9,6 +9,10 @@ import (
"fmt"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/retry"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/dustin/go-humanize"

"github.com/elastic/beats/v7/libbeat/common/cfgtype"
Expand Down Expand Up @@ -222,3 +226,59 @@ func (rc *readerConfig) InitDefaults() {
rc.MaxBytes = 10 * humanize.MiByte
rc.LineTerminator = readfile.AutoLineTerminator
}

func (c config) getBucketName() string {
if c.NonAWSBucketName != "" {
return c.NonAWSBucketName
}
if c.BucketARN != "" {
return getBucketNameFromARN(c.BucketARN)
}
return ""
}

func (c config) getBucketARN() string {
if c.NonAWSBucketName != "" {
return c.NonAWSBucketName
}
if c.BucketARN != "" {
return c.BucketARN
}
return ""
}

// An AWS SDK callback to apply the input configuration's settings to an S3
// options struct.
// Should be provided as a parameter to s3.NewFromConfig.
func (c config) s3ConfigModifier(o *s3.Options) {
if c.NonAWSBucketName != "" {
o.EndpointResolver = nonAWSBucketResolver{endpoint: c.AWSConfig.Endpoint}
}

if c.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
o.UsePathStyle = c.PathStyle

o.Retryer = retry.NewStandard(func(so *retry.StandardOptions) {
so.MaxAttempts = 5
// Recover quickly when requests start working again
so.NoRetryIncrement = 100
})
}

// An AWS SDK callback to apply the input configuration's settings to an SQS
// options struct.
// Should be provided as a parameter to sqs.NewFromConfig.
func (c config) sqsConfigModifier(o *sqs.Options) {
if c.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
}

func (c config) getFileSelectors() []fileSelectorConfig {
if len(c.FileSelectors) > 0 {
return c.FileSelectors
}
return []fileSelectorConfig{{ReaderConfig: c.ReaderConfig}}
}

0 comments on commit cc35cce

Please sign in to comment.