Skip to content

Commit

Permalink
minor improvement in rabbit mq executor
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-kripakov-m10 committed Mar 19, 2024
1 parent 203e31d commit ee97ccf
Show file tree
Hide file tree
Showing 5 changed files with 1,276 additions and 67 deletions.
39 changes: 35 additions & 4 deletions builtin/bins/dkron-executor-rabbitmq/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,26 @@
### Create a job to send rabbitmq text Message
### RabbitMQ Executor

#### Executor Configuration

The options names are inherited from the [RabbitMQ Publishers](https://www.rabbitmq.com/docs/publishers)

| Option | Description | Default |
|-----------------------|-------------------------------|------------|
| url | RabbitMQ connection string | - |
| queue.name | Queue name to send message to | - |
| queue.create | Create queue if not exists | false |
| queue.durable | Durable queue | false |
| queue.auto_delete | Auto delete queue | false |
| queue.exclusive | Exclusive queue | false |
| message.content_type | Message content type | text/plain |
| message.delivery_mode | Message delivery mode | 0 |
| message.messageId | Message id | "" |
| message.body | Message body | - |
| message.base64Body | Base64 encoded message body | - |

#### Example

```shell
curl localhost:8080/v1/jobs -XPOST -d '{
"name": "job1",
"schedule": "@every 10s",
Expand All @@ -15,8 +37,17 @@ curl localhost:8080/v1/jobs -XPOST -d '{
"concurrency": "allow",
"executor": "rabbitmq",
"executor_config": {
"url": "amqp://guest:guest@localhost:5672/",
"text": "hello world!",
"queue": "test"
"url": "amqp://guest:guest@localhost:5672/",
"queue.name": "test",
"queue.create": "true",
"queue.durable": "true",
"queue.auto_delete": "false",
"queue.exclusive": "false",
"message.content_type": "application/json",
"message.delivery_mode": "2",
"message.messageId": "4373732772",
"message.body": "{\"key\":\"value\"}"
}
}'
```

136 changes: 99 additions & 37 deletions builtin/bins/dkron-executor-rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"encoding/base64"
"errors"
"strconv"

dkplugin "github.com/distribworks/dkron/v4/plugin"
dktypes "github.com/distribworks/dkron/v4/plugin/types"
Expand All @@ -17,9 +18,16 @@ type RabbitMQ struct {
// "executor": "rabbitmq",
//
// "executor_config": {
// "url": "amqp://guest:guest@localhost:5672/", // rabbitmq server url
// "text": "hello world!", // or "base64" to send bytes as rabbitmq message
// "queue": "test", //
// "url": "amqp://guest:guest@localhost:5672/",
// "queue.name": "test",
// "queue.create": "true",
// "queue.durable": "true",
// "queue.auto_delete": "false",
// "queue.exclusive": "false",
// "message.content_type": "application/json",
// "message.delivery_mode": "2",
// "message.messageId": "4373732772",
// "message.body": "{\"key\":\"value\"}"
// }
func (r *RabbitMQ) Execute(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelper) (*dktypes.ExecuteResponse, error) {
out, err := r.ExecuteImpl(args, cb)
Expand All @@ -32,64 +40,118 @@ func (r *RabbitMQ) Execute(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelpe

// ExecuteImpl do rabbitmq publish
func (r *RabbitMQ) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelper) ([]byte, error) {
// validate config
cfg := args.Config
if cfg == nil {
return nil, errors.New("RabbitMQ config is empty")
}

url := cfg["url"]
if url == "" {
return nil, errors.New("RabbitMQ url is empty")
}

if args.Config["url"] == "" {
return nil, errors.New("url is empty")
queueName := cfg["queue.name"]
if queueName == "" {
return nil, errors.New("RabbitMQ queue name is empty")
}

if args.Config["queue"] == "" {
return nil, errors.New("queue is empty")
if cfg["message.body"] != "" && args.Config["message.base64"] != "" {
return nil, errors.New("RabbitMQ message.body and message.base64 are both set")
}

// broker := "amqp://guest:guest@localhost:5672/"
// establish connection
broker := args.Config["url"]
conn, err := amqp.Dial(broker)
if err != nil {
return nil, err
}
defer conn.Close()

defer func(conn *amqp.Connection) {
err := conn.Close()
if err != nil {
// DO NOTHING
}
}(conn)
ch, err := conn.Channel()
if err != nil {
return nil, err
}
defer ch.Close()

queue := args.Config["queue"]
q, err := ch.QueueDeclare(
queue, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
// DO NOTHING
}
}(ch)

// create queue if necessary
if err := createQueueIfNecessary(args.Config, queueName, ch); err != nil {
return nil, err
}

// publish message
if err = publish(cfg, ch); err != nil {
return nil, err
}
return nil, nil
}

func createQueueIfNecessary(cfg map[string]string, queue string, ch *amqp.Channel) error {
if val, ok := cfg["queue.create"]; !ok || (ok && val == "false") {
return nil
}

durable, _ := strconv.ParseBool(cfg["queue.durable"])
autoDelete, _ := strconv.ParseBool(cfg["queue.auto_delete"])
exclusive, _ := strconv.ParseBool(cfg["queue.exclusive"])

_, err := ch.QueueDeclare(
queue,
durable,
autoDelete,
exclusive,
false,
nil,
)

return err
}

func publish(cfg map[string]string, ch *amqp.Channel) error {
var body []byte
b64, ok := args.Config["base64"]
b64, ok := cfg["message.base64Body"]
if ok {
decoded, err := base64.StdEncoding.DecodeString(b64)
if err != nil {
return nil, err
return err
}
body = decoded
} else {
text := args.Config["text"]
body = []byte(text)
stringBody := cfg["message.body"]
body = []byte(stringBody)
}
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: body,
})

contentType := cfg["message.content_type"]
if contentType == "" {
contentType = "text/plain"
}
messageId := cfg["message.messageId"]
rawDeliveryMode := cfg["message.delivery_mode"]
if rawDeliveryMode == "" {
rawDeliveryMode = "0"
}
deliveryMode, err := strconv.ParseUint(rawDeliveryMode, 10, 8)
if err != nil {
return nil, err
return err
}
return nil, nil
return ch.Publish(
"", // exchange
cfg["queue.name"], // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: contentType,
Body: body,
MessageId: messageId,
DeliveryMode: uint8(deliveryMode),
})
}
89 changes: 88 additions & 1 deletion builtin/bins/dkron-executor-rabbitmq/rabbitmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,18 @@ package main

import (
"fmt"
dktypes "github.com/distribworks/dkron/v4/plugin/types"
"github.com/streadway/amqp"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/rabbitmq"
"golang.org/x/net/context"
"testing"
)

dktypes "github.com/distribworks/dkron/v4/plugin/types"
const (
username = "guest"
password = "guest"
)

func TestPublishExecute(t *testing.T) {
Expand All @@ -24,3 +33,81 @@ func TestPublishExecute(t *testing.T) {
t.Fatal(err)
}
}

func TestExecute(t *testing.T) {
// arrange
ctx := context.Background()

rabbitmqContainer, err := rabbitmq.RunContainer(ctx,
testcontainers.WithImage("rabbitmq:3.12.11-management-alpine"),
rabbitmq.WithAdminUsername(username),
rabbitmq.WithAdminPassword(password),
)
require.NoError(t, err)

defer func() {
if err := rabbitmqContainer.Terminate(ctx); err != nil {
t.Fatal(err)
}
}()

port, err := rabbitmqContainer.MappedPort(ctx, "5672")
require.NoError(t, err)
pa := &dktypes.ExecuteRequest{
JobName: "testJob",
Config: map[string]string{
"url": fmt.Sprintf("amqp://%s:%s@localhost:%s", username, password, port.Port()),
"message.body": "{\"hello\":11}",
"queue.name": "test",
"queue.create": "true",
"queue.auto_delete": "false",
"queue.durable": "true",
"queue.exclusive": "false",
"message.content_type": "application/json",
"message.delivery_mode": "2",
"message.messageId": "4373732772",
},
}
// act
rmq := &RabbitMQ{}
output, err := rmq.Execute(pa, nil)

// assert
require.NoError(t, err)
require.NotNil(t, output)
// consume and assert the message
conn, err := amqp.Dial(pa.Config["url"])
require.NoError(t, err)
defer func(conn *amqp.Connection) {
err := conn.Close()
if err != nil {
// DO NOTHING
}
}(conn)

ch, err := conn.Channel()
require.NoError(t, err)
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
// DO NOTHING
}
}(ch)

consume, err := ch.Consume(
"test",
"",
true,
false,
false,
false,
nil,
)
require.NoError(t, err)
msg := <-consume
require.Equal(t, "{\"hello\":11}", string(msg.Body))
require.EqualValuesf(t, "test", msg.RoutingKey, "expected %s, got %s", "test", msg.RoutingKey)
require.EqualValuesf(t, "4373732772", msg.MessageId, "expected %s, got %s", "4373732772", msg.MessageId)
require.EqualValuesf(t, "application/json", msg.ContentType, "expected %s, got %s", "application/json", msg.ContentType)
require.EqualValuesf(t, 0x2, msg.DeliveryMode, "expected %d, got %d", 0x2, 0x2)
}
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ module github.com/distribworks/dkron/v4
require (
cloud.google.com/go/pubsub v1.36.0
github.com/DataDog/datadog-go v4.8.3+incompatible // indirect
github.com/Microsoft/go-winio v0.5.1 // indirect
github.com/Shopify/sarama v1.38.1
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2
github.com/armon/go-metrics v0.4.1
github.com/aws/aws-sdk-go v1.42.18 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/devopsfaith/krakend-usage v1.4.0
github.com/fluent/fluent-logger-golang v1.9.0
github.com/fullstorydev/grpcurl v1.8.9
Expand Down Expand Up @@ -47,6 +44,8 @@ require (
github.com/streadway/amqp v1.1.0
github.com/stretchr/testify v1.8.4
github.com/tencentcloud/tencentcloud-sdk-go v3.0.83+incompatible // indirect
github.com/testcontainers/testcontainers-go v0.28.0
github.com/testcontainers/testcontainers-go/modules/rabbitmq v0.28.0
github.com/tidwall/buntdb v1.2.7
github.com/ugorji/go v1.2.7 // indirect
golang.org/x/net v0.20.0
Expand Down

0 comments on commit ee97ccf

Please sign in to comment.