Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add error and success handlers to BulkWriter #458

Closed
wants to merge 13 commits into from

Conversation

thebrianchen
Copy link

@thebrianchen thebrianchen commented Nov 9, 2020

Porting of several node PRs: 1, 2, 3.

Another fix I made was reducing the MAX_BATCH_SIZE from 500 to 20. I had to do this after refactoring UpdateBuilder, since the previous code imposed the same limits on WriteBatch/Transactions, causing the system tests to fail.

I've tried to order the commits to make it more reviewable, but after finishing it up the first time, I went back and did more cleanup.

@thebrianchen thebrianchen requested a review from a team as a code owner November 9, 2020 20:14
@thebrianchen thebrianchen requested a review from a team November 9, 2020 20:14
@thebrianchen thebrianchen self-assigned this Nov 9, 2020
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Nov 9, 2020
@product-auto-label product-auto-label bot added the api: firestore Issues related to the googleapis/java-firestore API. label Nov 9, 2020
*
* <p>The writes in the batch are not applied atomically and can be applied out of order.
*/
ApiFuture<List<BatchWriteResult>> bulkCommit() {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried extracting a commitHelper() from bulkCommit() and commit(), but couldn't figure out how to properly parameterize everything.

Aside from reducing code duplication, the main reason I want to do this is because moving this over requires a getter for writes and exposing committed as protected. If we could move the logic for these parts into UpdateBuilder, we could make both of those variables private.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to come up with something here as well but mostly failed. The two Proto classes don't share a common ancestor that would help us here, which rules out extracting a common function that operates on a shared base type.
My one suggestion would be to have a prepareCommit() method that returns a list of Writes and sets committed to true. I, however, don't think that this will actually save any code or make the code more readable. I am not sure if that is worth pursuing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, if that's the case I think we should just not try and refactor then.

@codecov
Copy link

codecov bot commented Nov 9, 2020

Codecov Report

Merging #458 (2133403) into master (3893c44) will increase coverage by 0.31%.
The diff coverage is 84.92%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master     #458      +/-   ##
============================================
+ Coverage     73.40%   73.71%   +0.31%     
- Complexity     1086     1091       +5     
============================================
  Files            65       66       +1     
  Lines          5786     5828      +42     
  Branches        723      724       +1     
============================================
+ Hits           4247     4296      +49     
+ Misses         1313     1306       -7     
  Partials        226      226              
Impacted Files Coverage Δ Complexity Δ
...n/java/com/google/cloud/firestore/Transaction.java 85.41% <ø> (ø) 13.00 <0.00> (ø)
...in/java/com/google/cloud/firestore/WriteBatch.java 100.00% <ø> (ø) 3.00 <0.00> (ø)
...in/java/com/google/cloud/firestore/BulkWriter.java 82.39% <80.43%> (+9.80%) 39.00 <9.00> (+8.00)
...va/com/google/cloud/firestore/BulkWriterError.java 91.66% <91.66%> (ø) 5.00 <5.00> (?)
...va/com/google/cloud/firestore/BulkCommitBatch.java 94.44% <94.20%> (+3.53%) 17.00 <15.00> (+13.00)
...java/com/google/cloud/firestore/UpdateBuilder.java 94.92% <100.00%> (-1.25%) 58.00 <3.00> (-20.00)
...java/com/google/cloud/firestore/FirestoreImpl.java 77.61% <0.00%> (ø) 26.00% <0.00%> (-1.00%)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 3893c44...4c9e41a. Read the comment docs.

Copy link
Contributor

@schmidt-sebastian schmidt-sebastian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really good!

*
* <p>The writes in the batch are not applied atomically and can be applied out of order.
*/
ApiFuture<List<BatchWriteResult>> bulkCommit() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to come up with something here as well but mostly failed. The two Proto classes don't share a common ancestor that would help us here, which rules out extracting a common function that operates on a shared base type.
My one suggestion would be to have a prepareCommit() method that returns a list of Writes and sets committed to true. I, however, don't think that this will actually save any code or make the code more readable. I am not sure if that is worth pursuing.

* Resolves the individual operations in the batch with the results and removes the entry from the
* pendingOperations map if the result is not retryable.
*/
void newProcessResults(List<BatchWriteResult> results) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the final name for this function? :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, changed.

* @param error The error object from the failed BulkWriter operation attempt.
* @return Whether to retry the operation or not.
*/
boolean shouldRetryListener(BulkWriterError error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a listener, this is the function that is invoked by the listener. I think this should be onError, shouldRetry, or shouldRetryOnError.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed.

operationType,
failedAttempts);

boolean shouldRetry = errorListener.shouldRetryListener(bulkWriterError);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be executed on the User-specified executor (via FirestoreOptions - the same as

)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done here and in the success callback.

return ApiFutures.transformAsync(
ApiFutures.catchingAsync(
batch.bulkCommit(),
Exception.class,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a Throwable so we can also catch RuntimeException.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on Java docs, RunTimeException extends Exception. The main issue with using Throwable is that BatchWriteResult takes in Exception, and I would have to plumb that through.

bulkWriter.addWriteErrorListener(
new WriteErrorCallback() {
public boolean shouldRetryListener(BulkWriterError error) {
errorListenerCalled[0] = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe assert that we received an INTERNAL error here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

bulkWriter.addWriteErrorListener(
new WriteErrorCallback() {
public boolean shouldRetryListener(BulkWriterError error) {
throw new NullPointerException("Test code threw NullPointerException");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably not a good exception type since this is likely thrown by our code as well. I would advise to use something different - maybe UnsupportedOperationException?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

bulkWriter.addWriteResultListener(
new WriteResultCallback() {
public void onResult(DocumentReference documentReference, WriteResult result) {
throw new NullPointerException("Test code threw NullPointerException");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Comment on lines 764 to 773
* bulkWriter.addWriteResultListener(
* new WriteResultCallback() {
* public void onResult(DocumentReference documentReference, WriteResult result) {
* System.out.println(
* "Successfully executed write on document: "
* + documentReference
* + " at: "
* + result.getUpdateTime());
* }
* });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should probably just use Lamdba syntax here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

* System.out.println("Failed write at document: " + error.getDocumentReference());
* return false;
* }
* });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should probably use Lambda syntax here. If you stick with the current example, you need to add the function signature.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -59,13 +61,15 @@
* @param error The error object from the failed BulkWriter operation attempt.
* @return Whether to retry the operation or not.
*/
boolean shouldRetryListener(BulkWriterError error);
boolean shouldRetryError(BulkWriterError error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking more and more that this should be onError. Otherwise, users are going to ask us how they can get the error result and why we don't expose it in WriteResultCallback. Note also that the term result is a bit ambiguous as it doesn't define whether "error" is a "result".

We should maybe also ask for a third opinion of whether the classes are "callbacks". On Android, we seem to call the outermost class "Listener". The method inside is prefixed with "on" (see EventListener).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to onError. As for WriteResult, we discussed naming for the node implementation and decided on WriteResult over WriteSuccess since a WriteResult is returned from the operation.

Would it be too sadistic to ask Ben to take a look at this PR for Java code style/convention aftewards?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should definitely ask Ben :)

successListener.onResult(documentReference, result);
public ApiFuture<WriteResult> apply(WriteResult result)
throws ExecutionException, InterruptedException {
invokeUserSuccessCallback(documentReference, result).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it may block the GRPC thread while the user code is running. Instead, you should do one of the following:

  • Run your code
  • Run user code in user thread, followed by a continuation that dispatches back on your old exectur
  • Finish running your own code

Realistically, though, no one is going to get mad if you run a little bit of your own code on the use executor (as long as it is non-blocking and doesn't throw). For that reason, you can likely just use the user executor when the callback is first invoked (e.g. replace the directExecutor call in executeWrite() if deemed safe).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main issue with dispatching the code onto the user executor is that we're no longer guaranteed that the write is enqueued onto the BatchQueue before flush() is called. Instead, after our discussion, I exposed two user executors that the user could pass in to run their callbacks on. We're blocking on the user-callbacks, but not on the user or grpc threads.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of conversation w/ Ben: We should not block grpc or gax threads. Instead, we should be creating a thread on each firestore instance that awaits user callbacks.

Solution: Created a separate thread for waiting on the callback. Fixed async issue by fixing bug in performFlush(), where we were incorrectly resolving the flushComplete future early if no retries were scheduled.

@@ -762,8 +806,7 @@ private void verifyNotClosed() {
* <p>For example, see the sample code: <code>
* BulkWriter bulkWriter = firestore.bulkWriter();
* bulkWriter.addWriteResultListener(
* new WriteResultCallback() {
* public void onResult(DocumentReference documentReference, WriteResult result) {
* (DocumentReference documentReference, WriteResult result -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing ")".

We should make sure these compile as users are going to copy them.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. Checked that code compiles.

@schmidt-sebastian schmidt-sebastian removed their assignment Nov 10, 2020
@thebrianchen
Copy link
Author

Continued in #483.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: firestore Issues related to the googleapis/java-firestore API. cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants