Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add @BetaApi, make BulkWriter public, and refactor Executor #497

Merged
merged 3 commits into from Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,6 +19,7 @@
import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.firestore.v1.FirestoreSettings;
Expand All @@ -41,7 +42,9 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

final class BulkWriter implements AutoCloseable {
/** A Firestore BulkWriter that can be used to perform a large number of writes in parallel. */
@BetaApi
public final class BulkWriter implements AutoCloseable {
/**
* A callback set by `addWriteResultListener()` to be run every time an operation successfully
* completes.
Expand Down Expand Up @@ -182,15 +185,11 @@ public boolean onError(BulkWriterException error) {
private final ScheduledExecutorService bulkWriterExecutor;

BulkWriter(FirestoreImpl firestore, BulkWriterOptions options) {
this(firestore, options, Executors.newSingleThreadScheduledExecutor());
}

BulkWriter(
FirestoreImpl firestore,
BulkWriterOptions options,
ScheduledExecutorService bulkWriterExecutor) {
this.firestore = firestore;
this.bulkWriterExecutor = bulkWriterExecutor;
this.bulkWriterExecutor =
options.getExecutor() != null
? options.getExecutor()
: Executors.newSingleThreadScheduledExecutor();
this.successExecutor = MoreExecutors.directExecutor();
this.errorExecutor = MoreExecutors.directExecutor();

Expand Down
Expand Up @@ -16,11 +16,13 @@

package com.google.cloud.firestore;

import com.google.api.core.BetaApi;
import com.google.cloud.firestore.BulkWriter.OperationType;
import io.grpc.Status;

/** The error thrown when a BulkWriter operation fails. */
final class BulkWriterException extends FirestoreException {
@BetaApi
public final class BulkWriterException extends FirestoreException {
private final Status status;
private final String message;
private final DocumentReference documentReference;
Expand Down
Expand Up @@ -16,10 +16,13 @@

package com.google.cloud.firestore;

import com.google.api.core.BetaApi;
import com.google.auto.value.AutoValue;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;

/** Options used to configure request throttling in BulkWriter. */
@BetaApi
@AutoValue
abstract class BulkWriterOptions {
/**
Expand Down Expand Up @@ -48,11 +51,19 @@ abstract class BulkWriterOptions {
@Nullable
abstract Double getMaxOpsPerSecond();

/**
* @return The {@link ScheduledExecutorService} that BulkWriter uses to schedule all operations.
* If null, the default executor will be used.
*/
@Nullable
abstract ScheduledExecutorService getExecutor();

static Builder builder() {
return new AutoValue_BulkWriterOptions.Builder()
.setMaxOpsPerSecond(null)
.setInitialOpsPerSecond(null)
.setThrottlingEnabled(true);
.setThrottlingEnabled(true)
.setExecutor(null);
}

abstract Builder toBuilder();
Expand Down Expand Up @@ -104,6 +115,13 @@ Builder setMaxOpsPerSecond(int maxOpsPerSecond) {
return setMaxOpsPerSecond(new Double(maxOpsPerSecond));
}

/**
* Set the executor that the BulkWriter instance schedules operations on.
*
* @param executor The executor to schedule BulkWriter operations on.
*/
abstract Builder setExecutor(@Nullable ScheduledExecutorService executor);

abstract BulkWriterOptions autoBuild();

BulkWriterOptions build() {
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.firestore;

import com.google.api.core.ApiFuture;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.cloud.Service;
Expand Down Expand Up @@ -168,9 +169,29 @@ void getAll(
@Nonnull
WriteBatch batch();

/**
* Creates a {@link BulkWriter} instance, used for performing multiple writes in parallel.
* Gradually ramps up writes as specified by the 500/50/5 rule.
*
* @see <a href=https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic>Ramping
* up traffic</a>
*/
@BetaApi
@Nonnull
BulkWriter bulkWriter();

/**
* Creates a {@link BulkWriter} instance, used for performing multiple writes in parallel.
* Gradually ramps up writes as specified by the 500/50/5 rule.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't necessarily true for all options object, is it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added "unless otherwise configured by a BulkWriterOptions object".

*
* @see <a href=https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic>Ramping
* up traffic</a>
* @param options An options object to configure BulkWriter.
*/
@BetaApi
@Nonnull
BulkWriter bulkWriter(BulkWriterOptions options);

/**
* Returns a FirestoreBundle.Builder {@link FirestoreBundle.Builder} instance using an
* automatically generated bundle ID. When loaded on clients, client SDKs use the bundle ID and
Expand Down
Expand Up @@ -39,7 +39,6 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -99,15 +98,10 @@ public BulkWriter bulkWriter() {
}

@Nonnull
BulkWriter bulkWriter(BulkWriterOptions options) {
public BulkWriter bulkWriter(BulkWriterOptions options) {
return new BulkWriter(this, options);
}

@Nonnull
BulkWriter bulkWriter(BulkWriterOptions options, ScheduledExecutorService executor) {
return new BulkWriter(this, options, executor);
}

@Nonnull
@Override
public CollectionReference collection(@Nonnull String collectionPath) {
Expand Down
Expand Up @@ -953,7 +953,10 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
responseStubber.initializeStub(batchWriteCapture, firestoreMock);
BulkWriter bulkWriter =
firestoreMock.bulkWriter(
BulkWriterOptions.builder().setInitialOpsPerSecond(5).build(), timeoutExecutor);
BulkWriterOptions.builder()
.setInitialOpsPerSecond(5)
.setExecutor(timeoutExecutor)
.build());

for (int i = 0; i < 600; ++i) {
bulkWriter.set(firestoreMock.document("coll/doc"), LocalFirestoreHelper.SINGLE_FIELD_MAP);
Expand Down