diff --git a/.github/workflows/verify.yml b/.github/workflows/verify.yml index 7a3de1a..0e70c75 100644 --- a/.github/workflows/verify.yml +++ b/.github/workflows/verify.yml @@ -53,5 +53,5 @@ jobs: - name: Run Tests env: - POSTMAND_DATABASE_URL: "postgres://test:test@localhost:5432/postmand?sslmode=disable" - run: make db-migrate && go test -covermode=count -coverprofile=count.out -v ./... + POSTMAND_TEST_DATABASE_URL: "postgres://test:test@localhost:5432/postmand?sslmode=disable" + run: make db-test-migrate && go test -covermode=count -coverprofile=count.out -v ./... diff --git a/Makefile b/Makefile index 4111892..e41d24b 100644 --- a/Makefile +++ b/Makefile @@ -19,4 +19,7 @@ download-golang-migrate-binary: db-migrate: download-golang-migrate-binary ./migrate.$(PLATFORM)-amd64 -source file://db/migrations -database ${POSTMAND_DATABASE_URL} up -.PHONY: lint test download-golang-migrate-binary db-migrate +db-test-migrate: download-golang-migrate-binary + ./migrate.$(PLATFORM)-amd64 -source file://db/migrations -database ${POSTMAND_TEST_DATABASE_URL} up + +.PHONY: lint test download-golang-migrate-binary db-migrate db-test-migrate diff --git a/db/migrations/000001_create_initial_schema.up.sql b/db/migrations/000001_create_initial_schema.up.sql index d0d4d5f..0096830 100644 --- a/db/migrations/000001_create_initial_schema.up.sql +++ b/db/migrations/000001_create_initial_schema.up.sql @@ -45,8 +45,7 @@ CREATE TABLE IF NOT EXISTS delivery_attempts( id UUID PRIMARY KEY, webhook_id UUID NOT NULL, delivery_id UUID NOT NULL, - response_headers TEXT NOT NULL, - response_body TEXT NOT NULL, + raw_response TEXT NOT NULL, response_status_code SMALLINT NOT NULL, execution_duration SMALLINT NOT NULL, success BOOLEAN NOT NULL, diff --git a/entity.go b/entity.go index 9413def..ecab1d1 100644 --- a/entity.go +++ b/entity.go @@ -10,10 +10,8 @@ import ( ) const ( - // DeliveryStatusTodo represents the delivery todo status - DeliveryStatusTodo = "todo" - // DeliveryStatusDoing represents the delivery doing status - DeliveryStatusDoing = "doing" + // DeliveryStatusPending represents the delivery pending status + DeliveryStatusPending = "pending" // DeliveryStatusSucceeded represents the delivery succeeded status DeliveryStatusSucceeded = "succeeded" // DeliveryStatusFailed represents the delivery failed status @@ -73,7 +71,7 @@ func (d Delivery) Validate() error { validation.Field(&d.ID, validation.Required, is.UUIDv4), validation.Field(&d.WebhookID, validation.Required, is.UUIDv4), validation.Field(&d.ScheduledAt, validation.Required), - validation.Field(&d.Status, validation.Required, validation.In("todo", "doing", "succeeded", "failed")), + validation.Field(&d.Status, validation.Required, validation.In(DeliveryStatusPending, DeliveryStatusSucceeded, DeliveryStatusFailed)), ) } @@ -82,8 +80,7 @@ type DeliveryAttempt struct { ID ID `json:"id" db:"id"` WebhookID ID `json:"webhook_id" db:"webhook_id"` DeliveryID ID `json:"delivery_id" db:"delivery_id"` - ResponseHeaders string `json:"response_headers" db:"response_headers"` - ResponseBody string `json:"response_body" db:"response_body"` + RawResponse string `json:"raw_response" db:"raw_response"` ResponseStatusCode int `json:"response_status_code" db:"response_status_code"` ExecutionDuration int `json:"execution_duration" db:"execution_duration"` Success bool `json:"success" db:"success"` diff --git a/entity_test.go b/entity_test.go index 00d24c2..c0ffcba 100644 --- a/entity_test.go +++ b/entity_test.go @@ -96,7 +96,7 @@ func TestDelivery(t *testing.T) { WebhookID: uuid.New(), Payload: `{"success": true}`, ScheduledAt: time.Now().UTC(), - Status: "todo", + Status: DeliveryStatusPending, CreatedAt: time.Now().UTC(), UpdatedAt: time.Now().UTC(), } diff --git a/go.mod b/go.mod index b068683..fcbef37 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/google/uuid v1.2.0 github.com/huandu/go-sqlbuilder v1.12.0 github.com/jmoiron/sqlx v1.3.1 + github.com/jpillora/backoff v1.0.0 github.com/lib/pq v1.9.0 github.com/stretchr/testify v1.7.0 ) diff --git a/go.sum b/go.sum index 2d1b487..ca53c0f 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,8 @@ github.com/huandu/xstrings v1.3.2 h1:L18LIDzqlW6xN2rEkpdV8+oL/IXWJ1APd+vsdYy4Wdw github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/jmoiron/sqlx v1.3.1 h1:aLN7YINNZ7cYOPK3QC83dbM6KT0NMqVMw961TqrejlE= github.com/jmoiron/sqlx v1.3.1/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.9.0 h1:L8nSXQQzAYByakOFMTwpjRoHsMJklur4Gi59b6VivR8= github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= diff --git a/repository.go b/repository.go index 07e0818..b6e2891 100644 --- a/repository.go +++ b/repository.go @@ -11,6 +11,7 @@ type RepositoryListOptions struct { Limit int Offset int OrderBy string + Order string } // WebhookRepository is the interface that will be used to iterate with the Webhook data. @@ -29,6 +30,7 @@ type DeliveryRepository interface { Create(delivery *Delivery) error Update(delivery *Delivery) error Delete(id ID) error + Dispatch() error } // DeliveryAttemptRepository is the interface that will be used to iterate with the DeliveryAttempt data. diff --git a/repository/delivery.go b/repository/delivery.go index 7aee5e9..1c449c3 100644 --- a/repository/delivery.go +++ b/repository/delivery.go @@ -1,10 +1,74 @@ package repository import ( + "bytes" + "database/sql" + "net/http" + "net/http/httputil" + "time" + "github.com/allisson/postmand" + "github.com/google/uuid" "github.com/jmoiron/sqlx" + "github.com/jpillora/backoff" ) +type dispatchResponse struct { + RawResponse string + ResponseStatusCode int + ExecutionDuration int + Success bool + Error string +} + +func dispatchToURL(webhook *postmand.Webhook, delivery *postmand.Delivery) dispatchResponse { + dr := dispatchResponse{} + + // Prepare request + httpClient := &http.Client{Timeout: time.Duration(webhook.DeliveryAttemptTimeout) * time.Second} + request, err := http.NewRequest("POST", webhook.URL, bytes.NewBufferString(delivery.Payload)) + if err != nil { + dr.Success = false + dr.Error = err.Error() + return dr + } + request.Header.Set("Content-Type", webhook.ContentType) + + // Make request + start := time.Now() + response, err := httpClient.Do(request) + if err != nil { + dr.Success = false + dr.Error = err.Error() + return dr + } + latency := time.Since(start) + + // Create response dump + responseDump, err := httputil.DumpResponse(response, true) + if err != nil { + dr.Success = false + dr.Error = err.Error() + return dr + } + + // Verify response status code + success := false + for _, statusCode := range webhook.ValidStatusCodes { + if response.StatusCode == int(statusCode) { + success = true + } + } + + // Update dispatch response + dr.RawResponse = string(responseDump) + dr.ResponseStatusCode = response.StatusCode + dr.ExecutionDuration = int(latency.Milliseconds()) + dr.Success = success + + return dr +} + // Delivery implements postmand.DeliveryRepository interface. type Delivery struct { db *sqlx.DB @@ -35,7 +99,7 @@ func (d Delivery) Create(delivery *postmand.Delivery) error { // Update postmand.Delivery on database. func (d Delivery) Update(delivery *postmand.Delivery) error { - sql, args := updateQuery("deliveries", delivery) + sql, args := updateQuery("deliveries", delivery.ID, delivery) _, err := d.db.Exec(sql, args...) return err } @@ -49,6 +113,113 @@ func (d Delivery) Delete(id postmand.ID) error { return err } +// Dispatch fetchs a delivery and send to url destination. +func (d Delivery) Dispatch() error { + sqlStatement := ` + SELECT + deliveries.* + FROM + deliveries + INNER JOIN webhooks + ON deliveries.webhook_id = webhooks.id + WHERE + webhooks.active = true AND deliveries.status = $1 AND deliveries.scheduled_at <= $2 + ORDER BY + deliveries.created_at ASC + FOR UPDATE SKIP LOCKED + LIMIT + 1 + ` + + // Starts a new transaction + tx, err := d.db.Beginx() + if err != nil { + return err + } + + // Get delivery + delivery := postmand.Delivery{} + err = tx.Get(&delivery, sqlStatement, postmand.DeliveryStatusPending, time.Now().UTC()) + if err != nil { + // Skip if no result + if err == sql.ErrNoRows { + rollback("delivery not found", tx) + return nil + } + rollback("get delivery", tx) + return err + } + + // Get webhook + webhook := postmand.Webhook{} + getOptions := postmand.RepositoryGetOptions{Filters: map[string]interface{}{"id": delivery.WebhookID}} + sql, args := getQuery("webhooks", getOptions) + err = tx.Get(&webhook, sql, args...) + if err != nil { + rollback("get webhook", tx) + return err + } + + // Dispatch webhook + dr := dispatchToURL(&webhook, &delivery) + + // Update delivery + newDeliveryAttempts := delivery.DeliveryAttempts + 1 + newStatus := postmand.DeliveryStatusPending + newScheduledAt := delivery.ScheduledAt + if dr.Success { + newStatus = postmand.DeliveryStatusSucceeded + } else { + if newDeliveryAttempts >= webhook.MaxDeliveryAttempts { + newStatus = postmand.DeliveryStatusFailed + } else { + b := &backoff.Backoff{ + Min: time.Duration(webhook.RetryMinBackoff) * time.Second, + Max: time.Duration(webhook.RetryMaxBackoff) * time.Second, + Factor: 2, + Jitter: false, + } + newScheduledAt = time.Now().UTC().Add(b.ForAttempt(float64(delivery.DeliveryAttempts))) + } + } + delivery.DeliveryAttempts = newDeliveryAttempts + delivery.Status = newStatus + delivery.ScheduledAt = newScheduledAt + delivery.UpdatedAt = time.Now().UTC() + sql, args = updateQuery("deliveries", delivery.ID, delivery) + _, err = tx.Exec(sql, args...) + if err != nil { + rollback("update delivery", tx) + return err + } + + // Create delivery attempt + deliveryAttempt := postmand.DeliveryAttempt{ + ID: uuid.New(), + WebhookID: webhook.ID, + DeliveryID: delivery.ID, + RawResponse: dr.RawResponse, + ResponseStatusCode: dr.ResponseStatusCode, + ExecutionDuration: dr.ExecutionDuration, + Success: dr.Success, + Error: dr.Error, + CreatedAt: time.Now().UTC(), + } + sql, args = insertQuery("delivery_attempts", deliveryAttempt) + _, err = tx.Exec(sql, args...) + if err != nil { + rollback("create delivery attempt", tx) + return err + } + + if err := tx.Commit(); err != nil { + rollback("unable to commit", tx) + return err + } + + return nil +} + // NewDelivery returns postmand.Delivery with db connection. func NewDelivery(db *sqlx.DB) *Delivery { return &Delivery{db: db} diff --git a/repository/delivery_attempt_test.go b/repository/delivery_attempt_test.go index 1b48885..7db7362 100644 --- a/repository/delivery_attempt_test.go +++ b/repository/delivery_attempt_test.go @@ -2,21 +2,21 @@ package repository import ( "testing" + "time" "github.com/allisson/postmand" "github.com/google/uuid" "github.com/stretchr/testify/assert" ) -func makeDeliveryAttempt() *postmand.DeliveryAttempt { - return &postmand.DeliveryAttempt{ +func makeDeliveryAttempt() postmand.DeliveryAttempt { + return postmand.DeliveryAttempt{ ID: uuid.New(), - ResponseHeaders: `{"Content-Type": ["application/json; charset=utf-8"]}`, - ResponseBody: `{"message":"A requisição para registrar essa transação foi enviada."}`, ResponseStatusCode: 201, ExecutionDuration: 150, Success: true, Error: "", + CreatedAt: time.Now().UTC(), } } @@ -26,18 +26,18 @@ func TestDeliveryAttempt(t *testing.T) { defer th.db.Close() webhook := makeWebhook() - err := th.webhookRepository.Create(webhook) + err := th.webhookRepository.Create(&webhook) assert.Nil(t, err) delivery := makeDelivery() delivery.WebhookID = webhook.ID - err = th.deliveryRepository.Create(delivery) + err = th.deliveryRepository.Create(&delivery) assert.Nil(t, err) deliveryAttempt := makeDeliveryAttempt() deliveryAttempt.WebhookID = webhook.ID deliveryAttempt.DeliveryID = delivery.ID - err = th.deliveryAttemptRepository.Create(deliveryAttempt) + err = th.deliveryAttemptRepository.Create(&deliveryAttempt) assert.Nil(t, err) }) @@ -46,18 +46,18 @@ func TestDeliveryAttempt(t *testing.T) { defer th.db.Close() webhook := makeWebhook() - err := th.webhookRepository.Create(webhook) + err := th.webhookRepository.Create(&webhook) assert.Nil(t, err) delivery := makeDelivery() delivery.WebhookID = webhook.ID - err = th.deliveryRepository.Create(delivery) + err = th.deliveryRepository.Create(&delivery) assert.Nil(t, err) deliveryAttempt := makeDeliveryAttempt() deliveryAttempt.WebhookID = webhook.ID deliveryAttempt.DeliveryID = delivery.ID - err = th.deliveryAttemptRepository.Create(deliveryAttempt) + err = th.deliveryAttemptRepository.Create(&deliveryAttempt) assert.Nil(t, err) options := postmand.RepositoryGetOptions{Filters: map[string]interface{}{"id": deliveryAttempt.ID}} @@ -71,27 +71,27 @@ func TestDeliveryAttempt(t *testing.T) { defer th.db.Close() webhook := makeWebhook() - err := th.webhookRepository.Create(webhook) + err := th.webhookRepository.Create(&webhook) assert.Nil(t, err) delivery := makeDelivery() delivery.WebhookID = webhook.ID - err = th.deliveryRepository.Create(delivery) + err = th.deliveryRepository.Create(&delivery) assert.Nil(t, err) deliveryAttempt1 := makeDeliveryAttempt() deliveryAttempt1.WebhookID = webhook.ID deliveryAttempt1.DeliveryID = delivery.ID - err = th.deliveryAttemptRepository.Create(deliveryAttempt1) + err = th.deliveryAttemptRepository.Create(&deliveryAttempt1) assert.Nil(t, err) deliveryAttempt2 := makeDeliveryAttempt() deliveryAttempt2.WebhookID = webhook.ID deliveryAttempt2.DeliveryID = delivery.ID - err = th.deliveryAttemptRepository.Create(deliveryAttempt2) + err = th.deliveryAttemptRepository.Create(&deliveryAttempt2) assert.Nil(t, err) - options := postmand.RepositoryListOptions{Limit: 1, Offset: 1, OrderBy: "created_at DESC"} + options := postmand.RepositoryListOptions{Limit: 1, Offset: 0, OrderBy: "created_at", Order: "DESC"} deliveryAttempts, err := th.deliveryAttemptRepository.List(options) assert.Nil(t, err) assert.Len(t, deliveryAttempts, 1) diff --git a/repository/delivery_test.go b/repository/delivery_test.go index 1fe9964..2d2ac0f 100644 --- a/repository/delivery_test.go +++ b/repository/delivery_test.go @@ -2,6 +2,8 @@ package repository import ( "database/sql" + "net/http" + "net/http/httptest" "testing" "time" @@ -10,28 +12,82 @@ import ( "github.com/stretchr/testify/assert" ) -func makeDelivery() *postmand.Delivery { - return &postmand.Delivery{ +func makeDelivery() postmand.Delivery { + return postmand.Delivery{ ID: uuid.New(), Payload: `{"success": true}`, ScheduledAt: time.Now().UTC(), DeliveryAttempts: 0, - Status: postmand.DeliveryStatusTodo, + Status: postmand.DeliveryStatusPending, + CreatedAt: time.Now().UTC(), + UpdatedAt: time.Now().UTC(), } } +func TestDispatchToURL(t *testing.T) { + t.Run("Invalid webhook url", func(t *testing.T) { + webhook := makeWebhook() + webhook.URL = "http://localhost:9999" + delivery := makeDelivery() + delivery.WebhookID = webhook.ID + + dr := dispatchToURL(&webhook, &delivery) + assert.False(t, dr.Success) + assert.Equal(t, `Post "http://localhost:9999": dial tcp [::1]:9999: connect: connection refused`, dr.Error) + }) + + t.Run("Invalid response status code", func(t *testing.T) { + httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) + // nolint:errcheck + w.Write([]byte("OK")) + })) + defer httpServer.Close() + + webhook := makeWebhook() + webhook.URL = httpServer.URL + delivery := makeDelivery() + delivery.WebhookID = webhook.ID + + dr := dispatchToURL(&webhook, &delivery) + assert.NotEqual(t, "", dr.RawResponse) + assert.Equal(t, http.StatusNoContent, dr.ResponseStatusCode) + assert.False(t, dr.Success) + assert.Equal(t, "", dr.Error) + }) + + t.Run("Valid response status code", func(t *testing.T) { + httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // nolint:errcheck + w.Write([]byte("OK")) + })) + defer httpServer.Close() + + webhook := makeWebhook() + webhook.URL = httpServer.URL + delivery := makeDelivery() + delivery.WebhookID = webhook.ID + + dr := dispatchToURL(&webhook, &delivery) + assert.NotEqual(t, "", dr.RawResponse) + assert.Equal(t, http.StatusOK, dr.ResponseStatusCode) + assert.True(t, dr.Success) + assert.Equal(t, "", dr.Error) + }) +} + func TestDelivery(t *testing.T) { t.Run("Create delivery", func(t *testing.T) { th := newTestHelper() defer th.db.Close() webhook := makeWebhook() - err := th.webhookRepository.Create(webhook) + err := th.webhookRepository.Create(&webhook) assert.Nil(t, err) delivery := makeDelivery() delivery.WebhookID = webhook.ID - err = th.deliveryRepository.Create(delivery) + err = th.deliveryRepository.Create(&delivery) assert.Nil(t, err) }) @@ -40,22 +96,22 @@ func TestDelivery(t *testing.T) { defer th.db.Close() webhook := makeWebhook() - err := th.webhookRepository.Create(webhook) + err := th.webhookRepository.Create(&webhook) assert.Nil(t, err) delivery := makeDelivery() delivery.WebhookID = webhook.ID - err = th.deliveryRepository.Create(delivery) + err = th.deliveryRepository.Create(&delivery) assert.Nil(t, err) - delivery.Status = postmand.DeliveryStatusDoing - err = th.deliveryRepository.Update(delivery) + delivery.Status = postmand.DeliveryStatusPending + err = th.deliveryRepository.Update(&delivery) assert.Nil(t, err) options := postmand.RepositoryGetOptions{Filters: map[string]interface{}{"id": delivery.ID}} deliveryFromRepository, err := th.deliveryRepository.Get(options) assert.Nil(t, err) - assert.Equal(t, postmand.DeliveryStatusDoing, deliveryFromRepository.Status) + assert.Equal(t, postmand.DeliveryStatusPending, deliveryFromRepository.Status) }) t.Run("Delete delivery", func(t *testing.T) { @@ -63,12 +119,12 @@ func TestDelivery(t *testing.T) { defer th.db.Close() webhook := makeWebhook() - err := th.webhookRepository.Create(webhook) + err := th.webhookRepository.Create(&webhook) assert.Nil(t, err) delivery := makeDelivery() delivery.WebhookID = webhook.ID - err = th.deliveryRepository.Create(delivery) + err = th.deliveryRepository.Create(&delivery) assert.Nil(t, err) err = th.deliveryRepository.Delete(delivery.ID) @@ -84,12 +140,12 @@ func TestDelivery(t *testing.T) { defer th.db.Close() webhook := makeWebhook() - err := th.webhookRepository.Create(webhook) + err := th.webhookRepository.Create(&webhook) assert.Nil(t, err) delivery := makeDelivery() delivery.WebhookID = webhook.ID - err = th.deliveryRepository.Create(delivery) + err = th.deliveryRepository.Create(&delivery) assert.Nil(t, err) options := postmand.RepositoryGetOptions{Filters: map[string]interface{}{"id": delivery.ID}} @@ -103,23 +159,132 @@ func TestDelivery(t *testing.T) { defer th.db.Close() webhook := makeWebhook() - err := th.webhookRepository.Create(webhook) + err := th.webhookRepository.Create(&webhook) assert.Nil(t, err) delivery1 := makeDelivery() delivery1.WebhookID = webhook.ID - err = th.deliveryRepository.Create(delivery1) + err = th.deliveryRepository.Create(&delivery1) assert.Nil(t, err) delivery2 := makeDelivery() delivery2.WebhookID = webhook.ID - err = th.deliveryRepository.Create(delivery2) + err = th.deliveryRepository.Create(&delivery2) assert.Nil(t, err) - options := postmand.RepositoryListOptions{Limit: 1, Offset: 1, OrderBy: "created_at DESC"} + options := postmand.RepositoryListOptions{Limit: 1, Offset: 0, OrderBy: "created_at", Order: "DESC"} deliveries, err := th.deliveryRepository.List(options) assert.Nil(t, err) assert.Len(t, deliveries, 1) assert.Equal(t, delivery2.ID, deliveries[0].ID) }) + + t.Run("Dispatch delivery succeeded", func(t *testing.T) { + httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // nolint:errcheck + w.Write([]byte("OK")) + })) + defer httpServer.Close() + + th := newTestHelper() + defer th.db.Close() + + webhook := makeWebhook() + webhook.URL = httpServer.URL + err := th.webhookRepository.Create(&webhook) + assert.Nil(t, err) + + delivery := makeDelivery() + delivery.WebhookID = webhook.ID + err = th.deliveryRepository.Create(&delivery) + assert.Nil(t, err) + + err = th.deliveryRepository.Dispatch() + assert.Nil(t, err) + + options := postmand.RepositoryGetOptions{Filters: map[string]interface{}{"id": delivery.ID}} + deliveryFromRepository, err := th.deliveryRepository.Get(options) + assert.Nil(t, err) + assert.Equal(t, 1, deliveryFromRepository.DeliveryAttempts) + assert.Equal(t, postmand.DeliveryStatusSucceeded, deliveryFromRepository.Status) + + options = postmand.RepositoryGetOptions{Filters: map[string]interface{}{"delivery_id": delivery.ID}} + deliveryAttemptFromRepository, err := th.deliveryAttemptRepository.Get(options) + assert.Nil(t, err) + assert.True(t, deliveryAttemptFromRepository.Success) + }) + + t.Run("Dispatch delivery retry", func(t *testing.T) { + httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) + // nolint:errcheck + w.Write([]byte("OK")) + })) + defer httpServer.Close() + + th := newTestHelper() + defer th.db.Close() + + webhook := makeWebhook() + webhook.MaxDeliveryAttempts = 2 + webhook.URL = httpServer.URL + err := th.webhookRepository.Create(&webhook) + assert.Nil(t, err) + + delivery := makeDelivery() + delivery.WebhookID = webhook.ID + err = th.deliveryRepository.Create(&delivery) + assert.Nil(t, err) + + err = th.deliveryRepository.Dispatch() + assert.Nil(t, err) + + options := postmand.RepositoryGetOptions{Filters: map[string]interface{}{"id": delivery.ID}} + deliveryFromRepository, err := th.deliveryRepository.Get(options) + assert.Nil(t, err) + assert.Equal(t, 1, deliveryFromRepository.DeliveryAttempts) + assert.Equal(t, postmand.DeliveryStatusPending, deliveryFromRepository.Status) + assert.True(t, deliveryFromRepository.ScheduledAt.After(delivery.ScheduledAt)) + + options = postmand.RepositoryGetOptions{Filters: map[string]interface{}{"delivery_id": delivery.ID}} + deliveryAttemptFromRepository, err := th.deliveryAttemptRepository.Get(options) + assert.Nil(t, err) + assert.False(t, deliveryAttemptFromRepository.Success) + }) + + t.Run("Dispatch delivery failed", func(t *testing.T) { + httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) + // nolint:errcheck + w.Write([]byte("OK")) + })) + defer httpServer.Close() + + th := newTestHelper() + defer th.db.Close() + + webhook := makeWebhook() + webhook.URL = httpServer.URL + err := th.webhookRepository.Create(&webhook) + assert.Nil(t, err) + + delivery := makeDelivery() + delivery.WebhookID = webhook.ID + err = th.deliveryRepository.Create(&delivery) + assert.Nil(t, err) + + err = th.deliveryRepository.Dispatch() + assert.Nil(t, err) + + options := postmand.RepositoryGetOptions{Filters: map[string]interface{}{"id": delivery.ID}} + deliveryFromRepository, err := th.deliveryRepository.Get(options) + assert.Nil(t, err) + assert.Equal(t, 1, deliveryFromRepository.DeliveryAttempts) + assert.Equal(t, postmand.DeliveryStatusFailed, deliveryFromRepository.Status) + + options = postmand.RepositoryGetOptions{Filters: map[string]interface{}{"delivery_id": delivery.ID}} + deliveryAttemptFromRepository, err := th.deliveryAttemptRepository.Get(options) + assert.Nil(t, err) + assert.False(t, deliveryAttemptFromRepository.Success) + }) } diff --git a/repository/query.go b/repository/query.go index ec151cd..5145acf 100644 --- a/repository/query.go +++ b/repository/query.go @@ -1,8 +1,11 @@ package repository import ( + "log" + "github.com/allisson/postmand" "github.com/huandu/go-sqlbuilder" + "github.com/jmoiron/sqlx" ) func getQuery(tableName string, getOptions postmand.RepositoryGetOptions) (string, []interface{}) { @@ -20,8 +23,14 @@ func listQuery(tableName string, listOptions postmand.RepositoryListOptions) (st for key, value := range listOptions.Filters { sb.Where(sb.Equal(key, value)) } - if listOptions.OrderBy != "" { + if listOptions.OrderBy != "" && listOptions.Order != "" { sb.OrderBy(listOptions.OrderBy) + switch listOptions.Order { + case "asc", "ASC": + sb.Asc() + case "desc", "DESC": + sb.Desc() + } } return sb.Build() } @@ -32,8 +41,15 @@ func insertQuery(tableName string, structValue interface{}) (string, []interface return ib.Build() } -func updateQuery(tableName string, structValue interface{}) (string, []interface{}) { +func updateQuery(tableName string, id postmand.ID, structValue interface{}) (string, []interface{}) { theStruct := sqlbuilder.NewStruct(structValue).For(sqlbuilder.PostgreSQL) ib := theStruct.Update(tableName, structValue) + ib.Where(ib.Equal("id", id)) return ib.Build() } + +func rollback(msg string, tx *sqlx.Tx) { + if err := tx.Rollback(); err != nil { + log.Printf("%s: unable to rollback: %v\n", msg, err) + } +} diff --git a/repository/util_test.go b/repository/util_test.go index b4443c2..be4e437 100644 --- a/repository/util_test.go +++ b/repository/util_test.go @@ -12,7 +12,7 @@ import ( ) func init() { - txdb.Register("pgx", "postgres", os.Getenv("POSTMAND_DATABASE_URL")) + txdb.Register("pgx", "postgres", os.Getenv("POSTMAND_TEST_DATABASE_URL")) rand.Seed(time.Now().UnixNano()) } diff --git a/repository/webhook.go b/repository/webhook.go index 1773423..2c2f564 100644 --- a/repository/webhook.go +++ b/repository/webhook.go @@ -35,7 +35,7 @@ func (w Webhook) Create(webhook *postmand.Webhook) error { // Update postmand.Webhook on database. func (w Webhook) Update(webhook *postmand.Webhook) error { - sql, args := updateQuery("webhooks", webhook) + sql, args := updateQuery("webhooks", webhook.ID, webhook) _, err := w.db.Exec(sql, args...) return err } diff --git a/repository/webhook_test.go b/repository/webhook_test.go index 4b3ca2a..5329402 100644 --- a/repository/webhook_test.go +++ b/repository/webhook_test.go @@ -3,6 +3,7 @@ package repository import ( "database/sql" "testing" + "time" "github.com/allisson/postmand" "github.com/google/uuid" @@ -10,17 +11,20 @@ import ( "github.com/stretchr/testify/assert" ) -func makeWebhook() *postmand.Webhook { - return &postmand.Webhook{ +func makeWebhook() postmand.Webhook { + return postmand.Webhook{ ID: uuid.New(), Name: "Test", URL: "https://httpbin.org/post", ContentType: "application/json", + Active: true, ValidStatusCodes: pq.Int32Array{200, 201}, MaxDeliveryAttempts: 1, DeliveryAttemptTimeout: 1, RetryMinBackoff: 1, RetryMaxBackoff: 1, + CreatedAt: time.Now().UTC(), + UpdatedAt: time.Now().UTC(), } } @@ -30,7 +34,7 @@ func TestTransaction(t *testing.T) { defer th.db.Close() webhook := makeWebhook() - err := th.webhookRepository.Create(webhook) + err := th.webhookRepository.Create(&webhook) assert.Nil(t, err) }) @@ -39,11 +43,11 @@ func TestTransaction(t *testing.T) { defer th.db.Close() webhook := makeWebhook() - err := th.webhookRepository.Create(webhook) + err := th.webhookRepository.Create(&webhook) assert.Nil(t, err) webhook.ValidStatusCodes = pq.Int32Array{200, 201, 204} - err = th.webhookRepository.Update(webhook) + err = th.webhookRepository.Update(&webhook) assert.Nil(t, err) options := postmand.RepositoryGetOptions{Filters: map[string]interface{}{"id": webhook.ID}} @@ -57,7 +61,7 @@ func TestTransaction(t *testing.T) { defer th.db.Close() webhook := makeWebhook() - err := th.webhookRepository.Create(webhook) + err := th.webhookRepository.Create(&webhook) assert.Nil(t, err) err = th.webhookRepository.Delete(webhook.ID) @@ -73,7 +77,7 @@ func TestTransaction(t *testing.T) { defer th.db.Close() webhook := makeWebhook() - err := th.webhookRepository.Create(webhook) + err := th.webhookRepository.Create(&webhook) assert.Nil(t, err) options := postmand.RepositoryGetOptions{Filters: map[string]interface{}{"id": webhook.ID}} @@ -87,14 +91,14 @@ func TestTransaction(t *testing.T) { defer th.db.Close() webhook1 := makeWebhook() - err := th.webhookRepository.Create(webhook1) + err := th.webhookRepository.Create(&webhook1) assert.Nil(t, err) webhook2 := makeWebhook() - err = th.webhookRepository.Create(webhook2) + err = th.webhookRepository.Create(&webhook2) assert.Nil(t, err) - options := postmand.RepositoryListOptions{Limit: 1, Offset: 1, OrderBy: "created_at DESC"} + options := postmand.RepositoryListOptions{Limit: 1, Offset: 0, OrderBy: "created_at", Order: "DESC"} webhooks, err := th.webhookRepository.List(options) assert.Nil(t, err) assert.Len(t, webhooks, 1)