Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): Add support for RPC priority #3341

Merged
merged 12 commits into from Mar 31, 2021
17 changes: 14 additions & 3 deletions spanner/client.go
Expand Up @@ -487,6 +487,8 @@ type applyOption struct {
// If atLeastOnce == true, Client.Apply will execute the mutations on Cloud
// Spanner at least once.
atLeastOnce bool
// priority is the RPC priority that is used for the commit operation.
priority sppb.RequestOptions_Priority
}

// An ApplyOption is an optional argument to Apply.
Expand All @@ -509,6 +511,14 @@ func ApplyAtLeastOnce() ApplyOption {
}
}

// Priority returns an ApplyOptions that sets the RPC priority to use for the
// commit operation.
func Priority(priority sppb.RequestOptions_Priority) ApplyOption {
codyoss marked this conversation as resolved.
Show resolved Hide resolved
return func(ao *applyOption) {
ao.priority = priority
}
}

// Apply applies a list of mutations atomically to the database.
func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
ao := &applyOption{}
Expand All @@ -520,11 +530,12 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
defer func() { trace.EndSpan(ctx, err) }()

if !ao.atLeastOnce {
return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
return t.BufferWrite(ms)
})
}, TransactionOptions{CommitPriority: ao.priority})
return resp.CommitTs, err
}
t := &writeOnlyTransaction{c.idleSessions}
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority}
return t.applyAtLeastOnce(ctx, ms...)
}

Expand Down
195 changes: 194 additions & 1 deletion spanner/client_test.go
Expand Up @@ -869,7 +869,10 @@ func TestClient_ReadWriteStmtBasedTransactionWithOptions(t *testing.T) {
_, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()
tx, err := NewReadWriteStmtBasedTransactionWithOptions(ctx, client, TransactionOptions{CommitOptions{ReturnCommitStats: true}})
tx, err := NewReadWriteStmtBasedTransactionWithOptions(
ctx,
client,
TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}})
if err != nil {
t.Fatalf("Unexpected error when creating transaction: %v", err)
}
Expand Down Expand Up @@ -2344,6 +2347,196 @@ func TestClient_DoForEachRow_ShouldEndSpanWithQueryError(t *testing.T) {
}
}

func TestClient_ReadOnlyTransaction_Priority(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, qo := range []QueryOptions{
{},
{Priority: sppb.RequestOptions_PRIORITY_HIGH},
} {
for _, tx := range []*ReadOnlyTransaction{
client.Single(),
client.ReadOnlyTransaction(),
} {
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

if tx.singleUse {
tx = client.Single()
}
iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority})
iter.Next()
iter.Stop()

checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 2, sppb.RequestOptions{Priority: qo.Priority})
tx.Close()
}
}
}

func TestClient_ReadWriteTransaction_Priority(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, to := range []TransactionOptions{
{},
{CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM},
} {
for _, qo := range []QueryOptions{
{},
{Priority: sppb.RequestOptions_PRIORITY_MEDIUM},
} {
client.ReadWriteTransactionWithOptions(context.Background(), func(ctx context.Context, tx *ReadWriteTransaction) error {
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority})
iter.Next()
iter.Stop()

tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
tx.BatchUpdateWithOptions(context.Background(), []Statement{
NewStatement(UpdateBarSetFoo),
}, qo)
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{Priority: qo.Priority})

return nil
}, to)
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: to.CommitPriority})
}
}
}

func TestClient_StmtBasedReadWriteTransaction_Priority(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, to := range []TransactionOptions{
{},
{CommitPriority: sppb.RequestOptions_PRIORITY_LOW},
} {
for _, qo := range []QueryOptions{
{},
{Priority: sppb.RequestOptions_PRIORITY_LOW},
} {
tx, _ := NewReadWriteStmtBasedTransactionWithOptions(context.Background(), client, to)
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{Priority: qo.Priority})
iter.Next()
iter.Stop()

tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
tx.BatchUpdateWithOptions(context.Background(), []Statement{
NewStatement(UpdateBarSetFoo),
}, qo)

checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{Priority: qo.Priority})
tx.Commit(context.Background())
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: to.CommitPriority})
}
}
}

func TestClient_PDML_Priority(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()

for _, qo := range []QueryOptions{
{},
{Priority: sppb.RequestOptions_PRIORITY_HIGH},
} {
client.PartitionedUpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{Priority: qo.Priority})
}
}

func TestClient_Apply_Priority(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})})
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, Priority(sppb.RequestOptions_PRIORITY_HIGH))
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_HIGH})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce())
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce(), Priority(sppb.RequestOptions_PRIORITY_MEDIUM))
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_MEDIUM})
}

func checkRequestsForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, reqCount int, ro sppb.RequestOptions) {
reqs := drainRequestsFromServer(server)
reqOptions := []*sppb.RequestOptions{}

for _, req := range reqs {
if sqlReq, ok := req.(*sppb.ExecuteSqlRequest); ok {
reqOptions = append(reqOptions, sqlReq.RequestOptions)
}
if batchReq, ok := req.(*sppb.ExecuteBatchDmlRequest); ok {
reqOptions = append(reqOptions, batchReq.RequestOptions)
}
if readReq, ok := req.(*sppb.ReadRequest); ok {
reqOptions = append(reqOptions, readReq.RequestOptions)
}
}

if got, want := len(reqOptions), reqCount; got != want {
t.Fatalf("Requests length mismatch\nGot: %v\nWant: %v", got, want)
}

for _, opts := range reqOptions {
var got sppb.RequestOptions_Priority
if opts != nil {
got = opts.Priority
}
want := ro.Priority
if got != want {
t.Fatalf("Request priority mismatch\nGot: %v\nWant: %v", got, want)
}
}
}

func checkCommitForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, ro sppb.RequestOptions) {
reqs := drainRequestsFromServer(server)
var commit *sppb.CommitRequest
var ok bool

for _, req := range reqs {
if commit, ok = req.(*sppb.CommitRequest); ok {
break
}
}

if commit == nil {
t.Fatalf("Missing commit request")
}

var got sppb.RequestOptions_Priority
if commit.RequestOptions != nil {
got = commit.RequestOptions.Priority
}
want := ro.Priority
if got != want {
t.Fatalf("Commit priority mismatch\nGot: %v\nWant: %v", got, want)
}
}

func TestClient_Single_Read_WithNumericKey(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 2 additions & 2 deletions spanner/integration_test.go
Expand Up @@ -1058,7 +1058,7 @@ func TestIntegration_ReadWriteTransactionWithOptions(t *testing.T) {
}
}

txOpts := TransactionOptions{CommitOptions{ReturnCommitStats: true}}
txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}}
resp, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
// Query Foo's balance and Bar's balance.
bf, e := readBalance(tx.Query(ctx,
Expand Down Expand Up @@ -1237,7 +1237,7 @@ func TestIntegration_ReadWriteTransaction_StatementBasedWithOptions(t *testing.T
}

var resp CommitResponse
txOpts := TransactionOptions{CommitOptions{ReturnCommitStats: true}}
txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}}
for {
tx, err := NewReadWriteStmtBasedTransactionWithOptions(ctx, client, txOpts)
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions spanner/pdml.go
Expand Up @@ -64,11 +64,12 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
return 0, ToSpannerError(err)
}
req := &sppb.ExecuteSqlRequest{
Session: sh.getID(),
Sql: statement.SQL,
Params: params,
ParamTypes: paramTypes,
QueryOptions: options.Options,
Session: sh.getID(),
Sql: statement.SQL,
Params: params,
ParamTypes: paramTypes,
QueryOptions: options.Options,
RequestOptions: createRequestOptions(&options),
}

// Make a retryer for Aborted and certain Internal errors.
Expand Down