Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat(spanner): add metadata to RowIterator (#3050)
Adds ResultSetMetaData to the RowIterator struct. The metadata
are available after the first call to RowIterator.Next() as long
as that call did not return any other error than iterator.Done.

Fixes #1805
  • Loading branch information
olavloite committed Oct 20, 2020
1 parent 7d2d83e commit 9a2289c
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 7 deletions.
45 changes: 45 additions & 0 deletions spanner/client_test.go
Expand Up @@ -1880,6 +1880,51 @@ func TestClient_QueryWithCallOptions(t *testing.T) {
}
}

func TestClient_ShouldReceiveMetadataForEmptyResultSet(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
// This creates an empty result set.
res := server.CreateSingleRowSingersResult(SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
sql := "SELECT SingerId, AlbumId, AlbumTitle FROM Albums WHERE 1=2"
server.TestSpanner.PutStatementResult(sql, res)
defer teardown()
ctx := context.Background()
iter := client.Single().Query(ctx, NewStatement(sql))
defer iter.Stop()
row, err := iter.Next()
if err != iterator.Done {
t.Errorf("Query result mismatch:\nGot: %v\nWant: <no rows>", row)
}
metadata := iter.Metadata
if metadata == nil {
t.Fatalf("Missing ResultSet Metadata")
}
if metadata.RowType == nil {
t.Fatalf("Missing ResultSet RowType")
}
if metadata.RowType.Fields == nil {
t.Fatalf("Missing ResultSet Fields")
}
if g, w := len(metadata.RowType.Fields), 3; g != w {
t.Fatalf("Field count mismatch\nGot: %v\nWant: %v", g, w)
}
wantFieldNames := []string{"SingerId", "AlbumId", "AlbumTitle"}
for i, w := range wantFieldNames {
g := metadata.RowType.Fields[i].Name
if g != w {
t.Fatalf("Field[%v] name mismatch\nGot: %v\nWant: %v", i, g, w)
}
}
wantFieldTypes := []sppb.TypeCode{sppb.TypeCode_INT64, sppb.TypeCode_INT64, sppb.TypeCode_STRING}
for i, w := range wantFieldTypes {
g := metadata.RowType.Fields[i].Type.Code
if g != w {
t.Fatalf("Field[%v] type mismatch\nGot: %v\nWant: %v", i, g, w)
}
}
}

func TestClient_EncodeCustomFieldType(t *testing.T) {
t.Parallel()

Expand Down
5 changes: 5 additions & 0 deletions spanner/integration_test.go
Expand Up @@ -3156,6 +3156,11 @@ func readAllTestTable(iter *RowIterator) ([]testTableRow, error) {
for {
row, err := iter.Next()
if err == iterator.Done {
if iter.Metadata == nil {
// All queries should always return metadata, regardless whether
// they return any rows or not.
return nil, errors.New("missing metadata from query")
}
return vals, nil
}
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions spanner/internal/testutil/inmem_spanner_server.go
Expand Up @@ -148,6 +148,10 @@ func (s *StatementResult) ToPartialResultSets(resumeToken []byte) (result []*spa
break
}
}
} else {
result = append(result, &spannerpb.PartialResultSet{
Metadata: s.ResultSet.Metadata,
})
}
return result, nil
}
Expand Down
21 changes: 15 additions & 6 deletions spanner/read.go
Expand Up @@ -102,6 +102,11 @@ type RowIterator struct {
// iterator.Done.
RowCount int64

// The metadata of the results of the query. The metadata are available
// after the first call to RowIterator.Next(), unless the first call to
// RowIterator.Next() returned an error that is not equal to iterator.Done.
Metadata *sppb.ResultSetMetadata

streamd *resumableStreamDecoder
rowd *partialResultSetDecoder
setTimestamp func(time.Time)
Expand Down Expand Up @@ -133,7 +138,11 @@ func (r *RowIterator) Next() (*Row, error) {
r.RowCount = rc
}
}
r.rows, r.err = r.rowd.add(prs)
var metadata *sppb.ResultSetMetadata
r.rows, metadata, r.err = r.rowd.add(prs)
if metadata != nil {
r.Metadata = metadata
}
if r.err != nil {
return nil, r.err
}
Expand Down Expand Up @@ -648,7 +657,7 @@ func errChunkedEmptyRow() error {

// add tries to merge a new PartialResultSet into buffered Row. It returns any
// rows that have been completed as a result.
func (p *partialResultSetDecoder) add(r *sppb.PartialResultSet) ([]*Row, error) {
func (p *partialResultSetDecoder) add(r *sppb.PartialResultSet) ([]*Row, *sppb.ResultSetMetadata, error) {
var rows []*Row
if r.Metadata != nil {
// Metadata should only be returned in the first result.
Expand All @@ -663,20 +672,20 @@ func (p *partialResultSetDecoder) add(r *sppb.PartialResultSet) ([]*Row, error)
}
}
if len(r.Values) == 0 {
return nil, nil
return nil, r.Metadata, nil
}
if p.chunked {
p.chunked = false
// Try to merge first value in r.Values into uncompleted row.
last := len(p.row.vals) - 1
if last < 0 { // confidence check
return nil, errChunkedEmptyRow()
return nil, nil, errChunkedEmptyRow()
}
var err error
// If p is chunked, then we should always try to merge p.last with
// r.first.
if p.row.vals[last], err = p.merge(p.row.vals[last], r.Values[0]); err != nil {
return nil, err
return nil, r.Metadata, err
}
r.Values = r.Values[1:]
// Merge is done, try to yield a complete Row.
Expand All @@ -698,7 +707,7 @@ func (p *partialResultSetDecoder) add(r *sppb.PartialResultSet) ([]*Row, error)
// also chunked.
p.chunked = true
}
return rows, nil
return rows, r.Metadata, nil
}

// isMergeable returns if a protobuf Value can be potentially merged with other
Expand Down
2 changes: 1 addition & 1 deletion spanner/read_test.go
Expand Up @@ -584,7 +584,7 @@ nextTest:
var rows []*Row
p := &partialResultSetDecoder{}
for j, v := range test.input {
rs, err := p.add(v)
rs, _, err := p.add(v)
if err != nil {
t.Errorf("test %d.%d: partialResultSetDecoder.add(%v) = %v; want nil", i, j, v, err)
continue nextTest
Expand Down

0 comments on commit 9a2289c

Please sign in to comment.