New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add success and error callbacks to BulkWriter #483
Changes from 16 commits
f8e53e7
f4d53b4
9673dfa
d92d1da
3d895e1
906165a
2b5f04a
2133403
4ed2e47
bee9431
9a75859
1ece19e
1b63018
49fe275
a5b372b
028e373
95fdcde
b984f56
59c89bc
c46a296
044f925
8dc465d
bf03904
c1264e0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,32 +16,177 @@ | |
|
||
package com.google.cloud.firestore; | ||
|
||
import com.google.api.core.ApiAsyncFunction; | ||
import com.google.api.core.ApiFunction; | ||
import com.google.api.core.ApiFuture; | ||
import com.google.api.core.ApiFutures; | ||
import com.google.api.core.SettableApiFuture; | ||
import com.google.cloud.Timestamp; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.collect.ImmutableMap; | ||
import com.google.common.util.concurrent.MoreExecutors; | ||
import com.google.firestore.v1.BatchWriteRequest; | ||
import com.google.firestore.v1.BatchWriteResponse; | ||
import io.grpc.Status; | ||
import io.opencensus.trace.AttributeValue; | ||
import io.opencensus.trace.Tracing; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.concurrent.CopyOnWriteArraySet; | ||
import javax.annotation.Nullable; | ||
|
||
/** Used to represent a batch on the BatchQueue. */ | ||
class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> { | ||
/** | ||
* Used to represent the state of batch. | ||
* | ||
* <p>Writes can only be added while the batch is OPEN. For a batch to be sent, the batch must be | ||
* READY_TO_SEND. After a batch is sent, it is marked as SENT. | ||
*/ | ||
enum BatchState { | ||
OPEN, | ||
READY_TO_SEND, | ||
SENT, | ||
} | ||
|
||
private BatchState state = BatchState.OPEN; | ||
|
||
private final List<SettableApiFuture<BatchWriteResult>> pendingOperations = new ArrayList<>(); | ||
private final Set<DocumentReference> documents = new CopyOnWriteArraySet<>(); | ||
private final int maxBatchSize; | ||
|
||
BulkCommitBatch(FirestoreImpl firestore, int maxBatchSize) { | ||
super(firestore, maxBatchSize); | ||
super(firestore); | ||
this.maxBatchSize = maxBatchSize; | ||
} | ||
|
||
BulkCommitBatch(FirestoreImpl firestore, BulkCommitBatch retryBatch) { | ||
super(firestore); | ||
ApiFuture<WriteResult> wrapResult(DocumentReference documentReference) { | ||
return processLastOperation(documentReference); | ||
} | ||
|
||
/** | ||
* Commits all pending operations to the database and verifies all preconditions. | ||
* | ||
* <p>The writes in the batch are not applied atomically and can be applied out of order. | ||
*/ | ||
ApiFuture<List<BatchWriteResult>> bulkCommit() { | ||
Tracing.getTracer() | ||
.getCurrentSpan() | ||
.addAnnotation( | ||
TraceUtil.SPAN_NAME_BATCHWRITE, | ||
ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(getWrites().size()))); | ||
|
||
Preconditions.checkState( | ||
isReadyToSend(), "The batch should be marked as READY_TO_SEND before committing"); | ||
state = BatchState.SENT; | ||
|
||
// Create a new BulkCommitBatch containing only the indexes from the provided indexes to retry. | ||
for (int index : retryBatch.getPendingIndexes()) { | ||
this.writes.add(retryBatch.writes.get(index)); | ||
final BatchWriteRequest.Builder request = BatchWriteRequest.newBuilder(); | ||
request.setDatabase(firestore.getDatabaseName()); | ||
|
||
for (WriteOperation writeOperation : getWrites()) { | ||
request.addWrites(writeOperation.write); | ||
} | ||
|
||
ApiFuture<BatchWriteResponse> response = | ||
firestore.sendRequest(request.build(), firestore.getClient().batchWriteCallable()); | ||
|
||
committed = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just noticed that this is missing from Node. Would it make sense to make this an abstract property? In UpdateBuilder, you could do:
and here you could do:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. I'll back-port this to node later along with some of the other changes like the retry logging message. |
||
|
||
return ApiFutures.transform( | ||
response, | ||
new ApiFunction<BatchWriteResponse, List<BatchWriteResult>>() { | ||
@Override | ||
public List<BatchWriteResult> apply(BatchWriteResponse batchWriteResponse) { | ||
List<com.google.firestore.v1.WriteResult> writeResults = | ||
batchWriteResponse.getWriteResultsList(); | ||
|
||
List<com.google.rpc.Status> statuses = batchWriteResponse.getStatusList(); | ||
|
||
List<BatchWriteResult> result = new ArrayList<>(); | ||
|
||
for (int i = 0; i < writeResults.size(); ++i) { | ||
com.google.firestore.v1.WriteResult writeResult = writeResults.get(i); | ||
com.google.rpc.Status status = statuses.get(i); | ||
Status code = Status.fromCodeValue(status.getCode()); | ||
@Nullable Timestamp updateTime = null; | ||
@Nullable Exception exception = null; | ||
if (code == Status.OK) { | ||
updateTime = Timestamp.fromProto(writeResult.getUpdateTime()); | ||
} else { | ||
exception = FirestoreException.serverRejected(code, status.getMessage()); | ||
} | ||
result.add(new BatchWriteResult(updateTime, exception)); | ||
} | ||
|
||
return result; | ||
} | ||
}, | ||
MoreExecutors.directExecutor()); | ||
} | ||
|
||
int getPendingOperationCount() { | ||
return pendingOperations.size(); | ||
} | ||
|
||
ApiFuture<WriteResult> processLastOperation(DocumentReference documentReference) { | ||
Preconditions.checkState( | ||
retryBatch.state == BatchState.SENT, | ||
"Batch should be SENT when creating a new BulkCommitBatch for retry"); | ||
this.state = retryBatch.state; | ||
this.pendingOperations = retryBatch.pendingOperations; | ||
!documents.contains(documentReference), | ||
"Batch should not contain writes to the same document"); | ||
documents.add(documentReference); | ||
Preconditions.checkState(state == BatchState.OPEN, "Batch should be OPEN when adding writes"); | ||
SettableApiFuture<BatchWriteResult> resultFuture = SettableApiFuture.create(); | ||
pendingOperations.add(resultFuture); | ||
|
||
if (getPendingOperationCount() == maxBatchSize) { | ||
state = BatchState.READY_TO_SEND; | ||
} | ||
|
||
return ApiFutures.transformAsync( | ||
resultFuture, | ||
new ApiAsyncFunction<BatchWriteResult, WriteResult>() { | ||
public ApiFuture<WriteResult> apply(BatchWriteResult batchWriteResult) throws Exception { | ||
if (batchWriteResult.getException() == null) { | ||
return ApiFutures.immediateFuture(new WriteResult(batchWriteResult.getWriteTime())); | ||
} else { | ||
throw batchWriteResult.getException(); | ||
} | ||
} | ||
}, | ||
MoreExecutors.directExecutor()); | ||
} | ||
|
||
/** | ||
* Resolves the individual operations in the batch with the results and removes the entry from the | ||
* pendingOperations map if the result is not retryable. | ||
*/ | ||
void processResults(List<BatchWriteResult> results) { | ||
for (int i = 0; i < results.size(); i++) { | ||
SettableApiFuture<BatchWriteResult> resultFuture = pendingOperations.get(i); | ||
BatchWriteResult result = results.get(i); | ||
if (result.getException() == null) { | ||
resultFuture.set(result); | ||
} else { | ||
resultFuture.setException(result.getException()); | ||
} | ||
} | ||
} | ||
|
||
void markReadyToSend() { | ||
if (state == BatchState.OPEN) { | ||
state = BatchState.READY_TO_SEND; | ||
} | ||
} | ||
|
||
boolean isOpen() { | ||
return state == BatchState.OPEN; | ||
} | ||
|
||
boolean isReadyToSend() { | ||
return state == BatchState.READY_TO_SEND; | ||
} | ||
|
||
ApiFuture<WriteResult> wrapResult(ApiFuture<WriteResult> result) { | ||
return result; | ||
boolean has(DocumentReference documentReference) { | ||
return documents.contains(documentReference); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't remove this method without a major version release since it's public. because it's public customers could be using already and we won't want to remove it and break them.
We should be able to keep the method public without any issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed back to public.