Skip to content

Commit

Permalink
fix(spanner): retry session not found for read (googleapis#2724)
Browse files Browse the repository at this point in the history
* fix(spanner): retry session not found for read

`Session not found` errors were not retried for read operations on a
single-use read-only transaction.

Fixes googleapis#2718

* fix: rename method to satisfy lint check

* fix: remove unused variable
  • Loading branch information
olavloite authored and tritone committed Aug 25, 2020
1 parent 1f56da3 commit cab4945
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 11 deletions.
47 changes: 47 additions & 0 deletions spanner/client_test.go
Expand Up @@ -162,6 +162,53 @@ func TestClient_Single_SessionNotFound(t *testing.T) {
}
}

func TestClient_Single_Read_SessionNotFound(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodStreamingRead,
SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
ctx := context.Background()
iter := client.Single().Read(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"})
defer iter.Stop()
rowCount := int64(0)
for {
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
t.Fatal(err)
}
rowCount++
}
if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
t.Fatalf("row count mismatch\nGot: %v\nWant: %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
}
}

func TestClient_Single_ReadRow_SessionNotFound(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodStreamingRead,
SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
ctx := context.Background()
row, err := client.Single().ReadRow(ctx, "Albums", Key{"foo"}, []string{"SingerId", "AlbumId", "AlbumTitle"})
if err != nil {
t.Fatalf("Unexpected error for read row: %v", err)
}
if row == nil {
t.Fatal("ReadRow did not return a row")
}
}

func TestClient_Single_RetryableErrorOnPartialResultSet(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServer(t)
Expand Down
27 changes: 20 additions & 7 deletions spanner/internal/testutil/inmem_spanner_server.go
Expand Up @@ -84,6 +84,7 @@ const (
MethodExecuteSql string = "EXECUTE_SQL"
MethodExecuteStreamingSql string = "EXECUTE_STREAMING_SQL"
MethodExecuteBatchDml string = "EXECUTE_BATCH_DML"
MethodStreamingRead string = "EXECUTE_STREAMING_READ"
)

// StatementResult represents a mocked result on the test server. The result is
Expand Down Expand Up @@ -794,6 +795,10 @@ func (s *inMemSpannerServer) ExecuteStreamingSql(req *spannerpb.ExecuteSqlReques
if err := s.simulateExecutionTime(MethodExecuteStreamingSql, req); err != nil {
return err
}
return s.executeStreamingSQL(req, stream)
}

func (s *inMemSpannerServer) executeStreamingSQL(req *spannerpb.ExecuteSqlRequest, stream spannerpb.Spanner_ExecuteStreamingSqlServer) error {
if req.Session == "" {
return gstatus.Error(codes.InvalidArgument, "Missing session name")
}
Expand Down Expand Up @@ -917,14 +922,22 @@ func (s *inMemSpannerServer) Read(ctx context.Context, req *spannerpb.ReadReques
}

func (s *inMemSpannerServer) StreamingRead(req *spannerpb.ReadRequest, stream spannerpb.Spanner_StreamingReadServer) error {
s.mu.Lock()
if s.stopped {
s.mu.Unlock()
return gstatus.Error(codes.Unavailable, "server has been stopped")
if err := s.simulateExecutionTime(MethodStreamingRead, req); err != nil {
return err
}
s.receivedRequests <- req
s.mu.Unlock()
return gstatus.Error(codes.Unimplemented, "Method not yet implemented")
sqlReq := &spannerpb.ExecuteSqlRequest{
Session: req.Session,
Transaction: req.Transaction,
PartitionToken: req.PartitionToken,
ResumeToken: req.ResumeToken,
// KeySet is currently ignored.
Sql: fmt.Sprintf(
"SELECT %s FROM %s",
strings.Join(req.Columns, ", "),
req.Table,
),
}
return s.executeStreamingSQL(sqlReq, stream)
}

func (s *inMemSpannerServer) BeginTransaction(ctx context.Context, req *spannerpb.BeginTransactionRequest) (*spannerpb.Transaction, error) {
Expand Down
9 changes: 5 additions & 4 deletions spanner/transaction.go
Expand Up @@ -120,8 +120,8 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key
return &RowIterator{err: err}
}
// Cloud Spanner will return "Session not found" on bad sessions.
sid, client := sh.getID(), sh.getClient()
if sid == "" || client == nil {
client := sh.getClient()
if client == nil {
// Might happen if transaction is closed in the middle of a API call.
return &RowIterator{err: errSessionClosed(sh)}
}
Expand All @@ -133,13 +133,13 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key
limit = opts.Limit
}
}
return stream(
return streamWithReplaceSessionFunc(
contextWithOutgoingMetadata(ctx, sh.getMetadata()),
sh.session.logger,
func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
return client.StreamingRead(ctx,
&sppb.ReadRequest{
Session: sid,
Session: t.sh.getID(),
Transaction: ts,
Table: table,
Index: index,
Expand All @@ -149,6 +149,7 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key
Limit: int64(limit),
})
},
t.replaceSessionFunc,
t.setTimestamp,
t.release,
)
Expand Down

0 comments on commit cab4945

Please sign in to comment.