Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add CollectionGroup#getPartitions(long) #478

Merged
merged 3 commits into from Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -16,23 +16,35 @@

package com.google.cloud.firestore;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptions;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.cloud.firestore.v1.FirestoreClient;
import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.firestore.v1.Cursor;
import com.google.firestore.v1.PartitionQueryRequest;
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;

/**
* A Collection Group query matches all documents that are contained in a collection or
* subcollection with a specific collection ID.
*/
public class CollectionGroup extends Query {

final Query partitionQuery;

CollectionGroup(FirestoreRpcContext<?> rpcContext, String collectionId) {
super(
rpcContext,
Expand All @@ -41,6 +53,9 @@ public class CollectionGroup extends Query {
.setCollectionId(collectionId)
.setAllDescendants(true)
.build());

// Partition queries require explicit ordering by __name__.
partitionQuery = orderBy(FieldPath.DOCUMENT_ID);
}

/**
Expand All @@ -53,24 +68,12 @@ public class CollectionGroup extends Query {
* @param observer a stream observer that receives the result of the Partition request.
*/
public void getPartitions(
long desiredPartitionCount, ApiStreamObserver<QueryPartition> observer) {
Preconditions.checkArgument(
desiredPartitionCount > 0, "Desired partition count must be one or greater");

// Partition queries require explicit ordering by __name__.
Query queryWithDefaultOrder = orderBy(FieldPath.DOCUMENT_ID);

long desiredPartitionCount, final ApiStreamObserver<QueryPartition> observer) {
if (desiredPartitionCount == 1) {
// Short circuit if the user only requested a single partition.
observer.onNext(new QueryPartition(queryWithDefaultOrder, null, null));
observer.onNext(new QueryPartition(partitionQuery, null, null));
} else {
PartitionQueryRequest.Builder request = PartitionQueryRequest.newBuilder();
request.setStructuredQuery(queryWithDefaultOrder.buildQuery());
request.setParent(options.getParentPath().toString());

// Since we are always returning an extra partition (with en empty endBefore cursor), we
// reduce the desired partition count by one.
request.setPartitionCount(desiredPartitionCount - 1);
PartitionQueryRequest request = buildRequest(desiredPartitionCount);

final FirestoreClient.PartitionQueryPagedResponse response;
final TraceUtil traceUtil = TraceUtil.getInstance();
Expand All @@ -79,26 +82,92 @@ public void getPartitions(
response =
ApiExceptions.callAndTranslateApiException(
rpcContext.sendRequest(
request.build(), rpcContext.getClient().partitionQueryPagedCallable()));
request, rpcContext.getClient().partitionQueryPagedCallable()));

consumePartitions(
response,
new Function<QueryPartition, Void>() {
@Override
public Void apply(QueryPartition queryPartition) {
observer.onNext(queryPartition);
return null;
}
});

observer.onCompleted();
} catch (ApiException exception) {
span.setStatus(Status.UNKNOWN.withDescription(exception.getMessage()));
throw FirestoreException.forApiException(exception);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
}
}
}

public ApiFuture<List<QueryPartition>> getPartitions(long desiredPartitionCount) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you we could do something like this:

public Iterable<CollectionReference> listCollections() {
and expose an Iterable of QueryPartitions? I assume this won't work because of some threading issues, but it would be nice to re-use an existing pattern.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go this route we lose the ability to chain the result of this method into a series of queries with the following pattern:

    ApiFuture<List<QuerySnapshot>> allQueries = ApiFutures.transformAsync(
        firestore.collectionGroup(randomColl.getId()).getPartitions(3),
        new ApiAsyncFunction<List<QueryPartition>, List<QuerySnapshot>>() {
          @Override
          public ApiFuture<List<QuerySnapshot>> apply(List<QueryPartition> input) {
            logit("ApiAsyncFunction.apply");
            List<ApiFuture<QuerySnapshot>> futures = new ArrayList<>();
            for (QueryPartition part : input) {
              logit("part = %s", part);
              futures.add(part.createQuery().get());
            }
            return ApiFutures.allAsList(futures);
          }
        },
        MoreExecutors.directExecutor()
    );
    List<QuerySnapshot> querySnapshots = allQueries.get();
2021-01-08T23:52:57.989  [main] com.google.cloud.firestore.CollectionGroup - transformAsync...
2021-01-08T23:52:58.008  [main] com.google.cloud.firestore.CollectionGroup - transformAsync Complete
2021-01-08T23:52:58.008  [main] com.google.cloud.firestore.CollectionGroup - allQueries.get()...
2021-01-08T23:52:58.111  [Gax-4] com.google.cloud.firestore.CollectionGroup - transform->apply()
2021-01-08T23:52:58.116  [Gax-4] com.google.cloud.firestore.CollectionGroup - ApiAsyncFunction.apply
2021-01-08T23:52:58.117  [Gax-4] com.google.cloud.firestore.CollectionGroup - part = com.google.cloud.firestore.QueryPartition@a9b0a06e
2021-01-08T23:52:58.131  [Gax-4] com.google.cloud.firestore.CollectionGroup - part = com.google.cloud.firestore.QueryPartition@19a03734
2021-01-08T23:52:58.309  [main] com.google.cloud.firestore.CollectionGroup - allQueries.get() complete

There is also additional state that has to be tracked because the iterator we return would have to potentially run past the end of the iterator returned by response.iterateAll() not to mention the iteration state now has to be split across the hasNext and next methods. (It's possible but actually much more code, so I can swap it in if you want me to).

if (desiredPartitionCount == 1) {
// Short circuit if the user only requested a single partition.
return ApiFutures.immediateFuture(
Collections.singletonList(new QueryPartition(partitionQuery, null, null)));
} else {
PartitionQueryRequest request = buildRequest(desiredPartitionCount);

@Nullable Object[] lastCursor = null;
for (Cursor cursor : response.iterateAll()) {
Object[] decodedCursorValue = new Object[cursor.getValuesCount()];
for (int i = 0; i < cursor.getValuesCount(); ++i) {
decodedCursorValue[i] = UserDataConverter.decodeValue(rpcContext, cursor.getValues(i));
}
observer.onNext(new QueryPartition(queryWithDefaultOrder, lastCursor, decodedCursorValue));
lastCursor = decodedCursorValue;
final TraceUtil traceUtil = TraceUtil.getInstance();
BenWhitehead marked this conversation as resolved.
Show resolved Hide resolved
Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_PARTITIONQUERY);
try (Scope scope = traceUtil.getTracer().withSpan(span)) {
return ApiFutures.transform(
rpcContext.sendRequest(request, rpcContext.getClient().partitionQueryPagedCallable()),
new ApiFunction<PartitionQueryPagedResponse, List<QueryPartition>>() {
@Override
public List<QueryPartition> apply(PartitionQueryPagedResponse response) {
final ImmutableList.Builder<QueryPartition> partitions = ImmutableList.builder();
consumePartitions(
response,
new Function<QueryPartition, Void>() {
@Override
public Void apply(QueryPartition queryPartition) {
partitions.add(queryPartition);
return null;
}
});
return partitions.build();
}
},
MoreExecutors.directExecutor());
} catch (ApiException exception) {
span.setStatus(Status.UNKNOWN.withDescription(exception.getMessage()));
throw FirestoreException.forApiException(exception);
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
}
BenWhitehead marked this conversation as resolved.
Show resolved Hide resolved
observer.onNext(new QueryPartition(queryWithDefaultOrder, lastCursor, null));
}
}

observer.onCompleted();
private PartitionQueryRequest buildRequest(long desiredPartitionCount) {
Preconditions.checkArgument(
desiredPartitionCount > 0, "Desired partition count must be one or greater");

PartitionQueryRequest.Builder request = PartitionQueryRequest.newBuilder();
request.setStructuredQuery(partitionQuery.buildQuery());
request.setParent(options.getParentPath().toString());

// Since we are always returning an extra partition (with en empty endBefore cursor), we
// reduce the desired partition count by one.
request.setPartitionCount(desiredPartitionCount - 1);
return request.build();
}

private void consumePartitions(
PartitionQueryPagedResponse response, Function<QueryPartition, Void> consumer) {
@Nullable Object[] lastCursor = null;
for (Cursor cursor : response.iterateAll()) {
Object[] decodedCursorValue = new Object[cursor.getValuesCount()];
for (int i = 0; i < cursor.getValuesCount(); ++i) {
decodedCursorValue[i] = UserDataConverter.decodeValue(rpcContext, cursor.getValues(i));
}
consumer.apply(new QueryPartition(partitionQuery, lastCursor, decodedCursorValue));
lastCursor = decodedCursorValue;
}
consumer.apply(new QueryPartition(partitionQuery, lastCursor, null));
}
}
Expand Up @@ -618,6 +618,34 @@ public void partitionedQuery() throws Exception {
assertEquals(documentCount, resultCount);
}

@Test
public void partitionedQuery_future() throws Exception {
int documentCount = 2 * 128 + 127; // Minimum partition size is 128.

WriteBatch batch = firestore.batch();
for (int i = 0; i < documentCount; ++i) {
batch.create(randomColl.document(), map("foo", i));
}
batch.commit().get();

ApiFuture<List<QueryPartition>> future =
firestore.collectionGroup(randomColl.getId()).getPartitions(3);
final List<QueryPartition> partitions = future.get();

assertNull(partitions.get(0).getStartAt());
for (int i = 0; i < partitions.size() - 1; ++i) {
assertArrayEquals(partitions.get(i).getEndBefore(), partitions.get(i + 1).getStartAt());
}
assertNull(partitions.get(partitions.size() - 1).getEndBefore());

// Validate that we can use the partitions to read the original documents.
int resultCount = 0;
for (QueryPartition partition : partitions) {
resultCount += partition.createQuery().get().get().size();
}
assertEquals(documentCount, resultCount);
}

@Test
public void emptyPartitionedQuery() throws Exception {
StreamConsumer<QueryPartition> consumer = new StreamConsumer<>();
Expand Down