Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor improvement in RabbitMQ executor #1500

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 35 additions & 4 deletions builtin/bins/dkron-executor-rabbitmq/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,26 @@
### Create a job to send rabbitmq text Message
### RabbitMQ Executor

#### Executor Configuration

The options names are inherited from the [RabbitMQ Publishers](https://www.rabbitmq.com/docs/publishers)

| Option | Required | Description | Default |
|-----------------------|----------|-------------------------------|------------|
| url | yes | RabbitMQ connection string | - |
| queue.name | yes | Queue name to send message to | - |
| queue.create | no | Create queue if not exists | false |
| queue.durable | no | Durable queue | false |
| queue.auto_delete | no | Auto delete queue | false |
| queue.exclusive | no | Exclusive queue | false |
| message.content_type | no | Message content type | text/plain |
| message.delivery_mode | no | Message delivery mode | 0 |
| message.messageId | no | Message id | "" |
| message.body | yes | Message body | - |
| message.base64Body | yes | Base64 encoded message body | - |

#### Example

```shell
curl localhost:8080/v1/jobs -XPOST -d '{
"name": "job1",
"schedule": "@every 10s",
Expand All @@ -15,8 +37,17 @@ curl localhost:8080/v1/jobs -XPOST -d '{
"concurrency": "allow",
"executor": "rabbitmq",
"executor_config": {
"url": "amqp://guest:guest@localhost:5672/",
"text": "hello world!",
"queue": "test"
"url": "amqp://guest:guest@localhost:5672/",
"queue.name": "test",
"queue.create": "true",
"queue.durable": "true",
"queue.auto_delete": "false",
"queue.exclusive": "false",
"message.content_type": "application/json",
"message.delivery_mode": "2",
"message.messageId": "4373732772",
"message.body": "{\"key\":\"value\"}"
}
}'
```

139 changes: 100 additions & 39 deletions builtin/bins/dkron-executor-rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"encoding/base64"
"errors"
"strconv"

dkplugin "github.com/distribworks/dkron/v4/plugin"
dktypes "github.com/distribworks/dkron/v4/types"
Expand All @@ -17,9 +18,16 @@ type RabbitMQ struct {
// "executor": "rabbitmq",
//
// "executor_config": {
// "url": "amqp://guest:guest@localhost:5672/", // rabbitmq server url
// "text": "hello world!", // or "base64" to send bytes as rabbitmq message
// "queue": "test", //
// "url": "amqp://guest:guest@localhost:5672/",
// "queue.name": "test",
// "queue.create": "true",
// "queue.durable": "true",
// "queue.auto_delete": "false",
// "queue.exclusive": "false",
// "message.content_type": "application/json",
// "message.delivery_mode": "2",
// "message.messageId": "4373732772",
// "message.body": "{\"key\":\"value\"}"
// }
func (r *RabbitMQ) Execute(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelper) (*dktypes.ExecuteResponse, error) {
out, err := r.ExecuteImpl(args, cb)
Expand All @@ -32,64 +40,117 @@ func (r *RabbitMQ) Execute(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelpe

// ExecuteImpl do rabbitmq publish
func (r *RabbitMQ) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelper) ([]byte, error) {
// validate config
cfg := args.Config
if cfg == nil {
return nil, errors.New("RabbitMQ config is empty")
}

url := cfg["url"]
if url == "" {
return nil, errors.New("RabbitMQ url is empty")
}

if args.Config["url"] == "" {
return nil, errors.New("url is empty")
queueName := cfg["queue.name"]
if queueName == "" {
return nil, errors.New("RabbitMQ queue name is empty")
}

if args.Config["queue"] == "" {
return nil, errors.New("queue is empty")
if cfg["message.body"] != "" && args.Config["message.base64"] != "" {
return nil, errors.New("RabbitMQ message.body and message.base64 are both set")
}

// broker := "amqp://guest:guest@localhost:5672/"
broker := args.Config["url"]
conn, err := amqp.Dial(broker)
// establish connection
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
}
defer conn.Close()

defer func(conn *amqp.Connection) {
err := conn.Close()
if err != nil {
// DO NOTHING
}
}(conn)
ch, err := conn.Channel()
if err != nil {
return nil, err
}
defer ch.Close()

queue := args.Config["queue"]
q, err := ch.QueueDeclare(
queue, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
// DO NOTHING
}
}(ch)

// create queue if necessary
if err := createQueueIfNecessary(args.Config, queueName, ch); err != nil {
return nil, err
}

// publish message
if err = publish(cfg, ch); err != nil {
return nil, err
}
return nil, nil
}

func createQueueIfNecessary(cfg map[string]string, queue string, ch *amqp.Channel) error {
if val, ok := cfg["queue.create"]; !ok || (ok && val == "false") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this query rabbit in order to check if the queue already exists?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to check if the queue exists here. If the user chooses not to create the queue via the plugin, we expect him to create it manually.
If the user doesn't declare the queue, message publishing will fail with error in logs or a failed execution in UI.

return nil
}

durable, _ := strconv.ParseBool(cfg["queue.durable"])
autoDelete, _ := strconv.ParseBool(cfg["queue.auto_delete"])
exclusive, _ := strconv.ParseBool(cfg["queue.exclusive"])

_, err := ch.QueueDeclare(
queue,
durable,
autoDelete,
exclusive,
false,
nil,
)

return err
}

func publish(cfg map[string]string, ch *amqp.Channel) error {
var body []byte
b64, ok := args.Config["base64"]
b64, ok := cfg["message.base64Body"]
if ok {
decoded, err := base64.StdEncoding.DecodeString(b64)
if err != nil {
return nil, err
return err
}
body = decoded
} else {
text := args.Config["text"]
body = []byte(text)
stringBody := cfg["message.body"]
body = []byte(stringBody)
}
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: body,
})

contentType := cfg["message.content_type"]
if contentType == "" {
contentType = "text/plain"
}
messageId := cfg["message.messageId"]
rawDeliveryMode := cfg["message.delivery_mode"]
if rawDeliveryMode == "" {
rawDeliveryMode = "0"
}
deliveryMode, err := strconv.ParseUint(rawDeliveryMode, 10, 8)
if err != nil {
return nil, err
return err
}
return nil, nil
return ch.Publish(
"", // exchange
cfg["queue.name"], // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: contentType,
Body: body,
MessageId: messageId,
DeliveryMode: uint8(deliveryMode),
})
}
4 changes: 2 additions & 2 deletions dkron/server_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func TestAddServer(t *testing.T) {

// assert
servers := lookup.Servers()
expectedServers := []*ServerParts{server1, server2}
require.EqualValuesf(t, expectedServers, servers, "Expected %v but got %v", expectedServers, servers)
require.Containsf(t, servers, server1, "Expected %v to contain %+v", servers, server1)
require.Containsf(t, servers, server2, "Expected %v to contain %+v", servers, server2)

got, err := lookup.ServerAddr(raft.ServerID(id1))
require.NoErrorf(t, err, "Unexpected error: %v", err)
Expand Down