Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat(spanner): Add support for RPC priority (#3341)
* feat(spanner): add support for RPC priority

Adds support for setting a priority per RPC. This can be any read/query/update
statement and it can be set for a commit RPC.

* fix: fix ApplyAtLeastOnce + add tests

* fix: only include RequestOptions if necessary

* fix: remove unnecessary blank line

* fix: merge txOpts and options

* fix: remove commented code

* fix: create generic createRequestOptions() method

Co-authored-by: larkee <31196561+larkee@users.noreply.github.com>
  • Loading branch information
olavloite and larkee committed Mar 31, 2021
1 parent 2b02a03 commit 88cf097
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 37 deletions.
17 changes: 14 additions & 3 deletions spanner/client.go
Expand Up @@ -487,6 +487,8 @@ type applyOption struct {
// If atLeastOnce == true, Client.Apply will execute the mutations on Cloud
// Spanner at least once.
atLeastOnce bool
// priority is the RPC priority that is used for the commit operation.
priority sppb.RequestOptions_Priority
}

// An ApplyOption is an optional argument to Apply.
Expand All @@ -509,6 +511,14 @@ func ApplyAtLeastOnce() ApplyOption {
}
}

// Priority returns an ApplyOptions that sets the RPC priority to use for the
// commit operation.
func Priority(priority sppb.RequestOptions_Priority) ApplyOption {
return func(ao *applyOption) {
ao.priority = priority
}
}

// Apply applies a list of mutations atomically to the database.
func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
ao := &applyOption{}
Expand All @@ -520,11 +530,12 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
defer func() { trace.EndSpan(ctx, err) }()

if !ao.atLeastOnce {
return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
return t.BufferWrite(ms)
})
}, TransactionOptions{CommitPriority: ao.priority})
return resp.CommitTs, err
}
t := &writeOnlyTransaction{c.idleSessions}
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority}
return t.applyAtLeastOnce(ctx, ms...)
}

Expand Down
195 changes: 194 additions & 1 deletion spanner/client_test.go
Expand Up @@ -869,7 +869,10 @@ func TestClient_ReadWriteStmtBasedTransactionWithOptions(t *testing.T) {
_, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()
tx, err := NewReadWriteStmtBasedTransactionWithOptions(ctx, client, TransactionOptions{CommitOptions{ReturnCommitStats: true}})
tx, err := NewReadWriteStmtBasedTransactionWithOptions(
ctx,
client,
TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}})
if err != nil {
t.Fatalf("Unexpected error when creating transaction: %v", err)
}
Expand Down Expand Up @@ -2344,6 +2347,196 @@ func TestClient_DoForEachRow_ShouldEndSpanWithQueryError(t *testing.T) {
}
}

func TestClient_ReadOnlyTransaction_Priority(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, qo := range []QueryOptions{
{},
{Priority: sppb.RequestOptions_PRIORITY_HIGH},
} {
for _, tx := range []*ReadOnlyTransaction{
client.Single(),
client.ReadOnlyTransaction(),
} {
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

if tx.singleUse {
tx = client.Single()
}
iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority})
iter.Next()
iter.Stop()

checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 2, sppb.RequestOptions{Priority: qo.Priority})
tx.Close()
}
}
}

func TestClient_ReadWriteTransaction_Priority(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, to := range []TransactionOptions{
{},
{CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
} {
for _, qo := range []QueryOptions{
{},
{Priority: sppb.RequestOptions_PRIORITY_MEDIUM},
} {
client.ReadWriteTransactionWithOptions(context.Background(), func(ctx context.Context, tx *ReadWriteTransaction) error {
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority})
iter.Next()
iter.Stop()

tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
tx.BatchUpdateWithOptions(context.Background(), []Statement{
NewStatement(UpdateBarSetFoo),
}, qo)
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{Priority: qo.Priority})

return nil
}, to)
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: to.CommitPriority})
}
}
}

func TestClient_StmtBasedReadWriteTransaction_Priority(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, to := range []TransactionOptions{
{},
{CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
} {
for _, qo := range []QueryOptions{
{},
{Priority: sppb.RequestOptions_PRIORITY_LOW},
} {
tx, _ := NewReadWriteStmtBasedTransactionWithOptions(context.Background(), client, to)
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority})
iter.Next()
iter.Stop()

tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
tx.BatchUpdateWithOptions(context.Background(), []Statement{
NewStatement(UpdateBarSetFoo),
}, qo)

checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{Priority: qo.Priority})
tx.Commit(context.Background())
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: to.CommitPriority})
}
}
}

func TestClient_PDML_Priority(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()

for _, qo := range []QueryOptions{
{},
{Priority: sppb.RequestOptions_PRIORITY_HIGH},
} {
client.PartitionedUpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{Priority: qo.Priority})
}
}

func TestClient_Apply_Priority(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})})
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, Priority(sppb.RequestOptions_PRIORITY_HIGH))
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_HIGH})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce())
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce(), Priority(sppb.RequestOptions_PRIORITY_MEDIUM))
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_MEDIUM})
}

func checkRequestsForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, reqCount int, ro sppb.RequestOptions) {
reqs := drainRequestsFromServer(server)
reqOptions := []*sppb.RequestOptions{}

for _, req := range reqs {
if sqlReq, ok := req.(*sppb.ExecuteSqlRequest); ok {
reqOptions = append(reqOptions, sqlReq.RequestOptions)
}
if batchReq, ok := req.(*sppb.ExecuteBatchDmlRequest); ok {
reqOptions = append(reqOptions, batchReq.RequestOptions)
}
if readReq, ok := req.(*sppb.ReadRequest); ok {
reqOptions = append(reqOptions, readReq.RequestOptions)
}
}

if got, want := len(reqOptions), reqCount; got != want {
t.Fatalf("Requests length mismatch\nGot: %v\nWant: %v", got, want)
}

for _, opts := range reqOptions {
var got sppb.RequestOptions_Priority
if opts != nil {
got = opts.Priority
}
want := ro.Priority
if got != want {
t.Fatalf("Request priority mismatch\nGot: %v\nWant: %v", got, want)
}
}
}

func checkCommitForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, ro sppb.RequestOptions) {
reqs := drainRequestsFromServer(server)
var commit *sppb.CommitRequest
var ok bool

for _, req := range reqs {
if commit, ok = req.(*sppb.CommitRequest); ok {
break
}
}

if commit == nil {
t.Fatalf("Missing commit request")
}

var got sppb.RequestOptions_Priority
if commit.RequestOptions != nil {
got = commit.RequestOptions.Priority
}
want := ro.Priority
if got != want {
t.Fatalf("Commit priority mismatch\nGot: %v\nWant: %v", got, want)
}
}

func TestClient_Single_Read_WithNumericKey(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 2 additions & 2 deletions spanner/integration_test.go
Expand Up @@ -1058,7 +1058,7 @@ func TestIntegration_ReadWriteTransactionWithOptions(t *testing.T) {
}
}

txOpts := TransactionOptions{CommitOptions{ReturnCommitStats: true}}
txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}}
resp, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
// Query Foo's balance and Bar's balance.
bf, e := readBalance(tx.Query(ctx,
Expand Down Expand Up @@ -1237,7 +1237,7 @@ func TestIntegration_ReadWriteTransaction_StatementBasedWithOptions(t *testing.T
}

var resp CommitResponse
txOpts := TransactionOptions{CommitOptions{ReturnCommitStats: true}}
txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}}
for {
tx, err := NewReadWriteStmtBasedTransactionWithOptions(ctx, client, txOpts)
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions spanner/pdml.go
Expand Up @@ -64,11 +64,12 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
return 0, ToSpannerError(err)
}
req := &sppb.ExecuteSqlRequest{
Session: sh.getID(),
Sql: statement.SQL,
Params: params,
ParamTypes: paramTypes,
QueryOptions: options.Options,
Session: sh.getID(),
Sql: statement.SQL,
Params: params,
ParamTypes: paramTypes,
QueryOptions: options.Options,
RequestOptions: createRequestOptions(&options),
}

// Make a retryer for Aborted and certain Internal errors.
Expand Down

0 comments on commit 88cf097

Please sign in to comment.