diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java index e2027545fa..3b287594fc 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java @@ -26,6 +26,7 @@ import com.google.api.gax.rpc.UnaryCallable; import com.google.cloud.bigtable.data.v2.models.BulkMutation; import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; +import com.google.cloud.bigtable.data.v2.models.Filters; import com.google.cloud.bigtable.data.v2.models.Filters.Filter; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.cloud.bigtable.data.v2.models.Query; @@ -936,6 +937,85 @@ public Batcher newBulkMutationBatcher(@Nonnull String ta return stub.newMutateRowsBatcher(tableId); } + /** + * Reads rows for given tableId in a batch. If the row does not exist, the value will be null. + * This operation should be called with in a single thread. + * + *

Sample Code: + * + *

{@code
+   * try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
+   *   List> rows = new ArrayList<>();
+   *
+   *   try (Batcher batcher = bigtableDataClient.newBulkReadRowsBatcher("[TABLE]")) {
+   *     for (String someValue : someCollection) {
+   *       ApiFuture rowFuture =
+   *           batcher.add(ByteString.copyFromUtf8("[ROW KEY]"));
+   *       rows.add(rowFuture);
+   *     }
+   *
+   *     // [Optional] Sends collected elements for batching asynchronously.
+   *     batcher.sendOutstanding();
+   *
+   *     // [Optional] Invokes sendOutstanding() and awaits until all pending entries are resolved.
+   *     batcher.flush();
+   *   }
+   *   // batcher.close() invokes `flush()` which will in turn invoke `sendOutstanding()` with await for
+   *   pending batches until its resolved.
+   *
+   *   List actualRows = ApiFutures.allAsList(rows).get();
+   * }
+   * }
+ */ + public Batcher newBulkReadRowsBatcher(String tableId) { + return newBulkReadRowsBatcher(tableId, null); + } + + /** + * Reads rows for given tableId and filter criteria in a batch. If the row does not exist, the + * value will be null. This operation should be called with in a single thread. + * + *

Sample Code: + * + *

{@code
+   * try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
+   *
+   *  // Build the filter expression
+   *  Filter filter = FILTERS.chain()
+   *         .filter(FILTERS.key().regex("prefix.*"))
+   *         .filter(FILTERS.limit().cellsPerRow(10));
+   *
+   *   List> rows = new ArrayList<>();
+   *
+   *   try (Batcher batcher = bigtableDataClient.newBulkReadRowsBatcher("[TABLE]", filter)) {
+   *     for (String someValue : someCollection) {
+   *       ApiFuture rowFuture =
+   *           batcher.add(ByteString.copyFromUtf8("[ROW KEY]"));
+   *       rows.add(rowFuture);
+   *     }
+   *
+   *     // [Optional] Sends collected elements for batching asynchronously.
+   *     batcher.sendOutstanding();
+   *
+   *     // [Optional] Invokes sendOutstanding() and awaits until all pending entries are resolved.
+   *     batcher.flush();
+   *   }
+   *   // batcher.close() invokes `flush()` which will in turn invoke `sendOutstanding()` with await for
+   *   pending batches until its resolved.
+   *
+   *   List actualRows = ApiFutures.allAsList(rows).get();
+   * }
+   * }
+ */ + public Batcher newBulkReadRowsBatcher( + String tableId, @Nullable Filters.Filter filter) { + Query query = Query.create(tableId); + if (filter != null) { + query = query.filter(filter); + } + return stub.newBulkReadRowsBatcher(query); + } + /** * Convenience method to mutate multiple rows in a batch. Each individual row is mutated * atomically as in MutateRow, but the entire batch is not executed atomically. This method diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java index 2cbb6d6f37..8924556127 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java @@ -274,6 +274,12 @@ public static Query fromProto(@Nonnull ReadRowsRequest request) { return query; } + public Query clone() { + Query query = Query.create(tableId); + query.builder = this.builder.clone(); + return query; + } + private static ByteString wrapKey(String key) { if (key == null) { return null; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableBulkReadRowsCallSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableBulkReadRowsCallSettings.java new file mode 100644 index 0000000000..f4852765db --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableBulkReadRowsCallSettings.java @@ -0,0 +1,150 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.core.BetaApi; +import com.google.api.gax.batching.BatchingCallSettings; +import com.google.api.gax.batching.BatchingDescriptor; +import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.UnaryCallSettings; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import java.util.List; +import java.util.Set; +import javax.annotation.Nonnull; + +/** + * This settings holds the batching thresholds as well as retry configuration for bulk read API. + * + *

Sample configuration: + * + *

{@code
+ * BigtableBulkReadRowsCallSettings defaultBulkReadCallSettings =
+ *     bigtableDataCallSettings.getStubSettings().bulkReadRowsSettings();
+ *
+ * BigtableBulkReadRowsCallSettings customBulkReadCallSettings = defaultBulkReadCallSettings
+ *     .toBuilder()
+ *     .setBatchingSettings(
+ *         defaultBulkReadCallSettings.getBatchingSettings().toBuilder()
+ *             .setDelayThreshold(Duration.ofSeconds(10))
+ *             .build())
+ *     .setRetryableCodes(Code.DEADLINE_EXCEEDED)
+ *     .build();
+ * }
+ * + * @see BatchingSettings for batching thresholds explantion. + * @see RetrySettings for retry configuration. + */ +@BetaApi("This surface is likely to change as the batching surface evolves.") +public class BigtableBulkReadRowsCallSettings extends UnaryCallSettings> { + + private final BatchingCallSettings> batchingCallSettings; + + private BigtableBulkReadRowsCallSettings(Builder builder) { + super(builder); + batchingCallSettings = + BatchingCallSettings.newBuilder(builder.batchingDescriptor) + .setBatchingSettings(builder.batchingSettings) + .setRetrySettings(builder.getRetrySettings()) + .setRetryableCodes(builder.getRetryableCodes()) + .build(); + } + + /** Returns batching settings which contains multiple batch threshold levels. */ + public BatchingSettings getBatchingSettings() { + return batchingCallSettings.getBatchingSettings(); + } + + /** Returns an adapter that packs and unpacks batching elements. */ + BatchingDescriptor> getBatchingDescriptor() { + return batchingCallSettings.getBatchingDescriptor(); + } + + static BigtableBulkReadRowsCallSettings.Builder newBuilder( + BatchingDescriptor> batchingDescriptor) { + return new Builder(batchingDescriptor); + } + + /** + * Get a builder with the same values as this object. See the class documentation of {@link + * BigtableBatchingCallSettings} for a sample settings configuration. + */ + @Override + public final BigtableBulkReadRowsCallSettings.Builder toBuilder() { + return new BigtableBulkReadRowsCallSettings.Builder(this); + } + + public static class Builder extends UnaryCallSettings.Builder> { + + private BatchingDescriptor> batchingDescriptor; + private BatchingSettings batchingSettings; + + private Builder( + @Nonnull BatchingDescriptor> batchingDescriptor) { + this.batchingDescriptor = + Preconditions.checkNotNull(batchingDescriptor, "batching descriptor can't be null"); + } + + private Builder(@Nonnull BigtableBulkReadRowsCallSettings settings) { + super(settings); + this.batchingDescriptor = settings.getBatchingDescriptor(); + this.batchingSettings = settings.getBatchingSettings(); + } + + /** Sets the batching settings with various thresholds. */ + public Builder setBatchingSettings(@Nonnull BatchingSettings batchingSettings) { + Preconditions.checkNotNull(batchingSettings, "batching settings can't be null"); + this.batchingSettings = batchingSettings; + return this; + } + + /** Returns the {@link BatchingSettings}. */ + public BatchingSettings getBatchingSettings() { + return batchingSettings; + } + + /** Sets the rpc failure {@link StatusCode.Code code}, for which retries should be performed. */ + @Override + public Builder setRetryableCodes(StatusCode.Code... codes) { + super.setRetryableCodes(codes); + return this; + } + + /** Sets the rpc failure {@link StatusCode.Code code}, for which retries should be performed. */ + @Override + public Builder setRetryableCodes(Set retryableCodes) { + super.setRetryableCodes(retryableCodes); + return this; + } + + /** Sets the {@link RetrySettings} values for each retry attempts. */ + @Override + public Builder setRetrySettings(@Nonnull RetrySettings retrySettings) { + super.setRetrySettings(retrySettings); + return this; + } + + /** Builds the {@link BigtableBulkReadRowsCallSettings} object with provided configuration. */ + @Override + public BigtableBulkReadRowsCallSettings build() { + return new BigtableBulkReadRowsCallSettings(this); + } + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 301bd3eff4..db76aca681 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -52,12 +52,15 @@ import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable; +import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable; import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm; import com.google.cloud.bigtable.gaxx.tracing.WrappedTracerFactory; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; import io.opencensus.stats.Stats; import io.opencensus.stats.StatsRecorder; import io.opencensus.tags.Tagger; @@ -381,9 +384,8 @@ private UnaryCallable createBulkMutateRowsCallable() { } /** - * Creates a {@link com.google.api.gax.batching.BatcherImpl} to handle {@link - * MutateRowsRequest.Entry} mutations. This is meant to be used for automatic batching with flow - * control. + * Creates a {@link BatcherImpl} to handle {@link MutateRowsRequest.Entry} mutations. This is + * meant to be used for automatic batching with flow control. * *
    *
  • Uses {@link MutateRowsBatchingDescriptor} to spool the {@link RowMutationEntry} mutations @@ -409,6 +411,31 @@ public Batcher newMutateRowsBatcher(@Nonnull String tabl clientContext.getExecutor()); } + /** + * Creates a {@link BatcherImpl} to handle {@link Query#rowKey(String)}. This is meant for bulk + * read with flow control. + * + *
      + *
    • Uses {@link ReadRowsBatchingDescriptor} to merge the row-keys and send them out as {@link + * Query}. + *
    • Uses {@link #readRowsCallable()} to perform RPC. + *
    • Batching thresholds can be configured from {@link + * EnhancedBigtableStubSettings#bulkReadRowsSettings()}. + *
    • Schedule retries for retryable exceptions until there are no more entries or there are no + * more retry attempts left. + *
    • Split the responses using {@link ReadRowsBatchingDescriptor}. + *
    + */ + public Batcher newBulkReadRowsBatcher(@Nonnull Query query) { + Preconditions.checkNotNull(query, "query cannot be null"); + return new BatcherImpl<>( + settings.bulkReadRowsSettings().getBatchingDescriptor(), + readRowsCallable().all(), + query, + settings.bulkReadRowsSettings().getBatchingSettings(), + clientContext.getExecutor()); + } + /** * Internal helper to create the base MutateRows callable chain. The chain is responsible for * retrying individual entry in case of error. diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 4fc544061f..682b1fe757 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -16,6 +16,7 @@ package com.google.cloud.bigtable.data.v2.stub; import com.google.api.core.BetaApi; +import com.google.api.gax.batching.BatchingCallSettings; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController.LimitExceededBehavior; @@ -38,6 +39,7 @@ import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor; +import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -158,6 +160,7 @@ public class EnhancedBigtableStubSettings extends StubSettings> sampleRowKeysSettings; private final UnaryCallSettings mutateRowSettings; private final BigtableBatchingCallSettings bulkMutateRowsSettings; + private final BigtableBulkReadRowsCallSettings bulkReadRowsSettings; private final UnaryCallSettings checkAndMutateRowSettings; private final UnaryCallSettings readModifyWriteRowSettings; @@ -170,14 +173,20 @@ private EnhancedBigtableStubSettings(Builder builder) { + " This is currently an experimental feature and should not be used in production."); } - // Since point reads & streaming reads share the same base callable that converts grpc errors - // into ApiExceptions, they must have the same retry codes. + // Since point reads, streaming reads, bulk reads share the same base callable that converts + // grpc errors into ApiExceptions, they must have the same retry codes. Preconditions.checkState( builder .readRowSettings .getRetryableCodes() .equals(builder.readRowsSettings.getRetryableCodes()), "Single ReadRow retry codes must match ReadRows retry codes"); + Preconditions.checkState( + builder + .bulkReadRowsSettings + .getRetryableCodes() + .equals(builder.readRowsSettings.getRetryableCodes()), + "Bulk ReadRow retry codes must match ReadRows retry codes"); projectId = builder.projectId; instanceId = builder.instanceId; @@ -190,6 +199,7 @@ private EnhancedBigtableStubSettings(Builder builder) { sampleRowKeysSettings = builder.sampleRowKeysSettings.build(); mutateRowSettings = builder.mutateRowSettings.build(); bulkMutateRowsSettings = builder.bulkMutateRowsSettings.build(); + bulkReadRowsSettings = builder.bulkReadRowsSettings.build(); checkAndMutateRowSettings = builder.checkAndMutateRowSettings.build(); readModifyWriteRowSettings = builder.readModifyWriteRowSettings.build(); } @@ -223,11 +233,15 @@ public boolean isRefreshingChannel() { /** Returns a builder for the default ChannelProvider for this service. */ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() { return BigtableStubSettings.defaultGrpcTransportProviderBuilder() - // TODO: tune channels - .setChannelsPerCpu(2) + .setPoolSize(getDefaultChannelPoolSize()) .setMaxInboundMessageSize(MAX_MESSAGE_SIZE); } + static int getDefaultChannelPoolSize() { + // TODO: tune channels + return 2 * Runtime.getRuntime().availableProcessors(); + } + @SuppressWarnings("WeakerAccess") public static TransportChannelProvider defaultTransportChannelProvider() { return defaultGrpcTransportProviderBuilder().build(); @@ -347,8 +361,8 @@ public UnaryCallSettings mutateRowSettings() { *

    Default retry and timeout settings: * *

      - *
    • Retry {@link com.google.api.gax.batching.BatchingCallSettings.Builder#setRetryableCodes - * error codes} are: {@link Code#DEADLINE_EXCEEDED} and {@link Code#UNAVAILABLE}. + *
    • Retry {@link BatchingCallSettings.Builder#setRetryableCodes error codes} are: {@link + * Code#DEADLINE_EXCEEDED} and {@link Code#UNAVAILABLE}. *
    • RetryDelay between failed attempts {@link RetrySettings.Builder#setInitialRetryDelay * starts} at 10ms and {@link RetrySettings.Builder#setRetryDelayMultiplier increases * exponentially} by a factor of 2 until a {@link RetrySettings.Builder#setMaxRetryDelay @@ -384,6 +398,47 @@ public BigtableBatchingCallSettings bulkMutateRowsSettings() { return bulkMutateRowsSettings; } + /** + * Returns the call settings used for bulk read rows. + * + *

      Default retry and timeout settings: + * + *

        + *
      • Retry {@link BatchingCallSettings.Builder#setRetryableCodes error codes} are: {@link + * Code#DEADLINE_EXCEEDED}, {@link Code#UNAVAILABLE} and {@link Code#ABORTED}. + *
      • RetryDelay between failed attempts {@link RetrySettings.Builder#setInitialRetryDelay + * starts} at 10ms and {@link RetrySettings.Builder#setRetryDelayMultiplier increases + * exponentially} by a factor of 2 until a {@link RetrySettings.Builder#setMaxRetryDelay + * maximum of} 1 minute. + *
      • The default timeout for {@link RetrySettings.Builder#setMaxRpcTimeout each attempt} is 5 + * minute and the timeout for the {@link RetrySettings.Builder#setTotalTimeout entire + * operation} across all of the attempts is 10 mins. + *
      + * + *

      On breach of certain triggers, the operation initiates processing of accumulated request for + * which the default settings are: + * + *

        + *
      • When the {@link BatchingSettings.Builder#setElementCountThreshold request count} reaches + * 100. + *
      • When accumulated {@link BatchingSettings.Builder#setRequestByteThreshold request size} + * reaches to 400KB. + *
      • When an {@link BatchingSettings.Builder#setDelayThreshold interval of} 1 second passes + * after batching initialization or last processed batch. + *
      + * + *

      When the pending {@link FlowControlSettings.Builder#setMaxOutstandingElementCount request + * count} reaches a default of 1000 outstanding row keys per channel then this operation will by + * default be {@link FlowControlSettings.Builder#setLimitExceededBehavior blocked} until some of + * the pending batch are resolved. + * + * @see RetrySettings for more explanation. + * @see BatchingSettings for batch related configuration explanation. + */ + public BigtableBulkReadRowsCallSettings bulkReadRowsSettings() { + return bulkReadRowsSettings; + } + /** * Returns the object with the settings used for calls to CheckAndMutateRow. * @@ -430,6 +485,7 @@ public static class Builder extends StubSettings.Builder> sampleRowKeysSettings; private final UnaryCallSettings.Builder mutateRowSettings; private final BigtableBatchingCallSettings.Builder bulkMutateRowsSettings; + private final BigtableBulkReadRowsCallSettings.Builder bulkReadRowsSettings; private final UnaryCallSettings.Builder checkAndMutateRowSettings; private final UnaryCallSettings.Builder readModifyWriteRowSettings; @@ -515,6 +571,28 @@ private Builder() { .build()) .build()); + long maxBulkReadElementPerBatch = 100L; + long maxBulkReadRequestSizePerBatch = 400L * 1024L; + // Enables bulkRead to support 10 outstanding batches per channel + long maxBulkReadOutstandingElementCount = + 10L * maxBulkReadElementPerBatch * getDefaultChannelPoolSize(); + + bulkReadRowsSettings = + BigtableBulkReadRowsCallSettings.newBuilder(new ReadRowsBatchingDescriptor()) + .setRetryableCodes(readRowsSettings.getRetryableCodes()) + .setRetrySettings(IDEMPOTENT_RETRY_SETTINGS) + .setBatchingSettings( + BatchingSettings.newBuilder() + .setElementCountThreshold(maxBulkReadElementPerBatch) + .setRequestByteThreshold(maxBulkReadRequestSizePerBatch) + .setDelayThreshold(Duration.ofSeconds(1)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(LimitExceededBehavior.Block) + .setMaxOutstandingElementCount(maxBulkReadOutstandingElementCount) + .build()) + .build()); + checkAndMutateRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder(); copyRetrySettings(baseDefaults.checkAndMutateRowSettings(), checkAndMutateRowSettings); @@ -535,6 +613,7 @@ private Builder(EnhancedBigtableStubSettings settings) { sampleRowKeysSettings = settings.sampleRowKeysSettings.toBuilder(); mutateRowSettings = settings.mutateRowSettings.toBuilder(); bulkMutateRowsSettings = settings.bulkMutateRowsSettings.toBuilder(); + bulkReadRowsSettings = settings.bulkReadRowsSettings.toBuilder(); checkAndMutateRowSettings = settings.checkAndMutateRowSettings.toBuilder(); readModifyWriteRowSettings = settings.readModifyWriteRowSettings.toBuilder(); } @@ -673,6 +752,11 @@ public BigtableBatchingCallSettings.Builder bulkMutateRowsSettings() { return bulkMutateRowsSettings; } + /** Returns the builder for the settings used for calls to MutateRows. */ + public BigtableBulkReadRowsCallSettings.Builder bulkReadRowsSettings() { + return bulkReadRowsSettings; + } + /** Returns the builder for the settings used for calls to CheckAndMutateRow. */ public UnaryCallSettings.Builder checkAndMutateRowSettings() { return checkAndMutateRowSettings; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsBatchingDescriptor.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsBatchingDescriptor.java new file mode 100644 index 0000000000..36a9dec1e7 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsBatchingDescriptor.java @@ -0,0 +1,86 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.readrows; + +import com.google.api.core.InternalApi; +import com.google.api.gax.batching.BatchEntry; +import com.google.api.gax.batching.BatchingDescriptor; +import com.google.api.gax.batching.BatchingRequestBuilder; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.protobuf.ByteString; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Implementation for {@link BatchingDescriptor} to split batch response or exception into + * individual row request. + * + *

      This class is considered an internal implementation detail and not meant to be used by + * applications directly. + */ +@InternalApi("For internal use only") +public class ReadRowsBatchingDescriptor + implements BatchingDescriptor> { + + @Override + public BatchingRequestBuilder newRequestBuilder(Query query) { + return new BulkReadRequestBuilder(query); + } + + @Override + public void splitResponse(List rowsResponse, List> entries) { + Map rowKeys = new HashMap<>(); + for (Row row : rowsResponse) { + rowKeys.put(row.getKey(), row); + } + + for (BatchEntry entry : entries) { + entry.getResultFuture().set(rowKeys.get(entry.getElement())); + } + } + + @Override + public void splitException(Throwable throwable, List> entries) { + for (BatchEntry resultEntry : entries) { + resultEntry.getResultFuture().setException(throwable); + } + } + + @Override + public long countBytes(ByteString bytes) { + return bytes.size(); + } + + static class BulkReadRequestBuilder implements BatchingRequestBuilder { + private final Query query; + + BulkReadRequestBuilder(Query query) { + this.query = query.clone(); + } + + @Override + public void add(ByteString rowKey) { + query.rowKey(rowKey); + } + + @Override + public Query build() { + return query; + } + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java index 1754f8f678..b83800c565 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java @@ -65,7 +65,8 @@ public class BigtableDataClientTest { @Mock private UnaryCallable mockCheckAndMutateRowCallable; @Mock private UnaryCallable mockReadModifyWriteRowCallable; @Mock private UnaryCallable mockBulkMutateRowsCallable; - @Mock private Batcher mockBulkMutationbatcher; + @Mock private Batcher mockBulkMutationBatcher; + @Mock private Batcher mockBulkReadRowsBatcher; private BigtableDataClient bigtableDataClient; @@ -80,7 +81,9 @@ public void setUp() { Mockito.when(mockStub.checkAndMutateRowCallable()).thenReturn(mockCheckAndMutateRowCallable); Mockito.when(mockStub.readModifyWriteRowCallable()).thenReturn(mockReadModifyWriteRowCallable); Mockito.when(mockStub.newMutateRowsBatcher(Mockito.any(String.class))) - .thenReturn(mockBulkMutationbatcher); + .thenReturn(mockBulkMutationBatcher); + Mockito.when(mockStub.newBulkReadRowsBatcher(Mockito.any(Query.class))) + .thenReturn(mockBulkReadRowsBatcher); } @Test @@ -315,7 +318,7 @@ public void proxyNewBulkMutationBatcherTest() { bigtableDataClient.newBulkMutationBatcher("fake-table"); RowMutationEntry request = RowMutationEntry.create("some-key").setCell("some-family", "fake-qualifier", "fake-value"); - Mockito.when(mockBulkMutationbatcher.add(request)).thenReturn(expectedResponse); + Mockito.when(mockBulkMutationBatcher.add(request)).thenReturn(expectedResponse); ApiFuture actualRes = batcher.add(request); assertThat(actualRes).isSameInstanceAs(expectedResponse); @@ -323,6 +326,39 @@ public void proxyNewBulkMutationBatcherTest() { Mockito.verify(mockStub).newMutateRowsBatcher(Mockito.any(String.class)); } + @Test + public void proxyNewBulkReadRowsTest() { + ApiFuture expectedResponse = + ApiFutures.immediateFuture( + Row.create(ByteString.copyFromUtf8("fake-row-key"), Collections.emptyList())); + ByteString request = ByteString.copyFromUtf8("fake-row-key"); + + Batcher batcher = bigtableDataClient.newBulkReadRowsBatcher("fake-table"); + Mockito.when(mockBulkReadRowsBatcher.add(request)).thenReturn(expectedResponse); + + ApiFuture actualResponse = batcher.add(request); + assertThat(actualResponse).isSameInstanceAs(expectedResponse); + + Mockito.verify(mockStub).newBulkReadRowsBatcher(Mockito.any(Query.class)); + } + + @Test + public void proxyNewBulkReadRowsWithFilterTest() { + ApiFuture expectedResponse = + ApiFutures.immediateFuture( + Row.create(ByteString.copyFromUtf8("fake-row-key"), Collections.emptyList())); + ByteString request = ByteString.copyFromUtf8("fake-row-key"); + + Batcher batcher = + bigtableDataClient.newBulkReadRowsBatcher("fake-table", FILTERS.key().regex("fake-row")); + Mockito.when(mockBulkReadRowsBatcher.add(request)).thenReturn(expectedResponse); + + ApiFuture actualResponse = batcher.add(request); + assertThat(actualResponse).isSameInstanceAs(expectedResponse); + + Mockito.verify(mockStub).newBulkReadRowsBatcher(Mockito.any(Query.class)); + } + @Test public void proxyCheckAndMutateRowCallableTest() { assertThat(bigtableDataClient.checkAndMutateRowCallable()) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/BulkReadIT.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/BulkReadIT.java new file mode 100644 index 0000000000..99c14ccc4f --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/BulkReadIT.java @@ -0,0 +1,105 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.it; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.gax.batching.Batcher; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.models.BulkMutation; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.models.RowCell; +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import com.google.cloud.bigtable.test_helpers.env.TestEnvRule; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class BulkReadIT { + + @ClassRule public static TestEnvRule testEnvRule = new TestEnvRule(); + + @Test + public void testBulkRead() throws InterruptedException, ExecutionException { + BigtableDataClient client = testEnvRule.env().getDataClient(); + String family = testEnvRule.env().getFamilyId(); + String rowPrefix = UUID.randomUUID().toString(); + int numRows = 10; + + BulkMutation bulkMutation = BulkMutation.create(testEnvRule.env().getTableId()); + List expectedRows = new ArrayList<>(); + + for (int i = 0; i < numRows; i++) { + bulkMutation.add( + RowMutationEntry.create(rowPrefix + "-" + i) + .setCell(family, "qualifier", 10_000L, "value-" + i)); + expectedRows.add( + Row.create( + ByteString.copyFromUtf8(rowPrefix + "-" + i), + ImmutableList.of( + RowCell.create( + family, + ByteString.copyFromUtf8("qualifier"), + 10_000L, + ImmutableList.of(), + ByteString.copyFromUtf8("value-" + i))))); + } + client.bulkMutateRows(bulkMutation); + + try (Batcher batcher = + client.newBulkReadRowsBatcher(testEnvRule.env().getTableId())) { + + List> rowFutures = new ArrayList<>(numRows); + + for (int rowCount = 0; rowCount < numRows; rowCount++) { + ApiFuture entryResponse = + batcher.add(ByteString.copyFromUtf8(rowPrefix + "-" + rowCount)); + + rowFutures.add(entryResponse); + } + + batcher.flush(); + List actualRows = ApiFutures.allAsList(rowFutures).get(); + assertThat(actualRows).isEqualTo(expectedRows); + + // To verify non-existent and duplicate row keys + rowFutures = new ArrayList<>(); + + // non-existent row key + rowFutures.add(batcher.add(ByteString.copyFromUtf8(UUID.randomUUID().toString()))); + + // duplicate row key + rowFutures.add(batcher.add(ByteString.copyFromUtf8(rowPrefix + "-" + 0))); + rowFutures.add(batcher.add(ByteString.copyFromUtf8(rowPrefix + "-" + 0))); + + batcher.flush(); + actualRows = ApiFutures.allAsList(rowFutures).get(); + assertThat(actualRows.get(0)).isNull(); + assertThat(actualRows.get(1)).isEqualTo(expectedRows.get(0)); + assertThat(actualRows.get(2)).isEqualTo(expectedRows.get(0)); + } + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/QueryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/QueryTest.java index 7cf4246943..b073f979ba 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/QueryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/QueryTest.java @@ -286,4 +286,23 @@ public void testEquality() { .isNotEqualTo(Query.create(TABLE_ID).filter(FILTERS.family().exactMatch("test-one"))); assertThat(Query.create(TABLE_ID).limit(4)).isNotEqualTo(Query.create(TABLE_ID).limit(5)); } + + @Test + public void testClone() { + Query query = Query.create(TABLE_ID).filter(FILTERS.key().regex("temp")).limit(10); + ReadRowsRequest request = + ReadRowsRequest.newBuilder() + .setTableName(NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID)) + .setAppProfileId(APP_PROFILE_ID) + .setRowsLimit(10) + .setFilter( + RowFilter.newBuilder() + .setRowKeyRegexFilter(ByteString.copyFromUtf8("temp")) + .build()) + .build(); + + Query clonedReq = query.clone(); + assertThat(clonedReq).isEqualTo(query); + assertThat(clonedReq.toProto(requestContext)).isEqualTo(request); + } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableBulkReadRowsCallSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableBulkReadRowsCallSettingsTest.java new file mode 100644 index 0000000000..a343bf4667 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableBulkReadRowsCallSettingsTest.java @@ -0,0 +1,113 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.StatusCode; +import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor; +import com.google.common.collect.ImmutableSet; +import java.util.Set; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; + +@RunWith(JUnit4.class) +public class BigtableBulkReadRowsCallSettingsTest { + + private static final BatchingSettings BATCHING_SETTINGS = + BatchingSettings.newBuilder() + .setElementCountThreshold(10L) + .setRequestByteThreshold(20L) + .setDelayThreshold(Duration.ofMillis(5)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(100L) + .setMaxOutstandingRequestBytes(100L) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) + .build()) + .build(); + + @Test + public void testEmptyBuilder() { + BigtableBulkReadRowsCallSettings.Builder builder = + BigtableBulkReadRowsCallSettings.newBuilder(new ReadRowsBatchingDescriptor()); + assertThat(builder.getBatchingSettings()).isNull(); + assertThat(builder.getRetryableCodes()).isEmpty(); + assertThat(builder.getRetrySettings()).isNotNull(); + } + + @Test + public void testBuilder() { + BigtableBulkReadRowsCallSettings.Builder builder = + BigtableBulkReadRowsCallSettings.newBuilder(new ReadRowsBatchingDescriptor()); + + Set retryCodes = ImmutableSet.of(StatusCode.Code.UNAVAILABLE); + RetrySettings retrySettings = RetrySettings.newBuilder().build(); + builder + .setBatchingSettings(BATCHING_SETTINGS) + .setRetryableCodes(retryCodes) + .setRetrySettings(retrySettings); + + BigtableBulkReadRowsCallSettings settings = builder.build(); + assertThat(settings.getBatchingSettings()).isEqualTo(BATCHING_SETTINGS); + assertThat(settings.getRetryableCodes()).isEqualTo(retryCodes); + assertThat(settings.getRetrySettings()).isEqualTo(retrySettings); + } + + @Test + public void testBuilderFromSettings() { + BigtableBulkReadRowsCallSettings.Builder builder = + BigtableBulkReadRowsCallSettings.newBuilder(new ReadRowsBatchingDescriptor()); + RetrySettings retrySettings = + RetrySettings.newBuilder().setTotalTimeout(Duration.ofMinutes(1)).build(); + builder + .setBatchingSettings(BATCHING_SETTINGS) + .setRetryableCodes(StatusCode.Code.UNAVAILABLE, StatusCode.Code.UNAUTHENTICATED) + .setRetrySettings(retrySettings); + + BigtableBulkReadRowsCallSettings settings = builder.build(); + BigtableBulkReadRowsCallSettings.Builder newBuilder = settings.toBuilder(); + + assertThat(newBuilder.getBatchingSettings()).isEqualTo(BATCHING_SETTINGS); + assertThat(newBuilder.getRetryableCodes()) + .containsExactly(StatusCode.Code.UNAVAILABLE, StatusCode.Code.UNAUTHENTICATED); + assertThat(newBuilder.getRetrySettings()).isEqualTo(retrySettings); + } + + @Test + public void testMandatorySettings() { + Exception actualEx = null; + try { + BigtableBulkReadRowsCallSettings.newBuilder(null); + } catch (Exception ex) { + actualEx = ex; + } + assertThat(actualEx).isInstanceOf(NullPointerException.class); + actualEx = null; + try { + BigtableBulkReadRowsCallSettings.newBuilder(new ReadRowsBatchingDescriptor()).build(); + } catch (Exception ex) { + actualEx = ex; + } + assertThat(actualEx).isInstanceOf(IllegalStateException.class); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java index b9b6a69cde..ec759c0bf0 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java @@ -193,8 +193,9 @@ public void readRowsIsNotLostTest() { .setRetrySettings(retrySettings) .build(); - // Point readRow settings must match streaming settings + // Point readRow & bulk read settings must match streaming settings builder.readRowSettings().setRetryableCodes(Code.ABORTED, Code.DEADLINE_EXCEEDED); + builder.bulkReadRowsSettings().setRetryableCodes(Code.ABORTED, Code.DEADLINE_EXCEEDED); assertThat(builder.readRowsSettings().getIdleTimeout()).isEqualTo(Duration.ofMinutes(5)); assertThat(builder.readRowsSettings().getRetryableCodes()) @@ -249,8 +250,9 @@ public void readRowIsNotLostTest() { .setRetrySettings(retrySettings) .build(); - // Streaming readRows settings must match point lookup settings. + // Streaming readRows & bulk read settings must match point lookup settings. builder.readRowsSettings().setRetryableCodes(Code.ABORTED, Code.DEADLINE_EXCEEDED); + builder.bulkReadRowsSettings().setRetryableCodes(Code.ABORTED, Code.DEADLINE_EXCEEDED); assertThat(builder.readRowSettings().getRetryableCodes()) .containsAtLeast(Code.ABORTED, Code.DEADLINE_EXCEEDED); @@ -297,6 +299,7 @@ public void readRowRetryCodesMustMatch() { assertThat(actualError).isNotNull(); builder.readRowSettings().setRetryableCodes(Code.DEADLINE_EXCEEDED); + builder.bulkReadRowsSettings().setRetryableCodes(Code.DEADLINE_EXCEEDED); actualError = null; try { @@ -451,6 +454,59 @@ public void bulkMutateRowsSettingsAreNotLostTest() { .isSameInstanceAs(batchingSettings); } + @Test + public void bulkReadRowsSettingsAreNotLostTest() { + String dummyProjectId = "my-project"; + String dummyInstanceId = "my-instance"; + + EnhancedBigtableStubSettings.Builder builder = + EnhancedBigtableStubSettings.newBuilder() + .setProjectId(dummyProjectId) + .setInstanceId(dummyInstanceId); + + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setMaxAttempts(10) + .setTotalTimeout(Duration.ofHours(1)) + .setInitialRpcTimeout(Duration.ofSeconds(10)) + .setRpcTimeoutMultiplier(1) + .setMaxRpcTimeout(Duration.ofSeconds(10)) + .setJittered(true) + .build(); + + BatchingSettings batchingSettings = BatchingSettings.newBuilder().build(); + + builder + .bulkReadRowsSettings() + .setRetryableCodes(Code.ABORTED, Code.DEADLINE_EXCEEDED) + .setRetrySettings(retrySettings) + .setBatchingSettings(batchingSettings) + .build(); + + // Point read & streaming readRows settings must match point lookup settings. + builder.readRowSettings().setRetryableCodes(Code.ABORTED, Code.DEADLINE_EXCEEDED); + builder.readRowsSettings().setRetryableCodes(Code.ABORTED, Code.DEADLINE_EXCEEDED); + + assertThat(builder.bulkReadRowsSettings().getRetryableCodes()) + .containsAtLeast(Code.ABORTED, Code.DEADLINE_EXCEEDED); + assertThat(builder.bulkReadRowsSettings().getRetrySettings()).isEqualTo(retrySettings); + assertThat(builder.bulkReadRowsSettings().getBatchingSettings()) + .isSameInstanceAs(batchingSettings); + + assertThat(builder.build().bulkReadRowsSettings().getRetryableCodes()) + .containsAtLeast(Code.ABORTED, Code.DEADLINE_EXCEEDED); + assertThat(builder.build().bulkReadRowsSettings().getRetrySettings()).isEqualTo(retrySettings); + assertThat(builder.build().bulkReadRowsSettings().getBatchingSettings()) + .isSameInstanceAs(batchingSettings); + + assertThat(builder.build().toBuilder().bulkReadRowsSettings().getRetryableCodes()) + .containsAtLeast(Code.ABORTED, Code.DEADLINE_EXCEEDED); + assertThat(builder.build().toBuilder().bulkReadRowsSettings().getRetrySettings()) + .isEqualTo(retrySettings); + assertThat(builder.build().toBuilder().bulkReadRowsSettings().getBatchingSettings()) + .isSameInstanceAs(batchingSettings); + } + @Test public void mutateRowsHasSaneDefaultsTest() { BigtableBatchingCallSettings.Builder builder = diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsBatchingDescriptorTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsBatchingDescriptorTest.java new file mode 100644 index 0000000000..4bcf17674c --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsBatchingDescriptorTest.java @@ -0,0 +1,135 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.readrows; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.batching.BatchEntry; +import com.google.api.gax.batching.BatchingRequestBuilder; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.cloud.bigtable.data.v2.internal.RequestContext; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.models.RowCell; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import java.util.List; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ReadRowsBatchingDescriptorTest { + + private static final RowCell ROW_CELL = + RowCell.create( + "cf", + ByteString.copyFromUtf8("qualifier"), + 10000, + ImmutableList.of("label-1", "label-2"), + ByteString.copyFromUtf8("qualifier")); + private static final Row ROW_KEY_1_RESPONSE = + Row.create(ByteString.copyFromUtf8("row-key-1"), ImmutableList.of(ROW_CELL)); + private static final Row ROW_KEY_2_RESPONSE = + Row.create(ByteString.copyFromUtf8("row-key-2"), ImmutableList.of(ROW_CELL)); + + private ReadRowsBatchingDescriptor underTest = new ReadRowsBatchingDescriptor(); + + @Test + public void splitResponseTest() throws Exception { + List> batchEntries = createBatchEntries("row-key-1", "row-key-2"); + underTest.splitResponse(ImmutableList.of(ROW_KEY_1_RESPONSE, ROW_KEY_2_RESPONSE), batchEntries); + + assertThat(batchEntries.get(0).getResultFuture().get()).isEqualTo(ROW_KEY_1_RESPONSE); + assertThat(batchEntries.get(1).getResultFuture().get()).isEqualTo(ROW_KEY_2_RESPONSE); + } + + @Test + public void splitResponseWithDuplicateKeysTest() throws Exception { + List> batchEntries = + createBatchEntries("row-key-1", "row-key-2", "row-key-1", "row-key-1"); + + underTest.splitResponse(ImmutableList.of(ROW_KEY_1_RESPONSE, ROW_KEY_2_RESPONSE), batchEntries); + + assertThat(batchEntries.get(0).getResultFuture().get()).isEqualTo(ROW_KEY_1_RESPONSE); + assertThat(batchEntries.get(1).getResultFuture().get()).isEqualTo(ROW_KEY_2_RESPONSE); + assertThat(batchEntries.get(2).getResultFuture().get()).isEqualTo(ROW_KEY_1_RESPONSE); + assertThat(batchEntries.get(3).getResultFuture().get()).isEqualTo(ROW_KEY_1_RESPONSE); + } + + @Test + public void splitResponseWithNonExistent() throws Exception { + List> batchEntries = + createBatchEntries("non-existent-1", "non-existent-2", "row-key-1"); + + underTest.splitResponse(ImmutableList.of(ROW_KEY_1_RESPONSE), batchEntries); + + assertThat(batchEntries.size()).isEqualTo(3); + assertThat(batchEntries.get(0).getResultFuture().get()).isNull(); + assertThat(batchEntries.get(1).getResultFuture().get()).isNull(); + assertThat(batchEntries.get(2).getResultFuture().get()).isEqualTo(ROW_KEY_1_RESPONSE); + } + + @Test + public void splitExceptionTest() { + RuntimeException expectedException = new RuntimeException("cannot scan the table"); + List> batchEntries = createBatchEntries("row-key-1", "row-key-2"); + underTest.splitException(expectedException, batchEntries); + for (BatchEntry resultEntry : batchEntries) { + try { + resultEntry.getResultFuture().get(); + } catch (Exception actualEx) { + assertThat(actualEx).hasCauseThat().isEqualTo(expectedException); + } + } + } + + @Test + public void countBytesTest() { + ByteString rowKey = ByteString.copyFromUtf8("testRowKey"); + long len = underTest.countBytes(rowKey); + assertThat(len).isEqualTo(rowKey.size()); + } + + @Test + public void requestBuilderTest() { + BatchingRequestBuilder requestBuilder = + underTest.newRequestBuilder(Query.create("table-Id")); + requestBuilder.add(ByteString.copyFromUtf8("row-key-1")); + requestBuilder.add(ByteString.copyFromUtf8("row-key-2")); + + Query request = requestBuilder.build(); + + ReadRowsRequest readRowsRequest = + request.toProto(RequestContext.create("project", "instance", "appProfile")); + assertThat(readRowsRequest.getTableName()).contains("table-Id"); + assertThat(readRowsRequest.getRows().getRowKeysList()) + .isEqualTo( + ImmutableList.of( + ByteString.copyFromUtf8("row-key-1"), ByteString.copyFromUtf8("row-key-2"))); + } + + private List> createBatchEntries(String... rowKeys) { + ImmutableList.Builder> builder = ImmutableList.builder(); + + for (String rowKey : rowKeys) { + builder.add( + BatchEntry.create(ByteString.copyFromUtf8(rowKey), SettableApiFuture.create())); + } + return builder.build(); + } +}