From 69b1bf27d90983c17d6bbea2d2779a1cdabf4b02 Mon Sep 17 00:00:00 2001 From: Alain Lefebvre Date: Tue, 5 Sep 2017 11:24:45 -0400 Subject: [PATCH] Version 0.2.0 --- .gitignore | 1 + CHANGELOG.md | 5 +++ Dockerfile | 16 +++---- README.md | 1 + _samples/generator.py | 8 ++++ _samples/logger.lua | 4 +- _samples/logger.py | 6 +-- config/config.go | 2 + protolog/loglistener.go | 94 ++++++++++++++++++++++++++++++++--------- protologbeat-docker.yml | 2 + protologbeat.full.yml | 5 ++- protologbeat_test.go | 81 +++++++++++++++++++++++++++++++++++ 12 files changed, 189 insertions(+), 36 deletions(-) create mode 100644 _samples/generator.py create mode 100644 protologbeat_test.go diff --git a/.gitignore b/.gitignore index a886fd12..47e41e11 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ /protologbeat.test protologbeat.local.yml *.pyc +/logs diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c9a2081..d8570b2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,9 @@ +### Version 0.2.0 +- Added support for receiving GELF format messages by setting `enable_gelf: true` +- Fixed issue where (more commonly when getting high volumes of messages) the `processMessage` goroutine might have it's byte buffer modified before it actually executes with the original payload/message. (Related to original PR [#8](https://github.com/hartfordfive/protologbeat/pull/8), credit to [vcostet](https://github.com/vcostet)) + + ### Version 0.1.1 - Added Dockerfile and seperate `protologbeat-docker.yml` config file to be used by docker image - Updated default `protologbeat.yml` to have bare-minimum config values diff --git a/Dockerfile b/Dockerfile index 2aadbc96..1aa7cea7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,15 +13,18 @@ RUN set -ex ;\ # Install dependencies apk --no-cache add gettext libc6-compat curl ;\ # Hotfix for libc compat - ln -s /lib /lib64 - -RUN cd /tmp ;\ + ln -s /lib /lib64 ;\ + cd /tmp ;\ mkdir -p /opt/protologbeat/conf ;\ mkdir -p /opt/protologbeat/ssl ;\ curl -L https://github.com/hartfordfive/protologbeat/releases/download/${VERSION}/protologbeat-${VERSION}-linux-x86_64.tar.gz --output protologbeat-${VERSION}-linux-x86_64.tar.gz ;\ tar -xvzf protologbeat-${VERSION}-linux-x86_64.tar.gz ;\ mv /tmp/protologbeat-${VERSION}-linux-x86_64 /opt/protologbeat/protologbeat ;\ - rm -rf protologbeat-${VERSION}-linux-x86_64 && rm protologbeat-${VERSION}-linux-x86_64.tar.gz + rm -rf protologbeat-${VERSION}-linux-x86_64 && rm protologbeat-${VERSION}-linux-x86_64.tar.gz ;\ + # Fix permissions + chown -R protologbeat:protologbeat /opt/protologbeat ;\ + chmod 750 /opt/protologbeat ;\ + chmod 700 /opt/protologbeat/ssl ENV PATH=/opt/protologbeat:$PATH @@ -29,11 +32,6 @@ COPY protologbeat-docker.yml /opt/protologbeat/conf/protologbeat.yml COPY protologbeat.template-es2x.json /opt/protologbeat COPY protologbeat.template.json /opt/protologbeat -# Fix permissions -RUN chown -R protologbeat:protologbeat /opt/protologbeat ;\ - chmod 750 /opt/protologbeat ;\ - chmod 700 /opt/protologbeat/ssl - WORKDIR /opt/protologbeat USER protologbeat diff --git a/README.md b/README.md index 5120a102..a48cf3f6 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ Ensure that this folder is at the following location: - `protolog.merge_fields_to_root` : When **json_mode** enabled, wether to merge parsed fields to the root level. (Default = false) - `protologbeat.default_es_log_type`: Elasticsearch type to assign to an event if one isn't specified (Default: protologbeat) - `protologbeat.enable_syslog_format_only` : Boolean value indicating if only syslog messages should be accepted. (Default = false) +- `protologbeat.enable_gelf` : Boolean value indiciating if process should in mode to only accept [GELF formated messages](http://docs.graylog.org/en/2.2/pages/gelf.html) - `protologbeat.enable_json_validation` : Boolean value indicating if JSON schema validation should be applied for `json` format messages (Default = false) - `protologbeat.validate_all_json_types` : When json_mode enabled, indicates if ALL types must have a schema specified. Log entries with types that have no schema will not be published. (Default = false) - `protologbeat.json_schema` : A hash consisting of the Elasticsearch type as the key, and the absolute local schema file path as the value. diff --git a/_samples/generator.py b/_samples/generator.py new file mode 100644 index 00000000..4e3586f4 --- /dev/null +++ b/_samples/generator.py @@ -0,0 +1,8 @@ +from logger import Logger +import time, random + +l = Logger('127.0.0.1', 6000, 'udp', 'json') + +for i in range(10000): + l.send_message({'message': 'This is JSON encoded message #{}'.format(i), 'type': 'generator_test', 'id': int(i), 'log_level': 'INFO'}) + time.sleep(random.uniform(0.0001, 0.0010)) \ No newline at end of file diff --git a/_samples/logger.lua b/_samples/logger.lua index b8ed7a2b..cb12a5ce 100644 --- a/_samples/logger.lua +++ b/_samples/logger.lua @@ -34,8 +34,8 @@ function Logger:sendMsg(msg) end -- Start logger client to send plain-text formated message to protologbeat listening on UDP host/port -logger = Logger.init('127.0.0.1', 6000, "udp", "plain") -logger:sendMsg('This is a sample message sent from the Lua logger.') +--logger = Logger.init('127.0.0.1', 6000, "udp", "plain") +--logger:sendMsg('This is a sample message sent from the Lua logger.') -- Start logger client to send json formated message to protologbeat listening on TCP host/port --logger = Logger.init('127.0.0.1', 6000, "tcp", "json") diff --git a/_samples/logger.py b/_samples/logger.py index f54db5ba..571b4f3e 100644 --- a/_samples/logger.py +++ b/_samples/logger.py @@ -37,9 +37,9 @@ def send_message(self, msg): # Initializing udp connection and sending a plaintext message -l = Logger('127.0.0.1', 6000) -l.enable_debug() -l.send_message('This is a sample plaintext message to be sent via udp') +#l = Logger('127.0.0.1', 6000) +#l.enable_debug() +#l.send_message('This is a sample plaintext message to be sent via udp') # Initializing tcp connection and sending a json-encoded message #l = Logger('127.0.0.1', 6000, 'tcp', 'json') diff --git a/config/config.go b/config/config.go index 5276c0d3..9672a721 100644 --- a/config/config.go +++ b/config/config.go @@ -12,6 +12,7 @@ type Config struct { Protocol string `config:"protocol"` MaxMsgSize int `config:"max_msg_size"` JsonMode bool `config:"json_mode"` + EnableGelf bool `config:"enable_gelf"` DefaultEsLogType string `config:"default_es_log_type"` MergeFieldsToRoot bool `config:"merge_fields_to_root"` EnableSyslogFormatOnly bool `config:"enable_syslog_format_only"` @@ -28,6 +29,7 @@ var DefaultConfig = Config{ Protocol: "udp", MaxMsgSize: 4096, JsonMode: false, + EnableGelf: false, DefaultEsLogType: "protologbeat", MergeFieldsToRoot: false, EnableSyslogFormatOnly: false, diff --git a/protolog/loglistener.go b/protolog/loglistener.go index deead53e..cb55f257 100644 --- a/protolog/loglistener.go +++ b/protolog/loglistener.go @@ -3,12 +3,14 @@ package protolog import ( "fmt" "net" + "strconv" "strings" "time" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/Graylog2/go-gelf/gelf" "github.com/hartfordfive/protologbeat/config" "github.com/pquerna/ffjson/ffjson" "github.com/xeipuuv/gojsonschema" @@ -25,7 +27,7 @@ func NewLogListener(cfg config.Config) *LogListener { ll := &LogListener{ config: cfg, } - if ll.config.EnableJsonValidation { + if !ll.config.EnableGelf && ll.config.EnableJsonValidation { ll.jsonSchema = map[string]gojsonschema.JSONLoader{} for name, path := range ll.config.JsonSchema { logp.Info("Loading JSON schema %s from %s", name, path) @@ -46,6 +48,8 @@ func (ll *LogListener) Start(logEntriesRecieved chan common.MapStr, logEntriesEr if ll.config.Protocol == "tcp" { ll.startTCP(ll.config.Protocol, address) + } else if ll.config.EnableGelf { + ll.startGELF(address) } else { ll.startUDP(ll.config.Protocol, address) } @@ -72,19 +76,19 @@ func (ll *LogListener) startTCP(proto string, address string) { continue } - go func() { - buffer := make([]byte, ll.config.MaxMsgSize) - length, err := conn.Read(buffer) - if err != nil { - e, ok := err.(net.Error) - if ok && e.Timeout() { - logp.Err("Timeout reading from socket: %v", err) - ll.logEntriesError <- true - return - } + buffer := make([]byte, ll.config.MaxMsgSize) + + length, err := conn.Read(buffer) + if err != nil { + e, ok := err.(net.Error) + if ok && e.Timeout() { + logp.Err("Timeout reading from socket: %v", err) + ll.logEntriesError <- true + return } - go ll.processMessage(buffer, length) - }() + } + go ll.processMessage(strings.TrimSpace(string(buffer[:length]))) + } } @@ -99,16 +103,40 @@ func (ll *LogListener) startUDP(proto string, address string) { defer l.Close() logp.Info("Now listening for logs via %s on %s", ll.config.Protocol, address) - buffer := make([]byte, ll.config.MaxMsgSize) for { + buffer := make([]byte, ll.config.MaxMsgSize) length, _, err := l.ReadFrom(buffer) if err != nil { logp.Err("Error reading from buffer: %v", err.Error()) continue } - go ll.processMessage(buffer, length) + if length == 0 { + return + } + go ll.processMessage(strings.TrimSpace(string(buffer[:length]))) + } +} + +func (ll *LogListener) startGELF(address string) { + + gr, err := gelf.NewReader(address) + if err != nil { + logp.Err("Error starting GELF listener on %s: %v", address, err.Error()) + ll.logEntriesError <- true + } + + logp.Info("Listening for GELF encoded messages on %s...", address) + + for { + msg, err := gr.ReadMessage() + if err != nil { + logp.Err("Could not read GELF message: %v", err) + } else { + go ll.processGelfMessage(msg) + } } + } func (ll *LogListener) Shutdown() { @@ -116,13 +144,8 @@ func (ll *LogListener) Shutdown() { close(ll.logEntriesRecieved) } -func (ll *LogListener) processMessage(buffer []byte, length int) { - - if length == 0 { - return - } +func (ll *LogListener) processMessage(logData string) { - logData := strings.TrimSpace(string(buffer[:length])) if logData == "" { logp.Err("Event is empty") return @@ -203,3 +226,32 @@ PreSend: ll.logEntriesRecieved <- event } + +func (ll *LogListener) processGelfMessage(msg *gelf.Message) { + + event := common.MapStr{} + event["gelf"] = map[string]interface{}{"version": msg.Version} + event["host"] = msg.Host + event["type"] = ll.config.DefaultEsLogType + event["short_message"] = msg.Short + event["full_message"] = msg.Full + + // 1 ms = 1000000 ns + if msg.TimeUnix == 0 { + event["@timestamp"] = common.Time(time.Now()) + } else { + millisec := msg.TimeUnix - float64(int64(msg.TimeUnix)) + ms := fmt.Sprintf("%.4f", millisec) + msf, err := strconv.ParseFloat(ms, 64) + if err != nil { + event["@timestamp"] = common.Time(time.Now()) + } else { + event["@timestamp"] = common.Time(time.Unix(int64(msg.TimeUnix), int64(msf)*1000000)) + } + } + + event["level"] = msg.Level + event["facility"] = msg.Facility + ll.logEntriesRecieved <- event + +} diff --git a/protologbeat-docker.yml b/protologbeat-docker.yml index 2b8cc275..681d291f 100644 --- a/protologbeat-docker.yml +++ b/protologbeat-docker.yml @@ -7,6 +7,8 @@ protologbeat: port: ${PORT:6000} protocol: ${PROTOCOL:udp} max_msg_size: ${MAX_MSG_SIZE:4096} + enable_gelf: ${ENABLE_GELF:false} + enable_syslog_format_only: ${ENABLE_SYSLOG:false} default_es_log_type: ${DEFAULT_ES_TYPE:protologbeat} diff --git a/protologbeat.full.yml b/protologbeat.full.yml index eac755a8..7dbd2c11 100644 --- a/protologbeat.full.yml +++ b/protologbeat.full.yml @@ -25,8 +25,11 @@ protologbeat: #merge_fields_to_root: true # Set process to only act as a syslog message reciever - #enable_syslog_format_only: true + #enable_syslog_format_only: false + # Set process to receive only GELF formated messages + #enable_gelf: false + # Enable json validation with schemas #enable_json_validation: false diff --git a/protologbeat_test.go b/protologbeat_test.go new file mode 100644 index 00000000..3f3154fc --- /dev/null +++ b/protologbeat_test.go @@ -0,0 +1,81 @@ +package main + +import ( + "testing" + "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/hartfordfive/protologbeat/config" + "github.com/hartfordfive/protologbeat/protolog" + + "github.com/Graylog2/go-gelf/gelf" + "github.com/stretchr/testify/assert" +) + +func TestGreylogReceive(t *testing.T) { + + var logEntriesRecieved chan common.MapStr + var logEntriesErrors chan bool + + logEntriesRecieved = make(chan common.MapStr, 1) + logEntriesErrors = make(chan bool, 1) + + ll := protolog.NewLogListener(config.Config{EnableGelf: true, Port: 12000, DefaultEsLogType: "graylog"}) + + go func(logs chan common.MapStr, errs chan bool) { + ll.Start(logs, errs) + }(logEntriesRecieved, logEntriesErrors) + + var event common.MapStr + + gw, err := gelf.NewWriter("127.0.0.1:12000") + if err != nil { + t.Errorf("NewWriter: %s", err) + return + } + gw.CompressionType = gelf.CompressGzip + + expectedVersion := "1.1" + expectedHost := "localhost" + expectedShort := "This is a test message for protologbeat" + expectedFull := "This is the full message expected for the test of gelf input." + expectedTs := float64(time.Now().Unix()) + expectedLevel := int32(6) + expectedFacility := "local6" + exepectedType := "graylog" + + if err := gw.WriteMessage(&gelf.Message{ + Version: expectedVersion, + Host: expectedHost, + Short: expectedShort, + Full: expectedFull, + TimeUnix: expectedTs, + Level: expectedLevel, + Facility: expectedFacility, + Extra: map[string]interface{}{"type": exepectedType}, + }); err != nil { + t.Errorf("Could not write message to GELF listener: %v", err) + return + } + + for { + select { + case <-logEntriesErrors: + t.Errorf("Error receiving GELF format message") + return + case event = <-logEntriesRecieved: + if _, ok := event["@timestamp"]; !ok { + t.Errorf("Message missing timestamp field!: %v", event) + return + } + assert.Equal(t, event["gelf"].(map[string]interface{})["version"], expectedVersion, "Version should be the same") + assert.Equal(t, event["host"], expectedHost, "Host should be the same") + assert.Equal(t, event["short_message"], expectedShort, "Short message should be the same") + assert.Equal(t, event["full_message"], expectedFull, "Host should be the same") + assert.Equal(t, event["level"], expectedLevel, "Host should be the same") + assert.Equal(t, event["facility"], expectedFacility, "Host should be the same") + assert.Equal(t, event["type"], exepectedType, "Host should be the same") + return + } + } +}