Skip to content
This repository has been archived by the owner on Jan 26, 2022. It is now read-only.

Commit

Permalink
Merge pull request #76 from ekanite/dont_share_parsers
Browse files Browse the repository at this point in the history
Don't share parsers
  • Loading branch information
otoolep committed Jun 17, 2017
2 parents 4d5a200 + 7a1b525 commit ef961e4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,6 @@
## 1.2.1 (June 17th 2017)
- [PR #76](https://github.com/ekanite/ekanite/pull/76): Don't share parsers. Fixes [issue #75](https://github.com/ekanite/ekanite/issues/75).

## 1.2.0 (January 3rd 2017)
- [PR #63](https://github.com/ekanite/ekanite/pull/63): Add search duration to HTTP query service output.
- Cyclomatic complexity improvements.
Expand Down
32 changes: 22 additions & 10 deletions input/collector.go
Expand Up @@ -34,31 +34,33 @@ type Collector interface {
// TCPCollector represents a network collector that accepts and handler TCP connections.
type TCPCollector struct {
iface string
parser *Parser
format string

addr net.Addr
tlsConfig *tls.Config
}

// UDPCollector represents a network collector that accepts UDP packets.
type UDPCollector struct {
format string
addr *net.UDPAddr
parser *Parser
}

// NewCollector returns a network collector of the specified type, that will bind
// to the given inteface on Start(). If config is non-nil, a secure Collector will
// be returned. Secure Collectors require the protocol be TCP.
func NewCollector(proto, iface, format string, tlsConfig *tls.Config) (Collector, error) {
parser, err := NewParser(format)
// Verify that a parser can be instantiated. The actual parser that is used will
// be created by the connection handler.
_, err := NewParser(format)
if err != nil {
return nil, err
}

if strings.ToLower(proto) == "tcp" {
return &TCPCollector{
iface: iface,
parser: parser,
format: format,
tlsConfig: tlsConfig,
}, nil
} else if strings.ToLower(proto) == "udp" {
Expand All @@ -67,7 +69,7 @@ func NewCollector(proto, iface, format string, tlsConfig *tls.Config) (Collector
return nil, err
}

return &UDPCollector{addr: addr, parser: parser}, nil
return &UDPCollector{addr: addr, format: format}, nil
}
return nil, fmt.Errorf("unsupport collector protocol")
}
Expand Down Expand Up @@ -111,6 +113,11 @@ func (s *TCPCollector) handleConnection(conn net.Conn, c chan<- *Event) {
conn.Close()
}()

parser, err := NewParser(s.format)
if err != nil {
panic(fmt.Sprintf("failed to create TCP connection parser:%s", err.Error()))
}

delimiter := NewSyslogDelimiter(msgBufSize)
reader := bufio.NewReader(conn)
var log string
Expand Down Expand Up @@ -139,10 +146,10 @@ func (s *TCPCollector) handleConnection(conn net.Conn, c chan<- *Event) {
// Log line available?
if match {
stats.Add("tcpEventsRx", 1)
if s.parser.Parse(bytes.NewBufferString(log).Bytes()) {
if parser.Parse(bytes.NewBufferString(log).Bytes()) {
c <- &Event{
Text: string(s.parser.Raw),
Parsed: s.parser.Result,
Text: string(parser.Raw),
Parsed: parser.Result,
ReceptionTime: time.Now().UTC(),
Sequence: atomic.AddInt64(&sequenceNumber, 1),
SourceIP: conn.RemoteAddr().String(),
Expand All @@ -164,6 +171,11 @@ func (s *UDPCollector) Start(c chan<- *Event) error {
return err
}

parser, err := NewParser(s.format)
if err != nil {
panic(fmt.Sprintf("failed to create UDP parser:%s", err.Error()))
}

go func() {
buf := make([]byte, msgBufSize)
for {
Expand All @@ -173,10 +185,10 @@ func (s *UDPCollector) Start(c chan<- *Event) error {
continue
}
log := strings.Trim(string(buf[:n]), "\r\n")
if s.parser.Parse(bytes.NewBufferString(log).Bytes()) {
if parser.Parse(bytes.NewBufferString(log).Bytes()) {
c <- &Event{
Text: log,
Parsed: s.parser.Result,
Parsed: parser.Result,
ReceptionTime: time.Now().UTC(),
Sequence: atomic.AddInt64(&sequenceNumber, 1),
SourceIP: addr.String(),
Expand Down

0 comments on commit ef961e4

Please sign in to comment.