Skip to content

Commit

Permalink
separate input creation from start
Browse files Browse the repository at this point in the history
  • Loading branch information
Dieterbe committed Oct 9, 2018
1 parent fd82a92 commit eeb0c1b
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 57 deletions.
22 changes: 10 additions & 12 deletions cmd/carbon-relay-ng/carbon-relay-ng.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,25 +192,23 @@ func main() {
}

if config.Listen_addr != "" {
plugin, err := input.NewPlain(config.Listen_addr, table)
if err != nil {
log.Error(err.Error())
os.Exit(1)
}
inputs = append(inputs, plugin)
inputs = append(inputs, input.NewPlain(config.Listen_addr, table))
}

if config.Pickle_addr != "" {
plugin, err := input.NewPickle(config.Pickle_addr, table)
inputs = append(inputs, input.NewPickle(config.Pickle_addr, table))
}

if config.Amqp.Amqp_enabled == true {
inputs = append(inputs, input.NewAMQP(config, table, input.AMQPConnector))
}

for _, in := range inputs {
err := in.Start()
if err != nil {
log.Error(err.Error())
os.Exit(1)
}
inputs = append(inputs, plugin)
}

if config.Amqp.Amqp_enabled == true {
inputs = append(inputs, input.StartAMQP(config, table, input.AMQPConnector))
}

if config.Admin_addr != "" {
Expand Down
21 changes: 10 additions & 11 deletions input/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ func (a *Amqp) close() {
a.conn.Close()
}

func StartAMQP(config cfg.Config, dispatcher Dispatcher, connect amqpConnector) *Amqp {
a := NewAMQP(config, dispatcher, connect)
go a.Start()
return a
}

func NewAMQP(config cfg.Config, dispatcher Dispatcher, connect amqpConnector) *Amqp {
uri := amqp.URI{
Scheme: "amqp",
Expand Down Expand Up @@ -100,7 +94,16 @@ func AMQPConnector(a *Amqp) (<-chan amqp.Delivery, error) {
return c, nil
}

func (a *Amqp) Start() {
func (a *Amqp) Name() string {
return "amqp"
}

func (a *Amqp) Start() error {
go a.start()
return nil
}

func (a *Amqp) start() {
b := &backoff.Backoff{
Min: 500 * time.Millisecond,
}
Expand Down Expand Up @@ -141,10 +144,6 @@ func (a *Amqp) Start() {
}
}

func (a *Amqp) Name() string {
return "amqp"
}

func (a *Amqp) Stop() bool {
close(a.shutdown)
a.wg.Wait()
Expand Down
1 change: 1 addition & 0 deletions input/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func SetLogger(l *logging.Logger) {

type Plugin interface {
Name() string
Start() error
Stop() bool
}

Expand Down
46 changes: 24 additions & 22 deletions input/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,42 @@ import (
type Listener struct {
wg sync.WaitGroup
name string
addr string
tcpList *net.TCPListener
udpConn *net.UDPConn
handler Handler
shutdown chan struct{}
}

func NewListener(name string, handler Handler) *Listener {
func NewListener(name, addr string, handler Handler) *Listener {
return &Listener{
name: name,
addr: addr,
handler: handler,
shutdown: make(chan struct{}),
}
}

func (l *Listener) listen(addr string) error {
func (l *Listener) Start() error {
// listeners are set up outside of accept* here so they can interrupt startup
err := l.listenTcp(addr)
err := l.listenTcp()
if err != nil {
return err
}

err = l.listenUdp(addr)
err = l.listenUdp()
if err != nil {
return err
}

l.wg.Add(2)
go l.run(addr, "tcp", l.acceptTcp, l.listenTcp, l.tcpList)
go l.run(addr, "udp", l.consumeUdp, l.listenUdp, l.udpConn)
go l.run("tcp", l.acceptTcp, l.listenTcp, l.tcpList)
go l.run("udp", l.consumeUdp, l.listenUdp, l.udpConn)

return nil
}

func (l *Listener) run(addr, proto string, consume func(string), reconnect func(string) error, listener Closable) {
func (l *Listener) run(proto string, consume func(), reconnect func() error, listener Closable) {
defer l.wg.Done()

backoffCounter := &backoff.Backoff{
Expand All @@ -55,43 +57,43 @@ func (l *Listener) run(addr, proto string, consume func(string), reconnect func(

go func() {
<-l.shutdown
log.Info("shutting down %v/%s, closing socket", addr, proto)
log.Info("shutting down %v/%s, closing socket", l.addr, proto)
listener.Close()
}()

for {
log.Notice("listening on %v/%s", addr, proto)
log.Notice("listening on %v/%s", l.addr, proto)

consume(addr)
consume()

select {
case <-l.shutdown:
return
default:
}
for {
log.Notice("reopening %v/%s", addr, proto)
err := reconnect(addr)
log.Notice("reopening %v/%s", l.addr, proto)
err := reconnect()
if err == nil {
backoffCounter.Reset()
break
}

select {
case <-l.shutdown:
log.Info("shutting down %v/%s, closing socket", addr, proto)
log.Info("shutting down %v/%s, closing socket", l.addr, proto)
return
default:
}
backoffDuration := backoffCounter.Duration()
log.Error("error listening on %v/%s, retrying after %v: %s", addr, proto, backoffDuration, err)
log.Error("error listening on %v/%s, retrying after %v: %s", l.addr, proto, backoffDuration, err)
<-time.After(backoffDuration)
}
}
}

func (l *Listener) listenTcp(addr string) error {
laddr, err := net.ResolveTCPAddr("tcp", addr)
func (l *Listener) listenTcp() error {
laddr, err := net.ResolveTCPAddr("tcp", l.addr)
if err != nil {
return err
}
Expand All @@ -102,15 +104,15 @@ func (l *Listener) listenTcp(addr string) error {
return nil
}

func (l *Listener) acceptTcp(addr string) {
func (l *Listener) acceptTcp() {
for {
c, err := l.tcpList.AcceptTCP()
if err != nil {
select {
case <-l.shutdown:
return
default:
log.Error("error accepting on %v/tcp, closing connection: %s", addr, err)
log.Error("error accepting on %v/tcp, closing connection: %s", l.addr, err)
l.tcpList.Close()
return
}
Expand All @@ -134,8 +136,8 @@ func (l *Listener) acceptTcpConn(c net.Conn) {
l.handler.Handle(c)
}

func (l *Listener) listenUdp(addr string) error {
udp_addr, err := net.ResolveUDPAddr("udp", addr)
func (l *Listener) listenUdp() error {
udp_addr, err := net.ResolveUDPAddr("udp", l.addr)
if err != nil {
return err
}
Expand All @@ -146,7 +148,7 @@ func (l *Listener) listenUdp(addr string) error {
return nil
}

func (l *Listener) consumeUdp(addr string) {
func (l *Listener) consumeUdp() {
buffer := make([]byte, 65535)
for {
// read a packet into buffer
Expand All @@ -156,7 +158,7 @@ func (l *Listener) consumeUdp(addr string) {
case <-l.shutdown:
return
default:
log.Error("error reading packet on %v/udp, closing connection: %s", addr, err)
log.Error("error reading packet on %v/udp, closing connection: %s", l.addr, err)
l.udpConn.Close()
return
}
Expand Down
12 changes: 6 additions & 6 deletions input/listen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func (m *mockHandler) String() string {
func TestTcpUdpShutdown(t *testing.T) {
handler := mockHandler{testing: t}
addr := "localhost:" // choose random ports
listener := NewListener("mock", &handler)
err := listener.listen(addr)
listener := NewListener("mock", addr, &handler)
err := listener.Start()
if err != nil {
t.Fatalf("Error when trying to listen: %s", err)
}
Expand All @@ -61,8 +61,8 @@ func TestTcpUdpShutdown(t *testing.T) {
func TestTcpConnection(t *testing.T) {
handler := mockHandler{testing: t}
addr := "localhost:" // choose random ports
listener := NewListener("mock", &handler)
err := listener.listen(addr)
listener := NewListener("mock", addr, &handler)
err := listener.Start()
if err != nil {
t.Fatalf("Error when listening: %s", err)
}
Expand Down Expand Up @@ -100,8 +100,8 @@ func TestTcpConnection(t *testing.T) {
func TestUdpConnection(t *testing.T) {
handler := mockHandler{testing: t}
addr := "localhost:" // choose random ports
listener := NewListener("mock", &handler)
err := listener.listen(addr)
listener := NewListener("mock", addr, &handler)
err := listener.Start()
if err != nil {
t.Fatalf("Error when listening: %s", err)
}
Expand Down
5 changes: 2 additions & 3 deletions input/pickle.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ type Pickle struct {
dispatcher Dispatcher
}

func NewPickle(addr string, dispatcher Dispatcher) (*Listener, error) {
listener := NewListener("pickle", &Pickle{dispatcher})
return listener, listener.listen(addr)
func NewPickle(addr string, dispatcher Dispatcher) *Listener {
return NewListener("pickle", addr, &Pickle{dispatcher})
}

func (p *Pickle) Handle(c io.Reader) {
Expand Down
5 changes: 2 additions & 3 deletions input/plain.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ type Plain struct {
dispatcher Dispatcher
}

func NewPlain(addr string, dispatcher Dispatcher) (*Listener, error) {
listener := NewListener("plain", &Plain{dispatcher})
return listener, listener.listen(addr)
func NewPlain(addr string, dispatcher Dispatcher) *Listener {
return NewListener("plain", addr, &Plain{dispatcher})
}

func (p *Plain) Handle(c io.Reader) {
Expand Down

0 comments on commit eeb0c1b

Please sign in to comment.