Skip to content

Commit

Permalink
Version 0.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
hartfordfive committed Sep 5, 2017
1 parent e4240d2 commit 69b1bf2
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 36 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -7,3 +7,4 @@
/protologbeat.test
protologbeat.local.yml
*.pyc
/logs
5 changes: 5 additions & 0 deletions CHANGELOG.md
@@ -1,4 +1,9 @@

### Version 0.2.0
- Added support for receiving GELF format messages by setting `enable_gelf: true`
- Fixed issue where (more commonly when getting high volumes of messages) the `processMessage` goroutine might have it's byte buffer modified before it actually executes with the original payload/message. (Related to original PR [#8](https://github.com/hartfordfive/protologbeat/pull/8), credit to [vcostet](https://github.com/vcostet))


### Version 0.1.1
- Added Dockerfile and seperate `protologbeat-docker.yml` config file to be used by docker image
- Updated default `protologbeat.yml` to have bare-minimum config values
Expand Down
16 changes: 7 additions & 9 deletions Dockerfile
Expand Up @@ -13,27 +13,25 @@ RUN set -ex ;\
# Install dependencies
apk --no-cache add gettext libc6-compat curl ;\
# Hotfix for libc compat
ln -s /lib /lib64

RUN cd /tmp ;\
ln -s /lib /lib64 ;\
cd /tmp ;\
mkdir -p /opt/protologbeat/conf ;\
mkdir -p /opt/protologbeat/ssl ;\
curl -L https://github.com/hartfordfive/protologbeat/releases/download/${VERSION}/protologbeat-${VERSION}-linux-x86_64.tar.gz --output protologbeat-${VERSION}-linux-x86_64.tar.gz ;\
tar -xvzf protologbeat-${VERSION}-linux-x86_64.tar.gz ;\
mv /tmp/protologbeat-${VERSION}-linux-x86_64 /opt/protologbeat/protologbeat ;\
rm -rf protologbeat-${VERSION}-linux-x86_64 && rm protologbeat-${VERSION}-linux-x86_64.tar.gz
rm -rf protologbeat-${VERSION}-linux-x86_64 && rm protologbeat-${VERSION}-linux-x86_64.tar.gz ;\
# Fix permissions
chown -R protologbeat:protologbeat /opt/protologbeat ;\
chmod 750 /opt/protologbeat ;\
chmod 700 /opt/protologbeat/ssl

ENV PATH=/opt/protologbeat:$PATH

COPY protologbeat-docker.yml /opt/protologbeat/conf/protologbeat.yml
COPY protologbeat.template-es2x.json /opt/protologbeat
COPY protologbeat.template.json /opt/protologbeat

# Fix permissions
RUN chown -R protologbeat:protologbeat /opt/protologbeat ;\
chmod 750 /opt/protologbeat ;\
chmod 700 /opt/protologbeat/ssl

WORKDIR /opt/protologbeat
USER protologbeat

Expand Down
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -19,6 +19,7 @@ Ensure that this folder is at the following location:
- `protolog.merge_fields_to_root` : When **json_mode** enabled, wether to merge parsed fields to the root level. (Default = false)
- `protologbeat.default_es_log_type`: Elasticsearch type to assign to an event if one isn't specified (Default: protologbeat)
- `protologbeat.enable_syslog_format_only` : Boolean value indicating if only syslog messages should be accepted. (Default = false)
- `protologbeat.enable_gelf` : Boolean value indiciating if process should in mode to only accept [GELF formated messages](http://docs.graylog.org/en/2.2/pages/gelf.html)
- `protologbeat.enable_json_validation` : Boolean value indicating if JSON schema validation should be applied for `json` format messages (Default = false)
- `protologbeat.validate_all_json_types` : When json_mode enabled, indicates if ALL types must have a schema specified. Log entries with types that have no schema will not be published. (Default = false)
- `protologbeat.json_schema` : A hash consisting of the Elasticsearch type as the key, and the absolute local schema file path as the value.
Expand Down
8 changes: 8 additions & 0 deletions _samples/generator.py
@@ -0,0 +1,8 @@
from logger import Logger
import time, random

l = Logger('127.0.0.1', 6000, 'udp', 'json')

for i in range(10000):
l.send_message({'message': 'This is JSON encoded message #{}'.format(i), 'type': 'generator_test', 'id': int(i), 'log_level': 'INFO'})
time.sleep(random.uniform(0.0001, 0.0010))
4 changes: 2 additions & 2 deletions _samples/logger.lua
Expand Up @@ -34,8 +34,8 @@ function Logger:sendMsg(msg)
end

-- Start logger client to send plain-text formated message to protologbeat listening on UDP host/port
logger = Logger.init('127.0.0.1', 6000, "udp", "plain")
logger:sendMsg('This is a sample message sent from the Lua logger.')
--logger = Logger.init('127.0.0.1', 6000, "udp", "plain")
--logger:sendMsg('This is a sample message sent from the Lua logger.')

-- Start logger client to send json formated message to protologbeat listening on TCP host/port
--logger = Logger.init('127.0.0.1', 6000, "tcp", "json")
Expand Down
6 changes: 3 additions & 3 deletions _samples/logger.py
Expand Up @@ -37,9 +37,9 @@ def send_message(self, msg):


# Initializing udp connection and sending a plaintext message
l = Logger('127.0.0.1', 6000)
l.enable_debug()
l.send_message('This is a sample plaintext message to be sent via udp')
#l = Logger('127.0.0.1', 6000)
#l.enable_debug()
#l.send_message('This is a sample plaintext message to be sent via udp')

# Initializing tcp connection and sending a json-encoded message
#l = Logger('127.0.0.1', 6000, 'tcp', 'json')
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Expand Up @@ -12,6 +12,7 @@ type Config struct {
Protocol string `config:"protocol"`
MaxMsgSize int `config:"max_msg_size"`
JsonMode bool `config:"json_mode"`
EnableGelf bool `config:"enable_gelf"`
DefaultEsLogType string `config:"default_es_log_type"`
MergeFieldsToRoot bool `config:"merge_fields_to_root"`
EnableSyslogFormatOnly bool `config:"enable_syslog_format_only"`
Expand All @@ -28,6 +29,7 @@ var DefaultConfig = Config{
Protocol: "udp",
MaxMsgSize: 4096,
JsonMode: false,
EnableGelf: false,
DefaultEsLogType: "protologbeat",
MergeFieldsToRoot: false,
EnableSyslogFormatOnly: false,
Expand Down
94 changes: 73 additions & 21 deletions protolog/loglistener.go
Expand Up @@ -3,12 +3,14 @@ package protolog
import (
"fmt"
"net"
"strconv"
"strings"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"

"github.com/Graylog2/go-gelf/gelf"
"github.com/hartfordfive/protologbeat/config"
"github.com/pquerna/ffjson/ffjson"
"github.com/xeipuuv/gojsonschema"
Expand All @@ -25,7 +27,7 @@ func NewLogListener(cfg config.Config) *LogListener {
ll := &LogListener{
config: cfg,
}
if ll.config.EnableJsonValidation {
if !ll.config.EnableGelf && ll.config.EnableJsonValidation {
ll.jsonSchema = map[string]gojsonschema.JSONLoader{}
for name, path := range ll.config.JsonSchema {
logp.Info("Loading JSON schema %s from %s", name, path)
Expand All @@ -46,6 +48,8 @@ func (ll *LogListener) Start(logEntriesRecieved chan common.MapStr, logEntriesEr

if ll.config.Protocol == "tcp" {
ll.startTCP(ll.config.Protocol, address)
} else if ll.config.EnableGelf {
ll.startGELF(address)
} else {
ll.startUDP(ll.config.Protocol, address)
}
Expand All @@ -72,19 +76,19 @@ func (ll *LogListener) startTCP(proto string, address string) {
continue
}

go func() {
buffer := make([]byte, ll.config.MaxMsgSize)
length, err := conn.Read(buffer)
if err != nil {
e, ok := err.(net.Error)
if ok && e.Timeout() {
logp.Err("Timeout reading from socket: %v", err)
ll.logEntriesError <- true
return
}
buffer := make([]byte, ll.config.MaxMsgSize)

length, err := conn.Read(buffer)
if err != nil {
e, ok := err.(net.Error)
if ok && e.Timeout() {
logp.Err("Timeout reading from socket: %v", err)
ll.logEntriesError <- true
return
}
go ll.processMessage(buffer, length)
}()
}
go ll.processMessage(strings.TrimSpace(string(buffer[:length])))

}
}

Expand All @@ -99,30 +103,49 @@ func (ll *LogListener) startUDP(proto string, address string) {
defer l.Close()

logp.Info("Now listening for logs via %s on %s", ll.config.Protocol, address)
buffer := make([]byte, ll.config.MaxMsgSize)

for {
buffer := make([]byte, ll.config.MaxMsgSize)
length, _, err := l.ReadFrom(buffer)
if err != nil {
logp.Err("Error reading from buffer: %v", err.Error())
continue
}
go ll.processMessage(buffer, length)
if length == 0 {
return
}
go ll.processMessage(strings.TrimSpace(string(buffer[:length])))
}
}

func (ll *LogListener) startGELF(address string) {

gr, err := gelf.NewReader(address)
if err != nil {
logp.Err("Error starting GELF listener on %s: %v", address, err.Error())
ll.logEntriesError <- true
}

logp.Info("Listening for GELF encoded messages on %s...", address)

for {
msg, err := gr.ReadMessage()
if err != nil {
logp.Err("Could not read GELF message: %v", err)
} else {
go ll.processGelfMessage(msg)
}
}

}

func (ll *LogListener) Shutdown() {
close(ll.logEntriesError)
close(ll.logEntriesRecieved)
}

func (ll *LogListener) processMessage(buffer []byte, length int) {

if length == 0 {
return
}
func (ll *LogListener) processMessage(logData string) {

logData := strings.TrimSpace(string(buffer[:length]))
if logData == "" {
logp.Err("Event is empty")
return
Expand Down Expand Up @@ -203,3 +226,32 @@ PreSend:

ll.logEntriesRecieved <- event
}

func (ll *LogListener) processGelfMessage(msg *gelf.Message) {

event := common.MapStr{}
event["gelf"] = map[string]interface{}{"version": msg.Version}
event["host"] = msg.Host
event["type"] = ll.config.DefaultEsLogType
event["short_message"] = msg.Short
event["full_message"] = msg.Full

// 1 ms = 1000000 ns
if msg.TimeUnix == 0 {
event["@timestamp"] = common.Time(time.Now())
} else {
millisec := msg.TimeUnix - float64(int64(msg.TimeUnix))
ms := fmt.Sprintf("%.4f", millisec)
msf, err := strconv.ParseFloat(ms, 64)
if err != nil {
event["@timestamp"] = common.Time(time.Now())
} else {
event["@timestamp"] = common.Time(time.Unix(int64(msg.TimeUnix), int64(msf)*1000000))
}
}

event["level"] = msg.Level
event["facility"] = msg.Facility
ll.logEntriesRecieved <- event

}
2 changes: 2 additions & 0 deletions protologbeat-docker.yml
Expand Up @@ -7,6 +7,8 @@ protologbeat:
port: ${PORT:6000}
protocol: ${PROTOCOL:udp}
max_msg_size: ${MAX_MSG_SIZE:4096}
enable_gelf: ${ENABLE_GELF:false}
enable_syslog_format_only: ${ENABLE_SYSLOG:false}
default_es_log_type: ${DEFAULT_ES_TYPE:protologbeat}


Expand Down
5 changes: 4 additions & 1 deletion protologbeat.full.yml
Expand Up @@ -25,8 +25,11 @@ protologbeat:
#merge_fields_to_root: true

# Set process to only act as a syslog message reciever
#enable_syslog_format_only: true
#enable_syslog_format_only: false

# Set process to receive only GELF formated messages
#enable_gelf: false

# Enable json validation with schemas
#enable_json_validation: false

Expand Down
81 changes: 81 additions & 0 deletions protologbeat_test.go
@@ -0,0 +1,81 @@
package main

import (
"testing"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/hartfordfive/protologbeat/config"
"github.com/hartfordfive/protologbeat/protolog"

"github.com/Graylog2/go-gelf/gelf"
"github.com/stretchr/testify/assert"
)

func TestGreylogReceive(t *testing.T) {

var logEntriesRecieved chan common.MapStr
var logEntriesErrors chan bool

logEntriesRecieved = make(chan common.MapStr, 1)
logEntriesErrors = make(chan bool, 1)

ll := protolog.NewLogListener(config.Config{EnableGelf: true, Port: 12000, DefaultEsLogType: "graylog"})

go func(logs chan common.MapStr, errs chan bool) {
ll.Start(logs, errs)
}(logEntriesRecieved, logEntriesErrors)

var event common.MapStr

gw, err := gelf.NewWriter("127.0.0.1:12000")
if err != nil {
t.Errorf("NewWriter: %s", err)
return
}
gw.CompressionType = gelf.CompressGzip

expectedVersion := "1.1"
expectedHost := "localhost"
expectedShort := "This is a test message for protologbeat"
expectedFull := "This is the full message expected for the test of gelf input."
expectedTs := float64(time.Now().Unix())
expectedLevel := int32(6)
expectedFacility := "local6"
exepectedType := "graylog"

if err := gw.WriteMessage(&gelf.Message{
Version: expectedVersion,
Host: expectedHost,
Short: expectedShort,
Full: expectedFull,
TimeUnix: expectedTs,
Level: expectedLevel,
Facility: expectedFacility,
Extra: map[string]interface{}{"type": exepectedType},
}); err != nil {
t.Errorf("Could not write message to GELF listener: %v", err)
return
}

for {
select {
case <-logEntriesErrors:
t.Errorf("Error receiving GELF format message")
return
case event = <-logEntriesRecieved:
if _, ok := event["@timestamp"]; !ok {
t.Errorf("Message missing timestamp field!: %v", event)
return
}
assert.Equal(t, event["gelf"].(map[string]interface{})["version"], expectedVersion, "Version should be the same")
assert.Equal(t, event["host"], expectedHost, "Host should be the same")
assert.Equal(t, event["short_message"], expectedShort, "Short message should be the same")
assert.Equal(t, event["full_message"], expectedFull, "Host should be the same")
assert.Equal(t, event["level"], expectedLevel, "Host should be the same")
assert.Equal(t, event["facility"], expectedFacility, "Host should be the same")
assert.Equal(t, event["type"], exepectedType, "Host should be the same")
return
}
}
}

0 comments on commit 69b1bf2

Please sign in to comment.