diff --git a/google-cloud-firestore/clirr-ignored-differences.xml b/google-cloud-firestore/clirr-ignored-differences.xml index 53b346d09..0efc11414 100644 --- a/google-cloud-firestore/clirr-ignored-differences.xml +++ b/google-cloud-firestore/clirr-ignored-differences.xml @@ -142,4 +142,19 @@ * + + + 7012 + com/google/cloud/firestore/spi/v1/FirestoreRpc + com.google.api.gax.rpc.UnaryCallable partitionQueryPagedCallable() + + + 7006 + com/google/cloud/firestore/Firestore + com.google.cloud.firestore.Query collectionGroup(java.lang.String) + com.google.cloud.firestore.CollectionGroup + + diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java new file mode 100644 index 000000000..c2677f087 --- /dev/null +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java @@ -0,0 +1,86 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.firestore; + +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.ApiExceptions; +import com.google.api.gax.rpc.ApiStreamObserver; +import com.google.cloud.firestore.v1.FirestoreClient; +import com.google.firestore.v1.Cursor; +import com.google.firestore.v1.PartitionQueryRequest; +import javax.annotation.Nullable; + +/** + * A Collection Group query matches all documents that are contained in a collection or + * subcollection with a specific collection ID. + */ +public class CollectionGroup extends Query { + CollectionGroup(FirestoreRpcContext rpcContext, String collectionId) { + super( + rpcContext, + QueryOptions.builder() + .setParentPath(rpcContext.getResourcePath()) + .setCollectionId(collectionId) + .setAllDescendants(true) + .build()); + } + + /** + * Partitions a query by returning partition cursors that can be used to run the query in + * parallel. The returned partition cursors are split points that can be used as starting/end + * points for the query results. + * + * @param desiredPartitionCount The desired maximum number of partition points. The number must be + * strictly positive. The actual number of partitions returned may be fewer. + * @param observer a stream observer that receives the result of the Partition request. + */ + public void getPartitions( + long desiredPartitionCount, ApiStreamObserver observer) { + // Partition queries require explicit ordering by __name__. + Query queryWithDefaultOrder = orderBy(FieldPath.DOCUMENT_ID); + + PartitionQueryRequest.Builder request = PartitionQueryRequest.newBuilder(); + request.setStructuredQuery(queryWithDefaultOrder.buildQuery()); + request.setParent(options.getParentPath().toString()); + + // Since we are always returning an extra partition (with en empty endBefore cursor), we + // reduce the desired partition count by one. + request.setPartitionCount(desiredPartitionCount - 1); + + final FirestoreClient.PartitionQueryPagedResponse response; + try { + response = + ApiExceptions.callAndTranslateApiException( + rpcContext.sendRequest( + request.build(), rpcContext.getClient().partitionQueryPagedCallable())); + } catch (ApiException exception) { + throw FirestoreException.apiException(exception); + } + + @Nullable Object[] lastCursor = null; + for (Cursor cursor : response.iterateAll()) { + Object[] decodedCursorValue = new Object[cursor.getValuesCount()]; + for (int i = 0; i < cursor.getValuesCount(); ++i) { + decodedCursorValue[i] = UserDataConverter.decodeValue(rpcContext, cursor.getValues(i)); + } + observer.onNext(new QueryPartition(queryWithDefaultOrder, lastCursor, decodedCursorValue)); + lastCursor = decodedCursorValue; + } + observer.onNext(new QueryPartition(queryWithDefaultOrder, lastCursor, null)); + observer.onCompleted(); + } +} diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/DocumentSnapshot.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/DocumentSnapshot.java index 48008d4c5..6b24ae958 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/DocumentSnapshot.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/DocumentSnapshot.java @@ -23,11 +23,9 @@ import com.google.firestore.v1.Document; import com.google.firestore.v1.Value; import com.google.firestore.v1.Write; -import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Objects; import javax.annotation.Nonnull; @@ -115,48 +113,6 @@ static DocumentSnapshot fromMissing( return new DocumentSnapshot(rpcContext, documentReference, null, readTime, null, null); } - private Object decodeValue(Value v) { - Value.ValueTypeCase typeCase = v.getValueTypeCase(); - switch (typeCase) { - case NULL_VALUE: - return null; - case BOOLEAN_VALUE: - return v.getBooleanValue(); - case INTEGER_VALUE: - return v.getIntegerValue(); - case DOUBLE_VALUE: - return v.getDoubleValue(); - case TIMESTAMP_VALUE: - return Timestamp.fromProto(v.getTimestampValue()); - case STRING_VALUE: - return v.getStringValue(); - case BYTES_VALUE: - return Blob.fromByteString(v.getBytesValue()); - case REFERENCE_VALUE: - String pathName = v.getReferenceValue(); - return new DocumentReference(rpcContext, ResourcePath.create(pathName)); - case GEO_POINT_VALUE: - return new GeoPoint( - v.getGeoPointValue().getLatitude(), v.getGeoPointValue().getLongitude()); - case ARRAY_VALUE: - List list = new ArrayList<>(); - List lv = v.getArrayValue().getValuesList(); - for (Value iv : lv) { - list.add(decodeValue(iv)); - } - return list; - case MAP_VALUE: - Map outputMap = new HashMap<>(); - Map inputMap = v.getMapValue().getFieldsMap(); - for (Map.Entry entry : inputMap.entrySet()) { - outputMap.put(entry.getKey(), decodeValue(entry.getValue())); - } - return outputMap; - default: - throw FirestoreException.invalidState(String.format("Unknown Value Type: %s", typeCase)); - } - } - /** * Returns the time at which this snapshot was read. * @@ -214,7 +170,7 @@ public Map getData() { Map decodedFields = new HashMap<>(); for (Map.Entry entry : fields.entrySet()) { - Object decodedValue = decodeValue(entry.getValue()); + Object decodedValue = UserDataConverter.decodeValue(rpcContext, entry.getValue()); decodedFields.put(entry.getKey(), decodedValue); } return decodedFields; @@ -293,7 +249,7 @@ public Object get(@Nonnull FieldPath fieldPath) { return null; } - return decodeValue(value); + return UserDataConverter.decodeValue(rpcContext, value); } /** diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java index 1b2040efb..e69a2f903 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java @@ -56,14 +56,14 @@ public interface Firestore extends Service, AutoCloseable { Iterable listCollections(); /** - * Creates and returns a new @link{Query} that includes all documents in the database that are - * contained in a collection or subcollection with the given @code{collectionId}. + * Creates and returns a new {@link CollectionGroup} that includes all documents in the database + * that are contained in a collection or subcollection with the given @code{collectionId}. * * @param collectionId Identifies the collections to query over. Every collection or subcollection * with this ID as the last segment of its path will be included. Cannot contain a slash. * @return The created Query. */ - Query collectionGroup(@Nonnull String collectionId); + CollectionGroup collectionGroup(@Nonnull String collectionId); /** * Executes the given updateFunction and then attempts to commit the changes applied within the diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java index 74cf96179..b33fa2747 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java @@ -263,12 +263,12 @@ public void onCompleted() { @Nonnull @Override - public Query collectionGroup(@Nonnull final String collectionId) { + public CollectionGroup collectionGroup(@Nonnull final String collectionId) { Preconditions.checkArgument( !collectionId.contains("/"), String.format( "Invalid collectionId '%s'. Collection IDs must not contain '/'.", collectionId)); - return new Query(this, collectionId); + return new CollectionGroup(this, collectionId); } @Nonnull diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java index 9d17ad00d..6b856abb0 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java @@ -296,21 +296,7 @@ abstract static class Builder { .build()); } - /** - * Creates a Collection Group query that matches all documents directly nested under a - * specifically named collection - */ - Query(FirestoreRpcContext rpcContext, String collectionId) { - this( - rpcContext, - QueryOptions.builder() - .setParentPath(rpcContext.getResourcePath()) - .setCollectionId(collectionId) - .setAllDescendants(true) - .build()); - } - - private Query(FirestoreRpcContext rpcContext, QueryOptions queryOptions) { + protected Query(FirestoreRpcContext rpcContext, QueryOptions queryOptions) { this.rpcContext = rpcContext; this.options = queryOptions; } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/QueryPartition.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/QueryPartition.java new file mode 100644 index 000000000..b87f625c4 --- /dev/null +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/QueryPartition.java @@ -0,0 +1,97 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.firestore; + +import java.util.Arrays; +import java.util.Objects; +import javax.annotation.Nullable; + +/** + * A split point that can be used in a query as a starting and/or end point for the query results. + * The cursors returned by {@link #getStartAt()} and {@link #getEndBefore()} can only be used in a + * query that matches the constraint of query that produced this partition. + */ +public class QueryPartition { + private final Query query; + @Nullable private final Object[] startAt; + @Nullable private final Object[] endBefore; + + public QueryPartition(Query query, @Nullable Object[] startAt, @Nullable Object[] endBefore) { + this.query = query; + this.startAt = startAt; + this.endBefore = endBefore; + } + + /** + * The cursor that defines the first result for this partition. {@code null} if this is the first + * partition. + * + * @return a cursor value that can be used with {@link Query#startAt(Object...)} or {@code null} + * if this is the first partition. + */ + @Nullable + public Object[] getStartAt() { + return startAt; + } + + /** + * The cursor that defines the first result after this partition. {@code null} if this is the last + * partition. + * + * @return a cursor value that can be used with {@link Query#endBefore(Object...)} or {@code null} + * if this is the last partition. + */ + @Nullable + public Object[] getEndBefore() { + return endBefore; + } + + /** + * Returns a query that only returns the documents for this partition. + * + * @return a query partitioned by a {@link Query#startAt(Object...)} and {@link + * Query#endBefore(Object...)} cursor. + */ + public Query createQuery() { + Query baseQuery = query; + if (startAt != null) { + baseQuery = baseQuery.startAt(startAt); + } + if (endBefore != null) { + baseQuery = baseQuery.endBefore(endBefore); + } + return baseQuery; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof QueryPartition)) return false; + QueryPartition partition = (QueryPartition) o; + return query.equals(partition.query) + && Arrays.equals(startAt, partition.startAt) + && Arrays.equals(endBefore, partition.endBefore); + } + + @Override + public int hashCode() { + int result = Objects.hash(query); + result = 31 * result + Arrays.hashCode(startAt); + result = 31 * result + Arrays.hashCode(endBefore); + return result; + } +} diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UserDataConverter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UserDataConverter.java index 14ee9553a..79b5dacd9 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UserDataConverter.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UserDataConverter.java @@ -22,7 +22,9 @@ import com.google.firestore.v1.MapValue; import com.google.firestore.v1.Value; import com.google.protobuf.NullValue; +import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -181,4 +183,46 @@ static Value encodeValue( throw FirestoreException.invalidState("Cannot convert %s to Firestore Value", sanitizedObject); } + + static Object decodeValue(FirestoreRpcContext rpcContext, Value v) { + Value.ValueTypeCase typeCase = v.getValueTypeCase(); + switch (typeCase) { + case NULL_VALUE: + return null; + case BOOLEAN_VALUE: + return v.getBooleanValue(); + case INTEGER_VALUE: + return v.getIntegerValue(); + case DOUBLE_VALUE: + return v.getDoubleValue(); + case TIMESTAMP_VALUE: + return Timestamp.fromProto(v.getTimestampValue()); + case STRING_VALUE: + return v.getStringValue(); + case BYTES_VALUE: + return Blob.fromByteString(v.getBytesValue()); + case REFERENCE_VALUE: + String pathName = v.getReferenceValue(); + return new DocumentReference(rpcContext, ResourcePath.create(pathName)); + case GEO_POINT_VALUE: + return new GeoPoint( + v.getGeoPointValue().getLatitude(), v.getGeoPointValue().getLongitude()); + case ARRAY_VALUE: + List list = new ArrayList<>(); + List lv = v.getArrayValue().getValuesList(); + for (Value iv : lv) { + list.add(decodeValue(rpcContext, iv)); + } + return list; + case MAP_VALUE: + Map outputMap = new HashMap<>(); + Map inputMap = v.getMapValue().getFieldsMap(); + for (Map.Entry entry : inputMap.entrySet()) { + outputMap.put(entry.getKey(), decodeValue(rpcContext, entry.getValue())); + } + return outputMap; + default: + throw FirestoreException.invalidState(String.format("Unknown Value Type: %s", typeCase)); + } + } } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/FirestoreRpc.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/FirestoreRpc.java index 2353e0138..29bb676f5 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/FirestoreRpc.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/FirestoreRpc.java @@ -20,6 +20,7 @@ import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.UnaryCallable; import com.google.cloud.ServiceRpc; +import com.google.cloud.firestore.v1.FirestoreClient; import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse; import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse; import com.google.firestore.v1.BatchGetDocumentsRequest; @@ -32,6 +33,7 @@ import com.google.firestore.v1.ListDocumentsRequest; import com.google.firestore.v1.ListenRequest; import com.google.firestore.v1.ListenResponse; +import com.google.firestore.v1.PartitionQueryRequest; import com.google.firestore.v1.RollbackRequest; import com.google.firestore.v1.RunQueryRequest; import com.google.firestore.v1.RunQueryResponse; @@ -64,6 +66,9 @@ public interface FirestoreRpc extends AutoCloseable, ServiceRpc { UnaryCallable listCollectionIdsPagedCallable(); + UnaryCallable + partitionQueryPagedCallable(); + /** Returns a list of documents. */ UnaryCallable listDocumentsPagedCallable(); diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/GrpcFirestoreRpc.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/GrpcFirestoreRpc.java index d2fee1507..2eccae513 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/GrpcFirestoreRpc.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/spi/v1/GrpcFirestoreRpc.java @@ -33,6 +33,7 @@ import com.google.cloud.NoCredentials; import com.google.cloud.ServiceOptions; import com.google.cloud.firestore.FirestoreOptions; +import com.google.cloud.firestore.v1.FirestoreClient; import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse; import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse; import com.google.cloud.firestore.v1.FirestoreSettings; @@ -52,6 +53,7 @@ import com.google.firestore.v1.ListDocumentsRequest; import com.google.firestore.v1.ListenRequest; import com.google.firestore.v1.ListenResponse; +import com.google.firestore.v1.PartitionQueryRequest; import com.google.firestore.v1.RollbackRequest; import com.google.firestore.v1.RunQueryRequest; import com.google.firestore.v1.RunQueryResponse; @@ -189,6 +191,12 @@ public UnaryCallable rollbackCallable() { return firestoreStub.listCollectionIdsPagedCallable(); } + @Override + public UnaryCallable + partitionQueryPagedCallable() { + return firestoreStub.partitionQueryPagedCallable(); + } + @Override public UnaryCallable listDocumentsPagedCallable() { diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java index 04f57b6a8..c41855de9 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java @@ -19,6 +19,7 @@ import static com.google.cloud.firestore.LocalFirestoreHelper.UPDATE_SINGLE_FIELD_OBJECT; import static com.google.cloud.firestore.LocalFirestoreHelper.map; import static java.util.Arrays.asList; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -49,6 +50,7 @@ import com.google.cloud.firestore.Precondition; import com.google.cloud.firestore.Query; import com.google.cloud.firestore.QueryDocumentSnapshot; +import com.google.cloud.firestore.QueryPartition; import com.google.cloud.firestore.QuerySnapshot; import com.google.cloud.firestore.SetOptions; import com.google.cloud.firestore.Transaction; @@ -551,6 +553,45 @@ public void endBefore() throws Exception { assertEquals(1L, querySnapshot.getDocuments().get(0).get("foo")); } + @Test + public void partitionedQuery() throws Exception { + int documentCount = 2 * 128 + 127; // Minimum partition size is 128. + + WriteBatch batch = firestore.batch(); + for (int i = 0; i < documentCount; ++i) { + batch.create(randomColl.document(), map("foo", i)); + } + batch.commit().get(); + + StreamConsumer consumer = new StreamConsumer<>(); + firestore.collectionGroup(randomColl.getId()).getPartitions(3, consumer); + final List partitions = consumer.consume().get(); + + assertNull(partitions.get(0).getStartAt()); + for (int i = 0; i < partitions.size() - 1; ++i) { + assertArrayEquals(partitions.get(i).getEndBefore(), partitions.get(i + 1).getStartAt()); + } + assertNull(partitions.get(partitions.size() - 1).getEndBefore()); + + // Validate that we can use the paritions to read the original documents. + int resultCount = 0; + for (QueryPartition partition : partitions) { + resultCount += partition.createQuery().get().get().size(); + } + assertEquals(documentCount, resultCount); + } + + @Test + public void emptyPartitionedQuery() throws Exception { + StreamConsumer consumer = new StreamConsumer<>(); + firestore.collectionGroup(randomColl.getId()).getPartitions(3, consumer); + final List partitions = consumer.consume().get(); + + assertEquals(1, partitions.size()); + assertNull(partitions.get(0).getStartAt()); + assertNull(partitions.get(0).getEndBefore()); + } + @Test public void failedTransaction() { try { @@ -1225,32 +1266,12 @@ public void getAllWithObserver() throws Exception { DocumentReference ref3 = randomColl.document("doc3"); - final List documentSnapshots = - Collections.synchronizedList(new ArrayList()); final DocumentReference[] documentReferences = {ref1, ref2, ref3}; - final SettableApiFuture future = SettableApiFuture.create(); - firestore.getAll( - documentReferences, - FieldMask.of("foo"), - new ApiStreamObserver() { - - @Override - public void onNext(DocumentSnapshot documentSnapshot) { - documentSnapshots.add(documentSnapshot); - } - - @Override - public void onError(Throwable throwable) { - future.setException(throwable); - } - - @Override - public void onCompleted() { - future.set(null); - } - }); - - future.get(); + + StreamConsumer consumer = new StreamConsumer<>(); + firestore.getAll(documentReferences, FieldMask.of("foo"), consumer); + + final List documentSnapshots = consumer.consume().get(); assertEquals( ALL_SUPPORTED_TYPES_OBJECT, documentSnapshots.get(0).toObject(AllSupportedTypes.class)); @@ -1284,4 +1305,29 @@ public void deleteNestedFieldUsingFieldPath() throws Exception { documentSnapshots = documentReference.get().get(); assertNull(documentSnapshots.getData().get("c.d")); } + + /** Wrapper around ApiStreamObserver that returns the results in a list. */ + private static class StreamConsumer implements ApiStreamObserver { + SettableApiFuture> done = SettableApiFuture.create(); + List results = Collections.synchronizedList(new ArrayList()); + + @Override + public void onNext(T element) { + results.add(element); + } + + @Override + public void onError(Throwable throwable) { + done.setException(throwable); + } + + @Override + public void onCompleted() { + done.set(results); + } + + public ApiFuture> consume() { + return done; + } + } }