Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): support rw-transaction with options #3058

Merged
merged 3 commits into from Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 22 additions & 4 deletions spanner/client.go
Expand Up @@ -424,11 +424,29 @@ func checkNestedTxn(ctx context.Context) error {
func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction")
defer func() { trace.EndSpan(ctx, err) }()
resp, err := c.rwTransaction(ctx, f, TransactionOptions{})
return resp.CommitTs, err
}

// ReadWriteTransactionWithOptions executes a read-write transaction with
// configurable options, with retries as necessary.
//
// ReadWriteTransactionWithOptions is a configurable ReadWriteTransaction.
//
// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for
// more details.
func (c *Client) ReadWriteTransactionWithOptions(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error, options TransactionOptions) (resp CommitResponse, err error) {
olavloite marked this conversation as resolved.
Show resolved Hide resolved
ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransactionWithOptions")
defer func() { trace.EndSpan(ctx, err) }()
resp, err = c.rwTransaction(ctx, f, options)
return resp, err
}

func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error, options TransactionOptions) (resp CommitResponse, err error) {
if err := checkNestedTxn(ctx); err != nil {
return time.Time{}, err
return resp, err
}
var (
ts time.Time
sh *sessionHandle
)
err = runWithRetryOnAbortedOrSessionNotFound(ctx, func(ctx context.Context) error {
Expand Down Expand Up @@ -457,13 +475,13 @@ func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Contex
if err = t.begin(ctx); err != nil {
return err
}
ts, err = t.runInTransaction(ctx, f)
resp, err = t.runInTransaction(ctx, f)
return err
})
if sh != nil {
sh.recycle()
}
return ts, err
return resp, err
}

// applyOption controls the behavior of Client.Apply.
Expand Down
62 changes: 40 additions & 22 deletions spanner/transaction.go
Expand Up @@ -74,6 +74,10 @@ type txReadOnly struct {
qo QueryOptions
}

// TransactionOptions provides options for a transaction.
type TransactionOptions struct {
}

// errSessionClosed returns error for using a recycled/destroyed session
func errSessionClosed(sh *sessionHandle) error {
return spannerErrorf(codes.FailedPrecondition,
Expand Down Expand Up @@ -954,22 +958,28 @@ func (t *ReadWriteTransaction) begin(ctx context.Context) error {
return err
}

// CommitResponse provides a response of a transaction commit in a database.
type CommitResponse struct {
// CommitTs is the commit time for a transaction.
CommitTs time.Time
hengfengli marked this conversation as resolved.
Show resolved Hide resolved
}

// commit tries to commit a readwrite transaction to Cloud Spanner. It also
// returns the commit timestamp for the transactions.
func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) {
var ts time.Time
// returns the commit response for the transactions.
func (t *ReadWriteTransaction) commit(ctx context.Context) (CommitResponse, error) {
resp := CommitResponse{}
t.mu.Lock()
t.state = txClosed // No further operations after commit.
mPb, err := mutationsProto(t.wb)
t.mu.Unlock()
if err != nil {
return ts, err
return resp, err
}
// In case that sessionHandle was destroyed but transaction body fails to
// report it.
sid, client := t.sh.getID(), t.sh.getClient()
if sid == "" || client == nil {
return ts, errSessionClosed(t.sh)
return resp, errSessionClosed(t.sh)
}

res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{
Expand All @@ -980,15 +990,15 @@ func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) {
Mutations: mPb,
})
if e != nil {
return ts, toSpannerErrorWithCommitInfo(e, true)
return resp, toSpannerErrorWithCommitInfo(e, true)
}
if tstamp := res.GetCommitTimestamp(); tstamp != nil {
ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
resp.CommitTs = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
}
if isSessionNotFoundError(err) {
t.sh.destroy()
}
return ts, err
return resp, err
}

// rollback is called when a commit is aborted or the transaction body runs
Expand All @@ -1014,27 +1024,27 @@ func (t *ReadWriteTransaction) rollback(ctx context.Context) {
}

// runInTransaction executes f under a read-write transaction context.
func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (time.Time, error) {
func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (CommitResponse, error) {
var (
ts time.Time
resp CommitResponse
err error
errDuringCommit bool
)
if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil {
// Try to commit if transaction body returns no error.
ts, err = t.commit(ctx)
resp, err = t.commit(ctx)
errDuringCommit = err != nil
}
if err != nil {
if isAbortedErr(err) {
// Retry the transaction using the same session on ABORT error.
// Cloud Spanner will create the new transaction with the previous
// one's wound-wait priority.
return ts, err
return resp, err
}
if isSessionNotFoundError(err) {
t.sh.destroy()
return ts, err
return resp, err
}
// Rollback the transaction unless the error occurred during the
// commit. Executing a rollback after a commit has failed will
Expand All @@ -1045,10 +1055,10 @@ func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(cont
if !errDuringCommit {
t.rollback(ctx)
}
return ts, err
return resp, err
}
// err == nil, return commit timestamp.
return ts, nil
// err == nil, return commit response.
return resp, nil
}

// ReadWriteStmtBasedTransaction provides a wrapper of ReadWriteTransaction in
Expand All @@ -1074,6 +1084,18 @@ type ReadWriteStmtBasedTransaction struct {
// For most use cases, client.ReadWriteTransaction should be used, as it will
// handle all Aborted and 'Session not found' errors automatically.
func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWriteStmtBasedTransaction, error) {
return NewReadWriteStmtBasedTransactionWithOptions(ctx, c, TransactionOptions{})
}

// NewReadWriteStmtBasedTransactionWithOptions starts a read-write transaction
// with configurable options. Commit() or Rollback() must be called to end a
// transaction. If Commit() or Rollback() is not called, the session that is
// used by the transaction will not be returned to the pool and cause a session
// leak.
//
// NewReadWriteStmtBasedTransactionWithOptions is a configurable version of
// NewReadWriteStmtBasedTransaction.
func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, options TransactionOptions) (*ReadWriteStmtBasedTransaction, error) {
var (
sh *sessionHandle
err error
Expand Down Expand Up @@ -1105,19 +1127,15 @@ func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWrit
// Commit tries to commit a readwrite transaction to Cloud Spanner. It also
// returns the commit timestamp for the transactions.
func (t *ReadWriteStmtBasedTransaction) Commit(ctx context.Context) (time.Time, error) {
hengfengli marked this conversation as resolved.
Show resolved Hide resolved
var (
ts time.Time
err error
)
ts, err = t.commit(ctx)
resp, err := t.commit(ctx)
// Rolling back an aborted transaction is not necessary.
if err != nil && status.Code(err) != codes.Aborted {
t.rollback(ctx)
}
if t.sh != nil {
t.sh.recycle()
}
return ts, err
return resp.CommitTs, err
}

// Rollback is called to cancel the ongoing transaction that has not been
Expand Down