Skip to content

Commit

Permalink
Merge pull request #163 from vgarvardt/feat/db-adapter-unwrap-tx
Browse files Browse the repository at this point in the history
feat: added UnwrapTx to all db adapters to get driver-specific tx
  • Loading branch information
vgarvardt committed Feb 26, 2023
2 parents 4b253ba + d627b3f commit 3867ad3
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 0 deletions.
11 changes: 11 additions & 0 deletions adapter/libpq/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ func NewTx(tx *sql.Tx) adapter.Tx {
return &aTx{tx: tx}
}

// UnwrapTx tries to unwrap driver-specific transaction instance from the interface.
// Returns unwrap success as the second parameter.
func UnwrapTx(tx adapter.Tx) (*sql.Tx, bool) {
driverTx, ok := tx.(*aTx)
if !ok {
return nil, false
}

return driverTx.tx, ok
}

// Exec implements adapter.Tx.Exec() using github.com/lib/pq
func (tx *aTx) Exec(ctx context.Context, query string, args ...any) (adapter.CommandTag, error) {
ct, err := tx.tx.ExecContext(ctx, query, args...)
Expand Down
11 changes: 11 additions & 0 deletions adapter/pgxv4/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ func NewTx(tx pgx.Tx) adapter.Tx {
return &aTx{tx: tx}
}

// UnwrapTx tries to unwrap driver-specific transaction instance from the interface.
// Returns unwrap success as the second parameter.
func UnwrapTx(tx adapter.Tx) (pgx.Tx, bool) {
driverTx, ok := tx.(*aTx)
if !ok {
return nil, false
}

return driverTx.tx, ok
}

// Exec implements adapter.Tx.Exec() using github.com/jackc/pgx/v4
func (tx *aTx) Exec(ctx context.Context, query string, args ...any) (adapter.CommandTag, error) {
ct, err := tx.tx.Exec(ctx, query, args...)
Expand Down
11 changes: 11 additions & 0 deletions adapter/pgxv5/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ func NewTx(tx pgx.Tx) adapter.Tx {
return &aTx{tx: tx}
}

// UnwrapTx tries to unwrap driver-specific transaction instance from the interface.
// Returns unwrap success as the second parameter.
func UnwrapTx(tx adapter.Tx) (pgx.Tx, bool) {
driverTx, ok := tx.(*aTx)
if !ok {
return nil, false
}

return driverTx.tx, ok
}

// Exec implements adapter.Tx.Exec() using github.com/jackc/pgx/v5
func (tx *aTx) Exec(ctx context.Context, query string, args ...any) (adapter.CommandTag, error) {
ct, err := tx.tx.Exec(ctx, query, args...)
Expand Down
83 changes: 83 additions & 0 deletions job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package gue

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/vgarvardt/gue/v5/adapter"
"github.com/vgarvardt/gue/v5/adapter/libpq"
"github.com/vgarvardt/gue/v5/adapter/pgxv4"
"github.com/vgarvardt/gue/v5/adapter/pgxv5"
adapterTesting "github.com/vgarvardt/gue/v5/adapter/testing"
)

func TestJob_Tx(t *testing.T) {
for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool {
t.Run(name, func(t *testing.T) {
testJobTxUnwrapTx(t, name, openFunc(t))
})
}
}

func testJobTxUnwrapTx(t *testing.T, name string, connPool adapter.ConnPool) {
ctx := context.Background()

c, err := NewClient(connPool)
require.NoError(t, err)

newJob := &Job{Type: "MyJob", Args: []byte(`{}`)}
err = c.Enqueue(ctx, newJob)
require.NoError(t, err)
require.NotEmpty(t, newJob.ID)

j, err := c.LockJob(ctx, "")
require.NoError(t, err)
require.NotNil(t, j)
require.NotNil(t, j.tx)

t.Cleanup(func() {
err := j.Done(ctx)
assert.NoError(t, err)
})

switch name {
case "pgx/v4":
_, okPgxV5 := pgxv5.UnwrapTx(j.Tx())
require.False(t, okPgxV5)
_, okLibPQ := libpq.UnwrapTx(j.Tx())
require.False(t, okLibPQ)

tx, okPgxV4 := pgxv4.UnwrapTx(j.Tx())
require.True(t, okPgxV4)

_, err := tx.Exec(ctx, `SELECT COUNT(1) FROM gue_jobs`)
require.NoError(t, err)

case "pgx/v5":
_, okPgxV4 := pgxv4.UnwrapTx(j.Tx())
require.False(t, okPgxV4)
_, okLibPQ := libpq.UnwrapTx(j.Tx())
require.False(t, okLibPQ)

tx, okPgxV5 := pgxv5.UnwrapTx(j.Tx())
require.True(t, okPgxV5)

_, err := tx.Exec(ctx, `SELECT COUNT(1) FROM gue_jobs`)
require.NoError(t, err)

case "lib/pq":
_, okPgxV4 := pgxv4.UnwrapTx(j.Tx())
require.False(t, okPgxV4)
_, okPgxV5 := pgxv5.UnwrapTx(j.Tx())
require.False(t, okPgxV5)

tx, okLibPQ := libpq.UnwrapTx(j.Tx())
require.True(t, okLibPQ)

_, err := tx.ExecContext(ctx, `SELECT COUNT(1) FROM gue_jobs`)
require.NoError(t, err)
}
}

0 comments on commit 3867ad3

Please sign in to comment.