Skip to content

Commit

Permalink
feat(spanner): support rw-transaction with options (#3058)
Browse files Browse the repository at this point in the history
* feat(spanner): support rw-transaction with options
  • Loading branch information
hengfengli committed Oct 26, 2020
1 parent 5880482 commit 5130694
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 26 deletions.
26 changes: 22 additions & 4 deletions spanner/client.go
Expand Up @@ -424,11 +424,29 @@ func checkNestedTxn(ctx context.Context) error {
func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction")
defer func() { trace.EndSpan(ctx, err) }()
resp, err := c.rwTransaction(ctx, f, TransactionOptions{})
return resp.CommitTs, err
}

// ReadWriteTransactionWithOptions executes a read-write transaction with
// configurable options, with retries as necessary.
//
// ReadWriteTransactionWithOptions is a configurable ReadWriteTransaction.
//
// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for
// more details.
func (c *Client) ReadWriteTransactionWithOptions(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error, options TransactionOptions) (resp CommitResponse, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransactionWithOptions")
defer func() { trace.EndSpan(ctx, err) }()
resp, err = c.rwTransaction(ctx, f, options)
return resp, err
}

func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error, options TransactionOptions) (resp CommitResponse, err error) {
if err := checkNestedTxn(ctx); err != nil {
return time.Time{}, err
return resp, err
}
var (
ts time.Time
sh *sessionHandle
)
err = runWithRetryOnAbortedOrSessionNotFound(ctx, func(ctx context.Context) error {
Expand Down Expand Up @@ -457,13 +475,13 @@ func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Contex
if err = t.begin(ctx); err != nil {
return err
}
ts, err = t.runInTransaction(ctx, f)
resp, err = t.runInTransaction(ctx, f)
return err
})
if sh != nil {
sh.recycle()
}
return ts, err
return resp, err
}

// applyOption controls the behavior of Client.Apply.
Expand Down
62 changes: 40 additions & 22 deletions spanner/transaction.go
Expand Up @@ -74,6 +74,10 @@ type txReadOnly struct {
qo QueryOptions
}

// TransactionOptions provides options for a transaction.
type TransactionOptions struct {
}

// errSessionClosed returns error for using a recycled/destroyed session
func errSessionClosed(sh *sessionHandle) error {
return spannerErrorf(codes.FailedPrecondition,
Expand Down Expand Up @@ -954,22 +958,28 @@ func (t *ReadWriteTransaction) begin(ctx context.Context) error {
return err
}

// CommitResponse provides a response of a transaction commit in a database.
type CommitResponse struct {
// CommitTs is the commit time for a transaction.
CommitTs time.Time
}

// commit tries to commit a readwrite transaction to Cloud Spanner. It also
// returns the commit timestamp for the transactions.
func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) {
var ts time.Time
// returns the commit response for the transactions.
func (t *ReadWriteTransaction) commit(ctx context.Context) (CommitResponse, error) {
resp := CommitResponse{}
t.mu.Lock()
t.state = txClosed // No further operations after commit.
mPb, err := mutationsProto(t.wb)
t.mu.Unlock()
if err != nil {
return ts, err
return resp, err
}
// In case that sessionHandle was destroyed but transaction body fails to
// report it.
sid, client := t.sh.getID(), t.sh.getClient()
if sid == "" || client == nil {
return ts, errSessionClosed(t.sh)
return resp, errSessionClosed(t.sh)
}

res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{
Expand All @@ -980,15 +990,15 @@ func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) {
Mutations: mPb,
})
if e != nil {
return ts, toSpannerErrorWithCommitInfo(e, true)
return resp, toSpannerErrorWithCommitInfo(e, true)
}
if tstamp := res.GetCommitTimestamp(); tstamp != nil {
ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
resp.CommitTs = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
}
if isSessionNotFoundError(err) {
t.sh.destroy()
}
return ts, err
return resp, err
}

// rollback is called when a commit is aborted or the transaction body runs
Expand All @@ -1014,27 +1024,27 @@ func (t *ReadWriteTransaction) rollback(ctx context.Context) {
}

// runInTransaction executes f under a read-write transaction context.
func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (time.Time, error) {
func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (CommitResponse, error) {
var (
ts time.Time
resp CommitResponse
err error
errDuringCommit bool
)
if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil {
// Try to commit if transaction body returns no error.
ts, err = t.commit(ctx)
resp, err = t.commit(ctx)
errDuringCommit = err != nil
}
if err != nil {
if isAbortedErr(err) {
// Retry the transaction using the same session on ABORT error.
// Cloud Spanner will create the new transaction with the previous
// one's wound-wait priority.
return ts, err
return resp, err
}
if isSessionNotFoundError(err) {
t.sh.destroy()
return ts, err
return resp, err
}
// Rollback the transaction unless the error occurred during the
// commit. Executing a rollback after a commit has failed will
Expand All @@ -1045,10 +1055,10 @@ func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(cont
if !errDuringCommit {
t.rollback(ctx)
}
return ts, err
return resp, err
}
// err == nil, return commit timestamp.
return ts, nil
// err == nil, return commit response.
return resp, nil
}

// ReadWriteStmtBasedTransaction provides a wrapper of ReadWriteTransaction in
Expand All @@ -1074,6 +1084,18 @@ type ReadWriteStmtBasedTransaction struct {
// For most use cases, client.ReadWriteTransaction should be used, as it will
// handle all Aborted and 'Session not found' errors automatically.
func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWriteStmtBasedTransaction, error) {
return NewReadWriteStmtBasedTransactionWithOptions(ctx, c, TransactionOptions{})
}

// NewReadWriteStmtBasedTransactionWithOptions starts a read-write transaction
// with configurable options. Commit() or Rollback() must be called to end a
// transaction. If Commit() or Rollback() is not called, the session that is
// used by the transaction will not be returned to the pool and cause a session
// leak.
//
// NewReadWriteStmtBasedTransactionWithOptions is a configurable version of
// NewReadWriteStmtBasedTransaction.
func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, options TransactionOptions) (*ReadWriteStmtBasedTransaction, error) {
var (
sh *sessionHandle
err error
Expand Down Expand Up @@ -1105,19 +1127,15 @@ func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWrit
// Commit tries to commit a readwrite transaction to Cloud Spanner. It also
// returns the commit timestamp for the transactions.
func (t *ReadWriteStmtBasedTransaction) Commit(ctx context.Context) (time.Time, error) {
var (
ts time.Time
err error
)
ts, err = t.commit(ctx)
resp, err := t.commit(ctx)
// Rolling back an aborted transaction is not necessary.
if err != nil && status.Code(err) != codes.Aborted {
t.rollback(ctx)
}
if t.sh != nil {
t.sh.recycle()
}
return ts, err
return resp.CommitTs, err
}

// Rollback is called to cancel the ongoing transaction that has not been
Expand Down

0 comments on commit 5130694

Please sign in to comment.