Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add support for the Query Partition API #202

Merged
merged 16 commits into from Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,86 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.firestore;
BenWhitehead marked this conversation as resolved.
Show resolved Hide resolved
BenWhitehead marked this conversation as resolved.
Show resolved Hide resolved

import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptions;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.cloud.firestore.v1.FirestoreClient;
import com.google.firestore.v1.Cursor;
import com.google.firestore.v1.PartitionQueryRequest;
import javax.annotation.Nullable;

/**
* A Collection Group query matches all documents that are contained in a collection or
* subcollection with a specific collection ID.
*/
public class CollectionGroup extends Query {
CollectionGroup(FirestoreRpcContext<?> rpcContext, String collectionId) {
super(
rpcContext,
QueryOptions.builder()
.setParentPath(rpcContext.getResourcePath())
.setCollectionId(collectionId)
.setAllDescendants(true)
.build());
}

/**
* Partitions a query by returning partition cursors that can be used to run the query in
* parallel. The returned partition cursors are split points that can be used as starting/end
* points for the query results.
*
* @param desiredPartitionCount The desired maximum number of partition points. The number must be
* strictly positive. The actual number of partitions returned may be fewer.
* @param observer a stream observer that receives the result of the Partition request.
*/
public void getPartitions(
long desiredPartitionCount, ApiStreamObserver<QueryPartition> observer) {
// Partition queries require explicit ordering by __name__.
Query queryWithDefaultOrder = orderBy(FieldPath.DOCUMENT_ID);

PartitionQueryRequest.Builder request = PartitionQueryRequest.newBuilder();
request.setStructuredQuery(queryWithDefaultOrder.buildQuery());
request.setParent(options.getParentPath().toString());

// Since we are always returning an extra partition (with en empty endBefore cursor), we
// reduce the desired partition count by one.
request.setPartitionCount(desiredPartitionCount - 1);

final FirestoreClient.PartitionQueryPagedResponse response;
try {
response =
ApiExceptions.callAndTranslateApiException(
rpcContext.sendRequest(
request.build(), rpcContext.getClient().partitionQueryPagedCallable()));
} catch (ApiException exception) {
throw FirestoreException.apiException(exception);
}

@Nullable Object[] lastCursor = null;
for (Cursor cursor : response.iterateAll()) {
Object[] decodedCursorValue = new Object[cursor.getValuesCount()];
for (int i = 0; i < cursor.getValuesCount(); ++i) {
decodedCursorValue[i] = UserDataConverter.decodeValue(rpcContext, cursor.getValues(i));
}
observer.onNext(new QueryPartition(queryWithDefaultOrder, lastCursor, decodedCursorValue));
lastCursor = decodedCursorValue;
}
observer.onNext(new QueryPartition(queryWithDefaultOrder, lastCursor, null));
observer.onCompleted();
}
}
Expand Up @@ -23,11 +23,9 @@
import com.google.firestore.v1.Document;
import com.google.firestore.v1.Value;
import com.google.firestore.v1.Write;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -115,48 +113,6 @@ static DocumentSnapshot fromMissing(
return new DocumentSnapshot(rpcContext, documentReference, null, readTime, null, null);
}

private Object decodeValue(Value v) {
Value.ValueTypeCase typeCase = v.getValueTypeCase();
switch (typeCase) {
case NULL_VALUE:
return null;
case BOOLEAN_VALUE:
return v.getBooleanValue();
case INTEGER_VALUE:
return v.getIntegerValue();
case DOUBLE_VALUE:
return v.getDoubleValue();
case TIMESTAMP_VALUE:
return Timestamp.fromProto(v.getTimestampValue());
case STRING_VALUE:
return v.getStringValue();
case BYTES_VALUE:
return Blob.fromByteString(v.getBytesValue());
case REFERENCE_VALUE:
String pathName = v.getReferenceValue();
return new DocumentReference(rpcContext, ResourcePath.create(pathName));
case GEO_POINT_VALUE:
return new GeoPoint(
v.getGeoPointValue().getLatitude(), v.getGeoPointValue().getLongitude());
case ARRAY_VALUE:
List<Object> list = new ArrayList<>();
List<Value> lv = v.getArrayValue().getValuesList();
for (Value iv : lv) {
list.add(decodeValue(iv));
}
return list;
case MAP_VALUE:
Map<String, Object> outputMap = new HashMap<>();
Map<String, Value> inputMap = v.getMapValue().getFieldsMap();
for (Map.Entry<String, Value> entry : inputMap.entrySet()) {
outputMap.put(entry.getKey(), decodeValue(entry.getValue()));
}
return outputMap;
default:
throw FirestoreException.invalidState(String.format("Unknown Value Type: %s", typeCase));
}
}

/**
* Returns the time at which this snapshot was read.
*
Expand Down Expand Up @@ -214,7 +170,7 @@ public Map<String, Object> getData() {

Map<String, Object> decodedFields = new HashMap<>();
for (Map.Entry<String, Value> entry : fields.entrySet()) {
Object decodedValue = decodeValue(entry.getValue());
Object decodedValue = UserDataConverter.decodeValue(rpcContext, entry.getValue());
decodedFields.put(entry.getKey(), decodedValue);
}
return decodedFields;
Expand Down Expand Up @@ -293,7 +249,7 @@ public Object get(@Nonnull FieldPath fieldPath) {
return null;
}

return decodeValue(value);
return UserDataConverter.decodeValue(rpcContext, value);
}

/**
Expand Down
Expand Up @@ -56,14 +56,14 @@ public interface Firestore extends Service<FirestoreOptions>, AutoCloseable {
Iterable<CollectionReference> listCollections();

/**
* Creates and returns a new @link{Query} that includes all documents in the database that are
* contained in a collection or subcollection with the given @code{collectionId}.
* Creates and returns a new {@link CollectionGroup} that includes all documents in the database
* that are contained in a collection or subcollection with the given @code{collectionId}.
*
* @param collectionId Identifies the collections to query over. Every collection or subcollection
* with this ID as the last segment of its path will be included. Cannot contain a slash.
* @return The created Query.
*/
Query collectionGroup(@Nonnull String collectionId);
CollectionGroup collectionGroup(@Nonnull String collectionId);
BenWhitehead marked this conversation as resolved.
Show resolved Hide resolved

/**
* Executes the given updateFunction and then attempts to commit the changes applied within the
Expand Down
Expand Up @@ -263,12 +263,12 @@ public void onCompleted() {

@Nonnull
@Override
public Query collectionGroup(@Nonnull final String collectionId) {
public CollectionGroup collectionGroup(@Nonnull final String collectionId) {
Preconditions.checkArgument(
!collectionId.contains("/"),
String.format(
"Invalid collectionId '%s'. Collection IDs must not contain '/'.", collectionId));
return new Query(this, collectionId);
return new CollectionGroup(this, collectionId);
}

@Nonnull
Expand Down
Expand Up @@ -296,21 +296,7 @@ abstract static class Builder {
.build());
}

/**
* Creates a Collection Group query that matches all documents directly nested under a
* specifically named collection
*/
Query(FirestoreRpcContext<?> rpcContext, String collectionId) {
this(
rpcContext,
QueryOptions.builder()
.setParentPath(rpcContext.getResourcePath())
.setCollectionId(collectionId)
.setAllDescendants(true)
.build());
}

private Query(FirestoreRpcContext<?> rpcContext, QueryOptions queryOptions) {
protected Query(FirestoreRpcContext<?> rpcContext, QueryOptions queryOptions) {
this.rpcContext = rpcContext;
this.options = queryOptions;
}
Expand Down
@@ -0,0 +1,97 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.firestore;
schmidt-sebastian marked this conversation as resolved.
Show resolved Hide resolved
schmidt-sebastian marked this conversation as resolved.
Show resolved Hide resolved

import java.util.Arrays;
import java.util.Objects;
import javax.annotation.Nullable;

/**
* A split point that can be used in a query as a starting and/or end point for the query results.
* The cursors returned by {@link #getStartAt()} and {@link #getEndBefore()} can only be used in a
* query that matches the constraint of query that produced this partition.
*/
public class QueryPartition {
BenWhitehead marked this conversation as resolved.
Show resolved Hide resolved
private final Query query;
@Nullable private final Object[] startAt;
@Nullable private final Object[] endBefore;

public QueryPartition(Query query, @Nullable Object[] startAt, @Nullable Object[] endBefore) {
this.query = query;
this.startAt = startAt;
this.endBefore = endBefore;
}

/**
* The cursor that defines the first result for this partition. {@code null} if this is the first
* partition.
*
* @return a cursor value that can be used with {@link Query#startAt(Object...)} or {@code null}
* if this is the first partition.
*/
@Nullable
public Object[] getStartAt() {
return startAt;
}

/**
* The cursor that defines the first result after this partition. {@code null} if this is the last
* partition.
*
* @return a cursor value that can be used with {@link Query#endBefore(Object...)} or {@code null}
* if this is the last partition.
*/
@Nullable
public Object[] getEndBefore() {
return endBefore;
}

/**
* Returns a query that only returns the documents for this partition.
*
* @return a query partitioned by a {@link Query#startAt(Object...)} and {@link
* Query#endBefore(Object...)} cursor.
*/
public Query createQuery() {
Query baseQuery = query;
if (startAt != null) {
baseQuery = baseQuery.startAt(startAt);
}
if (endBefore != null) {
baseQuery = baseQuery.endBefore(endBefore);
}
return baseQuery;
}
schmidt-sebastian marked this conversation as resolved.
Show resolved Hide resolved

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof QueryPartition)) return false;
QueryPartition partition = (QueryPartition) o;
return query.equals(partition.query)
&& Arrays.equals(startAt, partition.startAt)
&& Arrays.equals(endBefore, partition.endBefore);
}

@Override
public int hashCode() {
int result = Objects.hash(query);
result = 31 * result + Arrays.hashCode(startAt);
result = 31 * result + Arrays.hashCode(endBefore);
return result;
}
}
Expand Up @@ -22,7 +22,9 @@
import com.google.firestore.v1.MapValue;
import com.google.firestore.v1.Value;
import com.google.protobuf.NullValue;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -181,4 +183,46 @@ static Value encodeValue(

throw FirestoreException.invalidState("Cannot convert %s to Firestore Value", sanitizedObject);
}

static Object decodeValue(FirestoreRpcContext<?> rpcContext, Value v) {
Value.ValueTypeCase typeCase = v.getValueTypeCase();
switch (typeCase) {
case NULL_VALUE:
return null;
case BOOLEAN_VALUE:
return v.getBooleanValue();
case INTEGER_VALUE:
return v.getIntegerValue();
case DOUBLE_VALUE:
return v.getDoubleValue();
case TIMESTAMP_VALUE:
return Timestamp.fromProto(v.getTimestampValue());
case STRING_VALUE:
return v.getStringValue();
case BYTES_VALUE:
return Blob.fromByteString(v.getBytesValue());
case REFERENCE_VALUE:
String pathName = v.getReferenceValue();
return new DocumentReference(rpcContext, ResourcePath.create(pathName));
case GEO_POINT_VALUE:
return new GeoPoint(
v.getGeoPointValue().getLatitude(), v.getGeoPointValue().getLongitude());
case ARRAY_VALUE:
List<Object> list = new ArrayList<>();
List<Value> lv = v.getArrayValue().getValuesList();
for (Value iv : lv) {
list.add(decodeValue(rpcContext, iv));
}
return list;
case MAP_VALUE:
Map<String, Object> outputMap = new HashMap<>();
Map<String, Value> inputMap = v.getMapValue().getFieldsMap();
for (Map.Entry<String, Value> entry : inputMap.entrySet()) {
outputMap.put(entry.getKey(), decodeValue(rpcContext, entry.getValue()));
}
return outputMap;
default:
throw FirestoreException.invalidState(String.format("Unknown Value Type: %s", typeCase));
}
}
}