Skip to content

Commit

Permalink
feat(firestore): Support DocumentRefs in OrderBy, Add Query.Serialize…
Browse files Browse the repository at this point in the history
…, Query.Deserialize for cross machine serialization (#4347)

Adds support for Serialization of Queries, useful for use with PartitionQuery APIs to distribute the work of streaming results. 

Also, as a side effect, DocumentRefs are now supported in OrderBy clauses.
  • Loading branch information
crwilcox committed Sep 2, 2021
1 parent d6d4a3c commit a0f7a02
Show file tree
Hide file tree
Showing 5 changed files with 345 additions and 65 deletions.
3 changes: 3 additions & 0 deletions firestore/collgroupref.go
Expand Up @@ -54,6 +54,9 @@ func newCollectionGroupRef(c *Client, dbPath, collectionID string) *CollectionGr
//
// If a Collection Group Query would return a large number of documents, this
// can help to subdivide the query to smaller working units that can be distributed.
//
// If the goal is to run the queries across processes or workers, it may be useful to use
// `Query.Serialize` and `Query.Deserialize` to serialize the query.
func (cgr CollectionGroupRef) GetPartitionedQueries(ctx context.Context, partitionCount int) ([]Query, error) {
qp, err := cgr.getPartitions(ctx, partitionCount)
if err != nil {
Expand Down
23 changes: 23 additions & 0 deletions firestore/doc.go
Expand Up @@ -193,6 +193,29 @@ as a query.
iter = client.Collection("States").Documents(ctx)
Collection Group Partition Queries
You can partition the documents of a Collection Group allowing for smaller subqueries.
collectionGroup = client.CollectionGroup("States")
partitions, err = collectionGroup.GetPartitionedQueries(ctx, 20)
You can also Serialize/Deserialize queries making it possible to run/stream the
queries elsewhere; another process or machine for instance.
queryProtos := make([][]byte, 0)
for _, query := range partitions {
protoBytes, err := query.Serialize()
// handle err
queryProtos = append(queryProtos, protoBytes)
...
}
for _, protoBytes := range queryProtos {
query, err := client.CollectionGroup("").Deserialize(protoBytes)
...
}
Transactions
Use a transaction to execute reads and writes atomically. All reads must happen
Expand Down
23 changes: 20 additions & 3 deletions firestore/integration_test.go
Expand Up @@ -1603,7 +1603,6 @@ func TestDetectProjectID(t *testing.T) {
}

func TestIntegration_ColGroupRefPartitions(t *testing.T) {
t.Skip("https://github.com/googleapis/google-cloud-go/issues/4325")
h := testHelper{t}
coll := integrationColl(t)
ctx := context.Background()
Expand All @@ -1622,7 +1621,7 @@ func TestIntegration_ColGroupRefPartitions(t *testing.T) {
{collectionID: coll.collectionID, expectedPartitionCount: 1},
} {
colGroup := iClient.CollectionGroup(tc.collectionID)
partitions, err := colGroup.getPartitions(ctx, 10)
partitions, err := colGroup.GetPartitionedQueries(ctx, 10)
if err != nil {
t.Fatalf("getPartitions: received unexpected error: %v", err)
}
Expand Down Expand Up @@ -1675,12 +1674,30 @@ func TestIntegration_ColGroupRefPartitionsLarge(t *testing.T) {
// Verify that we retrieve 383 documents across all partitions. (128*2 + 127)
totalCount := 0
for _, query := range partitions {

allDocs, err := query.Documents(ctx).GetAll()
if err != nil {
t.Fatalf("GetAll(): received unexpected error: %v", err)
}
totalCount += len(allDocs)

// Verify that serialization round-trips. Check that the same results are
// returned even if we use the proto converted query
queryBytes, err := query.Serialize()
if err != nil {
t.Fatalf("Serialize error: %v", err)
}
q, err := iClient.CollectionGroup("DNE").Deserialize(queryBytes)
if err != nil {
t.Fatalf("Deserialize error: %v", err)
}

protoReturnedDocs, err := q.Documents(ctx).GetAll()
if err != nil {
t.Fatalf("GetAll error: %v", err)
}
if len(allDocs) != len(protoReturnedDocs) {
t.Fatalf("Expected document count to be the same on both query runs: %v", err)
}
}

if got, want := totalCount, documentCount; got != want {
Expand Down

0 comments on commit a0f7a02

Please sign in to comment.