New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add error and success handlers to BulkWriter #458
Conversation
843cd2e
to
d92d1da
Compare
* | ||
* <p>The writes in the batch are not applied atomically and can be applied out of order. | ||
*/ | ||
ApiFuture<List<BatchWriteResult>> bulkCommit() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried extracting a commitHelper()
from bulkCommit()
and commit()
, but couldn't figure out how to properly parameterize everything.
Aside from reducing code duplication, the main reason I want to do this is because moving this over requires a getter for writes
and exposing committed
as protected
. If we could move the logic for these parts into UpdateBuilder
, we could make both of those variables private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to come up with something here as well but mostly failed. The two Proto classes don't share a common ancestor that would help us here, which rules out extracting a common function that operates on a shared base type.
My one suggestion would be to have a prepareCommit()
method that returns a list of Writes and sets committed
to true
. I, however, don't think that this will actually save any code or make the code more readable. I am not sure if that is worth pursuing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, if that's the case I think we should just not try and refactor then.
dceab85
to
906165a
Compare
Codecov Report
@@ Coverage Diff @@
## master #458 +/- ##
============================================
+ Coverage 73.40% 73.71% +0.31%
- Complexity 1086 1091 +5
============================================
Files 65 66 +1
Lines 5786 5828 +42
Branches 723 724 +1
============================================
+ Hits 4247 4296 +49
+ Misses 1313 1306 -7
Partials 226 226
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks really good!
* | ||
* <p>The writes in the batch are not applied atomically and can be applied out of order. | ||
*/ | ||
ApiFuture<List<BatchWriteResult>> bulkCommit() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to come up with something here as well but mostly failed. The two Proto classes don't share a common ancestor that would help us here, which rules out extracting a common function that operates on a shared base type.
My one suggestion would be to have a prepareCommit()
method that returns a list of Writes and sets committed
to true
. I, however, don't think that this will actually save any code or make the code more readable. I am not sure if that is worth pursuing.
* Resolves the individual operations in the batch with the results and removes the entry from the | ||
* pendingOperations map if the result is not retryable. | ||
*/ | ||
void newProcessResults(List<BatchWriteResult> results) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the final name for this function? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, changed.
* @param error The error object from the failed BulkWriter operation attempt. | ||
* @return Whether to retry the operation or not. | ||
*/ | ||
boolean shouldRetryListener(BulkWriterError error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a listener, this is the function that is invoked by the listener. I think this should be onError
, shouldRetry
, or shouldRetryOnError
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
operationType, | ||
failedAttempts); | ||
|
||
boolean shouldRetry = errorListener.shouldRetryListener(bulkWriterError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be executed on the User-specified executor (via FirestoreOptions - the same as
Line 87 in 5905438
this.userCallbackExecutor = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done here and in the success callback.
return ApiFutures.transformAsync( | ||
ApiFutures.catchingAsync( | ||
batch.bulkCommit(), | ||
Exception.class, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be a Throwable
so we can also catch RuntimeException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on Java docs, RunTimeException
extends Exception
. The main issue with using Throwable is that BatchWriteResult
takes in Exception
, and I would have to plumb that through.
bulkWriter.addWriteErrorListener( | ||
new WriteErrorCallback() { | ||
public boolean shouldRetryListener(BulkWriterError error) { | ||
errorListenerCalled[0] = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe assert that we received an INTERNAL
error here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
bulkWriter.addWriteErrorListener( | ||
new WriteErrorCallback() { | ||
public boolean shouldRetryListener(BulkWriterError error) { | ||
throw new NullPointerException("Test code threw NullPointerException"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably not a good exception type since this is likely thrown by our code as well. I would advise to use something different - maybe UnsupportedOperationException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
bulkWriter.addWriteResultListener( | ||
new WriteResultCallback() { | ||
public void onResult(DocumentReference documentReference, WriteResult result) { | ||
throw new NullPointerException("Test code threw NullPointerException"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
* bulkWriter.addWriteResultListener( | ||
* new WriteResultCallback() { | ||
* public void onResult(DocumentReference documentReference, WriteResult result) { | ||
* System.out.println( | ||
* "Successfully executed write on document: " | ||
* + documentReference | ||
* + " at: " | ||
* + result.getUpdateTime()); | ||
* } | ||
* }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should probably just use Lamdba syntax here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
* System.out.println("Failed write at document: " + error.getDocumentReference()); | ||
* return false; | ||
* } | ||
* }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should probably use Lambda syntax here. If you stick with the current example, you need to add the function signature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@@ -59,13 +61,15 @@ | |||
* @param error The error object from the failed BulkWriter operation attempt. | |||
* @return Whether to retry the operation or not. | |||
*/ | |||
boolean shouldRetryListener(BulkWriterError error); | |||
boolean shouldRetryError(BulkWriterError error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking more and more that this should be onError
. Otherwise, users are going to ask us how they can get the error result and why we don't expose it in WriteResultCallback
. Note also that the term result
is a bit ambiguous as it doesn't define whether "error" is a "result".
We should maybe also ask for a third opinion of whether the classes are "callbacks". On Android, we seem to call the outermost class "Listener". The method inside is prefixed with "on" (see EventListener).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to onError
. As for WriteResult, we discussed naming for the node implementation and decided on WriteResult
over WriteSuccess
since a WriteResult
is returned from the operation.
Would it be too sadistic to ask Ben to take a look at this PR for Java code style/convention aftewards?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should definitely ask Ben :)
successListener.onResult(documentReference, result); | ||
public ApiFuture<WriteResult> apply(WriteResult result) | ||
throws ExecutionException, InterruptedException { | ||
invokeUserSuccessCallback(documentReference, result).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it may block the GRPC thread while the user code is running. Instead, you should do one of the following:
- Run your code
- Run user code in user thread, followed by a continuation that dispatches back on your old exectur
- Finish running your own code
Realistically, though, no one is going to get mad if you run a little bit of your own code on the use executor (as long as it is non-blocking and doesn't throw). For that reason, you can likely just use the user executor when the callback is first invoked (e.g. replace the directExecutor call in executeWrite()
if deemed safe).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main issue with dispatching the code onto the user executor is that we're no longer guaranteed that the write is enqueued onto the BatchQueue before flush() is called. Instead, after our discussion, I exposed two user executors that the user could pass in to run their callbacks on. We're blocking on the user-callbacks, but not on the user or grpc threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of conversation w/ Ben: We should not block grpc or gax threads. Instead, we should be creating a thread on each firestore instance that awaits user callbacks.
Solution: Created a separate thread for waiting on the callback. Fixed async issue by fixing bug in performFlush()
, where we were incorrectly resolving the flushComplete
future early if no retries were scheduled.
@@ -762,8 +806,7 @@ private void verifyNotClosed() { | |||
* <p>For example, see the sample code: <code> | |||
* BulkWriter bulkWriter = firestore.bulkWriter(); | |||
* bulkWriter.addWriteResultListener( | |||
* new WriteResultCallback() { | |||
* public void onResult(DocumentReference documentReference, WriteResult result) { | |||
* (DocumentReference documentReference, WriteResult result -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing ")".
We should make sure these compile as users are going to copy them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added. Checked that code compiles.
Continued in #483. |
Porting of several node PRs: 1, 2, 3.
Another fix I made was reducing the
MAX_BATCH_SIZE
from 500 to 20. I had to do this after refactoring UpdateBuilder, since the previous code imposed the same limits on WriteBatch/Transactions, causing the system tests to fail.I've tried to order the commits to make it more reviewable, but after finishing it up the first time, I went back and did more cleanup.