Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add success and error callbacks to BulkWriter #483

Merged
merged 24 commits into from Jan 14, 2021

Conversation

thebrianchen
Copy link

Continuation of #458 but without blocking gax/grpc executor!

Brian Chen and others added 14 commits November 6, 2020 23:05
* Add new userCallbackExecutor which is used to ensure user callbacks aren't able to block any gax thread. Currently this is creating a cached thread pool per writer, we should likely treat this thread pool as a singleton and pass it from the builder to leverage the reuse of the threads even past the lifetime of the writer.
* Update instances of setting a SettableFuture to only set after the state operation (add/remove) has completed to ensure and downstream future will only start after state update.
@thebrianchen thebrianchen requested a review from a team as a code owner December 10, 2020 18:10
@thebrianchen thebrianchen self-assigned this Dec 10, 2020
@thebrianchen thebrianchen requested a review from a team December 10, 2020 18:10
@thebrianchen thebrianchen requested a review from a team as a code owner December 10, 2020 18:10
@product-auto-label product-auto-label bot added the api: firestore Issues related to the googleapis/java-firestore API. label Dec 10, 2020
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Dec 10, 2020
Copy link
Contributor

@schmidt-sebastian schmidt-sebastian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice. I am happy that we managed to get this to work!

ApiFuture<BatchWriteResponse> response =
firestore.sendRequest(request.build(), firestore.getClient().batchWriteCallable());

committed = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed that this is missing from Node.

Would it make sense to make this an abstract property?

In UpdateBuilder, you could do:

boolean isCommitted() {
  return committed;
}

and here you could do:

boolean isCommitted() {
  return batchState == SENT;
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I'll back-port this to node later along with some of the other changes like the retry logging message.

@@ -42,9 +44,44 @@
import javax.annotation.Nullable;

final class BulkWriter implements AutoCloseable {
/**
* A callback set by `addWriteResultListener()` to be run every time an operation successfully
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe s/set/used ? I am not sure though.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be "set" since the addWriteResultListener() function sets the callback, which is then run on operation success.

* modification errors (as this list is modified from both the user thread and the network
* thread).
*/
private final List<BulkCommitBatch> retryBatchQueue = new CopyOnWriteArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to schedule everything on bulkWriterExecutor now and use a normal ArrayList? The enqueue part is already on the executor, so the rest should be either be on it as well or be relatively easy to move.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, done.


/**
* A set of futures that represent pending BulkWriter operations. Each future is completed when
* the BulkWriter operation resolves. This set includes retries. Each retry's promise is added,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"This set includes retries" is not very specific. Do we:

  • Add a new operation for each retry (looks like we don't)
  • Or do we re-use the existing operations?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the documentation and removed the incorrect parts.


// Verify that the 2nd operation did not complete as a result of the flush() call.
assertArrayEquals(new String[] {"BEFORE_FLUSH", "FLUSH"}, operations.toArray());
bulkWriter.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make sense to add another flush here and verify "AFTER_FLUSH" as well. That way, it doesn't look unused.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -422,8 +743,8 @@ public void retriesIndividualWritesThatFailWithAbortedOrUnavailable() throws Exc
result1.get();
fail("set() should have failed");
} catch (Exception e) {
assertTrue(e.getCause() instanceof FirestoreException);
assertEquals(Status.DEADLINE_EXCEEDED, ((FirestoreException) e.getCause()).getStatus());
assertTrue(e.getCause() instanceof BulkWriterError);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could BulkWriterError extend FirestoreException? That way, it can be commonly treated by our users.

BulkWriterError should likely also be BulkWriterException. See the first part of the first answer here: https://stackoverflow.com/questions/5813614/what-is-difference-between-errors-and-exceptions

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attempted it. I had to make FirestoreException not final in order to extend, and I had to make one of the private constructors package-private.

@codecov
Copy link

codecov bot commented Dec 12, 2020

Codecov Report

Merging #483 (c1264e0) into master (a979c02) will increase coverage by 1.11%.
The diff coverage is 96.27%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master     #483      +/-   ##
============================================
+ Coverage     73.33%   74.45%   +1.11%     
- Complexity     1095     1118      +23     
============================================
  Files            65       66       +1     
  Lines          5790     5852      +62     
  Branches        721      722       +1     
============================================
+ Hits           4246     4357     +111     
+ Misses         1317     1267      -50     
- Partials        227      228       +1     
Impacted Files Coverage Δ Complexity Δ
...com/google/cloud/firestore/FirestoreException.java 83.33% <ø> (ø) 9.00 <0.00> (ø)
...n/java/com/google/cloud/firestore/Transaction.java 85.41% <ø> (ø) 13.00 <0.00> (ø)
...in/java/com/google/cloud/firestore/WriteBatch.java 100.00% <ø> (ø) 3.00 <0.00> (ø)
...java/com/google/cloud/firestore/UpdateBuilder.java 94.97% <88.88%> (-1.20%) 59.00 <3.00> (-19.00)
...om/google/cloud/firestore/BulkWriterException.java 91.66% <91.66%> (ø) 5.00 <5.00> (?)
...va/com/google/cloud/firestore/BulkCommitBatch.java 93.05% <92.75%> (+2.14%) 18.00 <16.00> (+14.00)
...in/java/com/google/cloud/firestore/BulkWriter.java 98.23% <98.03%> (+26.12%) 54.00 <15.00> (+23.00)
...java/com/google/cloud/firestore/FirestoreImpl.java 76.08% <100.00%> (+0.17%) 27.00 <1.00> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a979c02...c1264e0. Read the comment docs.

Copy link
Contributor

@schmidt-sebastian schmidt-sebastian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for not giving up!

/**
* A queue of batches to be retried. Use a synchronized list to avoid multi-thread concurrent
* modification errors (as this list is modified from both the user thread and the network
* thread).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is outdated.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

void onResult(DocumentReference documentReference, WriteResult result);
};

/** A callback set by `addWriteErrorListener()` to be run every time an operation fails. */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... "And determines if an operation should be retried"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.


/** Whether this BulkWriter instance is closed. Once closed, it cannot be opened again. */
private boolean closed = false;

/** Rate limiter used to throttle requests as per the 500/50/5 rule. */
private final RateLimiter rateLimiter;

private final FirestoreImpl firestore;
private WriteResultCallback successListener =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value of this can be turned into a static final constant and then referenced as a default so there is only one instance per JVM, rather than an instance per BulkWriter.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved.

public void onResult(DocumentReference documentReference, WriteResult result) {}
};

private WriteErrorCallback errorListener =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same note as successListener

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved.

return flushComplete;
}

/**
* Commits all enqueued writes and marks the BulkWriter instance as closed.
*
* <p>After calling `close()`, calling any method wil return an error.
* <p>After calling `close()`, calling any method wil return an error. Any retries scheduled with
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* <p>After calling `close()`, calling any method wil return an error. Any retries scheduled with
* <p>After calling `close()`, calling any method will return an error. Any retries scheduled with

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, thanks

return flushComplete;
}

/**
* Commits all enqueued writes and marks the BulkWriter instance as closed.
*
* <p>After calling `close()`, calling any method wil return an error.
* <p>After calling `close()`, calling any method wil return an error. Any retries scheduled with
* `addWriteErrorListener()` will be run before the `close()` completes.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the same true for success listeners? If so let's document it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/firestore/UpdateBuilder</className>
<method>int getMutationsSize()</method>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't remove this method without a major version release since it's public. because it's public customers could be using already and we won't want to remove it and break them.

We should be able to keep the method public without any issues.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed back to public.

}
/** Get the number of writes. */
@VisibleForTesting
int getWriteCount() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to keep this method public for binary compatibility as I noted in the clirr file.

After making this public we no longer need the @VisibleForTesting.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@thebrianchen thebrianchen merged commit 3c05037 into master Jan 14, 2021
@thebrianchen thebrianchen deleted the bc/bulk-error-with-test branch January 14, 2021 00:43
gcf-merge-on-green bot pushed a commit that referenced this pull request Jan 20, 2021
🤖 I have created a release \*beep\* \*boop\* 
---
## [2.2.0](https://www.github.com/googleapis/java-firestore/compare/v2.1.0...v2.2.0) (2021-01-20)


### Features

* Add bundle proto building ([#271](https://www.github.com/googleapis/java-firestore/issues/271)) ([994835c](https://www.github.com/googleapis/java-firestore/commit/994835c0a3be077404afa60abd4d4685d17fb533))
* add bundle.proto from googleapis/googleapis ([#407](https://www.github.com/googleapis/java-firestore/issues/407)) ([37da386](https://www.github.com/googleapis/java-firestore/commit/37da386875d1b65121e8a9a92b1a000537f07625))
* add CollectionGroup#getPartitions(long) ([#478](https://www.github.com/googleapis/java-firestore/issues/478)) ([bab064e](https://www.github.com/googleapis/java-firestore/commit/bab064edde26325bf0041ffe28d4c63b97a089c5))
* add implicit ordering for startAt(DocumentReference) calls ([#417](https://www.github.com/googleapis/java-firestore/issues/417)) ([aae6dc9](https://www.github.com/googleapis/java-firestore/commit/aae6dc960f7c42830ceed23c65acaad3e457dcff))
* add max/min throttling options to BulkWriterOptions ([#400](https://www.github.com/googleapis/java-firestore/issues/400)) ([27a9397](https://www.github.com/googleapis/java-firestore/commit/27a9397f67e151d723241c80ccb2ec9f1bfbba1c))
* add success and error callbacks to BulkWriter ([#483](https://www.github.com/googleapis/java-firestore/issues/483)) ([3c05037](https://www.github.com/googleapis/java-firestore/commit/3c05037e8fce8d3ce4907fde85bd254fc98ea588))
* Implementation of Firestore Bundle Builder ([#293](https://www.github.com/googleapis/java-firestore/issues/293)) ([fd5ef90](https://www.github.com/googleapis/java-firestore/commit/fd5ef90b6681cc67aeee6c95f3de80267798dcd0))
* Release bundles ([#466](https://www.github.com/googleapis/java-firestore/issues/466)) ([3af065e](https://www.github.com/googleapis/java-firestore/commit/3af065e32b193931c805b576f410ad90124b43a7))


### Bug Fixes

* add @BetaApi, make BulkWriter public, and refactor Executor ([#497](https://www.github.com/googleapis/java-firestore/issues/497)) ([27ff9f6](https://www.github.com/googleapis/java-firestore/commit/27ff9f6dfa92cac9119d2014c24a0759baa44fb7))
* **build:** sample checkstyle violations ([#457](https://www.github.com/googleapis/java-firestore/issues/457)) ([777ecab](https://www.github.com/googleapis/java-firestore/commit/777ecabd1ce12cbc5f4169de6c23a90f98deac06))
* bulkWriter: writing to the same doc doesn't create a new batch ([#394](https://www.github.com/googleapis/java-firestore/issues/394)) ([259ece8](https://www.github.com/googleapis/java-firestore/commit/259ece8511db71ea79cc1a080eb785a15db88756))
* empty commit to trigger release-please ([fcef0d3](https://www.github.com/googleapis/java-firestore/commit/fcef0d302cd0a9339d82db73152289d6f9f67ff2))
* make BulkWriterOptions public ([#502](https://www.github.com/googleapis/java-firestore/issues/502)) ([6ea05be](https://www.github.com/googleapis/java-firestore/commit/6ea05beb3f27337bef910ca64f0e3f32de6b7e98))
* retry Query streams ([#426](https://www.github.com/googleapis/java-firestore/issues/426)) ([3513cd3](https://www.github.com/googleapis/java-firestore/commit/3513cd39ff43d26c8432c05ce20693350539ae8f))
* retry transactions that fail with expired transaction IDs ([#447](https://www.github.com/googleapis/java-firestore/issues/447)) ([5905438](https://www.github.com/googleapis/java-firestore/commit/5905438af6501353e978210808834a26947aae95))
* verify partition count before invoking GetPartition RPC ([#418](https://www.github.com/googleapis/java-firestore/issues/418)) ([2054ae9](https://www.github.com/googleapis/java-firestore/commit/2054ae971083277e1cf81c2b57500c40a6faa0ef))


### Documentation

* **sample:** normalize firestore sample's region tags ([#453](https://www.github.com/googleapis/java-firestore/issues/453)) ([b529245](https://www.github.com/googleapis/java-firestore/commit/b529245c75f770e1b47ca5d9561bab55a7610677))


### Dependencies

* remove explicit version for jackson ([#479](https://www.github.com/googleapis/java-firestore/issues/479)) ([e2aecfe](https://www.github.com/googleapis/java-firestore/commit/e2aecfec51465b8fb3413337a76f9a3de57b8500))
* update dependency com.google.cloud:google-cloud-conformance-tests to v0.0.12 ([#367](https://www.github.com/googleapis/java-firestore/issues/367)) ([2bdd846](https://www.github.com/googleapis/java-firestore/commit/2bdd84693bbd968cafabd0e7ee56d1a9a7dc31ca))
* update dependency com.google.cloud:google-cloud-conformance-tests to v0.0.13 ([#411](https://www.github.com/googleapis/java-firestore/issues/411)) ([e6157b5](https://www.github.com/googleapis/java-firestore/commit/e6157b5cb532e0184125355b12115058e72afa67))
* update dependency com.google.cloud:google-cloud-shared-dependencies to v0.10.0 ([#383](https://www.github.com/googleapis/java-firestore/issues/383)) ([cb39ee8](https://www.github.com/googleapis/java-firestore/commit/cb39ee820c2f67e22da623f5a6eaa7ee6bf351e2))
* update dependency com.google.cloud:google-cloud-shared-dependencies to v0.10.2 ([#403](https://www.github.com/googleapis/java-firestore/issues/403)) ([991dd81](https://www.github.com/googleapis/java-firestore/commit/991dd810360e654fca0b53e0611da0cd77febc7c))
* update dependency com.google.cloud:google-cloud-shared-dependencies to v0.12.1 ([#425](https://www.github.com/googleapis/java-firestore/issues/425)) ([b897ffa](https://www.github.com/googleapis/java-firestore/commit/b897ffa90427a8f597c02c24f80d1d162be48b23))
* update dependency com.google.cloud:google-cloud-shared-dependencies to v0.13.0 ([#430](https://www.github.com/googleapis/java-firestore/issues/430)) ([0f8f218](https://www.github.com/googleapis/java-firestore/commit/0f8f218678c3ddebb73748c382cab8e38c2f140d))
* update dependency com.google.cloud:google-cloud-shared-dependencies to v0.14.1 ([#446](https://www.github.com/googleapis/java-firestore/issues/446)) ([e241f8e](https://www.github.com/googleapis/java-firestore/commit/e241f8ebbfdf202f00424177c69962311b37fc88))
* update dependency com.google.cloud:google-cloud-shared-dependencies to v0.15.0 ([#460](https://www.github.com/googleapis/java-firestore/issues/460)) ([b82fc35](https://www.github.com/googleapis/java-firestore/commit/b82fc3561d1a397438829ab69df24141481369a2))
* update dependency com.google.cloud:google-cloud-shared-dependencies to v0.16.0 ([#481](https://www.github.com/googleapis/java-firestore/issues/481)) ([ae98824](https://www.github.com/googleapis/java-firestore/commit/ae988245e6d6391c85414e9b6f7ae1b8148c3a6d))
* update dependency com.google.cloud:google-cloud-shared-dependencies to v0.16.1 ([4ace93c](https://www.github.com/googleapis/java-firestore/commit/4ace93c7be580a8f7870e71cad2dc19bb5fdef29))
* update dependency com.google.cloud:google-cloud-shared-dependencies to v0.17.0 ([#487](https://www.github.com/googleapis/java-firestore/issues/487)) ([e11e472](https://www.github.com/googleapis/java-firestore/commit/e11e4723bc75727086bee0436492f458def29ff5))
* update dependency com.google.cloud:google-cloud-shared-dependencies to v0.18.0 ([#495](https://www.github.com/googleapis/java-firestore/issues/495)) ([f78720a](https://www.github.com/googleapis/java-firestore/commit/f78720a155f1294321f05266b9a546bbf2cb9a04))
* update jackson dependencies to v2.11.3 ([#396](https://www.github.com/googleapis/java-firestore/issues/396)) ([2e176e2](https://www.github.com/googleapis/java-firestore/commit/2e176e2f864262f31e6f93705fa7e794023b9649))
---


This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: firestore Issues related to the googleapis/java-firestore API. cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants