Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8376] Google Cloud Firestore Connector - Add Firestore V1 Write Operations #14261

Merged

Conversation

BenWhitehead
Copy link
Contributor

First PR in a series of PRs to add sources and sinks for Google Cloud Firestore.

This PR adds a sink for writing Documents to Cloud Firestore. This PR also
includes common code that is shared between the sources and Firestore SDK
adapter which will be added in follow up PRs. (If desired I can split the common
code into a separate PR, but I thought it wouldn't make much sense without the
context of it being used).

Each commit has a detailed commit message, however a brief summary of the 4
commits included in this PR are as follows:

Gradle

Add Google Cloud Firestore dependencies. Version 2.2.4+ is needed for all dependencies
which include bugfixes and the necessary grpc client, protos and bootstraping
code.

Common Classes

A set of base classes which underpin the added PTransforms and DoFns that will
make up the connector. This change also includes the base classes used in the
unit and integration test suites which are present for each component of the
connector.

RPC QoS Classes

All read and write RPC Operations are managed in coordination with the RPC QoS
layer. Attempt counts, backoff amounts, batch sizing, success, failure metrics,
throttling are all managed here.

Firestore v1 Write Operations

Add PTransforms for writing to Cloud Firestore via the BatchWrite API. There are
two implementations which only differ in what happens when an error is
encountered. The first implementation will throw an exception if an error is
encountered, whereas the second implementation will output a failure object
(functioning similar to a dead letter queue).

Integration Tests

The integration test suite will only run if GOOGLE_APPLICATION_CREDENTIALS or
FIRESTORE_EMULATOR_HOST are present in the environment. The project the tests
run against must have a Firestore in Native mode
database configured (Firestore in Native Mode and Firestore in Datastore Mode cannot
both be present in the same project at this time). All integration tests are self
contained and do not depend on preconfigured external state, and clean up after
themselves.

A future PR will be created for sources, and another PR for a Firestore SDK
adapter which both will leverage the common code added in this PR.

Reviewers

R: @chamikaramj
R: @jlara310 (Firestore Technical Writer)
R: @clement (Firestore Dataplane TL)
R: @danthev (Firestore SRE)


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status Build Status Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@codecov
Copy link

codecov bot commented Mar 16, 2021

Codecov Report

Merging #14261 (3ac65e2) into master (b39ff90) will decrease coverage by 0.05%.
The diff coverage is n/a.

❗ Current head 3ac65e2 differs from pull request most recent head a7e327c. Consider uploading reports for the commit a7e327c to get more accurate results
Impacted file tree graph

@@            Coverage Diff             @@
##           master   #14261      +/-   ##
==========================================
- Coverage   83.81%   83.75%   -0.06%     
==========================================
  Files         435      872     +437     
  Lines       58422   116888   +58466     
==========================================
+ Hits        48966    97903   +48937     
- Misses       9456    18985    +9529     
Impacted Files Coverage Δ
...apache_beam/examples/complete/game/leader_board.py
...ache_beam/runners/interactive/recording_manager.py
...pache_beam/portability/api/beam_fn_api_pb2_grpc.py
...e_beam/runners/portability/abstract_job_service.py
...cp/internal/clients/storage/storage_v1_messages.py
..._beam/testing/benchmarks/nexmark/queries/query4.py
...srcs/sdks/python/apache_beam/typehints/__init__.py
...he_beam/io/flink/flink_streaming_impulse_source.py
...python/apache_beam/examples/wordcount_dataframe.py
...runners/interactive/display/pcoll_visualization.py
... and 1297 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update b39ff90...a7e327c. Read the comment docs.

@BenWhitehead
Copy link
Contributor Author

R: @chamikaramj
R: @jlara310 (Firestore Technical Writer)
R: @clement (Firestore Dataplane TL)
R: @danthev (Firestore SRE)

Copy link
Contributor

@danthev danthev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to preview the generated Java Doc? I only scanned most of the doc strings.


Write write = FirestoreProtoHelpers.newWrite();
BatchWriteRequest expectedRequest = BatchWriteRequest.newBuilder()
.setDatabase("projects/testing-project/databases/(default)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be parametrized on ENV_GOOGLE_CLOUD_PROJECT? Same on the other test cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not in this case, the unit tests are all in process and use mocks for all RPCs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

@BenWhitehead
Copy link
Contributor Author

Thanks for the review @danthev!

JavaDocs can be generated locally by running the following from you projects base dir:

./gradlew javadoc
open $(pwd)/sdks/java/io/google-cloud-platform/build/docs/javadoc/index.html

Copy link
Contributor

@danthev danthev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc LGTM, you'll just need to add a package-info.java to get a Javadoc for the package generally.

Copy link

@jlara310 jlara310 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed Javadoc comments for readability. LGTM after addressing review comments.

@chamikaramj
Copy link
Contributor

Run Java PreCommit

@chamikaramj
Copy link
Contributor

Run Spotless PreCommit

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Seems like pre-commit and spotless failures are related. So please address them as well.

import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

/**
* Base class for all {@link DoFn} defined in the Firestore Connector.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this will be the base class for all DoFns used by source and sink ? A DoFn can include any generic functionality and many sources/sinks break up functionality into multiple DoFns due to various reasons (readability, modularity, persistence of input, etc.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is true for all of the DoFn which are stateful and have a lifecycle that must be managed. For the DoFn which perform RPCs, Setup is for the RpcQos management which we want to live longer than an individual bundle, and StartBundle is where we create the RPC clients based on the PipelineOptions from StartBundleContext#getPipelineOptions().

This base class also serves as an upper bound for the unit tests where all DoFn are checked to ensure they are serializable as well as performing some common mocking which is used across the majority of tests (e.g. StartBundleContext, ProcessContext, Credentials etc).

There are a few DoFn which will be added in the "Read" PR to follow which directly extend DoFn and only implement a @ProcessElement method as they are otherwise stateless.

I'll update the comment on this class to call out that it is specifically for Stateful DoFn.

* The default behavior is to fail a bundle if any single write fails with a non-retryable error.
* <pre>{@code
* PCollection<Write> writes = ...;
* PDone sink = writes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning PDone from a sink is an anti-pattern since users will not be able to continue the pipeline (for example, write to system X after writing to Firestore). We should think of some useful PCollection that we can return from the Firestore sync. If nothing else, this can just be 'null' values (per-window) or the original input PCollection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in addition to providing an optional dead letter queue via .withDeadLetterQueue()? Is there a good example to follow for something like this? I got the PDone from the datastore connector.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think still returning DoFn (even for the general case) is an anti-pattern. Some sources (including Datastore) currently do this but we'd like to change such sources. See following thread.
https://lists.apache.org/thread.html/r1fa3599ef9c0034d2bfa0614012bc864ce7b296228d8cf804c931065%40%3Cuser.beam.apache.org%3E

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've read through both of the threads you linked and I'm leaning toward following the PCollection<TypeDescribingWhatIsComplete> from the threads.

I would Refactor BatchWrite to return a PCollection<NewWriteResultType> instead of PDone.

Two different things jump to mind for the NewWriteResultType and I'm curious if you think I should pick one over the other?

  1. WriteSuccessSummary which would contain a simple summary of the BatchWrite including the number of writes, and the number of bytes.
    public final class WriteSuccessSummary {
     private final int numWrites;
     private final long numBytes;
    }
  2. WriteSuccess which would be emitted for each processed Write and include the Write, and the WriteResult and Status from Firestore
    public static final class WriteSuccess implements Serializable {
      private final Write write;
      private final WriteResult writeResult;
      private final Status status;
    }

I'm personally leaning toward number 1 as it follows the SQL pattern and reduces the amount of data in the pipeline reducing any overhead if it is ignored.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the question to ask is what information will be more valuable for a Firestore user.
The pattern is:

"Generate data to write" -> "Write to firestore" -> "Do something else with the result"

I think most users will find failed results more useful but continuation when there are no failures is also important.

I don't think you should worry about performance too much in either case. Overhead when the returned PCollection is ignored should be minimum for all major runners.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've refactored things to use the WriteSuccessSummary approach and pushed the commit.

* <pre>{@code
* PCollection<Write> writes = ...;
* PDone sink = writes
* .apply(FirestoreIO.v1().write().batchWrite().build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure that these examples show any required parameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They do, the RpcQosOptions is the only optional parameter to these write builders.

* Alternatively, if you'd rather output write failures to a Dead Letter Queue add
* {@link BatchWrite.Builder#withDeadLetterQueue() withDeadLetterQueue} when building your writer.
* <pre>{@code
* PCollection<Write> writes = ...;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option might be to return a WriteResult with getters to get a regular result PCollection or a dead-letter queue. We have something like that for BigQuery sink.

public abstract static class Write<T> extends PTransform<PCollection<T>, WriteResult> {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java#L79-L107 it seems that the PCollections off of WriteResult are mutually exclusive and dependent on how it was constructed. I believe the builders I have accomplish the same functionality and are typed allowing for a compilation validation rather than a runtime validation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that there WriteResult is an output (not that we return a PCollection).
I'm not saying you should do that it was just an example.
See here for a previous discussion on this and various options.
https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E


@Override
public PDone expand(PCollection<com.google.firestore.v1.Write> input) {
input.apply("batchWrite", ParDo.of(new DefaultBatchWriteFn(clock, firestoreStatefulComponentFactory, rpcQosOptions, CounterFactory.DEFAULT)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are writes to Firestore idempotent ? Note that bundles of this write step may fail and may be retried by the runner. So if the writes are not idempotent you'll have to handle this to prevent duplicate data from being written.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writes to firestore are generally idempotent, there are a few narrow cases where a precondition can be specified on an individual write (something like perform and update as long as the updatedAt matches timeX). If a precondition fails, firestire will respond with a failure status for that individual write, and if the dead letter queue is used will be output as an individual WriteFailure{write, writeResult, status}

* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be possible to trigger this IT from the PR to make sure it passes. Lemme know if you need help with this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to trigger the ITs for the PR. They do need to run against a project that has a Firestore Native Mode database, otherwise they will be skipped (currently Firestore Native Mode and Datastore are mutually exclusive in a project).

@BenWhitehead
Copy link
Contributor Author

I've fixed the spotless and firetore related failures in "Run Java PreCommit".

@pabloem pabloem requested a review from chamikaramj April 9, 2021 21:53
@pabloem pabloem closed this Apr 15, 2021
@pabloem pabloem reopened this Apr 15, 2021
@pabloem
Copy link
Member

pabloem commented Apr 15, 2021

reopening to rerun tests

@BenWhitehead BenWhitehead force-pushed the firestore-connector/feat/02-v1-write branch from e2b7a39 to 75ee8eb Compare April 16, 2021 17:54
@suztomo
Copy link
Contributor

suztomo commented Apr 20, 2021

In another PR, I upgraded the version of the libraries-bom to 20.0.0 which has com.google.cloud:google-cloud-firestore:2.2.5. This may affect this PR.

@BenWhitehead
Copy link
Contributor Author

Thanks for the heads up @suztomo It shouldn't have a material impact to this PR. My changes to the gradle config are using the version from libraries-bom already, so I'd expect things to merge pretty easily and I can do that when I rebase and squash things before merging.

@BenWhitehead BenWhitehead force-pushed the firestore-connector/feat/02-v1-write branch from 61d5e97 to c285ec4 Compare April 23, 2021 01:14
@chamikaramj
Copy link
Contributor

Sorry about the delay here. Lemme know if this is ready for another look.

@chamikaramj
Copy link
Contributor

Run Java PreCommit

@chamikaramj
Copy link
Contributor

Run Java_Examples_Dataflow PreCommit

@BenWhitehead
Copy link
Contributor Author

@chamikaramj Any preference related to this comment? #14261 (comment)

Other than that, someone from the Firestore Backend team should be taking a look and posting a review soon.

Copy link

@cynthiachi cynthiachi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No real changes, just a question about the 555 implementation, otherwise looks good!

* Determines batch sizes based on past performance.
*
* <p>It aims for a target response time per RPC: it uses the response times for previous RPCs and
* the number of documents contained in them, calculates a rolling average time-per-document, and

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this doesn't take document sizes into account, right? I'm not really advocating for taking size into account, mostly just curious how sensitive this might be to fluctuations in average document size across batches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, right now it is only tracking number of documents not the size of those documents. It could be extended if we felt so inclined to do so at some point.

distributionFactory.get(RpcQos.class.getName(), "qos_rampUp_availableWriteCountBudget");
}

int getAvailableWriteCountBudget(Instant instant) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this computation is at the worker-second granularity and does not account for behavior in past seconds or of other workers? So if workers are slower than sending 1 request per second, would this suppress overall write rate across workers to be lower than budgeted by 555? In practice did you achieve something close to 555?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this only tracks data for up to one second. In the case of this ramp up calculation we're not taking past performance into consideration (instead the write batcher and adaptive throttler take care of that). This class is only attempt to limit based on the 555 budget and what has been used within that specific one-second window.

The budget value grows relative to the firstInstant captured, so even if a worker is only sending a request every 5 seconds the budget calculation will still be for the next time window. If the worker is sending more than one request a second, then it will decrement from the budget and be throttled until the next second if budget has been exhausted for the specific second.

For hintMaxNumWorkers = 1 the budget will fill along this line.

I've got a test org.apache.beam.sdk.io.gcp.firestore.RpcQosSimulationTest#writeRampUp_shouldScaleAlongTheExpectedLine which runs a series of operations against the RampUp to ensure values fall on the expected line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a screenshot of a job that ran for ~90min where we can see the growth steps for the the ramp up budget (the teal line/square line). Since this is aggregated across all workers it's not as exact as each of the 5 minute windows a single worker would see, but does show the progression.

qos batch size calculation

For the same job we can see the actual (aggregated) min/mean/max batchCapacityCount over the duration of the job. We can see the steps pretty clearly on the left before capping out at 500, while at the same time the ramp up budget keeps increasing all the way up to almost 20k by the end of the job.

qos_write_batchCapacityCount distribution

@@ -204,7 +210,8 @@ public boolean awaitSafeToProceed(Instant instant) throws InterruptedException {
state.checkActive();
Duration shouldThrottleRequest = at.shouldThrottleRequest(instant);
if (shouldThrottleRequest.compareTo(Duration.ZERO) > 0) {
logger.info("Delaying request by {}ms", shouldThrottleRequest.getMillis());
long throttleRequestMillis = shouldThrottleRequest.getMillis();
logger.debug("Delaying request by {}ms", throttleRequestMillis);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is when the adaptive throttler backs off, right? I think I'd tend toward an INFO log here, and a due to previous failures in the message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dropped it to debug after adding the diagnostic distribution qos_adaptiveThrottler_throttlingMs so as not to blow out logs as this scenario can happen thousands of times for a job.

This screenshot shows the average number of times the adaptive throttler throttled grouped by docCount and fieldCount.

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is throttlingMs always reported, or does it hinge on that flag? As long as a user can easily find out what's happening that's fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throttlingMs is always reported, it is the magic metric which the job is able to use to signal to the platform that the job is running but it's slowish for a reason.

@@ -344,7 +354,8 @@ public boolean awaitSafeToProceed(Instant instant) throws InterruptedException {
Optional<Duration> shouldThrottle = writeRampUp.shouldThrottle(instant);
if (shouldThrottle.isPresent()) {
Duration throttleDuration = shouldThrottle.get();
getLogger().debug("Still ramping up, Delaying request by {}ms", throttleDuration.getMillis());
long throttleDurationMillis = throttleDuration.getMillis();
getLogger().debug("Still ramping up, Delaying request by {}ms", throttleDurationMillis);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here with INFO logs. If there's a better way that's fine too, what's important is that it's not too hard for the user to find out if they're being throttled by ramp-up or the adaptive throttler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to adaptive throttler, this can happen a lot

image

@BenWhitehead
Copy link
Contributor Author

@chamikaramj Both SRE and the Backend team have approved the changes, I've removed the PDone from the non-failure write type and fixed all related build failures. I think this is ready for the final review before I rebase it against master and prepare for merge.

@chamikaramj
Copy link
Contributor

Run Java PreCommit

@chamikaramj
Copy link
Contributor

Run SQL PreCommit

@chamikaramj
Copy link
Contributor

Run Java PreCommit

* add com.google.cloud:google-cloud-firestore:2.2.4
* add com.google.api.grpc:grpc-google-cloud-firestore-v1:2.2.4
* add com.google.api.grpc:proto-google-cloud-firestore-v1:2.2.4
In preparation, this change adds common classes used by each of the three
sub-components of the Firestore Connector. The three sub-components to follow
are:
1. Firestore v1 Read operations
2. Firestore v1 Write operations
3. Firestore SDK Adapter

### General Common classes
* FirestoreDnFn: base DoFn for all Firestore DoFn
* FirestoreIO: DSL Entry point
* FirestoreIOOptions: Those options which are configurable at the PipelineOptions level
* FirestoreStatefulComponentFactory: Factory for those components used by DoFn which are stateful and provides a mockable boundary in unit tests
* JodaClock: Simplistic clock interface which provides a mockable boundary

### Integration Tests

A GCP Service account with read & write permissions for Cloud Firestore is
required and the `GOOGLE_APPLICATION_CREDENTIALS` environment variable should be
configured to point to the service account file. Alternatively, the integration
test suite can be ran against the Cloud Firestore emulator by setting
`FIRESTORE_EMULATOR_HOST` to the host-port of the running Cloud Firestore
Emulator, and setting `GOOGLE_CLOUD_PROJECT` to the name of the project that
will be used by the emulator.

All integration tests are integrated with JUnit tests lifecycle and will have
any documents or collections created during the course of a test run cleaned up
after the test finishes running (regardless of success). See the
`org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper` JUnit rule for
more details.
In preparation, this change adds RPC QoS classes used by the Firestore v1 Read
and Write operations.

### Rpc Quality of Service (QoS)
In an effort to not overwhelm the live Cloud Firestore service, client side
RPC Quality of Service has been built into FirestoreV1. All requests: attempted,
successful and failed are recorded and used to determine the rate at which
requests are sent to Cloud Firestore. In the case of `Write` requests, writes
are enqueued and flushed once a batch size threshold is reached, or a bundle
finishes. A combination of Adaptive Throttling[1], 500/50/5 Ramp Up[2] and
historical batch throughput are used to determine the maximum size of a batch.

Per RPC Method metrics will be surfaced allowing for insight into the values
that feed into the QoS System.

For each RPC Method the following counters are available:
* throttlingMs
* rpcFailures
* rpcSuccesses
* rpcStreamValueReceived

[1] https://sre.google/sre-book/handling-overload/#client-side-throttling-a7sYUg
[2] https://cloud.google.com/firestore/docs/best-practices#ramping_up_traffic
@BenWhitehead BenWhitehead force-pushed the firestore-connector/feat/02-v1-write branch from fcfd0e5 to 988aee2 Compare June 1, 2021 21:26
@chamikaramj
Copy link
Contributor

Run Java_Examples_Dataflow PreCommit

@chamikaramj
Copy link
Contributor

Run Python PreCommit

@chamikaramj
Copy link
Contributor

Run Java PostCommit

@chamikaramj
Copy link
Contributor

Run Java_Examples_Dataflow PreCommit

@chamikaramj
Copy link
Contributor

Run Python PreCommit

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This looks great. We can merge after tests pass.

LGTM

@BenWhitehead
Copy link
Contributor Author

Awesome, thanks @chamikaramj! I'll squash all the commits and cleanup the commit message today.

… Operations

Entry point for accessing Firestore V1 read methods is `FirestoreIO.v1().write()`.

Currently supported write RPC methods:
* `BatchWrite`

### Unit Tests

No external dependencies are needed for this suite

A large suite of unit tests have been added to cover most branches and error
scenarios in the various components. Test for input validation and bounds
checking are also included in this suite.

### Integration Tests

Integration tests for each type of RPC is present in
`org.apache.beam.sdk.io.gcp.firestore.it.FirestoreV1IT`. All of these tests
leverage `TestPipeline` and verify the expected Documents/Collections are all
operated on during the test.
@BenWhitehead BenWhitehead force-pushed the firestore-connector/feat/02-v1-write branch from 3ac65e2 to a7e327c Compare June 4, 2021 18:33
@BenWhitehead
Copy link
Contributor Author

Failures in Java ("Run Java PreCommit") seem unrelated to my change.

All of my commits have been cleaned up and are marked with the jira issue for the feature and should be ready to merge.

@chamikaramj
Copy link
Contributor

Run Java PreCommit

@chamikaramj
Copy link
Contributor

Run Java PostCommit

@chamikaramj
Copy link
Contributor

Run Java PreCommit

@chamikaramj
Copy link
Contributor

BTW did you confirm that new ITs get captured by "Run Java PostCommit" ?

@chamikaramj
Copy link
Contributor

@chamikaramj
Copy link
Contributor

Run PostCommit_Java_Dataflow

@BenWhitehead
Copy link
Contributor Author

I've dug into the missing integration tests running. The integration tests are currently using junit assumptions to skip running if the client isn't able to bootstrap and unfortunately this fact isn't being reported in the tests runs seemingly due to gradle/gradle#5511 .

I'll prepare a separate PR with a fix to not use assumptions and update the gradle config to run Firestore tests against a project which has a Firestore native mode database present.

@chamikaramj
Copy link
Contributor

Run Java PreCommit

@chamikaramj chamikaramj merged commit 468897a into apache:master Jun 11, 2021
@BenWhitehead BenWhitehead deleted the firestore-connector/feat/02-v1-write branch June 11, 2021 19:49
@willbattel
Copy link

@BenWhitehead what was the rationale for using the BatchWriter over the new BulkWriter API (googleapis/java-firestore#323)? The new BulkWriter was recommended by the Firestore team in a previous Beam PR (#10187 (comment)), so I'm curious if this was taken into account or forgotten about. It would seem to me that the BulkWriter would be more appropriate for usage as a sink, compared to the added overhead and error frequency of BatchWriter for no apparent gain in this case. Or maybe I'm wrong, or there was another conversation about this that I'm not privy to.

@BenWhitehead
Copy link
Contributor Author

@willbattel After digging into the implementation of the Firestore connector, it became apparent that the Firestore SDK's model wouldn't be a good fit for use in the connector. (None of the model is Serializable, and there is no separation between data and operations which would have made it difficult to update the Firestore SDK to be beam compatible.) So here we've opted to use the gRPC client itself and forgo the Firestore SDK.

The com.google.cloud.firestore.BulkWriter from the Firestore SDK ultimately uses the same gRPC api google.firestore.v1.Firestore.BatchWrite which is being used in the connector here. The Firestore SDK has a WriteBatch feature which uses the google.firestore.v1.Firestore.Commit api rather than BatchWrite.

Additionally the SRE team in particular wanted to build more client side traffic controls into the beam connector to ensure it is well behaved, since beam is so easy to scale up really fast. So in this sink, we have the 500/50/5 ramp up budgeting, adaptive throttling in conjunction with dynamic throughput base batch sizing which are not features which are supported by the Firestore SDK.

Secondarily, the BulkWriter from Firestore SDK doesn't currently provide doesn't provide any request lifecycle awareness hooks to allow for the Metrics which are surfaced by this sink.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants