Skip to content

Commit

Permalink
Merge pull request #9 from berezovskyi-oleksandr/feature/configurable…
Browse files Browse the repository at this point in the history
…-batch-size

Make BatchSize configurable
  • Loading branch information
shyimo committed Aug 7, 2022
2 parents d316da7 + afc00a3 commit 704de47
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
2 changes: 2 additions & 0 deletions README.md
Expand Up @@ -11,6 +11,7 @@ FluntBit custom output plugin which allows sending messages to AWS-SQS.
| PluginTagAttribute | attribute name of the message tag | no |
| QueueMessageGroupId | the group id required for fifo queues | fifo-only |
| ProxyUrl | the proxy address between fluentbit and sqs (if exists) | no |
| BatchSize | set amount of messages to be sent in a batch request | yes |

```conf
[SERVICE]
Expand All @@ -32,6 +33,7 @@ FluntBit custom output plugin which allows sending messages to AWS-SQS.
Match *
QueueUrl http://aws-sqs-url.com
QueueRegion eu-central-1
BatchSize 10
```

## Installation
Expand Down
17 changes: 14 additions & 3 deletions out_sqs.go
Expand Up @@ -4,9 +4,10 @@ import (
"C"
"errors"
"fmt"
"os"
"strconv"
"time"
"unsafe"
"os"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
Expand All @@ -24,7 +25,7 @@ import (
// integer representation for this plugin log level
// 0 - debug
// 1 - info
// 2 - error
// 2 - error
var sqsOutLogLevel int

// MessageCounter is used for count the current SQS Batch messages
Expand All @@ -39,6 +40,7 @@ type sqsConfig struct {
mySQS *sqs.SQS
pluginTagAttribute string
proxyURL string
batchSize int
}

//export FLBPluginRegister
Expand All @@ -54,12 +56,14 @@ func FLBPluginInit(plugin unsafe.Pointer) int {
queueMessageGroupID := output.FLBPluginConfigKey(plugin, "QueueMessageGroupId")
pluginTagAttribute := output.FLBPluginConfigKey(plugin, "PluginTagAttribute")
proxyURL := output.FLBPluginConfigKey(plugin, "ProxyUrl")
batchSizeString := output.FLBPluginConfigKey(plugin, "BatchSize")

writeInfoLog(fmt.Sprintf("QueueUrl is: %s", queueURL))
writeInfoLog(fmt.Sprintf("QueueRegion is: %s", queueRegion))
writeInfoLog(fmt.Sprintf("QueueMessageGroupId is: %s", queueMessageGroupID))
writeInfoLog(fmt.Sprintf("pluginTagAttribute is: %s", pluginTagAttribute))
writeInfoLog(fmt.Sprintf("ProxyUrl is: %s", proxyURL))
writeInfoLog(fmt.Sprintf("BatchSize is: %s", batchSizeString))

if queueURL == "" {
writeErrorLog(errors.New("QueueUrl configuration key is mandatory"))
Expand All @@ -78,6 +82,12 @@ func FLBPluginInit(plugin unsafe.Pointer) int {
}
}

batchSize, err := strconv.Atoi(batchSizeString)
if err != nil || (0 > batchSize && batchSize > 10) {
writeErrorLog(errors.New("BatchSize should be integer value between 1 and 10"))
return output.FLB_ERROR
}

writeInfoLog("retrieving aws credentials from environment variables")
awsCredentials := credentials.NewEnvCredentials()
var myAWSSession *session.Session
Expand Down Expand Up @@ -126,6 +136,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int {
queueMessageGroupID: queueMessageGroupID,
mySQS: sqs.New(myAWSSession),
pluginTagAttribute: pluginTagAttribute,
batchSize: batchSize,
})

return output.FLB_OK
Expand Down Expand Up @@ -212,7 +223,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int

SqsRecords = append(SqsRecords, sqsRecord)

if MessageCounter == 10 {
if MessageCounter == sqsConf.batchSize {
err := sendBatchToSqs(sqsConf, SqsRecords)

SqsRecords = nil
Expand Down

0 comments on commit 704de47

Please sign in to comment.