Skip to content

Commit

Permalink
feat(spanner): add metadata to RowIterator
Browse files Browse the repository at this point in the history
Adds ResultSetMetaData to the RowIterator struct. The metadata
are available after the first call to RowIterator.Next() as long
as that call did not return any other error than iterator.Done.

Fixes #1805
  • Loading branch information
olavloite committed Oct 19, 2020
1 parent a713912 commit 1ee538e
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 7 deletions.
45 changes: 45 additions & 0 deletions spanner/client_test.go
Expand Up @@ -1880,6 +1880,51 @@ func TestClient_QueryWithCallOptions(t *testing.T) {
}
}

func TestClient_ShouldReceiveMetadataForEmptyResultSet(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
// This creates an empty result set.
res := server.CreateSingleRowSingersResult(SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
sql := "SELECT SingerId, AlbumId, AlbumTitle FROM Albums WHERE 1=2"
server.TestSpanner.PutStatementResult(sql, res)
defer teardown()
ctx := context.Background()
iter := client.Single().Query(ctx, NewStatement(sql))
defer iter.Stop()
row, err := iter.Next()
if err != iterator.Done {
t.Errorf("Query result mismatch:\nGot: %v\nWant: <no rows>", row)
}
metadata := iter.Metadata
if metadata == nil {
t.Fatalf("Missing ResultSet Metadata")
}
if metadata.RowType == nil {
t.Fatalf("Missing ResultSet RowType")
}
if metadata.RowType.Fields == nil {
t.Fatalf("Missing ResultSet Fields")
}
if g, w := len(metadata.RowType.Fields), 3; g != w {
t.Fatalf("Field count mismatch\nGot: %v\nWant: %v", g, w)
}
wantFieldNames := []string{"SingerId", "AlbumId", "AlbumTitle"}
for i, w := range wantFieldNames {
g := metadata.RowType.Fields[i].Name
if g != w {
t.Fatalf("Field[%v] name mismatch\nGot: %v\nWant: %v", i, g, w)
}
}
wantFieldTypes := []sppb.TypeCode{sppb.TypeCode_INT64, sppb.TypeCode_INT64, sppb.TypeCode_STRING}
for i, w := range wantFieldTypes {
g := metadata.RowType.Fields[i].Type.Code
if g != w {
t.Fatalf("Field[%v] type mismatch\nGot: %v\nWant: %v", i, g, w)
}
}
}

func TestClient_EncodeCustomFieldType(t *testing.T) {
t.Parallel()

Expand Down
5 changes: 5 additions & 0 deletions spanner/integration_test.go
Expand Up @@ -3156,6 +3156,11 @@ func readAllTestTable(iter *RowIterator) ([]testTableRow, error) {
for {
row, err := iter.Next()
if err == iterator.Done {
if iter.Metadata == nil {
// All queries should always return metadata, regardless whether
// they return any rows or not.
return nil, errors.New("missing metadata from query")
}
return vals, nil
}
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions spanner/internal/testutil/inmem_spanner_server.go
Expand Up @@ -148,6 +148,10 @@ func (s *StatementResult) ToPartialResultSets(resumeToken []byte) (result []*spa
break
}
}
} else {
result = append(result, &spannerpb.PartialResultSet{
Metadata: s.ResultSet.Metadata,
})
}
return result, nil
}
Expand Down
21 changes: 15 additions & 6 deletions spanner/read.go
Expand Up @@ -102,6 +102,11 @@ type RowIterator struct {
// iterator.Done.
RowCount int64

// The metadata of the results of the query. The metadata are available
// after the first call to RowIterator.Next(), unless the first call to
// RowIterator.Next() returned an error that is not equal to iterator.Done.
Metadata *sppb.ResultSetMetadata

streamd *resumableStreamDecoder
rowd *partialResultSetDecoder
setTimestamp func(time.Time)
Expand Down Expand Up @@ -133,7 +138,11 @@ func (r *RowIterator) Next() (*Row, error) {
r.RowCount = rc
}
}
r.rows, r.err = r.rowd.add(prs)
var metadata *sppb.ResultSetMetadata
r.rows, metadata, r.err = r.rowd.add(prs)
if metadata != nil {
r.Metadata = metadata
}
if r.err != nil {
return nil, r.err
}
Expand Down Expand Up @@ -648,7 +657,7 @@ func errChunkedEmptyRow() error {

// add tries to merge a new PartialResultSet into buffered Row. It returns any
// rows that have been completed as a result.
func (p *partialResultSetDecoder) add(r *sppb.PartialResultSet) ([]*Row, error) {
func (p *partialResultSetDecoder) add(r *sppb.PartialResultSet) ([]*Row, *sppb.ResultSetMetadata, error) {
var rows []*Row
if r.Metadata != nil {
// Metadata should only be returned in the first result.
Expand All @@ -663,20 +672,20 @@ func (p *partialResultSetDecoder) add(r *sppb.PartialResultSet) ([]*Row, error)
}
}
if len(r.Values) == 0 {
return nil, nil
return nil, r.Metadata, nil
}
if p.chunked {
p.chunked = false
// Try to merge first value in r.Values into uncompleted row.
last := len(p.row.vals) - 1
if last < 0 { // confidence check
return nil, errChunkedEmptyRow()
return nil, nil, errChunkedEmptyRow()
}
var err error
// If p is chunked, then we should always try to merge p.last with
// r.first.
if p.row.vals[last], err = p.merge(p.row.vals[last], r.Values[0]); err != nil {
return nil, err
return nil, r.Metadata, err
}
r.Values = r.Values[1:]
// Merge is done, try to yield a complete Row.
Expand All @@ -698,7 +707,7 @@ func (p *partialResultSetDecoder) add(r *sppb.PartialResultSet) ([]*Row, error)
// also chunked.
p.chunked = true
}
return rows, nil
return rows, r.Metadata, nil
}

// isMergeable returns if a protobuf Value can be potentially merged with other
Expand Down
2 changes: 1 addition & 1 deletion spanner/read_test.go
Expand Up @@ -584,7 +584,7 @@ nextTest:
var rows []*Row
p := &partialResultSetDecoder{}
for j, v := range test.input {
rs, err := p.add(v)
rs, _, err := p.add(v)
if err != nil {
t.Errorf("test %d.%d: partialResultSetDecoder.add(%v) = %v; want nil", i, j, v, err)
continue nextTest
Expand Down

0 comments on commit 1ee538e

Please sign in to comment.