From c951fe3faa119777614d3bac2338c505913c62d8 Mon Sep 17 00:00:00 2001 From: Allisson Azevedo Date: Fri, 5 Mar 2021 11:28:43 -0300 Subject: [PATCH] feat: Implements graceful shutdown on WorkerService.Run method (#12) --- README.md | 2 +- cmd/postmand/main.go | 24 ------------------------ http/server.go | 9 ++++----- service/worker.go | 30 ++++++++++++++++++++++++++++-- service/worker_test.go | 12 ++++++------ 5 files changed, 39 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 63eb5e5..f75a99d 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ # postmand -Simple webhook delivery system powered by Golang and PostgreSQL +Simple webhook delivery system powered by Golang and PostgreSQL. diff --git a/cmd/postmand/main.go b/cmd/postmand/main.go index f496928..6e6211b 100644 --- a/cmd/postmand/main.go +++ b/cmd/postmand/main.go @@ -3,8 +3,6 @@ package main import ( "log" "os" - "os/signal" - "syscall" "time" "github.com/allisson/go-env" @@ -70,29 +68,7 @@ func main() { deliveryRepository := repository.NewDelivery(db) pollingInterval := time.Duration(env.GetInt("POSTMAND_POLLING_INTERVAL", 1000)) * time.Millisecond workerService := service.NewWorker(deliveryRepository, logger, pollingInterval) - - // Graceful shutdown - idleConnsClosed := make(chan struct{}) - go func() { - sigint := make(chan os.Signal, 1) - - // interrupt signal sent from terminal - signal.Notify(sigint, os.Interrupt) - // sigterm signal sent from kubernetes - signal.Notify(sigint, syscall.SIGTERM) - - <-sigint - - // We received an interrupt signal, shut down. - workerService.Shutdown(c.Context) - close(idleConnsClosed) - }() - - logger.Info("worker-started") workerService.Run(c.Context) - - <-idleConnsClosed - return nil }, }, diff --git a/http/server.go b/http/server.go index 3304bfc..f6e976a 100644 --- a/http/server.go +++ b/http/server.go @@ -9,11 +9,10 @@ import ( "os/signal" "syscall" + mw "github.com/allisson/postmand/http/middleware" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "go.uber.org/zap" - - mw "github.com/allisson/postmand/http/middleware" ) // NewRouter returns *chi.Mux with base middlewares. @@ -27,14 +26,14 @@ func NewRouter(logger *zap.Logger) *chi.Mux { return r } -// Server ... +// Server implements a http server. type Server struct { mux *chi.Mux httpPort int logger *zap.Logger } -// Run ... +// Run starts a http server. func (s Server) Run() { httpServer := &nethttp.Server{Addr: fmt.Sprintf(":%d", s.httpPort), Handler: s.mux} idleConnsClosed := make(chan struct{}) @@ -69,7 +68,7 @@ func (s Server) Run() { <-idleConnsClosed } -// NewServer ... +// NewServer creates a new Server. func NewServer(mux *chi.Mux, httpPort int, logger *zap.Logger) *Server { return &Server{ mux: mux, diff --git a/service/worker.go b/service/worker.go index 9ac446f..b0c7764 100644 --- a/service/worker.go +++ b/service/worker.go @@ -2,6 +2,9 @@ package service import ( "context" + "os" + "os/signal" + "syscall" "time" "github.com/allisson/postmand" @@ -16,8 +19,7 @@ type Worker struct { isStop bool } -// Run sending of webhooks until the Shutdown method is called. -func (w *Worker) Run(ctx context.Context) { +func (w *Worker) run(ctx context.Context) { for { // Break forloop if isStop is true. if w.isStop { @@ -51,6 +53,30 @@ func (w *Worker) Run(ctx context.Context) { w.logger.Info("worker-shutdown-completed") } +// Run sending of webhooks until the Shutdown method is called. +func (w *Worker) Run(ctx context.Context) { + idleConnsClosed := make(chan struct{}) + go func() { + sigint := make(chan os.Signal, 1) + + // interrupt signal sent from terminal + signal.Notify(sigint, os.Interrupt) + // sigterm signal sent from kubernetes + signal.Notify(sigint, syscall.SIGTERM) + + <-sigint + + // We received an interrupt signal, shut down. + w.Shutdown(ctx) + close(idleConnsClosed) + }() + + w.logger.Info("worker-started") + w.run(ctx) + + <-idleConnsClosed +} + // Shutdown stops the forloop in Run method. func (w *Worker) Shutdown(ctx context.Context) { w.isStop = true diff --git a/service/worker_test.go b/service/worker_test.go index 8fe4852..3e6b845 100644 --- a/service/worker_test.go +++ b/service/worker_test.go @@ -16,7 +16,7 @@ func TestWorker(t *testing.T) { ctx := context.Background() pollingInterval := 10 * time.Millisecond - t.Run("Run with dispatch error", func(t *testing.T) { + t.Run("run with dispatch error", func(t *testing.T) { deliveryRepository := &mocks.DeliveryRepository{} logger, _ := zap.NewDevelopment() workerService := NewWorker(deliveryRepository, logger, pollingInterval) @@ -26,12 +26,12 @@ func TestWorker(t *testing.T) { go func() { workerService.Shutdown(ctx) }() - workerService.Run(ctx) + workerService.run(ctx) deliveryRepository.AssertExpectations(t) }) - t.Run("Run with no dispatch", func(t *testing.T) { + t.Run("run with no dispatch", func(t *testing.T) { deliveryRepository := &mocks.DeliveryRepository{} logger, _ := zap.NewDevelopment() workerService := NewWorker(deliveryRepository, logger, pollingInterval) @@ -41,12 +41,12 @@ func TestWorker(t *testing.T) { go func() { workerService.Shutdown(ctx) }() - workerService.Run(ctx) + workerService.run(ctx) deliveryRepository.AssertExpectations(t) }) - t.Run("Run with dispatch", func(t *testing.T) { + t.Run("run with dispatch", func(t *testing.T) { deliveryRepository := &mocks.DeliveryRepository{} logger, _ := zap.NewDevelopment() workerService := NewWorker(deliveryRepository, logger, pollingInterval) @@ -56,7 +56,7 @@ func TestWorker(t *testing.T) { go func() { workerService.Shutdown(ctx) }() - workerService.Run(ctx) + workerService.run(ctx) deliveryRepository.AssertExpectations(t) })