Skip to content

Commit

Permalink
fix: add @BetaApi, make BulkWriter public, and refactor Executor (#497)
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Chen committed Jan 14, 2021
1 parent f78720a commit 27ff9f6
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 19 deletions.
Expand Up @@ -19,6 +19,7 @@
import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.firestore.v1.FirestoreSettings;
Expand All @@ -41,7 +42,9 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

final class BulkWriter implements AutoCloseable {
/** A Firestore BulkWriter that can be used to perform a large number of writes in parallel. */
@BetaApi
public final class BulkWriter implements AutoCloseable {
/**
* A callback set by `addWriteResultListener()` to be run every time an operation successfully
* completes.
Expand Down Expand Up @@ -182,15 +185,11 @@ public boolean onError(BulkWriterException error) {
private final ScheduledExecutorService bulkWriterExecutor;

BulkWriter(FirestoreImpl firestore, BulkWriterOptions options) {
this(firestore, options, Executors.newSingleThreadScheduledExecutor());
}

BulkWriter(
FirestoreImpl firestore,
BulkWriterOptions options,
ScheduledExecutorService bulkWriterExecutor) {
this.firestore = firestore;
this.bulkWriterExecutor = bulkWriterExecutor;
this.bulkWriterExecutor =
options.getExecutor() != null
? options.getExecutor()
: Executors.newSingleThreadScheduledExecutor();
this.successExecutor = MoreExecutors.directExecutor();
this.errorExecutor = MoreExecutors.directExecutor();

Expand Down
Expand Up @@ -16,11 +16,13 @@

package com.google.cloud.firestore;

import com.google.api.core.BetaApi;
import com.google.cloud.firestore.BulkWriter.OperationType;
import io.grpc.Status;

/** The error thrown when a BulkWriter operation fails. */
final class BulkWriterException extends FirestoreException {
@BetaApi
public final class BulkWriterException extends FirestoreException {
private final Status status;
private final String message;
private final DocumentReference documentReference;
Expand Down
Expand Up @@ -16,10 +16,13 @@

package com.google.cloud.firestore;

import com.google.api.core.BetaApi;
import com.google.auto.value.AutoValue;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;

/** Options used to configure request throttling in BulkWriter. */
@BetaApi
@AutoValue
abstract class BulkWriterOptions {
/**
Expand Down Expand Up @@ -48,11 +51,19 @@ abstract class BulkWriterOptions {
@Nullable
abstract Double getMaxOpsPerSecond();

/**
* @return The {@link ScheduledExecutorService} that BulkWriter uses to schedule all operations.
* If null, the default executor will be used.
*/
@Nullable
abstract ScheduledExecutorService getExecutor();

static Builder builder() {
return new AutoValue_BulkWriterOptions.Builder()
.setMaxOpsPerSecond(null)
.setInitialOpsPerSecond(null)
.setThrottlingEnabled(true);
.setThrottlingEnabled(true)
.setExecutor(null);
}

abstract Builder toBuilder();
Expand Down Expand Up @@ -104,6 +115,13 @@ Builder setMaxOpsPerSecond(int maxOpsPerSecond) {
return setMaxOpsPerSecond(new Double(maxOpsPerSecond));
}

/**
* Set the executor that the BulkWriter instance schedules operations on.
*
* @param executor The executor to schedule BulkWriter operations on.
*/
abstract Builder setExecutor(@Nullable ScheduledExecutorService executor);

abstract BulkWriterOptions autoBuild();

BulkWriterOptions build() {
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.firestore;

import com.google.api.core.ApiFuture;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.cloud.Service;
Expand Down Expand Up @@ -168,9 +169,30 @@ void getAll(
@Nonnull
WriteBatch batch();

/**
* Creates a {@link BulkWriter} instance, used for performing multiple writes in parallel.
* Gradually ramps up writes as specified by the 500/50/5 rule.
*
* @see <a href=https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic>Ramping
* up traffic</a>
*/
@BetaApi
@Nonnull
BulkWriter bulkWriter();

/**
* Creates a {@link BulkWriter} instance, used for performing multiple writes in parallel.
* Gradually ramps up writes as specified by the 500/50/5 rule unless otherwise configured by a
* BulkWriterOptions object.
*
* @see <a href=https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic>Ramping
* up traffic</a>
* @param options An options object to configure BulkWriter.
*/
@BetaApi
@Nonnull
BulkWriter bulkWriter(BulkWriterOptions options);

/**
* Returns a FirestoreBundle.Builder {@link FirestoreBundle.Builder} instance using an
* automatically generated bundle ID. When loaded on clients, client SDKs use the bundle ID and
Expand Down
Expand Up @@ -39,7 +39,6 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -99,15 +98,10 @@ public BulkWriter bulkWriter() {
}

@Nonnull
BulkWriter bulkWriter(BulkWriterOptions options) {
public BulkWriter bulkWriter(BulkWriterOptions options) {
return new BulkWriter(this, options);
}

@Nonnull
BulkWriter bulkWriter(BulkWriterOptions options, ScheduledExecutorService executor) {
return new BulkWriter(this, options, executor);
}

@Nonnull
@Override
public CollectionReference collection(@Nonnull String collectionPath) {
Expand Down
Expand Up @@ -953,7 +953,10 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
responseStubber.initializeStub(batchWriteCapture, firestoreMock);
BulkWriter bulkWriter =
firestoreMock.bulkWriter(
BulkWriterOptions.builder().setInitialOpsPerSecond(5).build(), timeoutExecutor);
BulkWriterOptions.builder()
.setInitialOpsPerSecond(5)
.setExecutor(timeoutExecutor)
.build());

for (int i = 0; i < 600; ++i) {
bulkWriter.set(firestoreMock.document("coll/doc"), LocalFirestoreHelper.SINGLE_FIELD_MAP);
Expand Down

0 comments on commit 27ff9f6

Please sign in to comment.