Skip to content

Commit

Permalink
revert(spanner): Revert enable request options for batch read (#3905)" (
Browse files Browse the repository at this point in the history
#3987)

BREAKING CHANGE: Removes
- BatchReadOnlyTransaction.PartitionReadUsingIndexWithOptions
- BatchReadOnlyTransaction.PartitionReadWithOptions

This reverts commit 0c73db2.
  • Loading branch information
olavloite committed Apr 22, 2021
1 parent a5bbdb5 commit 9928131
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 94 deletions.
44 changes: 12 additions & 32 deletions spanner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,25 +94,9 @@ func (t *BatchReadOnlyTransaction) PartitionRead(ctx context.Context, table stri
return t.PartitionReadUsingIndex(ctx, table, "", keys, columns, opt)
}

// PartitionReadWithOptions returns a list of Partitions that can be used to
// read rows from the database. These partitions can be executed across multiple
// processes, even across different machines. The partition size and count hints
// can be configured using PartitionOptions. Pass a ReadOptions to modify the
// read operation.
func (t *BatchReadOnlyTransaction) PartitionReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opt PartitionOptions, readOptions ReadOptions) ([]*Partition, error) {
return t.PartitionReadUsingIndexWithOptions(ctx, table, "", keys, columns, opt, readOptions)
}

// PartitionReadUsingIndex returns a list of Partitions that can be used to read
// rows from the database using an index.
func (t *BatchReadOnlyTransaction) PartitionReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string, opt PartitionOptions) ([]*Partition, error) {
return t.PartitionReadUsingIndexWithOptions(ctx, table, index, keys, columns, opt, ReadOptions{})
}

// PartitionReadUsingIndexWithOptions returns a list of Partitions that can be
// used to read rows from the database using an index. Pass a ReadOptions to
// modify the read operation.
func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx context.Context, table, index string, keys KeySet, columns []string, opt PartitionOptions, readOptions ReadOptions) ([]*Partition, error) {
sh, ts, err := t.acquire(ctx)
if err != nil {
return nil, err
Expand All @@ -139,13 +123,12 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
})
// Prepare ReadRequest.
req := &sppb.ReadRequest{
Session: sid,
Transaction: ts,
Table: table,
Index: index,
Columns: columns,
KeySet: kset,
RequestOptions: createRequestOptions(readOptions.Priority, readOptions.RequestTag, ""),
Session: sid,
Transaction: ts,
Table: table,
Index: index,
Columns: columns,
KeySet: kset,
}
// Generate partitions.
for _, p := range resp.GetPartitions() {
Expand Down Expand Up @@ -194,13 +177,12 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement

// prepare ExecuteSqlRequest
r := &sppb.ExecuteSqlRequest{
Session: sid,
Transaction: ts,
Sql: statement.SQL,
Params: params,
ParamTypes: paramTypes,
QueryOptions: qOpts.Options,
RequestOptions: createRequestOptions(qOpts.Priority, qOpts.RequestTag, ""),
Session: sid,
Transaction: ts,
Sql: statement.SQL,
Params: params,
ParamTypes: paramTypes,
QueryOptions: qOpts.Options,
}

// generate Partitions
Expand Down Expand Up @@ -288,7 +270,6 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
Columns: p.rreq.Columns,
KeySet: p.rreq.KeySet,
PartitionToken: p.pt,
RequestOptions: p.rreq.RequestOptions,
ResumeToken: resumeToken,
})
}
Expand All @@ -302,7 +283,6 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
ParamTypes: p.qreq.ParamTypes,
QueryOptions: p.qreq.QueryOptions,
PartitionToken: p.pt,
RequestOptions: p.qreq.RequestOptions,
ResumeToken: resumeToken,
})
}
Expand Down
56 changes: 5 additions & 51 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2633,54 +2633,6 @@ func TestClient_Apply_Tagging(t *testing.T) {
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: "tx-tag"})
}

func TestClient_PartitionQuery_RequestOptions(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()

for _, qo := range []QueryOptions{
{},
{Priority: sppb.RequestOptions_PRIORITY_LOW},
{RequestTag: "batch-query-tag"},
{Priority: sppb.RequestOptions_PRIORITY_MEDIUM, RequestTag: "batch-query-with-medium-prio"},
} {
ctx := context.Background()
txn, _ := client.BatchReadOnlyTransaction(ctx, StrongRead())
partitions, _ := txn.PartitionQueryWithOptions(ctx, NewStatement(SelectFooFromBar), PartitionOptions{MaxPartitions: 10}, qo)
for _, p := range partitions {
iter := txn.Execute(ctx, p)
iter.Next()
iter.Stop()
}
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, len(partitions), sppb.RequestOptions{RequestTag: qo.RequestTag, Priority: qo.Priority})
}
}

func TestClient_PartitionRead_RequestOptions(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()

for _, ro := range []ReadOptions{
{},
{Priority: sppb.RequestOptions_PRIORITY_LOW},
{RequestTag: "batch-read-tag"},
{Priority: sppb.RequestOptions_PRIORITY_MEDIUM, RequestTag: "batch-read-with-medium-prio"},
} {
ctx := context.Background()
txn, _ := client.BatchReadOnlyTransaction(ctx, StrongRead())
partitions, _ := txn.PartitionReadWithOptions(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"}, PartitionOptions{MaxPartitions: 10}, ro)
for _, p := range partitions {
iter := txn.Execute(ctx, p)
iter.Next()
iter.Stop()
}
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, len(partitions), sppb.RequestOptions{RequestTag: ro.RequestTag, Priority: ro.Priority})
}
}

func checkRequestsForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, reqCount int, ro sppb.RequestOptions) {
reqs := drainRequestsFromServer(server)
reqOptions := []*sppb.RequestOptions{}
Expand All @@ -2702,10 +2654,12 @@ func checkRequestsForExpectedRequestOptions(t *testing.T, server InMemSpannerSer
}

for _, opts := range reqOptions {
if opts == nil {
opts = &sppb.RequestOptions{}
var got sppb.RequestOptions_Priority
if opts != nil {
got = opts.Priority
}
if got, want := opts.Priority, ro.Priority; got != want {
want := ro.Priority
if got != want {
t.Fatalf("Request priority mismatch\nGot: %v\nWant: %v", got, want)
}
if got, want := opts.RequestTag, ro.RequestTag; got != want {
Expand Down
19 changes: 8 additions & 11 deletions spanner/internal/testutil/inmem_spanner_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,17 +1057,14 @@ func (s *inMemSpannerServer) PartitionQuery(ctx context.Context, req *spannerpb.
}

func (s *inMemSpannerServer) PartitionRead(ctx context.Context, req *spannerpb.PartitionReadRequest) (*spannerpb.PartitionResponse, error) {
return s.PartitionQuery(ctx, &spannerpb.PartitionQueryRequest{
Session: req.Session,
Transaction: req.Transaction,
PartitionOptions: req.PartitionOptions,
// KeySet is currently ignored.
Sql: fmt.Sprintf(
"SELECT %s FROM %s",
strings.Join(req.Columns, ", "),
req.Table,
),
})
s.mu.Lock()
if s.stopped {
s.mu.Unlock()
return nil, gstatus.Error(codes.Unavailable, "server has been stopped")
}
s.receivedRequests <- req
s.mu.Unlock()
return nil, gstatus.Error(codes.Unimplemented, "Method not yet implemented")
}

// EncodeResumeToken return mock resume token encoding for an uint64 integer.
Expand Down

0 comments on commit 9928131

Please sign in to comment.