Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat!: add support for the Query Partition API (#202)
  • Loading branch information
schmidt-sebastian committed Aug 4, 2020
1 parent 15d68cd commit 3996548
Show file tree
Hide file tree
Showing 11 changed files with 334 additions and 91 deletions.
15 changes: 15 additions & 0 deletions google-cloud-firestore/clirr-ignored-differences.xml
Expand Up @@ -142,4 +142,19 @@
<to>*</to>
</difference>

<!--
Query Partition API Feature
-->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/firestore/spi/v1/FirestoreRpc</className>
<method>com.google.api.gax.rpc.UnaryCallable partitionQueryPagedCallable()</method>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/firestore/Firestore</className>
<method>com.google.cloud.firestore.Query collectionGroup(java.lang.String)</method>
<to>com.google.cloud.firestore.CollectionGroup</to>
</difference>

</differences>
@@ -0,0 +1,86 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.firestore;

import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptions;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.cloud.firestore.v1.FirestoreClient;
import com.google.firestore.v1.Cursor;
import com.google.firestore.v1.PartitionQueryRequest;
import javax.annotation.Nullable;

/**
* A Collection Group query matches all documents that are contained in a collection or
* subcollection with a specific collection ID.
*/
public class CollectionGroup extends Query {
CollectionGroup(FirestoreRpcContext<?> rpcContext, String collectionId) {
super(
rpcContext,
QueryOptions.builder()
.setParentPath(rpcContext.getResourcePath())
.setCollectionId(collectionId)
.setAllDescendants(true)
.build());
}

/**
* Partitions a query by returning partition cursors that can be used to run the query in
* parallel. The returned partition cursors are split points that can be used as starting/end
* points for the query results.
*
* @param desiredPartitionCount The desired maximum number of partition points. The number must be
* strictly positive. The actual number of partitions returned may be fewer.
* @param observer a stream observer that receives the result of the Partition request.
*/
public void getPartitions(
long desiredPartitionCount, ApiStreamObserver<QueryPartition> observer) {
// Partition queries require explicit ordering by __name__.
Query queryWithDefaultOrder = orderBy(FieldPath.DOCUMENT_ID);

PartitionQueryRequest.Builder request = PartitionQueryRequest.newBuilder();
request.setStructuredQuery(queryWithDefaultOrder.buildQuery());
request.setParent(options.getParentPath().toString());

// Since we are always returning an extra partition (with en empty endBefore cursor), we
// reduce the desired partition count by one.
request.setPartitionCount(desiredPartitionCount - 1);

final FirestoreClient.PartitionQueryPagedResponse response;
try {
response =
ApiExceptions.callAndTranslateApiException(
rpcContext.sendRequest(
request.build(), rpcContext.getClient().partitionQueryPagedCallable()));
} catch (ApiException exception) {
throw FirestoreException.apiException(exception);
}

@Nullable Object[] lastCursor = null;
for (Cursor cursor : response.iterateAll()) {
Object[] decodedCursorValue = new Object[cursor.getValuesCount()];
for (int i = 0; i < cursor.getValuesCount(); ++i) {
decodedCursorValue[i] = UserDataConverter.decodeValue(rpcContext, cursor.getValues(i));
}
observer.onNext(new QueryPartition(queryWithDefaultOrder, lastCursor, decodedCursorValue));
lastCursor = decodedCursorValue;
}
observer.onNext(new QueryPartition(queryWithDefaultOrder, lastCursor, null));
observer.onCompleted();
}
}
Expand Up @@ -23,11 +23,9 @@
import com.google.firestore.v1.Document;
import com.google.firestore.v1.Value;
import com.google.firestore.v1.Write;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -115,48 +113,6 @@ static DocumentSnapshot fromMissing(
return new DocumentSnapshot(rpcContext, documentReference, null, readTime, null, null);
}

private Object decodeValue(Value v) {
Value.ValueTypeCase typeCase = v.getValueTypeCase();
switch (typeCase) {
case NULL_VALUE:
return null;
case BOOLEAN_VALUE:
return v.getBooleanValue();
case INTEGER_VALUE:
return v.getIntegerValue();
case DOUBLE_VALUE:
return v.getDoubleValue();
case TIMESTAMP_VALUE:
return Timestamp.fromProto(v.getTimestampValue());
case STRING_VALUE:
return v.getStringValue();
case BYTES_VALUE:
return Blob.fromByteString(v.getBytesValue());
case REFERENCE_VALUE:
String pathName = v.getReferenceValue();
return new DocumentReference(rpcContext, ResourcePath.create(pathName));
case GEO_POINT_VALUE:
return new GeoPoint(
v.getGeoPointValue().getLatitude(), v.getGeoPointValue().getLongitude());
case ARRAY_VALUE:
List<Object> list = new ArrayList<>();
List<Value> lv = v.getArrayValue().getValuesList();
for (Value iv : lv) {
list.add(decodeValue(iv));
}
return list;
case MAP_VALUE:
Map<String, Object> outputMap = new HashMap<>();
Map<String, Value> inputMap = v.getMapValue().getFieldsMap();
for (Map.Entry<String, Value> entry : inputMap.entrySet()) {
outputMap.put(entry.getKey(), decodeValue(entry.getValue()));
}
return outputMap;
default:
throw FirestoreException.invalidState(String.format("Unknown Value Type: %s", typeCase));
}
}

/**
* Returns the time at which this snapshot was read.
*
Expand Down Expand Up @@ -214,7 +170,7 @@ public Map<String, Object> getData() {

Map<String, Object> decodedFields = new HashMap<>();
for (Map.Entry<String, Value> entry : fields.entrySet()) {
Object decodedValue = decodeValue(entry.getValue());
Object decodedValue = UserDataConverter.decodeValue(rpcContext, entry.getValue());
decodedFields.put(entry.getKey(), decodedValue);
}
return decodedFields;
Expand Down Expand Up @@ -293,7 +249,7 @@ public Object get(@Nonnull FieldPath fieldPath) {
return null;
}

return decodeValue(value);
return UserDataConverter.decodeValue(rpcContext, value);
}

/**
Expand Down
Expand Up @@ -56,14 +56,14 @@ public interface Firestore extends Service<FirestoreOptions>, AutoCloseable {
Iterable<CollectionReference> listCollections();

/**
* Creates and returns a new @link{Query} that includes all documents in the database that are
* contained in a collection or subcollection with the given @code{collectionId}.
* Creates and returns a new {@link CollectionGroup} that includes all documents in the database
* that are contained in a collection or subcollection with the given @code{collectionId}.
*
* @param collectionId Identifies the collections to query over. Every collection or subcollection
* with this ID as the last segment of its path will be included. Cannot contain a slash.
* @return The created Query.
*/
Query collectionGroup(@Nonnull String collectionId);
CollectionGroup collectionGroup(@Nonnull String collectionId);

/**
* Executes the given updateFunction and then attempts to commit the changes applied within the
Expand Down
Expand Up @@ -263,12 +263,12 @@ public void onCompleted() {

@Nonnull
@Override
public Query collectionGroup(@Nonnull final String collectionId) {
public CollectionGroup collectionGroup(@Nonnull final String collectionId) {
Preconditions.checkArgument(
!collectionId.contains("/"),
String.format(
"Invalid collectionId '%s'. Collection IDs must not contain '/'.", collectionId));
return new Query(this, collectionId);
return new CollectionGroup(this, collectionId);
}

@Nonnull
Expand Down
Expand Up @@ -296,21 +296,7 @@ abstract static class Builder {
.build());
}

/**
* Creates a Collection Group query that matches all documents directly nested under a
* specifically named collection
*/
Query(FirestoreRpcContext<?> rpcContext, String collectionId) {
this(
rpcContext,
QueryOptions.builder()
.setParentPath(rpcContext.getResourcePath())
.setCollectionId(collectionId)
.setAllDescendants(true)
.build());
}

private Query(FirestoreRpcContext<?> rpcContext, QueryOptions queryOptions) {
protected Query(FirestoreRpcContext<?> rpcContext, QueryOptions queryOptions) {
this.rpcContext = rpcContext;
this.options = queryOptions;
}
Expand Down
@@ -0,0 +1,97 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.firestore;

import java.util.Arrays;
import java.util.Objects;
import javax.annotation.Nullable;

/**
* A split point that can be used in a query as a starting and/or end point for the query results.
* The cursors returned by {@link #getStartAt()} and {@link #getEndBefore()} can only be used in a
* query that matches the constraint of query that produced this partition.
*/
public class QueryPartition {
private final Query query;
@Nullable private final Object[] startAt;
@Nullable private final Object[] endBefore;

public QueryPartition(Query query, @Nullable Object[] startAt, @Nullable Object[] endBefore) {
this.query = query;
this.startAt = startAt;
this.endBefore = endBefore;
}

/**
* The cursor that defines the first result for this partition. {@code null} if this is the first
* partition.
*
* @return a cursor value that can be used with {@link Query#startAt(Object...)} or {@code null}
* if this is the first partition.
*/
@Nullable
public Object[] getStartAt() {
return startAt;
}

/**
* The cursor that defines the first result after this partition. {@code null} if this is the last
* partition.
*
* @return a cursor value that can be used with {@link Query#endBefore(Object...)} or {@code null}
* if this is the last partition.
*/
@Nullable
public Object[] getEndBefore() {
return endBefore;
}

/**
* Returns a query that only returns the documents for this partition.
*
* @return a query partitioned by a {@link Query#startAt(Object...)} and {@link
* Query#endBefore(Object...)} cursor.
*/
public Query createQuery() {
Query baseQuery = query;
if (startAt != null) {
baseQuery = baseQuery.startAt(startAt);
}
if (endBefore != null) {
baseQuery = baseQuery.endBefore(endBefore);
}
return baseQuery;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof QueryPartition)) return false;
QueryPartition partition = (QueryPartition) o;
return query.equals(partition.query)
&& Arrays.equals(startAt, partition.startAt)
&& Arrays.equals(endBefore, partition.endBefore);
}

@Override
public int hashCode() {
int result = Objects.hash(query);
result = 31 * result + Arrays.hashCode(startAt);
result = 31 * result + Arrays.hashCode(endBefore);
return result;
}
}
Expand Up @@ -22,7 +22,9 @@
import com.google.firestore.v1.MapValue;
import com.google.firestore.v1.Value;
import com.google.protobuf.NullValue;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -181,4 +183,46 @@ static Value encodeValue(

throw FirestoreException.invalidState("Cannot convert %s to Firestore Value", sanitizedObject);
}

static Object decodeValue(FirestoreRpcContext<?> rpcContext, Value v) {
Value.ValueTypeCase typeCase = v.getValueTypeCase();
switch (typeCase) {
case NULL_VALUE:
return null;
case BOOLEAN_VALUE:
return v.getBooleanValue();
case INTEGER_VALUE:
return v.getIntegerValue();
case DOUBLE_VALUE:
return v.getDoubleValue();
case TIMESTAMP_VALUE:
return Timestamp.fromProto(v.getTimestampValue());
case STRING_VALUE:
return v.getStringValue();
case BYTES_VALUE:
return Blob.fromByteString(v.getBytesValue());
case REFERENCE_VALUE:
String pathName = v.getReferenceValue();
return new DocumentReference(rpcContext, ResourcePath.create(pathName));
case GEO_POINT_VALUE:
return new GeoPoint(
v.getGeoPointValue().getLatitude(), v.getGeoPointValue().getLongitude());
case ARRAY_VALUE:
List<Object> list = new ArrayList<>();
List<Value> lv = v.getArrayValue().getValuesList();
for (Value iv : lv) {
list.add(decodeValue(rpcContext, iv));
}
return list;
case MAP_VALUE:
Map<String, Object> outputMap = new HashMap<>();
Map<String, Value> inputMap = v.getMapValue().getFieldsMap();
for (Map.Entry<String, Value> entry : inputMap.entrySet()) {
outputMap.put(entry.getKey(), decodeValue(rpcContext, entry.getValue()));
}
return outputMap;
default:
throw FirestoreException.invalidState(String.format("Unknown Value Type: %s", typeCase));
}
}
}

0 comments on commit 3996548

Please sign in to comment.