Skip to content

Commit

Permalink
feat: Add Dispatch method on DeliveryRepository (#6)
Browse files Browse the repository at this point in the history
* feat: Add Dispatch method on DeliveryRepository

* fix: Fix code for ordering and update tables

* fix: Execute commit before return

* feat: Add DispatchResult entity

* fix: Update query to receive status and datetime

* fix: Remove DispatchResult and query database on tests
  • Loading branch information
allisson committed Mar 3, 2021
1 parent 7f45d2b commit 8660882
Show file tree
Hide file tree
Showing 15 changed files with 421 additions and 61 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/verify.yml
Expand Up @@ -53,5 +53,5 @@ jobs:

- name: Run Tests
env:
POSTMAND_DATABASE_URL: "postgres://test:test@localhost:5432/postmand?sslmode=disable"
run: make db-migrate && go test -covermode=count -coverprofile=count.out -v ./...
POSTMAND_TEST_DATABASE_URL: "postgres://test:test@localhost:5432/postmand?sslmode=disable"
run: make db-test-migrate && go test -covermode=count -coverprofile=count.out -v ./...
5 changes: 4 additions & 1 deletion Makefile
Expand Up @@ -19,4 +19,7 @@ download-golang-migrate-binary:
db-migrate: download-golang-migrate-binary
./migrate.$(PLATFORM)-amd64 -source file://db/migrations -database ${POSTMAND_DATABASE_URL} up

.PHONY: lint test download-golang-migrate-binary db-migrate
db-test-migrate: download-golang-migrate-binary
./migrate.$(PLATFORM)-amd64 -source file://db/migrations -database ${POSTMAND_TEST_DATABASE_URL} up

.PHONY: lint test download-golang-migrate-binary db-migrate db-test-migrate
3 changes: 1 addition & 2 deletions db/migrations/000001_create_initial_schema.up.sql
Expand Up @@ -45,8 +45,7 @@ CREATE TABLE IF NOT EXISTS delivery_attempts(
id UUID PRIMARY KEY,
webhook_id UUID NOT NULL,
delivery_id UUID NOT NULL,
response_headers TEXT NOT NULL,
response_body TEXT NOT NULL,
raw_response TEXT NOT NULL,
response_status_code SMALLINT NOT NULL,
execution_duration SMALLINT NOT NULL,
success BOOLEAN NOT NULL,
Expand Down
11 changes: 4 additions & 7 deletions entity.go
Expand Up @@ -10,10 +10,8 @@ import (
)

const (
// DeliveryStatusTodo represents the delivery todo status
DeliveryStatusTodo = "todo"
// DeliveryStatusDoing represents the delivery doing status
DeliveryStatusDoing = "doing"
// DeliveryStatusPending represents the delivery pending status
DeliveryStatusPending = "pending"
// DeliveryStatusSucceeded represents the delivery succeeded status
DeliveryStatusSucceeded = "succeeded"
// DeliveryStatusFailed represents the delivery failed status
Expand Down Expand Up @@ -73,7 +71,7 @@ func (d Delivery) Validate() error {
validation.Field(&d.ID, validation.Required, is.UUIDv4),
validation.Field(&d.WebhookID, validation.Required, is.UUIDv4),
validation.Field(&d.ScheduledAt, validation.Required),
validation.Field(&d.Status, validation.Required, validation.In("todo", "doing", "succeeded", "failed")),
validation.Field(&d.Status, validation.Required, validation.In(DeliveryStatusPending, DeliveryStatusSucceeded, DeliveryStatusFailed)),
)
}

Expand All @@ -82,8 +80,7 @@ type DeliveryAttempt struct {
ID ID `json:"id" db:"id"`
WebhookID ID `json:"webhook_id" db:"webhook_id"`
DeliveryID ID `json:"delivery_id" db:"delivery_id"`
ResponseHeaders string `json:"response_headers" db:"response_headers"`
ResponseBody string `json:"response_body" db:"response_body"`
RawResponse string `json:"raw_response" db:"raw_response"`
ResponseStatusCode int `json:"response_status_code" db:"response_status_code"`
ExecutionDuration int `json:"execution_duration" db:"execution_duration"`
Success bool `json:"success" db:"success"`
Expand Down
2 changes: 1 addition & 1 deletion entity_test.go
Expand Up @@ -96,7 +96,7 @@ func TestDelivery(t *testing.T) {
WebhookID: uuid.New(),
Payload: `{"success": true}`,
ScheduledAt: time.Now().UTC(),
Status: "todo",
Status: DeliveryStatusPending,
CreatedAt: time.Now().UTC(),
UpdatedAt: time.Now().UTC(),
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -8,6 +8,7 @@ require (
github.com/google/uuid v1.2.0
github.com/huandu/go-sqlbuilder v1.12.0
github.com/jmoiron/sqlx v1.3.1
github.com/jpillora/backoff v1.0.0
github.com/lib/pq v1.9.0
github.com/stretchr/testify v1.7.0
)
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -19,6 +19,8 @@ github.com/huandu/xstrings v1.3.2 h1:L18LIDzqlW6xN2rEkpdV8+oL/IXWJ1APd+vsdYy4Wdw
github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/jmoiron/sqlx v1.3.1 h1:aLN7YINNZ7cYOPK3QC83dbM6KT0NMqVMw961TqrejlE=
github.com/jmoiron/sqlx v1.3.1/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.9.0 h1:L8nSXQQzAYByakOFMTwpjRoHsMJklur4Gi59b6VivR8=
github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
Expand Down
2 changes: 2 additions & 0 deletions repository.go
Expand Up @@ -11,6 +11,7 @@ type RepositoryListOptions struct {
Limit int
Offset int
OrderBy string
Order string
}

// WebhookRepository is the interface that will be used to iterate with the Webhook data.
Expand All @@ -29,6 +30,7 @@ type DeliveryRepository interface {
Create(delivery *Delivery) error
Update(delivery *Delivery) error
Delete(id ID) error
Dispatch() error
}

// DeliveryAttemptRepository is the interface that will be used to iterate with the DeliveryAttempt data.
Expand Down
173 changes: 172 additions & 1 deletion repository/delivery.go
@@ -1,10 +1,74 @@
package repository

import (
"bytes"
"database/sql"
"net/http"
"net/http/httputil"
"time"

"github.com/allisson/postmand"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/jpillora/backoff"
)

type dispatchResponse struct {
RawResponse string
ResponseStatusCode int
ExecutionDuration int
Success bool
Error string
}

func dispatchToURL(webhook *postmand.Webhook, delivery *postmand.Delivery) dispatchResponse {
dr := dispatchResponse{}

// Prepare request
httpClient := &http.Client{Timeout: time.Duration(webhook.DeliveryAttemptTimeout) * time.Second}
request, err := http.NewRequest("POST", webhook.URL, bytes.NewBufferString(delivery.Payload))
if err != nil {
dr.Success = false
dr.Error = err.Error()
return dr
}
request.Header.Set("Content-Type", webhook.ContentType)

// Make request
start := time.Now()
response, err := httpClient.Do(request)
if err != nil {
dr.Success = false
dr.Error = err.Error()
return dr
}
latency := time.Since(start)

// Create response dump
responseDump, err := httputil.DumpResponse(response, true)
if err != nil {
dr.Success = false
dr.Error = err.Error()
return dr
}

// Verify response status code
success := false
for _, statusCode := range webhook.ValidStatusCodes {
if response.StatusCode == int(statusCode) {
success = true
}
}

// Update dispatch response
dr.RawResponse = string(responseDump)
dr.ResponseStatusCode = response.StatusCode
dr.ExecutionDuration = int(latency.Milliseconds())
dr.Success = success

return dr
}

// Delivery implements postmand.DeliveryRepository interface.
type Delivery struct {
db *sqlx.DB
Expand Down Expand Up @@ -35,7 +99,7 @@ func (d Delivery) Create(delivery *postmand.Delivery) error {

// Update postmand.Delivery on database.
func (d Delivery) Update(delivery *postmand.Delivery) error {
sql, args := updateQuery("deliveries", delivery)
sql, args := updateQuery("deliveries", delivery.ID, delivery)
_, err := d.db.Exec(sql, args...)
return err
}
Expand All @@ -49,6 +113,113 @@ func (d Delivery) Delete(id postmand.ID) error {
return err
}

// Dispatch fetchs a delivery and send to url destination.
func (d Delivery) Dispatch() error {
sqlStatement := `
SELECT
deliveries.*
FROM
deliveries
INNER JOIN webhooks
ON deliveries.webhook_id = webhooks.id
WHERE
webhooks.active = true AND deliveries.status = $1 AND deliveries.scheduled_at <= $2
ORDER BY
deliveries.created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT
1
`

// Starts a new transaction
tx, err := d.db.Beginx()
if err != nil {
return err
}

// Get delivery
delivery := postmand.Delivery{}
err = tx.Get(&delivery, sqlStatement, postmand.DeliveryStatusPending, time.Now().UTC())
if err != nil {
// Skip if no result
if err == sql.ErrNoRows {
rollback("delivery not found", tx)
return nil
}
rollback("get delivery", tx)
return err
}

// Get webhook
webhook := postmand.Webhook{}
getOptions := postmand.RepositoryGetOptions{Filters: map[string]interface{}{"id": delivery.WebhookID}}
sql, args := getQuery("webhooks", getOptions)
err = tx.Get(&webhook, sql, args...)
if err != nil {
rollback("get webhook", tx)
return err
}

// Dispatch webhook
dr := dispatchToURL(&webhook, &delivery)

// Update delivery
newDeliveryAttempts := delivery.DeliveryAttempts + 1
newStatus := postmand.DeliveryStatusPending
newScheduledAt := delivery.ScheduledAt
if dr.Success {
newStatus = postmand.DeliveryStatusSucceeded
} else {
if newDeliveryAttempts >= webhook.MaxDeliveryAttempts {
newStatus = postmand.DeliveryStatusFailed
} else {
b := &backoff.Backoff{
Min: time.Duration(webhook.RetryMinBackoff) * time.Second,
Max: time.Duration(webhook.RetryMaxBackoff) * time.Second,
Factor: 2,
Jitter: false,
}
newScheduledAt = time.Now().UTC().Add(b.ForAttempt(float64(delivery.DeliveryAttempts)))
}
}
delivery.DeliveryAttempts = newDeliveryAttempts
delivery.Status = newStatus
delivery.ScheduledAt = newScheduledAt
delivery.UpdatedAt = time.Now().UTC()
sql, args = updateQuery("deliveries", delivery.ID, delivery)
_, err = tx.Exec(sql, args...)
if err != nil {
rollback("update delivery", tx)
return err
}

// Create delivery attempt
deliveryAttempt := postmand.DeliveryAttempt{
ID: uuid.New(),
WebhookID: webhook.ID,
DeliveryID: delivery.ID,
RawResponse: dr.RawResponse,
ResponseStatusCode: dr.ResponseStatusCode,
ExecutionDuration: dr.ExecutionDuration,
Success: dr.Success,
Error: dr.Error,
CreatedAt: time.Now().UTC(),
}
sql, args = insertQuery("delivery_attempts", deliveryAttempt)
_, err = tx.Exec(sql, args...)
if err != nil {
rollback("create delivery attempt", tx)
return err
}

if err := tx.Commit(); err != nil {
rollback("unable to commit", tx)
return err
}

return nil
}

// NewDelivery returns postmand.Delivery with db connection.
func NewDelivery(db *sqlx.DB) *Delivery {
return &Delivery{db: db}
Expand Down

0 comments on commit 8660882

Please sign in to comment.