Skip to content

Commit

Permalink
fixes data racing problems (fixes #11)
Browse files Browse the repository at this point in the history
  • Loading branch information
ga0 committed Aug 6, 2018
1 parent 5e67cc2 commit 6443c27
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 19 deletions.
2 changes: 1 addition & 1 deletion ng.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func initEventHandlers() {
if *bindingPort != 0 {
addr := fmt.Sprintf(":%d", *bindingPort)
ngserver := NewNGServer(addr, *saveEvent)
go ngserver.Serve()
ngserver.Serve()
handlers = append(handlers, ngserver)
}

Expand Down
15 changes: 8 additions & 7 deletions ngnet/httpstreamfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package ngnet

import (
"sync"
"sync/atomic"

"github.com/google/gopacket"
"github.com/google/gopacket/tcpassembly"
)

// HTTPStreamFactory implements StreamFactory interface for tcpassembly
type HTTPStreamFactory struct {
runningStream *uint
runningStream *int32
wg *sync.WaitGroup
seq *uint
uniStreams *map[streamKey]*httpStreamPair
Expand All @@ -25,7 +26,7 @@ func NewHTTPStreamFactory(out chan<- interface{}) HTTPStreamFactory {
f.uniStreams = new(map[streamKey]*httpStreamPair)
*f.uniStreams = make(map[streamKey]*httpStreamPair)
f.eventChan = out
f.runningStream = new(uint)
f.runningStream = new(int32)
return f
}

Expand All @@ -35,16 +36,15 @@ func (f HTTPStreamFactory) Wait() {
}

// RunningStreamCount get the running stream count
func (f *HTTPStreamFactory) RunningStreamCount() uint {
return *f.runningStream
func (f *HTTPStreamFactory) RunningStreamCount() int32 {
return atomic.LoadInt32(f.runningStream)
}

func (f *HTTPStreamFactory) runStreamPair(streamPair *httpStreamPair) {
f.wg.Add(1)
*f.runningStream++
atomic.AddInt32(f.runningStream, 1)

defer f.wg.Done()
defer func() { *f.runningStream-- }()
defer func() { atomic.AddInt32(f.runningStream, -1) }()
streamPair.run()
}

Expand All @@ -68,6 +68,7 @@ func (f HTTPStreamFactory) New(netFlow, tcpFlow gopacket.Flow) (ret tcpassembly.
streamPair.upStream = &s
(*f.uniStreams)[key] = streamPair
*f.seq++
f.wg.Add(1)
go f.runStreamPair(streamPair)
ret = s
}
Expand Down
37 changes: 26 additions & 11 deletions ngserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,31 +66,41 @@ func NewNGClient(ws *websocket.Conn, server *NGServer) *NGClient {

// NGServer is a http server which push captured HTTPEvent to the front end
type NGServer struct {
addr string
staticFileDir string
connectedClient map[*websocket.Conn]*NGClient
eventBuffer []interface{}
saveEvent bool
wg sync.WaitGroup
addr string
staticFileDir string
connectedClient map[*websocket.Conn]*NGClient
connectedClientMutex *sync.Mutex
eventBuffer []interface{}
saveEvent bool
wg sync.WaitGroup
}

func (s *NGServer) websocketHandler(ws *websocket.Conn) {
c := NewNGClient(ws, s)

s.connectedClientMutex.Lock()
s.connectedClient[ws] = c
s.connectedClientMutex.Unlock()

go c.transmitEvents()
c.recvAndProcessCommand()
c.close()

s.connectedClientMutex.Lock()
delete(s.connectedClient, ws)
s.connectedClientMutex.Unlock()
}

// PushEvent dispatches the event received from ngnet to all clients connected with websocket.
func (s *NGServer) PushEvent(e interface{}) {
if s.saveEvent {
s.eventBuffer = append(s.eventBuffer, e)
}
s.connectedClientMutex.Lock()
for _, c := range s.connectedClient {
c.eventChan <- e
}
s.connectedClientMutex.Unlock()
}

// Wait waits for serving
Expand Down Expand Up @@ -126,6 +136,14 @@ func (s *NGServer) handleStaticFile(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(c))
}

func (s *NGServer) listenAndServe() {
defer s.wg.Done()
err := http.ListenAndServe(s.addr, nil)
if err != nil {
log.Fatalln(err)
}
}

// Serve the web page
func (s *NGServer) Serve() {
http.Handle("/data", websocket.Handler(s.websocketHandler))
Expand All @@ -142,18 +160,15 @@ func (s *NGServer) Serve() {
http.HandleFunc("/", s.handleStaticFile)
}
s.wg.Add(1)
defer s.wg.Done()
err = http.ListenAndServe(s.addr, nil)
if err != nil {
log.Fatalln(err)
}
go s.listenAndServe()
}

// NewNGServer creates NGServer
func NewNGServer(addr string, saveEvent bool) *NGServer {
s := new(NGServer)
s.addr = addr
s.connectedClient = make(map[*websocket.Conn]*NGClient)
s.connectedClientMutex = &sync.Mutex{}
s.saveEvent = saveEvent
return s
}

0 comments on commit 6443c27

Please sign in to comment.