From a7785550cb80d9bafa5113631eb30633a63b8088 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Thu, 12 Nov 2020 16:15:52 -0500 Subject: [PATCH 1/3] feat: add CollectionGroup#getPartitions(long) Add new method to CollectionGroup allowing getPartitions to return a Future>. This new method behaves similar to the CollectionGroup#getPartitions(long, ApiStreamObserver) providing a Future centric api. --- .../cloud/firestore/CollectionGroup.java | 120 ++++++++++++++---- .../cloud/firestore/it/ITSystemTest.java | 28 ++++ 2 files changed, 123 insertions(+), 25 deletions(-) diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java index 3ffa564e9..3f3b3869e 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java @@ -16,16 +16,25 @@ package com.google.cloud.firestore; +import com.google.api.core.ApiFunction; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiExceptions; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.cloud.firestore.v1.FirestoreClient; +import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse; +import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.MoreExecutors; import com.google.firestore.v1.Cursor; import com.google.firestore.v1.PartitionQueryRequest; import io.opencensus.common.Scope; import io.opencensus.trace.Span; import io.opencensus.trace.Status; +import java.util.Collections; +import java.util.List; import javax.annotation.Nullable; /** @@ -33,6 +42,9 @@ * subcollection with a specific collection ID. */ public class CollectionGroup extends Query { + + final Query partitionQuery; + CollectionGroup(FirestoreRpcContext rpcContext, String collectionId) { super( rpcContext, @@ -41,6 +53,9 @@ public class CollectionGroup extends Query { .setCollectionId(collectionId) .setAllDescendants(true) .build()); + + // Partition queries require explicit ordering by __name__. + partitionQuery = orderBy(FieldPath.DOCUMENT_ID); } /** @@ -53,24 +68,12 @@ public class CollectionGroup extends Query { * @param observer a stream observer that receives the result of the Partition request. */ public void getPartitions( - long desiredPartitionCount, ApiStreamObserver observer) { - Preconditions.checkArgument( - desiredPartitionCount > 0, "Desired partition count must be one or greater"); - - // Partition queries require explicit ordering by __name__. - Query queryWithDefaultOrder = orderBy(FieldPath.DOCUMENT_ID); - + long desiredPartitionCount, final ApiStreamObserver observer) { if (desiredPartitionCount == 1) { // Short circuit if the user only requested a single partition. - observer.onNext(new QueryPartition(queryWithDefaultOrder, null, null)); + observer.onNext(new QueryPartition(partitionQuery, null, null)); } else { - PartitionQueryRequest.Builder request = PartitionQueryRequest.newBuilder(); - request.setStructuredQuery(queryWithDefaultOrder.buildQuery()); - request.setParent(options.getParentPath().toString()); - - // Since we are always returning an extra partition (with en empty endBefore cursor), we - // reduce the desired partition count by one. - request.setPartitionCount(desiredPartitionCount - 1); + PartitionQueryRequest.Builder request = buildRequest(desiredPartitionCount); final FirestoreClient.PartitionQueryPagedResponse response; final TraceUtil traceUtil = TraceUtil.getInstance(); @@ -80,25 +83,92 @@ public void getPartitions( ApiExceptions.callAndTranslateApiException( rpcContext.sendRequest( request.build(), rpcContext.getClient().partitionQueryPagedCallable())); + + consumePartitions( + response, + new Function() { + @Override + public Void apply(QueryPartition queryPartition) { + observer.onNext(queryPartition); + return null; + } + }); + + observer.onCompleted(); } catch (ApiException exception) { span.setStatus(Status.UNKNOWN.withDescription(exception.getMessage())); throw FirestoreException.forApiException(exception); } finally { span.end(TraceUtil.END_SPAN_OPTIONS); } + } + } + + public ApiFuture> getPartitions(long desiredPartitionCount) { + if (desiredPartitionCount == 1) { + // Short circuit if the user only requested a single partition. + return ApiFutures.immediateFuture( + Collections.singletonList(new QueryPartition(partitionQuery, null, null))); + } else { + PartitionQueryRequest.Builder request = buildRequest(desiredPartitionCount); - @Nullable Object[] lastCursor = null; - for (Cursor cursor : response.iterateAll()) { - Object[] decodedCursorValue = new Object[cursor.getValuesCount()]; - for (int i = 0; i < cursor.getValuesCount(); ++i) { - decodedCursorValue[i] = UserDataConverter.decodeValue(rpcContext, cursor.getValues(i)); - } - observer.onNext(new QueryPartition(queryWithDefaultOrder, lastCursor, decodedCursorValue)); - lastCursor = decodedCursorValue; + final TraceUtil traceUtil = TraceUtil.getInstance(); + Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_PARTITIONQUERY); + try (Scope scope = traceUtil.getTracer().withSpan(span)) { + return ApiFutures.transform( + rpcContext.sendRequest( + request.build(), rpcContext.getClient().partitionQueryPagedCallable()), + new ApiFunction>() { + @Override + public List apply(PartitionQueryPagedResponse response) { + final ImmutableList.Builder partitions = ImmutableList.builder(); + consumePartitions( + response, + new Function() { + @Override + public Void apply(QueryPartition queryPartition) { + partitions.add(queryPartition); + return null; + } + }); + return partitions.build(); + } + }, + MoreExecutors.directExecutor()); + } catch (ApiException exception) { + span.setStatus(Status.UNKNOWN.withDescription(exception.getMessage())); + throw FirestoreException.forApiException(exception); + } finally { + span.end(TraceUtil.END_SPAN_OPTIONS); } - observer.onNext(new QueryPartition(queryWithDefaultOrder, lastCursor, null)); } + } - observer.onCompleted(); + private PartitionQueryRequest.Builder buildRequest(long desiredPartitionCount) { + Preconditions.checkArgument( + desiredPartitionCount > 0, "Desired partition count must be one or greater"); + + PartitionQueryRequest.Builder request = PartitionQueryRequest.newBuilder(); + request.setStructuredQuery(partitionQuery.buildQuery()); + request.setParent(options.getParentPath().toString()); + + // Since we are always returning an extra partition (with en empty endBefore cursor), we + // reduce the desired partition count by one. + request.setPartitionCount(desiredPartitionCount - 1); + return request; + } + + private void consumePartitions( + PartitionQueryPagedResponse response, Function consumer) { + @Nullable Object[] lastCursor = null; + for (Cursor cursor : response.iterateAll()) { + Object[] decodedCursorValue = new Object[cursor.getValuesCount()]; + for (int i = 0; i < cursor.getValuesCount(); ++i) { + decodedCursorValue[i] = UserDataConverter.decodeValue(rpcContext, cursor.getValues(i)); + } + consumer.apply(new QueryPartition(partitionQuery, lastCursor, decodedCursorValue)); + lastCursor = decodedCursorValue; + } + consumer.apply(new QueryPartition(partitionQuery, lastCursor, null)); } } diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java index c61be8e77..a5387a38d 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java @@ -618,6 +618,34 @@ public void partitionedQuery() throws Exception { assertEquals(documentCount, resultCount); } + @Test + public void partitionedQuery_future() throws Exception { + int documentCount = 2 * 128 + 127; // Minimum partition size is 128. + + WriteBatch batch = firestore.batch(); + for (int i = 0; i < documentCount; ++i) { + batch.create(randomColl.document(), map("foo", i)); + } + batch.commit().get(); + + ApiFuture> future = + firestore.collectionGroup(randomColl.getId()).getPartitions(3); + final List partitions = future.get(); + + assertNull(partitions.get(0).getStartAt()); + for (int i = 0; i < partitions.size() - 1; ++i) { + assertArrayEquals(partitions.get(i).getEndBefore(), partitions.get(i + 1).getStartAt()); + } + assertNull(partitions.get(partitions.size() - 1).getEndBefore()); + + // Validate that we can use the partitions to read the original documents. + int resultCount = 0; + for (QueryPartition partition : partitions) { + resultCount += partition.createQuery().get().get().size(); + } + assertEquals(documentCount, resultCount); + } + @Test public void emptyPartitionedQuery() throws Exception { StreamConsumer consumer = new StreamConsumer<>(); From d4cd5cc127d898b5306cf1f629d9ef31189d5d63 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Fri, 15 Jan 2021 14:31:58 -0500 Subject: [PATCH 2/3] chore: return built request instead of builder --- .../com/google/cloud/firestore/CollectionGroup.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java index 3f3b3869e..4de86ab19 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java @@ -73,7 +73,7 @@ public void getPartitions( // Short circuit if the user only requested a single partition. observer.onNext(new QueryPartition(partitionQuery, null, null)); } else { - PartitionQueryRequest.Builder request = buildRequest(desiredPartitionCount); + PartitionQueryRequest request = buildRequest(desiredPartitionCount); final FirestoreClient.PartitionQueryPagedResponse response; final TraceUtil traceUtil = TraceUtil.getInstance(); @@ -82,7 +82,7 @@ public void getPartitions( response = ApiExceptions.callAndTranslateApiException( rpcContext.sendRequest( - request.build(), rpcContext.getClient().partitionQueryPagedCallable())); + request, rpcContext.getClient().partitionQueryPagedCallable())); consumePartitions( response, @@ -110,14 +110,14 @@ public ApiFuture> getPartitions(long desiredPartitionCount) return ApiFutures.immediateFuture( Collections.singletonList(new QueryPartition(partitionQuery, null, null))); } else { - PartitionQueryRequest.Builder request = buildRequest(desiredPartitionCount); + PartitionQueryRequest request = buildRequest(desiredPartitionCount); final TraceUtil traceUtil = TraceUtil.getInstance(); Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_PARTITIONQUERY); try (Scope scope = traceUtil.getTracer().withSpan(span)) { return ApiFutures.transform( rpcContext.sendRequest( - request.build(), rpcContext.getClient().partitionQueryPagedCallable()), + request, rpcContext.getClient().partitionQueryPagedCallable()), new ApiFunction>() { @Override public List apply(PartitionQueryPagedResponse response) { @@ -144,7 +144,7 @@ public Void apply(QueryPartition queryPartition) { } } - private PartitionQueryRequest.Builder buildRequest(long desiredPartitionCount) { + private PartitionQueryRequest buildRequest(long desiredPartitionCount) { Preconditions.checkArgument( desiredPartitionCount > 0, "Desired partition count must be one or greater"); @@ -155,7 +155,7 @@ private PartitionQueryRequest.Builder buildRequest(long desiredPartitionCount) { // Since we are always returning an extra partition (with en empty endBefore cursor), we // reduce the desired partition count by one. request.setPartitionCount(desiredPartitionCount - 1); - return request; + return request.build(); } private void consumePartitions( From a65f94f5975cddb78ddf0d4cb2092732da89685f Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Fri, 15 Jan 2021 14:40:32 -0500 Subject: [PATCH 3/3] chore: format --- .../main/java/com/google/cloud/firestore/CollectionGroup.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java index 4de86ab19..bcfc3b920 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java @@ -116,8 +116,7 @@ public ApiFuture> getPartitions(long desiredPartitionCount) Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_PARTITIONQUERY); try (Scope scope = traceUtil.getTracer().withSpan(span)) { return ApiFutures.transform( - rpcContext.sendRequest( - request, rpcContext.getClient().partitionQueryPagedCallable()), + rpcContext.sendRequest(request, rpcContext.getClient().partitionQueryPagedCallable()), new ApiFunction>() { @Override public List apply(PartitionQueryPagedResponse response) {