Skip to content

Commit

Permalink
feat: Add DeliveryRepository (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
allisson committed Mar 2, 2021
1 parent f1bea1b commit b4e974e
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 10 deletions.
1 change: 1 addition & 0 deletions db/migrations/000001_create_initial_schema.down.sql
@@ -1 +1,2 @@
DROP TABLE IF EXISTS deliveries;
DROP TABLE IF EXISTS webhooks;
21 changes: 20 additions & 1 deletion db/migrations/000001_create_initial_schema.up.sql
@@ -1,7 +1,7 @@
-- webhooks table

CREATE TABLE IF NOT EXISTS webhooks(
id uuid PRIMARY KEY,
id UUID PRIMARY KEY,
name VARCHAR NOT NULL,
url VARCHAR NOT NULL,
content_type VARCHAR NOT NULL,
Expand All @@ -19,3 +19,22 @@ CREATE TABLE IF NOT EXISTS webhooks(
CREATE INDEX IF NOT EXISTS webhooks_name_idx ON webhooks (name);
CREATE INDEX IF NOT EXISTS webhooks_active_idx ON webhooks (active);
CREATE INDEX IF NOT EXISTS webhooks_created_at_idx ON webhooks USING BRIN(created_at);

-- deliveries table

CREATE TABLE IF NOT EXISTS deliveries(
id UUID PRIMARY KEY,
webhook_id UUID NOT NULL,
payload TEXT NOT NULL,
scheduled_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
delivery_attempts SMALLINT NOT NULL,
status VARCHAR NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
FOREIGN KEY (webhook_id) REFERENCES webhooks (id) ON DELETE CASCADE
);

CREATE INDEX IF NOT EXISTS deliveries_webhook_id_idx ON deliveries (webhook_id);
CREATE INDEX IF NOT EXISTS deliveries_status_idx ON deliveries (status);
CREATE INDEX IF NOT EXISTS deliveries_scheduled_at_idx ON deliveries USING BRIN(scheduled_at);
CREATE INDEX IF NOT EXISTS deliveries_created_at_idx ON deliveries USING BRIN(created_at);
9 changes: 9 additions & 0 deletions repository.go
Expand Up @@ -21,3 +21,12 @@ type WebhookRepository interface {
Update(webhook *Webhook) error
Delete(id ID) error
}

// DeliveryRepository is the interface that will be used to iterate with the Delivery data.
type DeliveryRepository interface {
Get(getOptions *RepositoryGetOptions) (*Delivery, error)
List(listOptions *RepositoryListOptions) ([]*Delivery, error)
Create(delivery *Delivery) error
Update(delivery *Delivery) error
Delete(id ID) error
}
100 changes: 100 additions & 0 deletions repository/delivery.go
@@ -0,0 +1,100 @@
package repository

import (
"github.com/allisson/postmand"
"github.com/huandu/go-sqlbuilder"
"github.com/jmoiron/sqlx"
)

// Delivery implements postmand.DeliveryRepository interface.
type Delivery struct {
db *sqlx.DB
}

// Get returns postmand.Delivery by options filter.
func (d Delivery) Get(getOptions *postmand.RepositoryGetOptions) (*postmand.Delivery, error) {
delivery := postmand.Delivery{}
sb := sqlbuilder.PostgreSQL.NewSelectBuilder()
sb.Select("*").From("deliveries")
for key, value := range getOptions.Filters {
sb.Where(sb.Equal(key, value))
}
sql, args := sb.Build()
err := d.db.Get(&delivery, sql, args...)
return &delivery, err
}

// List returns a slice of postmand.Delivery by options filter.
func (d Delivery) List(listOptions *postmand.RepositoryListOptions) ([]*postmand.Delivery, error) {
deliveries := []*postmand.Delivery{}
sb := sqlbuilder.PostgreSQL.NewSelectBuilder()
sb.Select("*").From("deliveries").Limit(listOptions.Limit).Offset(listOptions.Offset)
for key, value := range listOptions.Filters {
sb.Where(sb.Equal(key, value))
}
if listOptions.OrderBy != "" {
sb.OrderBy(listOptions.OrderBy)
}
sql, args := sb.Build()
err := d.db.Select(&deliveries, sql, args...)
return deliveries, err
}

// Create postmand.Delivery on database.
func (d Delivery) Create(delivery *postmand.Delivery) error {
sqlStatement := `
INSERT INTO deliveries (
"id",
"webhook_id",
"payload",
"scheduled_at",
"delivery_attempts",
"status",
"created_at",
"updated_at"
)
VALUES (
:id,
:webhook_id,
:payload,
:scheduled_at,
:delivery_attempts,
:status,
:created_at,
:updated_at
)
`
_, err := d.db.NamedExec(sqlStatement, delivery)
return err
}

// Update postmand.Delivery on database.
func (d Delivery) Update(delivery *postmand.Delivery) error {
sqlStatement := `
UPDATE deliveries
SET webhook_id = :webhook_id,
payload = :payload,
scheduled_at = :scheduled_at,
delivery_attempts = :delivery_attempts,
status = :status,
created_at = :created_at,
updated_at = :updated_at
WHERE id = :id
`
_, err := d.db.NamedExec(sqlStatement, delivery)
return err
}

// Delete postmand.Delivery on database.
func (d Delivery) Delete(id postmand.ID) error {
sqlStatement := `
DELETE FROM deliveries WHERE id = $1
`
_, err := d.db.Exec(sqlStatement, id)
return err
}

// NewDelivery returns postmand.Delivery with db connection.
func NewDelivery(db *sqlx.DB) *Delivery {
return &Delivery{db: db}
}
125 changes: 125 additions & 0 deletions repository/delivery_test.go
@@ -0,0 +1,125 @@
package repository

import (
"database/sql"
"testing"
"time"

"github.com/allisson/postmand"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)

func makeDelivery() *postmand.Delivery {
return &postmand.Delivery{
ID: uuid.New(),
Payload: `{"success": true}`,
ScheduledAt: time.Now().UTC(),
DeliveryAttempts: 0,
Status: postmand.DeliveryStatusTodo,
}
}

func TestDelivery(t *testing.T) {
t.Run("Create delivery", func(t *testing.T) {
th := newTestHelper()
defer th.db.Close()

webhook := makeWebhook()
err := th.webhookRepository.Create(webhook)
assert.Nil(t, err)

delivery := makeDelivery()
delivery.WebhookID = webhook.ID
err = th.deliveryRepository.Create(delivery)
assert.Nil(t, err)
})

t.Run("Update delivery", func(t *testing.T) {
th := newTestHelper()
defer th.db.Close()

webhook := makeWebhook()
err := th.webhookRepository.Create(webhook)
assert.Nil(t, err)

delivery := makeDelivery()
delivery.WebhookID = webhook.ID
err = th.deliveryRepository.Create(delivery)
assert.Nil(t, err)

delivery.Status = postmand.DeliveryStatusDoing
err = th.deliveryRepository.Update(delivery)
assert.Nil(t, err)

options := postmand.RepositoryGetOptions{Filters: map[string]interface{}{"id": delivery.ID}}
deliveryFromRepository, err := th.deliveryRepository.Get(&options)
assert.Nil(t, err)
assert.Equal(t, postmand.DeliveryStatusDoing, deliveryFromRepository.Status)
})

t.Run("Delete delivery", func(t *testing.T) {
th := newTestHelper()
defer th.db.Close()

webhook := makeWebhook()
err := th.webhookRepository.Create(webhook)
assert.Nil(t, err)

delivery := makeDelivery()
delivery.WebhookID = webhook.ID
err = th.deliveryRepository.Create(delivery)
assert.Nil(t, err)

err = th.deliveryRepository.Delete(delivery.ID)
assert.Nil(t, err)

options := postmand.RepositoryGetOptions{Filters: map[string]interface{}{"id": delivery.ID}}
_, err = th.deliveryRepository.Get(&options)
assert.Equal(t, sql.ErrNoRows, err)
})

t.Run("Get delivery", func(t *testing.T) {
th := newTestHelper()
defer th.db.Close()

webhook := makeWebhook()
err := th.webhookRepository.Create(webhook)
assert.Nil(t, err)

delivery := makeDelivery()
delivery.WebhookID = webhook.ID
err = th.deliveryRepository.Create(delivery)
assert.Nil(t, err)

options := postmand.RepositoryGetOptions{Filters: map[string]interface{}{"id": delivery.ID}}
deliveryFromRepository, err := th.deliveryRepository.Get(&options)
assert.Nil(t, err)
assert.Equal(t, delivery.ID, deliveryFromRepository.ID)
})

t.Run("List deliveries", func(t *testing.T) {
th := newTestHelper()
defer th.db.Close()

webhook := makeWebhook()
err := th.webhookRepository.Create(webhook)
assert.Nil(t, err)

delivery1 := makeDelivery()
delivery1.WebhookID = webhook.ID
err = th.deliveryRepository.Create(delivery1)
assert.Nil(t, err)

delivery2 := makeDelivery()
delivery2.WebhookID = webhook.ID
err = th.deliveryRepository.Create(delivery2)
assert.Nil(t, err)

options := postmand.RepositoryListOptions{Limit: 1, Offset: 1, OrderBy: "created_at DESC"}
deliveries, err := th.deliveryRepository.List(&options)
assert.Nil(t, err)
assert.Len(t, deliveries, 1)
assert.Equal(t, delivery2.ID, deliveries[0].ID)
})
}
10 changes: 6 additions & 4 deletions repository/util_test.go
Expand Up @@ -17,15 +17,17 @@ func init() {
}

type testHelper struct {
db *sqlx.DB
webhookRepository *Webhook
db *sqlx.DB
webhookRepository *Webhook
deliveryRepository *Delivery
}

func newTestHelper() testHelper {
cName := fmt.Sprintf("connection_%d", time.Now().UnixNano())
db, _ := sqlx.Open("pgx", cName)
return testHelper{
db: db,
webhookRepository: NewWebhook(db),
db: db,
webhookRepository: NewWebhook(db),
deliveryRepository: NewDelivery(db),
}
}
8 changes: 4 additions & 4 deletions repository/webhook.go
Expand Up @@ -40,7 +40,7 @@ func (w Webhook) List(listOptions *postmand.RepositoryListOptions) ([]*postmand.
return webhooks, err
}

// Create webhook on database.
// Create postmand.Webhook on database.
func (w Webhook) Create(webhook *postmand.Webhook) error {
sqlStatement := `
INSERT INTO webhooks (
Expand Down Expand Up @@ -78,7 +78,7 @@ func (w Webhook) Create(webhook *postmand.Webhook) error {
return err
}

// Update webhook on database.
// Update postmand.Webhook on database.
func (w Webhook) Update(webhook *postmand.Webhook) error {
sqlStatement := `
UPDATE webhooks
Expand All @@ -100,7 +100,7 @@ func (w Webhook) Update(webhook *postmand.Webhook) error {
return err
}

// Delete webhook on database.
// Delete postmand.Webhook on database.
func (w Webhook) Delete(id postmand.ID) error {
sqlStatement := `
DELETE FROM webhooks WHERE id = $1
Expand All @@ -109,7 +109,7 @@ func (w Webhook) Delete(id postmand.ID) error {
return err
}

// NewWebhook returns Webhook with db connection.
// NewWebhook returns postmand.Webhook with db connection.
func NewWebhook(db *sqlx.DB) *Webhook {
return &Webhook{db: db}
}
2 changes: 1 addition & 1 deletion repository/webhook_test.go
Expand Up @@ -82,7 +82,7 @@ func TestTransaction(t *testing.T) {
assert.Equal(t, webhook.ID, webhookFromRepository.ID)
})

t.Run("List webhook", func(t *testing.T) {
t.Run("List webhooks", func(t *testing.T) {
th := newTestHelper()
defer th.db.Close()

Expand Down

0 comments on commit b4e974e

Please sign in to comment.