Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8376] Initial version of firestore connector JavaSDK #10187

Closed
wants to merge 14 commits into from

Conversation

djelekar
Copy link

Added initial version of firestore connector for JavaSDK.

R: @chamikaramj

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!


import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

public class FirestoreIO {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add experimental annotations to this and other public interfaces.

}

/**
* Returns a new {@link Write} that writes to the Cloud Firestore for the specified collection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably it'll be good a brief description on collection IDs and/or point to some link on Firestore Website.

}

/**
* Returns a new {@link Write} that writes to the Cloud Firestore for the specified collection key.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto (on documentation). What' the difference between collection ID and key ID ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collection ID is the unique identifier of Firestore collection (e.g. "userSessions"), while Key ID is the unique identifier for a specific document in a collection (e.g. "5a0062ef-3fe2-490d-b1be-06f5bf94a39d").

I added a link to the Firestore data model page which explains it pretty well.

Do you think that is enough, or should we adjust naming/docs additionally?

/**
* Writes a batch of mutations to Cloud Firestore.
*
* <p>If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. All mutations in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this result in duplicate data for records that were written in the previous try ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc is actually false. I've used Firestore "batched writes" which makes writing mutations an atomic operation. So all either succeeds or fails -> no risk of duplicating entries. I updated the docs.

@VisibleForTesting
public class WriteBatcherImpl implements WriteBatcher, Serializable {
/** Target time per RPC for writes. */
static final int FIRESTORE_BATCH_TARGET_LATENCY_MS = 5000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document these constants.

}

for (WriteResult result : future.get()) {
LOG.info("Update time : " + result.getUpdateTime());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably will be useful to log the firestoreKey here as well.

// Break if the commit threw no exception.
break;
} catch (FirestoreException exception) {
if (exception.getCode() == 4) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So 4 means DEADLINE_EXCEEDED ? Please clarify in the comment.

// Only log the code and message for potentially-transient errors. The entire exception
// will be propagated upon the last retry.
LOG.error(
"Error writing batch of {} mutations to Firestore ({}): {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log the firestoreKey ?

import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.util.MovingFunction;

class MovingAverage {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add Javadocs to clarify why we need this.

* Tests for {@link AdaptiveThrottler}.
*/
@RunWith(JUnit4.class)
public class AdaptiveThrottlerTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add unit test for the sink. See here for some of the example unit tests that we could add for Firestore.
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java

In addition please also consider adding an integration test similar to here: https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java

}

public WriteBatch batchWithKey(List<T> input, String collection, String key) {
WriteBatch batch = firestoreClient.batch();
Copy link

@fredzqm fredzqm Dec 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note that WriteBatch is atomic transaction.
Large WriteBatch could lead to contention and high error rates.

We are working on launching a batchWrite API for non-atomic data ingestion use cases. Before it gets launched, the next best option is writing each document separately.

(BTW, we have a max 500 write per commit limit.)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure this is addressed.

@fredzqm
Copy link

fredzqm commented Dec 14, 2019

Please note that WriteBatch is atomic transaction.
Large WriteBatch could lead to contention and high error rates.

We are working on launching a batchWrite API for non-atomic data ingestion use cases. Before it gets launched, the next best option is writing each document separately.

(BTW, we have a max 500 write per commit limit.)

@chamikaramj
Copy link
Contributor

Thanks @fredzqm.

@djelekar please let us know when current comments are addressed.

@chamikaramj
Copy link
Contributor

Any updates ?

Thanks.

@djelekar
Copy link
Author

Hi @chamikaramj , yes the tests are there and I addressed all your comments in code. However, I'm having issues running integration tests on my machine (windows). Once, that is working I'll bump you for one more review.

Hi @fredzqm, I understand your concerns, however, we have better experience running jobs with WriteBatch command. Namely, by doing data imports on large amounts of data, we witnessed faster processing times with WriteBatch then single writes. Can you point me to some documentation or else that aligns with your point?

Additionally, the code utilizes AdaptiveThrottler, with the max batch value of 500, so hopefully, that obeys with the limit.

@clement
Copy link

clement commented Feb 5, 2020

Hi @djelekar, I work on the Firestore backend, and chiming in to second @fredzqm point. There are two interlocking issues when using atomic WriteBatch for large ingestion (throughput) jobs.

First, under load and based on size, Firestore will split your dataset across multiple servers. When writing atomically to multiple documents, this increase the chance that the write will need to coordinate a 2-phase commit across multiple servers, which will increase the latency of the operation.

Second, Firestore uses a pessimistic locking model under the hood. If the WriteBatch takes longer to execute (because of the issue above, or just because it is doing more work) it will be holding locks longer and disrupt unrelated read/write traffic on the document or index entries.

I can see reasons why the experience looks better with WriteBatch, for example:

  • when using single writes, those should be asynchronous, and can (and should) be parallelized more aggressively
  • if the ingestion key range is not split, or not actively accessed by other processes, there will initially be no contention and good performance with WriteBatch, however there is a limit to how much throughput you will get from them once the ingestion runs longer and ramps up to more parallelism.

Does that make sense? We are hoping to launch a dedicated feature for writing batches in a non-atomic fashion, but it is unclear at this point when this will be generally available, and as @fredzqm point out, single writes are the best option for now.

@stale
Copy link

stale bot commented Apr 25, 2020

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Apr 25, 2020
@willbattel
Copy link

willbattel commented Apr 25, 2020

Not stale

@stale stale bot removed the stale label Apr 25, 2020
@willbattel
Copy link

Hey @fredzqm @clement are there any updates regarding the mentioned non-atomic batch writing capability?

@aaltay
Copy link
Member

aaltay commented Jun 25, 2020

What is the state of this PR? Do you need help?

@stale
Copy link

stale bot commented Aug 29, 2020

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Aug 29, 2020
@stale
Copy link

stale bot commented Sep 5, 2020

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@stale stale bot closed this Sep 5, 2020
@willbattel
Copy link

I noticed the aforementioned bulk writer feature for Firestore has been made available in some clients, such as Java and Node. Can the PR now be continued using this new API for parallelized writes?

@oeclint
Copy link

oeclint commented Dec 26, 2020

Does the new batch writer mean we could reopen this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants