diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java index 6dd86c720..494b754f8 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java @@ -19,6 +19,7 @@ import com.google.api.core.ApiAsyncFunction; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.core.BetaApi; import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.firestore.v1.FirestoreSettings; @@ -41,7 +42,9 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -final class BulkWriter implements AutoCloseable { +/** A Firestore BulkWriter that can be used to perform a large number of writes in parallel. */ +@BetaApi +public final class BulkWriter implements AutoCloseable { /** * A callback set by `addWriteResultListener()` to be run every time an operation successfully * completes. @@ -182,15 +185,11 @@ public boolean onError(BulkWriterException error) { private final ScheduledExecutorService bulkWriterExecutor; BulkWriter(FirestoreImpl firestore, BulkWriterOptions options) { - this(firestore, options, Executors.newSingleThreadScheduledExecutor()); - } - - BulkWriter( - FirestoreImpl firestore, - BulkWriterOptions options, - ScheduledExecutorService bulkWriterExecutor) { this.firestore = firestore; - this.bulkWriterExecutor = bulkWriterExecutor; + this.bulkWriterExecutor = + options.getExecutor() != null + ? options.getExecutor() + : Executors.newSingleThreadScheduledExecutor(); this.successExecutor = MoreExecutors.directExecutor(); this.errorExecutor = MoreExecutors.directExecutor(); diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterException.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterException.java index 199ef6671..d85c55d13 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterException.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterException.java @@ -16,11 +16,13 @@ package com.google.cloud.firestore; +import com.google.api.core.BetaApi; import com.google.cloud.firestore.BulkWriter.OperationType; import io.grpc.Status; /** The error thrown when a BulkWriter operation fails. */ -final class BulkWriterException extends FirestoreException { +@BetaApi +public final class BulkWriterException extends FirestoreException { private final Status status; private final String message; private final DocumentReference documentReference; diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java index aa04f0bff..628601cdc 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java @@ -16,10 +16,13 @@ package com.google.cloud.firestore; +import com.google.api.core.BetaApi; import com.google.auto.value.AutoValue; +import java.util.concurrent.ScheduledExecutorService; import javax.annotation.Nullable; /** Options used to configure request throttling in BulkWriter. */ +@BetaApi @AutoValue abstract class BulkWriterOptions { /** @@ -48,11 +51,19 @@ abstract class BulkWriterOptions { @Nullable abstract Double getMaxOpsPerSecond(); + /** + * @return The {@link ScheduledExecutorService} that BulkWriter uses to schedule all operations. + * If null, the default executor will be used. + */ + @Nullable + abstract ScheduledExecutorService getExecutor(); + static Builder builder() { return new AutoValue_BulkWriterOptions.Builder() .setMaxOpsPerSecond(null) .setInitialOpsPerSecond(null) - .setThrottlingEnabled(true); + .setThrottlingEnabled(true) + .setExecutor(null); } abstract Builder toBuilder(); @@ -104,6 +115,13 @@ Builder setMaxOpsPerSecond(int maxOpsPerSecond) { return setMaxOpsPerSecond(new Double(maxOpsPerSecond)); } + /** + * Set the executor that the BulkWriter instance schedules operations on. + * + * @param executor The executor to schedule BulkWriter operations on. + */ + abstract Builder setExecutor(@Nullable ScheduledExecutorService executor); + abstract BulkWriterOptions autoBuild(); BulkWriterOptions build() { diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java index 018d92788..a1c3a5f29 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java @@ -17,6 +17,7 @@ package com.google.cloud.firestore; import com.google.api.core.ApiFuture; +import com.google.api.core.BetaApi; import com.google.api.core.InternalExtensionOnly; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.cloud.Service; @@ -168,9 +169,30 @@ void getAll( @Nonnull WriteBatch batch(); + /** + * Creates a {@link BulkWriter} instance, used for performing multiple writes in parallel. + * Gradually ramps up writes as specified by the 500/50/5 rule. + * + * @see Ramping + * up traffic + */ + @BetaApi @Nonnull BulkWriter bulkWriter(); + /** + * Creates a {@link BulkWriter} instance, used for performing multiple writes in parallel. + * Gradually ramps up writes as specified by the 500/50/5 rule unless otherwise configured by a + * BulkWriterOptions object. + * + * @see Ramping + * up traffic + * @param options An options object to configure BulkWriter. + */ + @BetaApi + @Nonnull + BulkWriter bulkWriter(BulkWriterOptions options); + /** * Returns a FirestoreBundle.Builder {@link FirestoreBundle.Builder} instance using an * automatically generated bundle ID. When loaded on clients, client SDKs use the bundle ID and diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java index aa7b2a510..f92b7e3d6 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java @@ -39,7 +39,6 @@ import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.ScheduledExecutorService; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -99,15 +98,10 @@ public BulkWriter bulkWriter() { } @Nonnull - BulkWriter bulkWriter(BulkWriterOptions options) { + public BulkWriter bulkWriter(BulkWriterOptions options) { return new BulkWriter(this, options); } - @Nonnull - BulkWriter bulkWriter(BulkWriterOptions options, ScheduledExecutorService executor) { - return new BulkWriter(this, options, executor); - } - @Nonnull @Override public CollectionReference collection(@Nonnull String collectionPath) { diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java index 3ab3d5626..c3d549514 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java @@ -953,7 +953,10 @@ public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) responseStubber.initializeStub(batchWriteCapture, firestoreMock); BulkWriter bulkWriter = firestoreMock.bulkWriter( - BulkWriterOptions.builder().setInitialOpsPerSecond(5).build(), timeoutExecutor); + BulkWriterOptions.builder() + .setInitialOpsPerSecond(5) + .setExecutor(timeoutExecutor) + .build()); for (int i = 0; i < 600; ++i) { bulkWriter.set(firestoreMock.document("coll/doc"), LocalFirestoreHelper.SINGLE_FIELD_MAP);