diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..47102e5 --- /dev/null +++ b/LICENSE @@ -0,0 +1,7 @@ +Copyright 2017 Unikorp + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..b7ef8ef --- /dev/null +++ b/README.md @@ -0,0 +1,92 @@ +WorkMQ +====== + +A message queue system written in Go. + +This is a message queue implementation written in Go. +It allows to declare some queues, number of workers and processors that will process data sent in these queues. + +Initially used for a Golang workshop study case, we've decided to put it open-source. + +![Schema](doc/schema.jpg) + +# Installation + +First, install it: + +```go +$ go get -u github.com/unikorp/workmq +``` + +Then, import it in your application code: + +```go +import ( + "github.com/unikorp/workmq" +) +``` + +# Configuration + +Queues and workers configuration is managed using a `config.json` file in the root directory. + +Here is an example JSON with 2 queues, listening on UDP port and exposing the given HTTP port: + +```json +{ + "ports": { + "udp": ":10001", + "http": ":8080" + }, + "queues": { + "queue.1s": { + "processor": "processor.logger.1s", + "num_workers": 150 + }, + "queue.2s": { + "processor": "processor.logger.2s", + "num_workers": 200 + } + } +} +``` + +Here, we have 2 queues: +* `queue.1s` that will be processed by registered processor `processor.logger.1s` and will use 150 workers (goroutines), +* `queue.2s` that will be processed by registered processor `processor.logger.2s` and will use 200 workers (goroutines). + +# Usage + +Here is a code example that initializes WorkMQ, registers a processor and start handling messages: + +```go +package main + +import ( + "fmt" + "time" + + "github.com/unikorp/workmq" +) + +func main() { + app := workmq.Init() + + app.AddProcessor("processor.logger.1s", func(worker *workmq.Worker, message workmq.Message) { + time.Sleep(time.Second * 1) + + fmt.Printf("Worker #%d (queue: \"%s\") manages message %s\n", worker.ID, worker.Queue, message.Body) + }) + + app.Handle() +} + +``` + +# Send data + +You can send message data over UDP by sending a JSON string with the following structure: + +``` +{ "queue": "queue.1s", "body": "" } +``` \ No newline at end of file diff --git a/config.go b/config.go new file mode 100644 index 0000000..045fc0d --- /dev/null +++ b/config.go @@ -0,0 +1,40 @@ +package workmq + +import ( + "encoding/json" + "fmt" + "os" +) + +// QueueConfig is the "queues" configuration section type definition. +type QueueConfig struct { + Processor string `json:"processor"` + NumWorkers int `json:"num_workers"` +} + +// PortsConfig is the "port" configuration section type definition. +type PortsConfig struct { + UDP string `json:"udp"` + HTTP string `json:"http"` +} + +// Config is the configuration type definition. +type Config struct { + Ports PortsConfig `json:"ports"` + Queues map[string]QueueConfig `json:"queues"` +} + +// GetConfig returns the configuration object that can be used anywhere in application. +func GetConfig() Config { + file, _ := os.Open("./config.json") + decoder := json.NewDecoder(file) + + config := Config{} + err := decoder.Decode(&config) + + if err != nil { + fmt.Println("An error occurs on configuration loading:", err) + } + + return config +} diff --git a/doc/schema.jpg b/doc/schema.jpg new file mode 100644 index 0000000..95980a0 Binary files /dev/null and b/doc/schema.jpg differ diff --git a/example/config.json b/example/config.json new file mode 100644 index 0000000..4da529a --- /dev/null +++ b/example/config.json @@ -0,0 +1,16 @@ +{ + "ports": { + "udp": ":10001", + "http": ":8080" + }, + "queues": { + "queue.1s": { + "processor": "processor.logger.1s", + "num_workers": 150 + }, + "queue.2s": { + "processor": "processor.logger.2s", + "num_workers": 200 + } + } +} \ No newline at end of file diff --git a/example/main.go b/example/main.go new file mode 100644 index 0000000..d34fb07 --- /dev/null +++ b/example/main.go @@ -0,0 +1,26 @@ +package main + +import ( + "fmt" + "time" + + "github.com/unikorp/workmq" +) + +func main() { + app := workmq.Init() + + app.AddProcessor("processor.logger.1s", func(worker *workmq.Worker, message workmq.Message) { + time.Sleep(time.Second * 1) + + fmt.Printf("Worker #%d (queue: \"%s\") manages message %s\n", worker.ID, worker.Queue, message.Body) + }) + + app.AddProcessor("processor.logger.2s", func(worker *workmq.Worker, message workmq.Message) { + time.Sleep(time.Second * 2) + + fmt.Printf("Worker #%d (queue: \"%s\") manages message %s\n", worker.ID, worker.Queue, message.Body) + }) + + app.Handle() +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..fc428eb --- /dev/null +++ b/main.go @@ -0,0 +1,120 @@ +package workmq + +import ( + "fmt" + "log" + "net" + "net/http" + "sort" + "sync" + "time" + + "github.com/paulbellamy/ratecounter" +) + +// RateCounters containers counters +type RateCounters map[string]*ratecounter.RateCounter + +// Workmq type +type Workmq struct { + Config Config + Queues map[string]chan Message + Processors map[string]Processor + Counters RateCounters + Workers []Worker + Wg sync.WaitGroup +} + +// Init initializes processor part +func Init() *Workmq { + config := GetConfig() + processors := make(map[string]Processor) + queues := make(map[string]chan Message) + + counters := RateCounters{ + "sent": ratecounter.NewRateCounter(1 * time.Second), + } + + return &Workmq{ + Config: config, + Queues: queues, + Processors: processors, + Counters: counters, + } +} + +// Handle handles the configuration and runs workers +func (w *Workmq) Handle() { + for queue, data := range w.Config.Queues { + w.Queues[queue] = make(chan Message, 100000) + w.Counters[queue] = ratecounter.NewRateCounter(1 * time.Second) + + for i := 1; i <= data.NumWorkers; i++ { + processor, err := w.GetProcessor(data.Processor) + + if err != nil { + panic("Unable to find processor: " + data.Processor) + } + + current := NewWorker(i, queue, processor, w.Queues[queue], w.Counters[queue]) + w.Workers = append(w.Workers, current) + + go current.Process() + } + } + + w.Wg.Add(2) + + go w.ListenUDP() + go w.ListenHTTP() + + w.Wg.Wait() +} + +// ListenUDP creates a UDP server that listens for new messages +func (w *Workmq) ListenUDP() { + defer w.Wg.Done() + + address, _ := net.ResolveUDPAddr("udp", w.Config.Ports.UDP) + connection, _ := net.ListenUDP("udp", address) + + defer connection.Close() + + buf := make([]byte, 1024) + + for { + n, _, _ := connection.ReadFromUDP(buf) + w.Counters["sent"].Incr(1) + + message := TransformStringToMessage(buf[0:n]) + w.Queues[message.Queue] <- message + } +} + +// ListenHTTP creates a HTTP server to expose statistics information +func (w *Workmq) ListenHTTP() { + defer w.Wg.Done() + + http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { + fmt.Fprintln(writer, fmt.Sprintf("Sent rate: %d/s", w.Counters["sent"].Rate())) + + var keys []string + for key := range w.Queues { + keys = append(keys, key) + } + + sort.Strings(keys) + + for _, key := range keys { + fmt.Fprintln(writer, fmt.Sprintf("\n-> %s (%d workers):", key, w.Config.Queues[key].NumWorkers)) + fmt.Fprintln(writer, fmt.Sprintf(" Acknowledge: %d/s", w.Counters[key].Rate())) + fmt.Fprintln(writer, fmt.Sprintf(" Messages: %d", len(w.Queues[key]))) + } + }) + + err := http.ListenAndServe(w.Config.Ports.HTTP, nil) + + if err != nil { + log.Fatal("ListenAndServe error: ", err) + } +} diff --git a/message.go b/message.go new file mode 100644 index 0000000..e0540ce --- /dev/null +++ b/message.go @@ -0,0 +1,24 @@ +package workmq + +import ( + "encoding/json" + "fmt" +) + +// Message struct +type Message struct { + Queue string `json:"queue"` + Body string `json:"body"` +} + +// TransformStringToMessage transforms a string value to a Message struct +func TransformStringToMessage(value []byte) Message { + message := Message{} + err := json.Unmarshal(value, &message) + + if err != nil { + fmt.Println("Unable to transform string to Message struct:", err) + } + + return message +} diff --git a/processor.go b/processor.go new file mode 100644 index 0000000..65631e1 --- /dev/null +++ b/processor.go @@ -0,0 +1,25 @@ +package workmq + +import "fmt" + +// Processor type +type Processor func(worker *Worker, message Message) + +// AddProcessor adds a processor into the processors list +func (w *Workmq) AddProcessor(name string, processor Processor) { + w.Processors[name] = processor +} + +// GetProcessor retrieves a processor from its name +func (w *Workmq) GetProcessor(name string) (Processor, error) { + if _, ok := w.Processors[name]; !ok { + return nil, fmt.Errorf("Unable to find processor '%s'", name) + } + + return w.Processors[name], nil +} + +// RemoveProcessor removes a processor from its name +func (w *Workmq) RemoveProcessor(name string) { + delete(w.Processors, name) +} diff --git a/worker.go b/worker.go new file mode 100644 index 0000000..3a9e024 --- /dev/null +++ b/worker.go @@ -0,0 +1,31 @@ +package workmq + +import ( + "fmt" + + "github.com/paulbellamy/ratecounter" +) + +// Worker struct +type Worker struct { + ID int + Queue string + Message <-chan Message + Processor Processor + Counter *ratecounter.RateCounter +} + +// NewWorker creates a new Worker instance +func NewWorker(id int, queue string, processor Processor, message <-chan Message, counter *ratecounter.RateCounter) Worker { + return Worker{ID: id, Queue: queue, Processor: processor, Message: message, Counter: counter} +} + +// Process listens for a processor on the worker. +func (w *Worker) Process() { + fmt.Printf("-> Worker %d ready to process queue \"%s\"...\n", w.ID, w.Queue) + + for message := range w.Message { + w.Counter.Incr(1) + w.Processor(w, message) + } +}