Skip to content

Commit

Permalink
added support for rabbitmq
Browse files Browse the repository at this point in the history
  • Loading branch information
dmportella committed Jan 25, 2017
1 parent 973cff0 commit 4aa64ed
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 21 deletions.
17 changes: 14 additions & 3 deletions DOCKERHUB.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ A simple docker event beat server that will distribute docker events to plugins/
# Supported tags and respective `Dockerfile` links

* [`0.0.1`, (scratch/dockerfile)](https://github.com/dmportella/docker-beat/blob/0.0.1/dockerfile)
* [`0.0.2`, `latest` (scratch/dockerfile)](https://github.com/dmportella/docker-beat/blob/0.0.2/dockerfile)
* [`0.0.2`, (scratch/dockerfile)](https://github.com/dmportella/docker-beat/blob/0.0.2/dockerfile)
* [`0.0.3`, `latest` (scratch/dockerfile)](https://github.com/dmportella/docker-beat/blob/0.0.3/dockerfile)

## Running in Docker

Expand All @@ -22,15 +23,25 @@ The docker container supports Docker API Socket as a volume (not recommended) or
See below the list of available arguments.

```
docker-beat - Version: 0.0.2 Branch: master Revision: a798731. OSArch: linux/amd64.
docker-beat - Version: 0.0.3 Branch: master Revision: 973cff0. OSArch: linux/amd64.
Daniel Portella (c) 2016
Usage of ./bin/docker-beat:
-consumer string
Consumer to use: Webhook, Rabbitmq, etc. (default "console")
Consumer to use: webhook, rabbitmq, etc. (default "console")
-docker-endpoint string
The Url or unix socket address for the Docker Remote API. (default "unix:///var/run/docker.sock")
-help
Prints the help information.
-rabbitmq-endpoint string
rabbitmq: The URL that events will be published too. (default "amqp://guest:guest@localhost:5672/")
-rabbitmq-exchange string
rabbitmq: The exchange docker-beat will publish messages too. (default "docker-beat")
-rabbitmq-exchange-type string
rabbitmq: The exchange type that docker-beat will create/connect too. (direct|fanout|topic|x-custom) (default "fanout")
-rabbitmq-reliable
rabbitmq: The ensures messages published are confirmed.
-rabbitmq-routing-key string
rabbitmq: The routing key for messages published to the exchange. (default: docker-event (default "docker-event")
-verbose
Redirect trace information to the standard out.
-webhook-endpoint string
Expand Down
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,25 @@ The docker container supports Docker API Socket as a volume (not recommended) or
Should be simple to run the application locally it differs just slightly between OS.

```
docker-beat - Version: 0.0.2 Branch: master Revision: a798731. OSArch: linux/amd64.
docker-beat - Version: 0.0.3 Branch: master Revision: 973cff0. OSArch: linux/amd64.
Daniel Portella (c) 2016
Usage of ./bin/docker-beat:
-consumer string
Consumer to use: Webhook, Rabbitmq, etc. (default "console")
Consumer to use: webhook, rabbitmq, etc. (default "console")
-docker-endpoint string
The Url or unix socket address for the Docker Remote API. (default "unix:///var/run/docker.sock")
-help
Prints the help information.
-rabbitmq-endpoint string
rabbitmq: The URL that events will be published too. (default "amqp://guest:guest@localhost:5672/")
-rabbitmq-exchange string
rabbitmq: The exchange docker-beat will publish messages too. (default "docker-beat")
-rabbitmq-exchange-type string
rabbitmq: The exchange type that docker-beat will create/connect too. (direct|fanout|topic|x-custom) (default "fanout")
-rabbitmq-reliable
rabbitmq: The ensures messages published are confirmed.
-rabbitmq-routing-key string
rabbitmq: The routing key for messages published to the exchange. (default: docker-event (default "docker-event")
-verbose
Redirect trace information to the standard out.
-webhook-endpoint string
Expand Down
142 changes: 140 additions & 2 deletions builtin/consumers/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,155 @@
package rabbitmq

import (
"encoding/json"
"flag"
"fmt"
"github.com/dmportella/docker-beat/logging"
"github.com/dmportella/docker-beat/plugin"
_ "github.com/streadway/amqp" // Not currently used
"github.com/streadway/amqp"
)

var (
rabbitmqEndpoint string
rabbitmqExchange string
rabbitmqExchangeType string
rabbitmqRoutingKey string
rabbitmqReliable bool
)

const (
defaultRabbitmqEndpoint = "amqp://guest:guest@localhost:5672/"
rabbitmqEndpointUsage = "rabbitmq: The URL that events will be published too."

defaultRabbitmqExchange = "docker-beat"
rabbitmqExchangeUsage = "rabbitmq: The exchange docker-beat will publish messages too."

defaultRabbitmqExchangeType = "fanout"
rabbitmqExchangeTypeUsage = "rabbitmq: The exchange type that docker-beat will create/connect too. (direct|fanout|topic|x-custom)"

defaultRabbitmqRoutingKey = "docker-event"
rabbitmqRoutingKeyUsage = "rabbitmq: The routing key for messages published to the exchange. (default: docker-event"

defaultRabbitmqReliable = false
rabbitmqReliableUsage = "rabbitmq: The ensures messages published are confirmed."
)

type consumer struct {
Debug bool
Indent bool
endpoint string
exchange string
exchangeType string
routingKey string
reliable bool
connection *amqp.Connection
}

func (consumer *consumer) OnEvent(event plugin.DockerEvent) {
var data []byte

if consumer.Indent {
data, _ = json.MarshalIndent(event, "", " ")
} else {
data, _ = json.Marshal(event)
}

err := consumer.publish(data)
if err != nil {
logging.Error.Printf("Error publishing event '%s'\n", err)
}
}

func init() {
// do something here
flag.StringVar(&rabbitmqEndpoint, "rabbitmq-endpoint", defaultRabbitmqEndpoint, rabbitmqEndpointUsage)
flag.StringVar(&rabbitmqExchange, "rabbitmq-exchange", defaultRabbitmqExchange, rabbitmqExchangeUsage)
flag.StringVar(&rabbitmqExchangeType, "rabbitmq-exchange-type", defaultRabbitmqExchangeType, rabbitmqExchangeTypeUsage)
flag.StringVar(&rabbitmqRoutingKey, "rabbitmq-routing-key", defaultRabbitmqRoutingKey, rabbitmqRoutingKeyUsage)
flag.BoolVar(&rabbitmqReliable, "rabbitmq-reliable", defaultRabbitmqReliable, rabbitmqReliableUsage)

consumer := &consumer{
endpoint: rabbitmqEndpoint,
exchange: rabbitmqExchange,
exchangeType: rabbitmqExchangeType,
routingKey: rabbitmqRoutingKey,
reliable: rabbitmqReliable,
}

plugin.RegisterConsumer("rabbitmq", consumer)
}

func (consumer *consumer) publish(body []byte) error {
if consumer.connection == nil {
consumer.endpoint = rabbitmqEndpoint
consumer.exchange = rabbitmqExchange
consumer.exchangeType = rabbitmqExchangeType
consumer.routingKey = rabbitmqRoutingKey
consumer.reliable = rabbitmqReliable

connection, err := amqp.Dial(consumer.endpoint)
if err != nil {
return fmt.Errorf("Dial: %s", err)
}

consumer.connection = connection
}

channel, err := consumer.connection.Channel()
if err != nil {
return fmt.Errorf("Channel: %s", err)
}

if err := channel.ExchangeDeclare(
consumer.exchange, // name
consumer.exchangeType, // type
true, // durable
false, // auto-deleted
false, // internal
false, // noWait
nil, // arguments
); err != nil {
return fmt.Errorf("Exchange Declare: %s", err)
}

// Reliable publisher confirms require confirm.select support from the
// connection.
if consumer.reliable {
if err := channel.Confirm(false); err != nil {
return fmt.Errorf("Channel could not be put into confirm mode: %s", err)
}

confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 1))

defer confirmOne(confirms)
}

if err = channel.Publish(
consumer.exchange, // publish to an exchange
consumer.routingKey, // routing to 0 or more queues
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "application/json",
ContentEncoding: "utf-8",
Body: body,
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
// a bunch of application/implementation-specific fields
},
); err != nil {
return fmt.Errorf("Exchange Publish: %s", err)
}

return nil
}

func confirmOne(confirms <-chan amqp.Confirmation) {
logging.Info.Printf("Waiting for confirmation of one publishing.")

if confirmed := <-confirms; confirmed.Ack {
logging.Info.Printf("Confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
} else {
logging.Error.Printf("Failed delivery of delivery tag: %d", confirmed.DeliveryTag)
}
}
4 changes: 1 addition & 3 deletions builtin/consumers/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type consumer struct {
}

func (consumer *consumer) OnEvent(event plugin.DockerEvent) {

var data []byte

if webHookIndent {
Expand All @@ -53,14 +52,13 @@ func (consumer *consumer) OnEvent(event plugin.DockerEvent) {
}

func init() {
// do something here
flag.StringVar(&webHookEnpoint, "webhook-endpoint", defaultWebHookEndpoint, webHookEnpointUsage)
flag.BoolVar(&webHookIndent, "webhook-indent", defaultwebHookIndent, webHookIndentUsage)
flag.BoolVar(&webhookSkipSSLVerify, "webhook-skip-ssl-verify", defaultSkipSSLVerifyIndent, skipSSLVerifyUsage)

consumer := &consumer{}

plugin.RegisterConsumer(consumer)
plugin.RegisterConsumer("webhook", consumer)
}

func (consumer *consumer) request(method string, url string, b []byte) (response []byte, err error) {
Expand Down
7 changes: 6 additions & 1 deletion docker-beat_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ func (dockerbeat *dockerBeat) dockerEventListener() {

if dockerbeat.consumer != "console" {
eventWrapper := plugin.DockerEvent{event}
go plugin.GetConsumer().OnEvent(eventWrapper)
consumer := plugin.GetConsumer(dockerbeat.consumer)
if consumer != nil {
go consumer.OnEvent(eventWrapper)
} else {
logging.Error.Printf("Consumer '%s' is not available.", dockerbeat.consumer)
}
}
}
}
6 changes: 3 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ func init() {
ConsumerUsage = "Consumer to use: Webhook, Rabbitmq, etc."
)

flag.StringVar(&DockerEndpoint, "docker-endpoint", defaultDockerEndpoint, dockerEndpointUsage)
flag.StringVar(&Consumer, "consumer", defaultConsumer, ConsumerUsage)

const (
defaultHelp = false
helpUsage = "Prints the help information."
defaultVerbose = false
verboseUsage = "Redirect trace information to the standard out."
)

flag.StringVar(&DockerEndpoint, "docker-endpoint", defaultDockerEndpoint, dockerEndpointUsage)
flag.StringVar(&Consumer, "consumer", defaultConsumer, ConsumerUsage)
flag.BoolVar(&Verbose, "verbose", defaultVerbose, verboseUsage)
flag.BoolVar(&Help, "help", defaultHelp, helpUsage)

flag.Parse()

flag.Usage = func() {
Expand Down
18 changes: 11 additions & 7 deletions plugin/registery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@ type EventConsumer interface {
}

var (
consumers = []EventConsumer{}
consumers map[string]EventConsumer
)

// RegisterConsumer adds the consumer to the registry.
func RegisterConsumer(consumer EventConsumer) {
consumers = append(consumers, consumer)
func init() {
consumers = make(map[string]EventConsumer)
}

// GetConsumer temp method for testing
func GetConsumer() (consumer EventConsumer) {
return consumers[0]
// RegisterConsumer Adds a consumer to the registry.
func RegisterConsumer(name string, consumer EventConsumer) {
consumers[name] = consumer
}

// GetConsumer Returns the selected consumer
func GetConsumer(name string) (consumer EventConsumer) {
return consumers[name]
}

0 comments on commit 4aa64ed

Please sign in to comment.