Skip to content

Commit

Permalink
Add support for the Parition API
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Jul 28, 2020
1 parent 03ef755 commit c0a428c
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 91 deletions.
@@ -0,0 +1,69 @@
package com.google.cloud.firestore;

import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptions;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.cloud.firestore.v1.FirestoreClient;
import com.google.common.collect.ImmutableList;
import com.google.firestore.v1.Cursor;
import com.google.firestore.v1.PartitionQueryRequest;
import javax.annotation.Nullable;

/**
* A Collection Group query matches all documents that are contained in a collection or
* subcollection with a specific collection ID.
*/
public class CollectionGroup extends Query {
CollectionGroup(FirestoreRpcContext<?> rpcContext, String collectionId) {
super(
rpcContext,
QueryOptions.builder()
.setParentPath(rpcContext.getResourcePath())
.setCollectionId(collectionId)
.setFieldOrders(ImmutableList.of(FieldOrder.defaultOrder()))
.setAllDescendants(true)
.build());
}

/**
* Partitions a query by returning partition cursors that can be used to run the query in
* parallel. The returned partition cursors are split points that an be used starting/end points
* for the query results.
*
* @param desiredPartitionCount The desired maximum number of partition points. The number must be
* strictly positive. The actual number of partitions returned may be fewer.
* @param observer a stream observer that receives the result of the Partition request.
*/
public void getPartitions(
long desiredPartitionCount, ApiStreamObserver<QueryPartition> observer) {
PartitionQueryRequest.Builder request = PartitionQueryRequest.newBuilder();
request.setStructuredQuery(buildQuery());
request.setParent(options.getParentPath().toString());

// Since we are always returning an extra partition (with en empty endBefore cursor), we
// reduce the desired partition count by one.
request.setPartitionCount(desiredPartitionCount - 1);

final FirestoreClient.PartitionQueryPagedResponse response;
try {
response =
ApiExceptions.callAndTranslateApiException(
rpcContext.sendRequest(
request.build(), rpcContext.getClient().partitionQueryPagedCallable()));
} catch (ApiException exception) {
throw FirestoreException.apiException(exception);
}

@Nullable Object[] lastCursor = null;
for (Cursor cursor : response.iterateAll()) {
Object[] decodedCursorValue = new Object[cursor.getValuesCount()];
for (int i = 0; i < cursor.getValuesCount(); ++i) {
decodedCursorValue[i] = UserDataConverter.decodeValue(rpcContext, cursor.getValues(i));
}
observer.onNext(new QueryPartition(this, lastCursor, decodedCursorValue));
lastCursor = decodedCursorValue;
}
observer.onNext(new QueryPartition(this, lastCursor, null));
observer.onCompleted();
}
}
Expand Up @@ -23,11 +23,9 @@
import com.google.firestore.v1.Document;
import com.google.firestore.v1.Value;
import com.google.firestore.v1.Write;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -115,48 +113,6 @@ static DocumentSnapshot fromMissing(
return new DocumentSnapshot(rpcContext, documentReference, null, readTime, null, null);
}

private Object decodeValue(Value v) {
Value.ValueTypeCase typeCase = v.getValueTypeCase();
switch (typeCase) {
case NULL_VALUE:
return null;
case BOOLEAN_VALUE:
return v.getBooleanValue();
case INTEGER_VALUE:
return v.getIntegerValue();
case DOUBLE_VALUE:
return v.getDoubleValue();
case TIMESTAMP_VALUE:
return Timestamp.fromProto(v.getTimestampValue());
case STRING_VALUE:
return v.getStringValue();
case BYTES_VALUE:
return Blob.fromByteString(v.getBytesValue());
case REFERENCE_VALUE:
String pathName = v.getReferenceValue();
return new DocumentReference(rpcContext, ResourcePath.create(pathName));
case GEO_POINT_VALUE:
return new GeoPoint(
v.getGeoPointValue().getLatitude(), v.getGeoPointValue().getLongitude());
case ARRAY_VALUE:
List<Object> list = new ArrayList<>();
List<Value> lv = v.getArrayValue().getValuesList();
for (Value iv : lv) {
list.add(decodeValue(iv));
}
return list;
case MAP_VALUE:
Map<String, Object> outputMap = new HashMap<>();
Map<String, Value> inputMap = v.getMapValue().getFieldsMap();
for (Map.Entry<String, Value> entry : inputMap.entrySet()) {
outputMap.put(entry.getKey(), decodeValue(entry.getValue()));
}
return outputMap;
default:
throw FirestoreException.invalidState(String.format("Unknown Value Type: %s", typeCase));
}
}

/**
* Returns the time at which this snapshot was read.
*
Expand Down Expand Up @@ -214,7 +170,7 @@ public Map<String, Object> getData() {

Map<String, Object> decodedFields = new HashMap<>();
for (Map.Entry<String, Value> entry : fields.entrySet()) {
Object decodedValue = decodeValue(entry.getValue());
Object decodedValue = UserDataConverter.decodeValue(rpcContext, entry.getValue());
decodedFields.put(entry.getKey(), decodedValue);
}
return decodedFields;
Expand Down Expand Up @@ -293,7 +249,7 @@ public Object get(@Nonnull FieldPath fieldPath) {
return null;
}

return decodeValue(value);
return UserDataConverter.decodeValue(rpcContext, value);
}

/**
Expand Down
Expand Up @@ -56,14 +56,14 @@ public interface Firestore extends Service<FirestoreOptions>, AutoCloseable {
Iterable<CollectionReference> listCollections();

/**
* Creates and returns a new @link{Query} that includes all documents in the database that are
* contained in a collection or subcollection with the given @code{collectionId}.
* Creates and returns a new {@link CollectionGroup} that includes all documents in the database
* that are contained in a collection or subcollection with the given @code{collectionId}.
*
* @param collectionId Identifies the collections to query over. Every collection or subcollection
* with this ID as the last segment of its path will be included. Cannot contain a slash.
* @return The created Query.
*/
Query collectionGroup(@Nonnull String collectionId);
CollectionGroup collectionGroup(@Nonnull String collectionId);

/**
* Executes the given updateFunction and then attempts to commit the changes applied within the
Expand Down
Expand Up @@ -263,12 +263,12 @@ public void onCompleted() {

@Nonnull
@Override
public Query collectionGroup(@Nonnull final String collectionId) {
public CollectionGroup collectionGroup(@Nonnull final String collectionId) {
Preconditions.checkArgument(
!collectionId.contains("/"),
String.format(
"Invalid collectionId '%s'. Collection IDs must not contain '/'.", collectionId));
return new Query(this, collectionId);
return new CollectionGroup(this, collectionId);
}

@Nonnull
Expand Down
Expand Up @@ -196,6 +196,12 @@ static final class FieldOrder {
this.direction = direction;
}

/** Returns a FieldOrder that orders by __name__ ascending. */
static FieldOrder defaultOrder() {
return new FieldOrder(
FieldReference.newBuilder().setFieldPath("__name__").build(), Direction.ASCENDING);
}

Order toProto() {
Order.Builder result = Order.newBuilder();
result.setField(fieldReference);
Expand Down Expand Up @@ -296,21 +302,7 @@ abstract static class Builder {
.build());
}

/**
* Creates a Collection Group query that matches all documents directly nested under a
* specifically named collection
*/
Query(FirestoreRpcContext<?> rpcContext, String collectionId) {
this(
rpcContext,
QueryOptions.builder()
.setParentPath(rpcContext.getResourcePath())
.setCollectionId(collectionId)
.setAllDescendants(true)
.build());
}

private Query(FirestoreRpcContext<?> rpcContext, QueryOptions queryOptions) {
protected Query(FirestoreRpcContext<?> rpcContext, QueryOptions queryOptions) {
this.rpcContext = rpcContext;
this.options = queryOptions;
}
Expand Down
@@ -0,0 +1,61 @@
package com.google.cloud.firestore;

import javax.annotation.Nullable;

/**
* A split point that can be used by in a query as a starting or end point for the query results.
* The cursors returned by {@link #getStartAt()} and {@link #getEndBefore()} can only be used in a
* query that matches the constraint of query that produced this partition.
*/
public class QueryPartition {
private final Query query;
@Nullable private final Object[] startAt;
@Nullable private final Object[] endBefore;

public QueryPartition(Query query, Object[] startAt, @Nullable Object[] endBefore) {
this.query = query;
this.startAt = startAt;
this.endBefore = endBefore;
}

/**
* The cursor that defines the first result for this partition. {@code null} if this is the first
* partition.
*
* @return a cursor value that can be used with {@link Query#startAt(Object...)} or {@code null}
* if this is the first partition.
*/
@Nullable
public Object[] getStartAt() {
return startAt;
}

/**
* The cursor that defines the first result after this partition. {@code null} if this is the last
* partition.
*
* @return a cursor value that can be used with {@link Query#endBefore(Object...)} or {@code null}
* if this is the last partition.
*/
@Nullable
public Object[] getEndBefore() {
return endBefore;
}

/**
* Returns a query that only returns the documents for this partition.
*
* @return a query partitioned by a {@link Query#startAt(Object...)} and {@link
* Query#endBefore(Object...)} cursor.
*/
public Query createQuery() {
Query baseQuery = query;
if (startAt != null) {
baseQuery = baseQuery.startAt(startAt);
}
if (endBefore != null) {
baseQuery = baseQuery.endBefore(endBefore);
}
return baseQuery;
}
}
Expand Up @@ -22,7 +22,9 @@
import com.google.firestore.v1.MapValue;
import com.google.firestore.v1.Value;
import com.google.protobuf.NullValue;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -181,4 +183,46 @@ static Value encodeValue(

throw FirestoreException.invalidState("Cannot convert %s to Firestore Value", sanitizedObject);
}

static Object decodeValue(FirestoreRpcContext<?> rpcContext, Value v) {
Value.ValueTypeCase typeCase = v.getValueTypeCase();
switch (typeCase) {
case NULL_VALUE:
return null;
case BOOLEAN_VALUE:
return v.getBooleanValue();
case INTEGER_VALUE:
return v.getIntegerValue();
case DOUBLE_VALUE:
return v.getDoubleValue();
case TIMESTAMP_VALUE:
return Timestamp.fromProto(v.getTimestampValue());
case STRING_VALUE:
return v.getStringValue();
case BYTES_VALUE:
return Blob.fromByteString(v.getBytesValue());
case REFERENCE_VALUE:
String pathName = v.getReferenceValue();
return new DocumentReference(rpcContext, ResourcePath.create(pathName));
case GEO_POINT_VALUE:
return new GeoPoint(
v.getGeoPointValue().getLatitude(), v.getGeoPointValue().getLongitude());
case ARRAY_VALUE:
List<Object> list = new ArrayList<>();
List<Value> lv = v.getArrayValue().getValuesList();
for (Value iv : lv) {
list.add(decodeValue(rpcContext, iv));
}
return list;
case MAP_VALUE:
Map<String, Object> outputMap = new HashMap<>();
Map<String, Value> inputMap = v.getMapValue().getFieldsMap();
for (Map.Entry<String, Value> entry : inputMap.entrySet()) {
outputMap.put(entry.getKey(), decodeValue(rpcContext, entry.getValue()));
}
return outputMap;
default:
throw FirestoreException.invalidState(String.format("Unknown Value Type: %s", typeCase));
}
}
}
Expand Up @@ -20,6 +20,7 @@
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.ServiceRpc;
import com.google.cloud.firestore.v1.FirestoreClient;
import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
import com.google.firestore.v1.BatchGetDocumentsRequest;
Expand All @@ -32,6 +33,7 @@
import com.google.firestore.v1.ListDocumentsRequest;
import com.google.firestore.v1.ListenRequest;
import com.google.firestore.v1.ListenResponse;
import com.google.firestore.v1.PartitionQueryRequest;
import com.google.firestore.v1.RollbackRequest;
import com.google.firestore.v1.RunQueryRequest;
import com.google.firestore.v1.RunQueryResponse;
Expand Down Expand Up @@ -64,6 +66,9 @@ public interface FirestoreRpc extends AutoCloseable, ServiceRpc {
UnaryCallable<ListCollectionIdsRequest, ListCollectionIdsPagedResponse>
listCollectionIdsPagedCallable();

UnaryCallable<PartitionQueryRequest, FirestoreClient.PartitionQueryPagedResponse>
partitionQueryPagedCallable();

/** Returns a list of documents. */
UnaryCallable<ListDocumentsRequest, ListDocumentsPagedResponse> listDocumentsPagedCallable();

Expand Down
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceOptions;
import com.google.cloud.firestore.FirestoreOptions;
import com.google.cloud.firestore.v1.FirestoreClient;
import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
import com.google.cloud.firestore.v1.FirestoreSettings;
Expand All @@ -52,6 +53,7 @@
import com.google.firestore.v1.ListDocumentsRequest;
import com.google.firestore.v1.ListenRequest;
import com.google.firestore.v1.ListenResponse;
import com.google.firestore.v1.PartitionQueryRequest;
import com.google.firestore.v1.RollbackRequest;
import com.google.firestore.v1.RunQueryRequest;
import com.google.firestore.v1.RunQueryResponse;
Expand Down Expand Up @@ -189,6 +191,12 @@ public UnaryCallable<RollbackRequest, Empty> rollbackCallable() {
return firestoreStub.listCollectionIdsPagedCallable();
}

@Override
public UnaryCallable<PartitionQueryRequest, FirestoreClient.PartitionQueryPagedResponse>
partitionQueryPagedCallable() {
return firestoreStub.partitionQueryPagedCallable();
}

@Override
public UnaryCallable<ListDocumentsRequest, ListDocumentsPagedResponse>
listDocumentsPagedCallable() {
Expand Down

0 comments on commit c0a428c

Please sign in to comment.