New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-8376] Initial version of firestore connector JavaSDK #10187
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
|
||
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; | ||
|
||
public class FirestoreIO { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add experimental annotations to this and other public interfaces.
} | ||
|
||
/** | ||
* Returns a new {@link Write} that writes to the Cloud Firestore for the specified collection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably it'll be good a brief description on collection IDs and/or point to some link on Firestore Website.
} | ||
|
||
/** | ||
* Returns a new {@link Write} that writes to the Cloud Firestore for the specified collection key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto (on documentation). What' the difference between collection ID and key ID ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collection ID is the unique identifier of Firestore collection (e.g. "userSessions"), while Key ID is the unique identifier for a specific document in a collection (e.g. "5a0062ef-3fe2-490d-b1be-06f5bf94a39d").
I added a link to the Firestore data model page which explains it pretty well.
Do you think that is enough, or should we adjust naming/docs additionally?
/** | ||
* Writes a batch of mutations to Cloud Firestore. | ||
* | ||
* <p>If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. All mutations in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this result in duplicate data for records that were written in the previous try ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doc is actually false. I've used Firestore "batched writes" which makes writing mutations an atomic operation. So all either succeeds or fails -> no risk of duplicating entries. I updated the docs.
@VisibleForTesting | ||
public class WriteBatcherImpl implements WriteBatcher, Serializable { | ||
/** Target time per RPC for writes. */ | ||
static final int FIRESTORE_BATCH_TARGET_LATENCY_MS = 5000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document these constants.
} | ||
|
||
for (WriteResult result : future.get()) { | ||
LOG.info("Update time : " + result.getUpdateTime()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably will be useful to log the firestoreKey here as well.
// Break if the commit threw no exception. | ||
break; | ||
} catch (FirestoreException exception) { | ||
if (exception.getCode() == 4) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So 4 means DEADLINE_EXCEEDED ? Please clarify in the comment.
// Only log the code and message for potentially-transient errors. The entire exception | ||
// will be propagated upon the last retry. | ||
LOG.error( | ||
"Error writing batch of {} mutations to Firestore ({}): {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log the firestoreKey ?
import org.apache.beam.sdk.transforms.Sum; | ||
import org.apache.beam.sdk.util.MovingFunction; | ||
|
||
class MovingAverage { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add Javadocs to clarify why we need this.
* Tests for {@link AdaptiveThrottler}. | ||
*/ | ||
@RunWith(JUnit4.class) | ||
public class AdaptiveThrottlerTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add unit test for the sink. See here for some of the example unit tests that we could add for Firestore.
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
In addition please also consider adding an integration test similar to here: https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
} | ||
|
||
public WriteBatch batchWithKey(List<T> input, String collection, String key) { | ||
WriteBatch batch = firestoreClient.batch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please note that WriteBatch
is atomic transaction.
Large WriteBatch
could lead to contention and high error rates.
We are working on launching a batchWrite API for non-atomic data ingestion use cases. Before it gets launched, the next best option is writing each document separately.
(BTW, we have a max 500 write per commit limit.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make sure this is addressed.
Please note that WriteBatch is atomic transaction. We are working on launching a batchWrite API for non-atomic data ingestion use cases. Before it gets launched, the next best option is writing each document separately. (BTW, we have a max 500 write per commit limit.) |
Any updates ? Thanks. |
Hi @chamikaramj , yes the tests are there and I addressed all your comments in code. However, I'm having issues running integration tests on my machine (windows). Once, that is working I'll bump you for one more review. Hi @fredzqm, I understand your concerns, however, we have better experience running jobs with WriteBatch command. Namely, by doing data imports on large amounts of data, we witnessed faster processing times with WriteBatch then single writes. Can you point me to some documentation or else that aligns with your point? Additionally, the code utilizes AdaptiveThrottler, with the max batch value of 500, so hopefully, that obeys with the limit. |
Hi @djelekar, I work on the Firestore backend, and chiming in to second @fredzqm point. There are two interlocking issues when using atomic WriteBatch for large ingestion (throughput) jobs. First, under load and based on size, Firestore will split your dataset across multiple servers. When writing atomically to multiple documents, this increase the chance that the write will need to coordinate a 2-phase commit across multiple servers, which will increase the latency of the operation. Second, Firestore uses a pessimistic locking model under the hood. If the WriteBatch takes longer to execute (because of the issue above, or just because it is doing more work) it will be holding locks longer and disrupt unrelated read/write traffic on the document or index entries. I can see reasons why the experience looks better with WriteBatch, for example:
Does that make sense? We are hoping to launch a dedicated feature for writing batches in a non-atomic fashion, but it is unclear at this point when this will be generally available, and as @fredzqm point out, single writes are the best option for now. |
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions. |
Not stale |
What is the state of this PR? Do you need help? |
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Does the new batch writer mean we could reopen this? |
Added initial version of firestore connector for JavaSDK.
R: @chamikaramj
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.