Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introducing bulk read API through Batcher #99

Merged
merged 7 commits into from Jan 16, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -26,6 +26,7 @@
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.Filters.Filter;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Query;
Expand Down Expand Up @@ -936,6 +937,85 @@ public Batcher<RowMutationEntry, Void> newBulkMutationBatcher(@Nonnull String ta
return stub.newMutateRowsBatcher(tableId);
}

/**
* Reads rows for given tableId in a batch. If the row does not exist, the value will be null.
* This operation should be called with in a single thread.
*
* <p>Sample Code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* List<ApiFuture<Row>> rows = new ArrayList<>();
*
* try (Batcher<ByteString, Row> batcher = bigtableDataClient.newBulkReadRowsBatcher("[TABLE]")) {
* for (String someValue : someCollection) {
* ApiFuture<Row> rowFuture =
* batcher.add(ByteString.copyFromUtf8("[ROW KEY]"));
* rows.add(rowFuture);
* }
*
* // [Optional] Sends collected elements for batching asynchronously.
* batcher.sendOutstanding();
*
* // [Optional] Invokes sendOutstanding() and awaits until all pending entries are resolved.
* batcher.flush();
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
* }
* // batcher.close() invokes `flush()` which will in turn invoke `sendOutstanding()` with await for
* pending batches until its resolved.
*
* List<Row> actualRows = ApiFutures.allAsList(rows).get();
* }
* }</pre>
*/
public Batcher<ByteString, Row> newBulkReadRowsBatcher(String tableId) {
return newBulkReadRowsBatcher(tableId, null);
}

/**
* Reads rows for given tableId and filter criteria in a batch. If the row does not exist, the
* value will be null. This operation should be called with in a single thread.
*
* <p>Sample Code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
*
* // Build the filter expression
* Filter filter = FILTERS.chain()
* .filter(FILTERS.key().regex("prefix.*"))
* .filter(FILTERS.limit().cellsPerRow(10));
*
* List<ApiFuture<Row>> rows = new ArrayList<>();
*
* try (Batcher<ByteString, Row> batcher = bigtableDataClient.newBulkReadRowsBatcher("[TABLE]", filter)) {
* for (String someValue : someCollection) {
* ApiFuture<Row> rowFuture =
* batcher.add(ByteString.copyFromUtf8("[ROW KEY]"));
* rows.add(rowFuture);
* }
*
* // [Optional] Sends collected elements for batching asynchronously.
* batcher.sendOutstanding();
*
* // [Optional] Invokes sendOutstanding() and awaits until all pending entries are resolved.
* batcher.flush();
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
* }
* // batcher.close() invokes `flush()` which will in turn invoke `sendOutstanding()` with await for
* pending batches until its resolved.
*
* List<Row> actualRows = ApiFutures.allAsList(rows).get();
* }
* }</pre>
*/
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
String tableId, @Nullable Filters.Filter filter) {
Query query = Query.create(tableId);
if (filter != null) {
query = query.filter(filter);
}
return stub.newBulkReadRowsBatcher(query);
}

/**
* Convenience method to mutate multiple rows in a batch. Each individual row is mutated
* atomically as in MutateRow, but the entire batch is not executed atomically. This method
Expand Down
Expand Up @@ -274,6 +274,12 @@ public static Query fromProto(@Nonnull ReadRowsRequest request) {
return query;
}

public Query clone() {
Query query = Query.create(tableId);
query.builder = this.builder.clone();
return query;
}

private static ByteString wrapKey(String key) {
if (key == null) {
return null;
Expand Down
@@ -0,0 +1,150 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.BetaApi;
import com.google.api.gax.batching.BatchingCallSettings;
import com.google.api.gax.batching.BatchingDescriptor;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.util.List;
import java.util.Set;
import javax.annotation.Nonnull;

/**
* This settings holds the batching thresholds as well as retry configuration for bulk read API.
*
* <p>Sample configuration:
*
* <pre>{@code
* BigtableBulkReadRowsCallSettings defaultBulkReadCallSettings =
* bigtableDataCallSettings.getStubSettings().bulkReadRowsSettings();
*
* BigtableBulkReadRowsCallSettings customBulkReadCallSettings = defaultBulkReadCallSettings
* .toBuilder()
* .setBatchingSettings(
* defaultBulkReadCallSettings.getBatchingSettings().toBuilder()
* .setDelayThreshold(Duration.ofSeconds(10))
* .build())
* .setRetryableCodes(Code.DEADLINE_EXCEEDED)
* .build();
* }</pre>
*
* @see BatchingSettings for batching thresholds explantion.
* @see RetrySettings for retry configuration.
*/
@BetaApi("This surface is likely to change as the batching surface evolves.")
public class BigtableBulkReadRowsCallSettings extends UnaryCallSettings<Query, List<Row>> {

private final BatchingCallSettings<ByteString, Row, Query, List<Row>> batchingCallSettings;

private BigtableBulkReadRowsCallSettings(Builder builder) {
super(builder);
batchingCallSettings =
BatchingCallSettings.newBuilder(builder.batchingDescriptor)
.setBatchingSettings(builder.batchingSettings)
.setRetrySettings(builder.getRetrySettings())
.setRetryableCodes(builder.getRetryableCodes())
.build();
}

/** Returns batching settings which contains multiple batch threshold levels. */
public BatchingSettings getBatchingSettings() {
return batchingCallSettings.getBatchingSettings();
}

/** Returns an adapter that packs and unpacks batching elements. */
BatchingDescriptor<ByteString, Row, Query, List<Row>> getBatchingDescriptor() {
return batchingCallSettings.getBatchingDescriptor();
}

static BigtableBulkReadRowsCallSettings.Builder newBuilder(
BatchingDescriptor<ByteString, Row, Query, List<Row>> batchingDescriptor) {
return new Builder(batchingDescriptor);
}

/**
* Get a builder with the same values as this object. See the class documentation of {@link
* BigtableBatchingCallSettings} for a sample settings configuration.
*/
@Override
public final BigtableBulkReadRowsCallSettings.Builder toBuilder() {
return new BigtableBulkReadRowsCallSettings.Builder(this);
}

public static class Builder extends UnaryCallSettings.Builder<Query, List<Row>> {

private BatchingDescriptor<ByteString, Row, Query, List<Row>> batchingDescriptor;
private BatchingSettings batchingSettings;

private Builder(
@Nonnull BatchingDescriptor<ByteString, Row, Query, List<Row>> batchingDescriptor) {
this.batchingDescriptor =
Preconditions.checkNotNull(batchingDescriptor, "batching descriptor can't be null");
}

private Builder(@Nonnull BigtableBulkReadRowsCallSettings settings) {
super(settings);
this.batchingDescriptor = settings.getBatchingDescriptor();
this.batchingSettings = settings.getBatchingSettings();
}

/** Sets the batching settings with various thresholds. */
public Builder setBatchingSettings(@Nonnull BatchingSettings batchingSettings) {
Preconditions.checkNotNull(batchingSettings, "batching settings can't be null");
this.batchingSettings = batchingSettings;
return this;
}

/** Returns the {@link BatchingSettings}. */
public BatchingSettings getBatchingSettings() {
return batchingSettings;
}

/** Sets the rpc failure {@link StatusCode.Code code}, for which retries should be performed. */
@Override
public Builder setRetryableCodes(StatusCode.Code... codes) {
super.setRetryableCodes(codes);
return this;
}

/** Sets the rpc failure {@link StatusCode.Code code}, for which retries should be performed. */
@Override
public Builder setRetryableCodes(Set<StatusCode.Code> retryableCodes) {
super.setRetryableCodes(retryableCodes);
return this;
}

/** Sets the {@link RetrySettings} values for each retry attempts. */
@Override
public Builder setRetrySettings(@Nonnull RetrySettings retrySettings) {
super.setRetrySettings(retrySettings);
return this;
}

/** Builds the {@link BigtableBulkReadRowsCallSettings} object with provided configuration. */
@Override
public BigtableBulkReadRowsCallSettings build() {
return new BigtableBulkReadRowsCallSettings(this);
}
}
}
Expand Up @@ -52,12 +52,15 @@
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.tracing.WrappedTracerFactory;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import io.opencensus.stats.Stats;
import io.opencensus.stats.StatsRecorder;
import io.opencensus.tags.Tagger;
Expand Down Expand Up @@ -381,9 +384,8 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
}

/**
* Creates a {@link com.google.api.gax.batching.BatcherImpl} to handle {@link
* MutateRowsRequest.Entry} mutations. This is meant to be used for automatic batching with flow
* control.
* Creates a {@link BatcherImpl} to handle {@link MutateRowsRequest.Entry} mutations. This is
* meant to be used for automatic batching with flow control.
*
* <ul>
* <li>Uses {@link MutateRowsBatchingDescriptor} to spool the {@link RowMutationEntry} mutations
Expand All @@ -409,6 +411,31 @@ public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(@Nonnull String tabl
clientContext.getExecutor());
}

/**
* Creates a {@link BatcherImpl} to handle {@link Query#rowKey(String)}. This is meant for bulk
* read with flow control.
*
* <ul>
* <li>Uses {@link ReadRowsBatchingDescriptor} to merge the row-keys and send them out as {@link
* Query}.
* <li>Uses {@link #readRowsCallable()} to perform RPC.
* <li>Batching thresholds can be configured from {@link
* EnhancedBigtableStubSettings#bulkReadRowsSettings()}.
* <li>Schedule retries for retryable exceptions until there are no more entries or there are no
* more retry attempts left.
* <li>Split the responses using {@link ReadRowsBatchingDescriptor}.
* </ul>
*/
public Batcher<ByteString, Row> newBulkReadRowsBatcher(@Nonnull Query query) {
Preconditions.checkNotNull(query, "query cannot be null");
return new BatcherImpl<>(
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
settings.bulkReadRowsSettings().getBatchingDescriptor(),
readRowsCallable().all(),
query,
settings.bulkReadRowsSettings().getBatchingSettings(),
clientContext.getExecutor());
}

/**
* Internal helper to create the base MutateRows callable chain. The chain is responsible for
* retrying individual entry in case of error.
Expand Down