Skip to content

Commit

Permalink
Pushing to @github
Browse files Browse the repository at this point in the history
  • Loading branch information
eko committed Jul 20, 2017
0 parents commit 6aa65e6
Show file tree
Hide file tree
Showing 10 changed files with 381 additions and 0 deletions.
7 changes: 7 additions & 0 deletions LICENSE
@@ -0,0 +1,7 @@
Copyright 2017 Unikorp

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
92 changes: 92 additions & 0 deletions README.md
@@ -0,0 +1,92 @@
WorkMQ
======

A message queue system written in Go.

This is a message queue implementation written in Go.
It allows to declare some queues, number of workers and processors that will process data sent in these queues.

Initially used for a Golang workshop study case, we've decided to put it open-source.

![Schema](doc/schema.jpg)

# Installation

First, install it:

```go
$ go get -u github.com/unikorp/workmq
```

Then, import it in your application code:

```go
import (
"github.com/unikorp/workmq"
)
```

# Configuration

Queues and workers configuration is managed using a `config.json` file in the root directory.

Here is an example JSON with 2 queues, listening on UDP port and exposing the given HTTP port:

```json
{
"ports": {
"udp": ":10001",
"http": ":8080"
},
"queues": {
"queue.1s": {
"processor": "processor.logger.1s",
"num_workers": 150
},
"queue.2s": {
"processor": "processor.logger.2s",
"num_workers": 200
}
}
}
```

Here, we have 2 queues:
* `queue.1s` that will be processed by registered processor `processor.logger.1s` and will use 150 workers (goroutines),
* `queue.2s` that will be processed by registered processor `processor.logger.2s` and will use 200 workers (goroutines).

# Usage

Here is a code example that initializes WorkMQ, registers a processor and start handling messages:

```go
package main

import (
"fmt"
"time"

"github.com/unikorp/workmq"
)

func main() {
app := workmq.Init()

app.AddProcessor("processor.logger.1s", func(worker *workmq.Worker, message workmq.Message) {
time.Sleep(time.Second * 1)

fmt.Printf("Worker #%d (queue: \"%s\") manages message %s\n", worker.ID, worker.Queue, message.Body)
})

app.Handle()
}

```

# Send data

You can send message data over UDP by sending a JSON string with the following structure:

```
{ "queue": "queue.1s", "body": "<your data>" }
```
40 changes: 40 additions & 0 deletions config.go
@@ -0,0 +1,40 @@
package workmq

import (
"encoding/json"
"fmt"
"os"
)

// QueueConfig is the "queues" configuration section type definition.
type QueueConfig struct {
Processor string `json:"processor"`
NumWorkers int `json:"num_workers"`
}

// PortsConfig is the "port" configuration section type definition.
type PortsConfig struct {
UDP string `json:"udp"`
HTTP string `json:"http"`
}

// Config is the configuration type definition.
type Config struct {
Ports PortsConfig `json:"ports"`
Queues map[string]QueueConfig `json:"queues"`
}

// GetConfig returns the configuration object that can be used anywhere in application.
func GetConfig() Config {
file, _ := os.Open("./config.json")
decoder := json.NewDecoder(file)

config := Config{}
err := decoder.Decode(&config)

if err != nil {
fmt.Println("An error occurs on configuration loading:", err)
}

return config
}
Binary file added doc/schema.jpg
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
16 changes: 16 additions & 0 deletions example/config.json
@@ -0,0 +1,16 @@
{
"ports": {
"udp": ":10001",
"http": ":8080"
},
"queues": {
"queue.1s": {
"processor": "processor.logger.1s",
"num_workers": 150
},
"queue.2s": {
"processor": "processor.logger.2s",
"num_workers": 200
}
}
}
26 changes: 26 additions & 0 deletions example/main.go
@@ -0,0 +1,26 @@
package main

import (
"fmt"
"time"

"github.com/unikorp/workmq"
)

func main() {
app := workmq.Init()

app.AddProcessor("processor.logger.1s", func(worker *workmq.Worker, message workmq.Message) {
time.Sleep(time.Second * 1)

fmt.Printf("Worker #%d (queue: \"%s\") manages message %s\n", worker.ID, worker.Queue, message.Body)
})

app.AddProcessor("processor.logger.2s", func(worker *workmq.Worker, message workmq.Message) {
time.Sleep(time.Second * 2)

fmt.Printf("Worker #%d (queue: \"%s\") manages message %s\n", worker.ID, worker.Queue, message.Body)
})

app.Handle()
}
120 changes: 120 additions & 0 deletions main.go
@@ -0,0 +1,120 @@
package workmq

import (
"fmt"
"log"
"net"
"net/http"
"sort"
"sync"
"time"

"github.com/paulbellamy/ratecounter"
)

// RateCounters containers counters
type RateCounters map[string]*ratecounter.RateCounter

// Workmq type
type Workmq struct {
Config Config
Queues map[string]chan Message
Processors map[string]Processor
Counters RateCounters
Workers []Worker
Wg sync.WaitGroup
}

// Init initializes processor part
func Init() *Workmq {
config := GetConfig()
processors := make(map[string]Processor)
queues := make(map[string]chan Message)

counters := RateCounters{
"sent": ratecounter.NewRateCounter(1 * time.Second),
}

return &Workmq{
Config: config,
Queues: queues,
Processors: processors,
Counters: counters,
}
}

// Handle handles the configuration and runs workers
func (w *Workmq) Handle() {
for queue, data := range w.Config.Queues {
w.Queues[queue] = make(chan Message, 100000)
w.Counters[queue] = ratecounter.NewRateCounter(1 * time.Second)

for i := 1; i <= data.NumWorkers; i++ {
processor, err := w.GetProcessor(data.Processor)

if err != nil {
panic("Unable to find processor: " + data.Processor)
}

current := NewWorker(i, queue, processor, w.Queues[queue], w.Counters[queue])
w.Workers = append(w.Workers, current)

go current.Process()
}
}

w.Wg.Add(2)

go w.ListenUDP()
go w.ListenHTTP()

w.Wg.Wait()
}

// ListenUDP creates a UDP server that listens for new messages
func (w *Workmq) ListenUDP() {
defer w.Wg.Done()

address, _ := net.ResolveUDPAddr("udp", w.Config.Ports.UDP)
connection, _ := net.ListenUDP("udp", address)

defer connection.Close()

buf := make([]byte, 1024)

for {
n, _, _ := connection.ReadFromUDP(buf)
w.Counters["sent"].Incr(1)

message := TransformStringToMessage(buf[0:n])
w.Queues[message.Queue] <- message
}
}

// ListenHTTP creates a HTTP server to expose statistics information
func (w *Workmq) ListenHTTP() {
defer w.Wg.Done()

http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
fmt.Fprintln(writer, fmt.Sprintf("Sent rate: %d/s", w.Counters["sent"].Rate()))

var keys []string
for key := range w.Queues {
keys = append(keys, key)
}

sort.Strings(keys)

for _, key := range keys {
fmt.Fprintln(writer, fmt.Sprintf("\n-> %s (%d workers):", key, w.Config.Queues[key].NumWorkers))
fmt.Fprintln(writer, fmt.Sprintf(" Acknowledge: %d/s", w.Counters[key].Rate()))
fmt.Fprintln(writer, fmt.Sprintf(" Messages: %d", len(w.Queues[key])))
}
})

err := http.ListenAndServe(w.Config.Ports.HTTP, nil)

if err != nil {
log.Fatal("ListenAndServe error: ", err)
}
}
24 changes: 24 additions & 0 deletions message.go
@@ -0,0 +1,24 @@
package workmq

import (
"encoding/json"
"fmt"
)

// Message struct
type Message struct {
Queue string `json:"queue"`
Body string `json:"body"`
}

// TransformStringToMessage transforms a string value to a Message struct
func TransformStringToMessage(value []byte) Message {
message := Message{}
err := json.Unmarshal(value, &message)

if err != nil {
fmt.Println("Unable to transform string to Message struct:", err)
}

return message
}
25 changes: 25 additions & 0 deletions processor.go
@@ -0,0 +1,25 @@
package workmq

import "fmt"

// Processor type
type Processor func(worker *Worker, message Message)

// AddProcessor adds a processor into the processors list
func (w *Workmq) AddProcessor(name string, processor Processor) {
w.Processors[name] = processor
}

// GetProcessor retrieves a processor from its name
func (w *Workmq) GetProcessor(name string) (Processor, error) {
if _, ok := w.Processors[name]; !ok {
return nil, fmt.Errorf("Unable to find processor '%s'", name)
}

return w.Processors[name], nil
}

// RemoveProcessor removes a processor from its name
func (w *Workmq) RemoveProcessor(name string) {
delete(w.Processors, name)
}
31 changes: 31 additions & 0 deletions worker.go
@@ -0,0 +1,31 @@
package workmq

import (
"fmt"

"github.com/paulbellamy/ratecounter"
)

// Worker struct
type Worker struct {
ID int
Queue string
Message <-chan Message
Processor Processor
Counter *ratecounter.RateCounter
}

// NewWorker creates a new Worker instance
func NewWorker(id int, queue string, processor Processor, message <-chan Message, counter *ratecounter.RateCounter) Worker {
return Worker{ID: id, Queue: queue, Processor: processor, Message: message, Counter: counter}
}

// Process listens for a processor on the worker.
func (w *Worker) Process() {
fmt.Printf("-> Worker %d ready to process queue \"%s\"...\n", w.ID, w.Queue)

for message := range w.Message {
w.Counter.Incr(1)
w.Processor(w, message)
}
}

0 comments on commit 6aa65e6

Please sign in to comment.