Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add success and error callbacks to BulkWriter #483

Merged
merged 24 commits into from Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f8e53e7
chore: refactor BulkWriter to use retryBatch
Nov 7, 2020
f4d53b4
fix: create new batch for writes to the same doc
Nov 7, 2020
9673dfa
refactor BulkWriter's logic in UpdateBuilder into BulkCommitBatch
Nov 9, 2020
d92d1da
set MAX_BATCH_SIZE to 20 now that it won't affect WriteBatch/Transact…
Nov 9, 2020
3d895e1
cleanup
Nov 9, 2020
906165a
add clirr exceptions
Nov 9, 2020
2b5f04a
address rd. 1 comments
Nov 10, 2020
2133403
change naming to onError and expose callback executors
Nov 11, 2020
4ed2e47
add system tests
Nov 13, 2020
bee9431
fix: update BulkWriter logic to correctly track user callbacks in flush
BenWhitehead Nov 13, 2020
9a75859
add todo
Nov 13, 2020
1ece19e
Merge branch 'bc/bulk-error-with-test' of github.com:googleapis/java-…
Nov 13, 2020
1b63018
add slf4j logging to help with debugging
BenWhitehead Nov 17, 2020
49fe275
Move all BulkWriter logic to its own thread + cleanup
Dec 10, 2020
a5b372b
Merge branch 'master' into bc/bulk-error-with-test
Dec 10, 2020
028e373
remove usage of defaultThreadFactory
Dec 10, 2020
95fdcde
resolve comments rd.1
Dec 11, 2020
b984f56
remove userCallbackExecutor and add clirr exception
Dec 12, 2020
59c89bc
fix flaky test
Dec 12, 2020
c46a296
fix flaking batch limiter test
Dec 14, 2020
044f925
use directExecutor() in success/error executors by default instead of…
Dec 14, 2020
8dc465d
update comment since we're not using synchronized lists anymore
Dec 14, 2020
bf03904
resolve ben comments, add coverage
Dec 17, 2020
c1264e0
missed a codecov test
Dec 17, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions google-cloud-firestore/clirr-ignored-differences.xml
Expand Up @@ -223,6 +223,30 @@
<to>java.util.List</to>
</difference>

<!--
UpdateBuilder
-->
<difference>
<differenceType>6001</differenceType>
<className>com/google/cloud/firestore/UpdateBuilder</className>
<field>pendingOperations</field>
</difference>
<difference>
<differenceType>6001</differenceType>
<className>com/google/cloud/firestore/UpdateBuilder</className>
<field>state</field>
</difference>
<difference>
<differenceType>6010</differenceType>
<className>com/google/cloud/firestore/UpdateBuilder</className>
<field>writes</field>
</difference>
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/firestore/UpdateBuilder</className>
<method>int getMutationsSize()</method>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't remove this method without a major version release since it's public. because it's public customers could be using already and we won't want to remove it and break them.

We should be able to keep the method public without any issues.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed back to public.

</difference>

<!--
FakeCredentials Refactor
com.google.cloud.firestore.FirestoreOptions$Builder$FakeCredentials -> com.google.cloud.firestore.FirestoreOptions$EmulatorCredentials
Expand Down
Expand Up @@ -16,32 +16,177 @@

package com.google.cloud.firestore;

import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.firestore.v1.BatchWriteRequest;
import com.google.firestore.v1.BatchWriteResponse;
import io.grpc.Status;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.annotation.Nullable;

/** Used to represent a batch on the BatchQueue. */
class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> {
/**
* Used to represent the state of batch.
*
* <p>Writes can only be added while the batch is OPEN. For a batch to be sent, the batch must be
* READY_TO_SEND. After a batch is sent, it is marked as SENT.
*/
enum BatchState {
OPEN,
READY_TO_SEND,
SENT,
}

private BatchState state = BatchState.OPEN;

private final List<SettableApiFuture<BatchWriteResult>> pendingOperations = new ArrayList<>();
private final Set<DocumentReference> documents = new CopyOnWriteArraySet<>();
private final int maxBatchSize;

BulkCommitBatch(FirestoreImpl firestore, int maxBatchSize) {
super(firestore, maxBatchSize);
super(firestore);
this.maxBatchSize = maxBatchSize;
}

BulkCommitBatch(FirestoreImpl firestore, BulkCommitBatch retryBatch) {
super(firestore);
ApiFuture<WriteResult> wrapResult(DocumentReference documentReference) {
return processLastOperation(documentReference);
}

/**
* Commits all pending operations to the database and verifies all preconditions.
*
* <p>The writes in the batch are not applied atomically and can be applied out of order.
*/
ApiFuture<List<BatchWriteResult>> bulkCommit() {
Tracing.getTracer()
.getCurrentSpan()
.addAnnotation(
TraceUtil.SPAN_NAME_BATCHWRITE,
ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(getWrites().size())));

Preconditions.checkState(
isReadyToSend(), "The batch should be marked as READY_TO_SEND before committing");
state = BatchState.SENT;

// Create a new BulkCommitBatch containing only the indexes from the provided indexes to retry.
for (int index : retryBatch.getPendingIndexes()) {
this.writes.add(retryBatch.writes.get(index));
final BatchWriteRequest.Builder request = BatchWriteRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());

for (WriteOperation writeOperation : getWrites()) {
request.addWrites(writeOperation.write);
}

ApiFuture<BatchWriteResponse> response =
firestore.sendRequest(request.build(), firestore.getClient().batchWriteCallable());

committed = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed that this is missing from Node.

Would it make sense to make this an abstract property?

In UpdateBuilder, you could do:

boolean isCommitted() {
  return committed;
}

and here you could do:

boolean isCommitted() {
  return batchState == SENT;
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I'll back-port this to node later along with some of the other changes like the retry logging message.


return ApiFutures.transform(
response,
new ApiFunction<BatchWriteResponse, List<BatchWriteResult>>() {
@Override
public List<BatchWriteResult> apply(BatchWriteResponse batchWriteResponse) {
List<com.google.firestore.v1.WriteResult> writeResults =
batchWriteResponse.getWriteResultsList();

List<com.google.rpc.Status> statuses = batchWriteResponse.getStatusList();

List<BatchWriteResult> result = new ArrayList<>();

for (int i = 0; i < writeResults.size(); ++i) {
com.google.firestore.v1.WriteResult writeResult = writeResults.get(i);
com.google.rpc.Status status = statuses.get(i);
Status code = Status.fromCodeValue(status.getCode());
@Nullable Timestamp updateTime = null;
@Nullable Exception exception = null;
if (code == Status.OK) {
updateTime = Timestamp.fromProto(writeResult.getUpdateTime());
} else {
exception = FirestoreException.serverRejected(code, status.getMessage());
}
result.add(new BatchWriteResult(updateTime, exception));
}

return result;
}
},
MoreExecutors.directExecutor());
}

int getPendingOperationCount() {
return pendingOperations.size();
}

ApiFuture<WriteResult> processLastOperation(DocumentReference documentReference) {
Preconditions.checkState(
retryBatch.state == BatchState.SENT,
"Batch should be SENT when creating a new BulkCommitBatch for retry");
this.state = retryBatch.state;
this.pendingOperations = retryBatch.pendingOperations;
!documents.contains(documentReference),
"Batch should not contain writes to the same document");
documents.add(documentReference);
Preconditions.checkState(state == BatchState.OPEN, "Batch should be OPEN when adding writes");
SettableApiFuture<BatchWriteResult> resultFuture = SettableApiFuture.create();
pendingOperations.add(resultFuture);

if (getPendingOperationCount() == maxBatchSize) {
state = BatchState.READY_TO_SEND;
}

return ApiFutures.transformAsync(
resultFuture,
new ApiAsyncFunction<BatchWriteResult, WriteResult>() {
public ApiFuture<WriteResult> apply(BatchWriteResult batchWriteResult) throws Exception {
if (batchWriteResult.getException() == null) {
return ApiFutures.immediateFuture(new WriteResult(batchWriteResult.getWriteTime()));
} else {
throw batchWriteResult.getException();
}
}
},
MoreExecutors.directExecutor());
}

/**
* Resolves the individual operations in the batch with the results and removes the entry from the
* pendingOperations map if the result is not retryable.
*/
void processResults(List<BatchWriteResult> results) {
for (int i = 0; i < results.size(); i++) {
SettableApiFuture<BatchWriteResult> resultFuture = pendingOperations.get(i);
BatchWriteResult result = results.get(i);
if (result.getException() == null) {
resultFuture.set(result);
} else {
resultFuture.setException(result.getException());
}
}
}

void markReadyToSend() {
if (state == BatchState.OPEN) {
state = BatchState.READY_TO_SEND;
}
}

boolean isOpen() {
return state == BatchState.OPEN;
}

boolean isReadyToSend() {
return state == BatchState.READY_TO_SEND;
}

ApiFuture<WriteResult> wrapResult(ApiFuture<WriteResult> result) {
return result;
boolean has(DocumentReference documentReference) {
return documents.contains(documentReference);
}
}