Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implements graceful shutdown on WorkerService.Run method #12

Merged
merged 1 commit into from Mar 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
@@ -1,2 +1,2 @@
# postmand
Simple webhook delivery system powered by Golang and PostgreSQL
Simple webhook delivery system powered by Golang and PostgreSQL.
24 changes: 0 additions & 24 deletions cmd/postmand/main.go
Expand Up @@ -3,8 +3,6 @@ package main
import (
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/allisson/go-env"
Expand Down Expand Up @@ -70,29 +68,7 @@ func main() {
deliveryRepository := repository.NewDelivery(db)
pollingInterval := time.Duration(env.GetInt("POSTMAND_POLLING_INTERVAL", 1000)) * time.Millisecond
workerService := service.NewWorker(deliveryRepository, logger, pollingInterval)

// Graceful shutdown
idleConnsClosed := make(chan struct{})
go func() {
sigint := make(chan os.Signal, 1)

// interrupt signal sent from terminal
signal.Notify(sigint, os.Interrupt)
// sigterm signal sent from kubernetes
signal.Notify(sigint, syscall.SIGTERM)

<-sigint

// We received an interrupt signal, shut down.
workerService.Shutdown(c.Context)
close(idleConnsClosed)
}()

logger.Info("worker-started")
workerService.Run(c.Context)

<-idleConnsClosed

return nil
},
},
Expand Down
9 changes: 4 additions & 5 deletions http/server.go
Expand Up @@ -9,11 +9,10 @@ import (
"os/signal"
"syscall"

mw "github.com/allisson/postmand/http/middleware"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"go.uber.org/zap"

mw "github.com/allisson/postmand/http/middleware"
)

// NewRouter returns *chi.Mux with base middlewares.
Expand All @@ -27,14 +26,14 @@ func NewRouter(logger *zap.Logger) *chi.Mux {
return r
}

// Server ...
// Server implements a http server.
type Server struct {
mux *chi.Mux
httpPort int
logger *zap.Logger
}

// Run ...
// Run starts a http server.
func (s Server) Run() {
httpServer := &nethttp.Server{Addr: fmt.Sprintf(":%d", s.httpPort), Handler: s.mux}
idleConnsClosed := make(chan struct{})
Expand Down Expand Up @@ -69,7 +68,7 @@ func (s Server) Run() {
<-idleConnsClosed
}

// NewServer ...
// NewServer creates a new Server.
func NewServer(mux *chi.Mux, httpPort int, logger *zap.Logger) *Server {
return &Server{
mux: mux,
Expand Down
30 changes: 28 additions & 2 deletions service/worker.go
Expand Up @@ -2,6 +2,9 @@ package service

import (
"context"
"os"
"os/signal"
"syscall"
"time"

"github.com/allisson/postmand"
Expand All @@ -16,8 +19,7 @@ type Worker struct {
isStop bool
}

// Run sending of webhooks until the Shutdown method is called.
func (w *Worker) Run(ctx context.Context) {
func (w *Worker) run(ctx context.Context) {
for {
// Break forloop if isStop is true.
if w.isStop {
Expand Down Expand Up @@ -51,6 +53,30 @@ func (w *Worker) Run(ctx context.Context) {
w.logger.Info("worker-shutdown-completed")
}

// Run sending of webhooks until the Shutdown method is called.
func (w *Worker) Run(ctx context.Context) {
idleConnsClosed := make(chan struct{})
go func() {
sigint := make(chan os.Signal, 1)

// interrupt signal sent from terminal
signal.Notify(sigint, os.Interrupt)
// sigterm signal sent from kubernetes
signal.Notify(sigint, syscall.SIGTERM)

<-sigint

// We received an interrupt signal, shut down.
w.Shutdown(ctx)
close(idleConnsClosed)
}()

w.logger.Info("worker-started")
w.run(ctx)

<-idleConnsClosed
}

// Shutdown stops the forloop in Run method.
func (w *Worker) Shutdown(ctx context.Context) {
w.isStop = true
Expand Down
12 changes: 6 additions & 6 deletions service/worker_test.go
Expand Up @@ -16,7 +16,7 @@ func TestWorker(t *testing.T) {
ctx := context.Background()
pollingInterval := 10 * time.Millisecond

t.Run("Run with dispatch error", func(t *testing.T) {
t.Run("run with dispatch error", func(t *testing.T) {
deliveryRepository := &mocks.DeliveryRepository{}
logger, _ := zap.NewDevelopment()
workerService := NewWorker(deliveryRepository, logger, pollingInterval)
Expand All @@ -26,12 +26,12 @@ func TestWorker(t *testing.T) {
go func() {
workerService.Shutdown(ctx)
}()
workerService.Run(ctx)
workerService.run(ctx)

deliveryRepository.AssertExpectations(t)
})

t.Run("Run with no dispatch", func(t *testing.T) {
t.Run("run with no dispatch", func(t *testing.T) {
deliveryRepository := &mocks.DeliveryRepository{}
logger, _ := zap.NewDevelopment()
workerService := NewWorker(deliveryRepository, logger, pollingInterval)
Expand All @@ -41,12 +41,12 @@ func TestWorker(t *testing.T) {
go func() {
workerService.Shutdown(ctx)
}()
workerService.Run(ctx)
workerService.run(ctx)

deliveryRepository.AssertExpectations(t)
})

t.Run("Run with dispatch", func(t *testing.T) {
t.Run("run with dispatch", func(t *testing.T) {
deliveryRepository := &mocks.DeliveryRepository{}
logger, _ := zap.NewDevelopment()
workerService := NewWorker(deliveryRepository, logger, pollingInterval)
Expand All @@ -56,7 +56,7 @@ func TestWorker(t *testing.T) {
go func() {
workerService.Shutdown(ctx)
}()
workerService.Run(ctx)
workerService.run(ctx)

deliveryRepository.AssertExpectations(t)
})
Expand Down