Skip to content

Commit

Permalink
feat: Implements graceful shutdown on WorkerService.Run method (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
allisson committed Mar 5, 2021
1 parent 7bbec35 commit c951fe3
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 38 deletions.
2 changes: 1 addition & 1 deletion README.md
@@ -1,2 +1,2 @@
# postmand
Simple webhook delivery system powered by Golang and PostgreSQL
Simple webhook delivery system powered by Golang and PostgreSQL.
24 changes: 0 additions & 24 deletions cmd/postmand/main.go
Expand Up @@ -3,8 +3,6 @@ package main
import (
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/allisson/go-env"
Expand Down Expand Up @@ -70,29 +68,7 @@ func main() {
deliveryRepository := repository.NewDelivery(db)
pollingInterval := time.Duration(env.GetInt("POSTMAND_POLLING_INTERVAL", 1000)) * time.Millisecond
workerService := service.NewWorker(deliveryRepository, logger, pollingInterval)

// Graceful shutdown
idleConnsClosed := make(chan struct{})
go func() {
sigint := make(chan os.Signal, 1)

// interrupt signal sent from terminal
signal.Notify(sigint, os.Interrupt)
// sigterm signal sent from kubernetes
signal.Notify(sigint, syscall.SIGTERM)

<-sigint

// We received an interrupt signal, shut down.
workerService.Shutdown(c.Context)
close(idleConnsClosed)
}()

logger.Info("worker-started")
workerService.Run(c.Context)

<-idleConnsClosed

return nil
},
},
Expand Down
9 changes: 4 additions & 5 deletions http/server.go
Expand Up @@ -9,11 +9,10 @@ import (
"os/signal"
"syscall"

mw "github.com/allisson/postmand/http/middleware"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"go.uber.org/zap"

mw "github.com/allisson/postmand/http/middleware"
)

// NewRouter returns *chi.Mux with base middlewares.
Expand All @@ -27,14 +26,14 @@ func NewRouter(logger *zap.Logger) *chi.Mux {
return r
}

// Server ...
// Server implements a http server.
type Server struct {
mux *chi.Mux
httpPort int
logger *zap.Logger
}

// Run ...
// Run starts a http server.
func (s Server) Run() {
httpServer := &nethttp.Server{Addr: fmt.Sprintf(":%d", s.httpPort), Handler: s.mux}
idleConnsClosed := make(chan struct{})
Expand Down Expand Up @@ -69,7 +68,7 @@ func (s Server) Run() {
<-idleConnsClosed
}

// NewServer ...
// NewServer creates a new Server.
func NewServer(mux *chi.Mux, httpPort int, logger *zap.Logger) *Server {
return &Server{
mux: mux,
Expand Down
30 changes: 28 additions & 2 deletions service/worker.go
Expand Up @@ -2,6 +2,9 @@ package service

import (
"context"
"os"
"os/signal"
"syscall"
"time"

"github.com/allisson/postmand"
Expand All @@ -16,8 +19,7 @@ type Worker struct {
isStop bool
}

// Run sending of webhooks until the Shutdown method is called.
func (w *Worker) Run(ctx context.Context) {
func (w *Worker) run(ctx context.Context) {
for {
// Break forloop if isStop is true.
if w.isStop {
Expand Down Expand Up @@ -51,6 +53,30 @@ func (w *Worker) Run(ctx context.Context) {
w.logger.Info("worker-shutdown-completed")
}

// Run sending of webhooks until the Shutdown method is called.
func (w *Worker) Run(ctx context.Context) {
idleConnsClosed := make(chan struct{})
go func() {
sigint := make(chan os.Signal, 1)

// interrupt signal sent from terminal
signal.Notify(sigint, os.Interrupt)
// sigterm signal sent from kubernetes
signal.Notify(sigint, syscall.SIGTERM)

<-sigint

// We received an interrupt signal, shut down.
w.Shutdown(ctx)
close(idleConnsClosed)
}()

w.logger.Info("worker-started")
w.run(ctx)

<-idleConnsClosed
}

// Shutdown stops the forloop in Run method.
func (w *Worker) Shutdown(ctx context.Context) {
w.isStop = true
Expand Down
12 changes: 6 additions & 6 deletions service/worker_test.go
Expand Up @@ -16,7 +16,7 @@ func TestWorker(t *testing.T) {
ctx := context.Background()
pollingInterval := 10 * time.Millisecond

t.Run("Run with dispatch error", func(t *testing.T) {
t.Run("run with dispatch error", func(t *testing.T) {
deliveryRepository := &mocks.DeliveryRepository{}
logger, _ := zap.NewDevelopment()
workerService := NewWorker(deliveryRepository, logger, pollingInterval)
Expand All @@ -26,12 +26,12 @@ func TestWorker(t *testing.T) {
go func() {
workerService.Shutdown(ctx)
}()
workerService.Run(ctx)
workerService.run(ctx)

deliveryRepository.AssertExpectations(t)
})

t.Run("Run with no dispatch", func(t *testing.T) {
t.Run("run with no dispatch", func(t *testing.T) {
deliveryRepository := &mocks.DeliveryRepository{}
logger, _ := zap.NewDevelopment()
workerService := NewWorker(deliveryRepository, logger, pollingInterval)
Expand All @@ -41,12 +41,12 @@ func TestWorker(t *testing.T) {
go func() {
workerService.Shutdown(ctx)
}()
workerService.Run(ctx)
workerService.run(ctx)

deliveryRepository.AssertExpectations(t)
})

t.Run("Run with dispatch", func(t *testing.T) {
t.Run("run with dispatch", func(t *testing.T) {
deliveryRepository := &mocks.DeliveryRepository{}
logger, _ := zap.NewDevelopment()
workerService := NewWorker(deliveryRepository, logger, pollingInterval)
Expand All @@ -56,7 +56,7 @@ func TestWorker(t *testing.T) {
go func() {
workerService.Shutdown(ctx)
}()
workerService.Run(ctx)
workerService.run(ctx)

deliveryRepository.AssertExpectations(t)
})
Expand Down

0 comments on commit c951fe3

Please sign in to comment.