Skip to content

Commit

Permalink
Merge pull request #49 from CapillarySoftware/add_nano_msg
Browse files Browse the repository at this point in the history
add nanomsg support
  • Loading branch information
vrecan committed Aug 16, 2014
2 parents b2d412f + 6202eb5 commit 18b1430
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 7 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ language: go
before_install:
- sudo apt-get install libzmq3-dev
install:
- wget http://download.nanomsg.org/nanomsg-0.4-beta.tar.gz
- tar -xzvf nanomsg-0.4-beta.tar.gz
- cd nanomsg-0.4-beta
- ./configure && make && sudo make install && cd .. && sudo ldconfig
- go get github.com/tools/godep
- go get code.google.com/p/go.tools/cmd/cover
- go get github.com/onsi/ginkgo/ginkgo
Expand Down
4 changes: 4 additions & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
"ImportPath": "github.com/dustin/go-humanize",
"Rev": "3cd90eb3d932037b141d4daff9a012e1aee7e765"
},
{
"ImportPath": "github.com/op/go-nanomsg",
"Rev": "56d628e7c641959b9dc713a83e21dc178aee25bd"
},
{
"ImportPath": "github.com/pebbe/zmq3",
"Rev": "5828282ee2f245c309aa6f03a287f5713d4cde7b"
Expand Down
19 changes: 12 additions & 7 deletions goiostat.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/CapillarySoftware/goiostat/diskStat"
"github.com/CapillarySoftware/goiostat/ioStatTransform"
"github.com/CapillarySoftware/goiostat/logOutput"
"github.com/CapillarySoftware/goiostat/nanoMsgOutput"
"github.com/CapillarySoftware/goiostat/outputInterface"
. "github.com/CapillarySoftware/goiostat/protocols"
"github.com/CapillarySoftware/goiostat/statsOutput"
Expand All @@ -24,8 +25,8 @@ Go version of iostat, pull stats from proc and optionally log or send to a zeroM
*/

var interval = flag.Int("interval", 5, "Interval that stats should be reported.")
var outputType = flag.String("output", "stdout", "output should be one of the following types (stdout,zmq)")
var zmqUrl = flag.String("zmqUrl", "tcp://localhost:5400", "ZmqUrl valid formats (tcp://localhost:[port], ipc:///location/file.ipc)")
var outputType = flag.String("output", "stdout", "output should be one of the following types (stdout,zmq,nano)")
var queueUrl = flag.String("queueUrl", "tcp://localhost:5400", "queueUrl valid formats (tcp://localhost:[port], ipc:///location/file.ipc)")
var protocolType = flag.String("protocol", "", "Valid protocol types are (protobuffers, json")

const linuxDiskStats = "/proc/diskstats"
Expand All @@ -34,7 +35,7 @@ func main() {
defer log.Flush()
logger, err := log.LoggerFromConfigAsFile("seelog.xml")

if err != nil {
if nil != err {
log.Warn("Failed to load config", err)
}
log.ReplaceLogger(logger)
Expand Down Expand Up @@ -66,13 +67,15 @@ func main() {

switch *outputType {
case "zmq":
zmq := &zmqOutput.ZmqOutput{Proto: proto}
zmq.Connect(*zmqUrl)
defer zmq.Close()
output = zmq
output, err = zmqOutput.NewZmqOutput(queueUrl, proto)
case "nano":
output, err = nanoMsgOutput.NewNanoMsgOutput(queueUrl, proto)
default:
output = &logOutput.LogOutput{proto}
}
if nil != err {
log.Error("Failed to setup output ", err)
}

go ioStatTransform.TransformStat(statsTransformChannel, statsOutputChannel)

Expand All @@ -87,11 +90,13 @@ func main() {
close(statsOutputChannel)
}

//Read stats from proc and report stats
func readAndSendStats(statsTransformChannel chan *diskStat.DiskStat) {

file, err := os.Open(linuxDiskStats)
if nil != err {
log.Error(err)
return
}
defer file.Close()

Expand Down
122 changes: 122 additions & 0 deletions nanoMsgOutput/nanoMsgOutput.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package nanoMsgOutput

//Nanomsg output Package that allows you to send stats over nanomsg.

import (
"code.google.com/p/goprotobuf/proto"
"encoding/json"
"errors"
. "github.com/CapillarySoftware/goiostat/diskStat"
. "github.com/CapillarySoftware/goiostat/protoStat"
. "github.com/CapillarySoftware/goiostat/protocols"
log "github.com/cihub/seelog"
nano "github.com/op/go-nanomsg"
"time"
)

type NanoMsgOutput struct {
Proto Protocol
socket *nano.PushSocket
err error
}

func NewNanoMsgOutput(url *string, proto Protocol) (nano *NanoMsgOutput, err error) {
nano = &NanoMsgOutput{Proto: proto}
nano.Connect(*url)
return
}

//Method to connect to queue
func (this *NanoMsgOutput) Connect(url string) (err error) {

this.socket, err = nano.NewPushSocket()
if nil != err {
return
}
_, err = this.socket.Connect(url)
this.socket.SetSendTimeout(500 * time.Millisecond)
return
}

//Send byte data over queue
func (this *NanoMsgOutput) send(data *[]byte) (r int, err error) {
// fmt.Println(*z)
if nil == this.socket {
err = errors.New("Nil Socket, can't send")
return
}
r, err = this.socket.Send(*data, 0)
if nil != err {
log.Error(err)
}
return
}

//Close the socket
func (this *NanoMsgOutput) Close() {

if nil != this.socket {
this.socket.Close()
}
}

//Send stats by given format
func (this *NanoMsgOutput) SendStats(eStat *ExtendedIoStats) (err error) {
switch this.Proto {
case PProtoBuffers:
{
err = this.SendProtoBuffers(eStat)
}
case PJson:
{
err = this.SendJson(eStat)
}

default:
{
err = errors.New("zmqOutput doesn't support the type given... ")
return
}
}
return
}

//Send stats in protobuffer format
func (this *NanoMsgOutput) SendProtoBuffers(eStat *ExtendedIoStats) (err error) {
if nil == this.socket {
err = errors.New("Nil socket, call zmqOutput.Connect() before trying to send stats")
return
}
var (
stats *ProtoStats
)

stats, err = GetProtoStats(eStat)
if nil != err {
return //return the error
}
for _, stat := range stats.Stats {
data, mErr := proto.Marshal(stat)
if nil != mErr {
err = mErr
return
}
_, err = this.send(&data)
}
return
}

//Send stats in json format
func (this *NanoMsgOutput) SendJson(eStat *ExtendedIoStats) (err error) {
if nil == this.socket {
err = errors.New("Nil socket, call zmqOutput.Connect() before trying to send stats")
return
}

data, err := json.Marshal(&eStat)
if nil != err {
return
}
_, err = this.send(&data)
return
}
1 change: 1 addition & 0 deletions protoStat/conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func GetProtoStats(eStat *ExtendedIoStats) (stats *ProtoStats, err error) {
var buf bytes.Buffer
kind := f.Kind().String()
buf.WriteString("Invalid type: ")

buf.WriteString(kind)
buf.WriteString(" given")
errors.New(buf.String())
Expand Down
6 changes: 6 additions & 0 deletions zmqOutput/zmqOutput.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ type ZmqOutput struct {
err error
}

func NewZmqOutput(url *string, proto Protocol) (zmq *ZmqOutput, err error) {
zmq = &ZmqOutput{Proto: proto}
zmq.Connect(*url)
return
}

func (z *ZmqOutput) Connect(url string) {
z.sendSocket, z.err = zmq.NewSocket(zmq.PUSH)
z.sendSocket.Connect(url)
Expand Down

0 comments on commit 18b1430

Please sign in to comment.