diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java index 3ffa564e9..bcfc3b920 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java @@ -16,16 +16,25 @@ package com.google.cloud.firestore; +import com.google.api.core.ApiFunction; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiExceptions; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.cloud.firestore.v1.FirestoreClient; +import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse; +import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.MoreExecutors; import com.google.firestore.v1.Cursor; import com.google.firestore.v1.PartitionQueryRequest; import io.opencensus.common.Scope; import io.opencensus.trace.Span; import io.opencensus.trace.Status; +import java.util.Collections; +import java.util.List; import javax.annotation.Nullable; /** @@ -33,6 +42,9 @@ * subcollection with a specific collection ID. */ public class CollectionGroup extends Query { + + final Query partitionQuery; + CollectionGroup(FirestoreRpcContext rpcContext, String collectionId) { super( rpcContext, @@ -41,6 +53,9 @@ public class CollectionGroup extends Query { .setCollectionId(collectionId) .setAllDescendants(true) .build()); + + // Partition queries require explicit ordering by __name__. + partitionQuery = orderBy(FieldPath.DOCUMENT_ID); } /** @@ -53,24 +68,12 @@ public class CollectionGroup extends Query { * @param observer a stream observer that receives the result of the Partition request. */ public void getPartitions( - long desiredPartitionCount, ApiStreamObserver observer) { - Preconditions.checkArgument( - desiredPartitionCount > 0, "Desired partition count must be one or greater"); - - // Partition queries require explicit ordering by __name__. - Query queryWithDefaultOrder = orderBy(FieldPath.DOCUMENT_ID); - + long desiredPartitionCount, final ApiStreamObserver observer) { if (desiredPartitionCount == 1) { // Short circuit if the user only requested a single partition. - observer.onNext(new QueryPartition(queryWithDefaultOrder, null, null)); + observer.onNext(new QueryPartition(partitionQuery, null, null)); } else { - PartitionQueryRequest.Builder request = PartitionQueryRequest.newBuilder(); - request.setStructuredQuery(queryWithDefaultOrder.buildQuery()); - request.setParent(options.getParentPath().toString()); - - // Since we are always returning an extra partition (with en empty endBefore cursor), we - // reduce the desired partition count by one. - request.setPartitionCount(desiredPartitionCount - 1); + PartitionQueryRequest request = buildRequest(desiredPartitionCount); final FirestoreClient.PartitionQueryPagedResponse response; final TraceUtil traceUtil = TraceUtil.getInstance(); @@ -79,26 +82,92 @@ public void getPartitions( response = ApiExceptions.callAndTranslateApiException( rpcContext.sendRequest( - request.build(), rpcContext.getClient().partitionQueryPagedCallable())); + request, rpcContext.getClient().partitionQueryPagedCallable())); + + consumePartitions( + response, + new Function() { + @Override + public Void apply(QueryPartition queryPartition) { + observer.onNext(queryPartition); + return null; + } + }); + + observer.onCompleted(); } catch (ApiException exception) { span.setStatus(Status.UNKNOWN.withDescription(exception.getMessage())); throw FirestoreException.forApiException(exception); } finally { span.end(TraceUtil.END_SPAN_OPTIONS); } + } + } + + public ApiFuture> getPartitions(long desiredPartitionCount) { + if (desiredPartitionCount == 1) { + // Short circuit if the user only requested a single partition. + return ApiFutures.immediateFuture( + Collections.singletonList(new QueryPartition(partitionQuery, null, null))); + } else { + PartitionQueryRequest request = buildRequest(desiredPartitionCount); - @Nullable Object[] lastCursor = null; - for (Cursor cursor : response.iterateAll()) { - Object[] decodedCursorValue = new Object[cursor.getValuesCount()]; - for (int i = 0; i < cursor.getValuesCount(); ++i) { - decodedCursorValue[i] = UserDataConverter.decodeValue(rpcContext, cursor.getValues(i)); - } - observer.onNext(new QueryPartition(queryWithDefaultOrder, lastCursor, decodedCursorValue)); - lastCursor = decodedCursorValue; + final TraceUtil traceUtil = TraceUtil.getInstance(); + Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_PARTITIONQUERY); + try (Scope scope = traceUtil.getTracer().withSpan(span)) { + return ApiFutures.transform( + rpcContext.sendRequest(request, rpcContext.getClient().partitionQueryPagedCallable()), + new ApiFunction>() { + @Override + public List apply(PartitionQueryPagedResponse response) { + final ImmutableList.Builder partitions = ImmutableList.builder(); + consumePartitions( + response, + new Function() { + @Override + public Void apply(QueryPartition queryPartition) { + partitions.add(queryPartition); + return null; + } + }); + return partitions.build(); + } + }, + MoreExecutors.directExecutor()); + } catch (ApiException exception) { + span.setStatus(Status.UNKNOWN.withDescription(exception.getMessage())); + throw FirestoreException.forApiException(exception); + } finally { + span.end(TraceUtil.END_SPAN_OPTIONS); } - observer.onNext(new QueryPartition(queryWithDefaultOrder, lastCursor, null)); } + } - observer.onCompleted(); + private PartitionQueryRequest buildRequest(long desiredPartitionCount) { + Preconditions.checkArgument( + desiredPartitionCount > 0, "Desired partition count must be one or greater"); + + PartitionQueryRequest.Builder request = PartitionQueryRequest.newBuilder(); + request.setStructuredQuery(partitionQuery.buildQuery()); + request.setParent(options.getParentPath().toString()); + + // Since we are always returning an extra partition (with en empty endBefore cursor), we + // reduce the desired partition count by one. + request.setPartitionCount(desiredPartitionCount - 1); + return request.build(); + } + + private void consumePartitions( + PartitionQueryPagedResponse response, Function consumer) { + @Nullable Object[] lastCursor = null; + for (Cursor cursor : response.iterateAll()) { + Object[] decodedCursorValue = new Object[cursor.getValuesCount()]; + for (int i = 0; i < cursor.getValuesCount(); ++i) { + decodedCursorValue[i] = UserDataConverter.decodeValue(rpcContext, cursor.getValues(i)); + } + consumer.apply(new QueryPartition(partitionQuery, lastCursor, decodedCursorValue)); + lastCursor = decodedCursorValue; + } + consumer.apply(new QueryPartition(partitionQuery, lastCursor, null)); } } diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java index c61be8e77..a5387a38d 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java @@ -618,6 +618,34 @@ public void partitionedQuery() throws Exception { assertEquals(documentCount, resultCount); } + @Test + public void partitionedQuery_future() throws Exception { + int documentCount = 2 * 128 + 127; // Minimum partition size is 128. + + WriteBatch batch = firestore.batch(); + for (int i = 0; i < documentCount; ++i) { + batch.create(randomColl.document(), map("foo", i)); + } + batch.commit().get(); + + ApiFuture> future = + firestore.collectionGroup(randomColl.getId()).getPartitions(3); + final List partitions = future.get(); + + assertNull(partitions.get(0).getStartAt()); + for (int i = 0; i < partitions.size() - 1; ++i) { + assertArrayEquals(partitions.get(i).getEndBefore(), partitions.get(i + 1).getStartAt()); + } + assertNull(partitions.get(partitions.size() - 1).getEndBefore()); + + // Validate that we can use the partitions to read the original documents. + int resultCount = 0; + for (QueryPartition partition : partitions) { + resultCount += partition.createQuery().get().get().size(); + } + assertEquals(documentCount, resultCount); + } + @Test public void emptyPartitionedQuery() throws Exception { StreamConsumer consumer = new StreamConsumer<>();