Skip to content

Commit

Permalink
Updated as per gax BatchingDescriptor changes
Browse files Browse the repository at this point in the history
This is still in progess.

Adding these changes to showcase the implementation, test cases for BulkRead API.
  • Loading branch information
rahulKQL committed Dec 16, 2019
1 parent fdcf724 commit c2dd5fe
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 42 deletions.
Expand Up @@ -945,16 +945,52 @@ public Batcher<RowMutationEntry, Void> newBulkMutationBatcher(@Nonnull String ta
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* Filters.Filter filter = Filters.FILTERS.chain().filter()..;
* List<ApiFuture<Row>> rows = new ArrayList<>();
*
* try (Batcher<ByteString, Row> batcher = bigtableDataClient.newBulkReadRowsBatcher("[TABLE]", filter )) {
* try (Batcher<ByteString, Row> batcher = bigtableDataClient.newBulkReadRowsBatcher("[TABLE]")) {
* for (String someValue : someCollection) {
* ApiFuture<Row> rowFuture =
* batcher.add(ByteString.copyFromUtf8("[ROW KEY]"));
* rows.add(rowFuture);
* }
*
* // Sends collected elements for batching asynchronously.
* batcher.sendOutstanding();
*
* // Blocks until mutations are applied on all submitted row entries.
* batcher.flush();
* }
* // Before `batcher` is closed, all remaining(If any) mutations are applied.
*
* List<Row> actualRows = ApiFutures.allAsList(rows).get();
* }
* }</pre>
*/
public Batcher<ByteString, Row> newBulkReadRowsBatcher(@Nonnull String tableId) {
return stub.newBulkReadRowsBatcher(Query.create(tableId));
}

/**
* Reads rows in batches for given tableId and filter criteria. This method should be called in
* single thread.
*
* <p>Sample Code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* Filters.Filter rowKeyPrefix = Filters.FILTERS.key().regex("prefix");
* List<ApiFuture<Row>> rows = new ArrayList<>();
*
* try (Batcher<ByteString, Row> batcher = bigtableDataClient.newBulkReadRowsBatcher("[TABLE]", rowKeyPrefix)) {
* for (String someValue : someCollection) {
* ApiFuture<Row> rowFuture =
* batcher.add(ByteString.copyFromUtf8("[ROW KEY]"));
* rows.add(rowFuture);
* }
*
* // Sends collected elements for batching asynchronously.
* batcher.sendOutstanding();
*
* // Blocks until mutations are applied on all submitted row entries.
* batcher.flush();
* }
Expand All @@ -966,7 +1002,7 @@ public Batcher<RowMutationEntry, Void> newBulkMutationBatcher(@Nonnull String ta
*/
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
@Nonnull String tableId, Filters.Filter filter) {
return stub.newBulkReadRowsBatcher(tableId, filter);
return stub.newBulkReadRowsBatcher(Query.create(tableId).filter(filter));
}

/**
Expand Down
Expand Up @@ -38,7 +38,6 @@
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
Expand Down Expand Up @@ -412,13 +411,11 @@ public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(@Nonnull String tabl
}

// TODO: javadoc
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
@Nonnull String tableId, @Nonnull Filters.Filter filter) {

public Batcher<ByteString, Row> newBulkReadRowsBatcher(@Nonnull Query request) {
return new BatcherImpl<>(
settings.bulkReadRowsSettings().getBatchingDescriptor(),
readRowsCallable.all(),
Query.create(tableId).filter(filter),
request,
settings.bulkReadRowsSettings().getBatchingSettings(),
clientContext.getExecutor());
}
Expand Down
Expand Up @@ -513,19 +513,24 @@ private Builder() {

bulkReadRowsSettings =
BigtableBulkReadRowsSettings.newBuilder(new ReadRowsBatchingDescriptor())
.setRetryableCodes(IDEMPOTENT_RETRY_CODES)
.setRetrySettings(MUTATE_ROWS_RETRY_SETTINGS)
.setRetryableCodes(readRowSettings.getRetryableCodes())
.setRetrySettings(
readRowSettings
.getRetrySettings()
.toBuilder()
// Is this too high number?
.setMaxRetryDelay(Duration.ofSeconds(60))
.setTotalTimeout(Duration.ofMinutes(30))
.build())
.setBatchingSettings(
// TODO: Need to finalize initial thresholds for Read
BatchingSettings.newBuilder()
.setIsEnabled(true)
.setElementCountThreshold(100L)
.setRequestByteThreshold(20L * 1024 * 1024)
.setRequestByteThreshold(10_000L)
.setDelayThreshold(Duration.ofSeconds(1))
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(LimitExceededBehavior.Block)
.setMaxOutstandingRequestBytes(100L * 1024 * 1024)
.setMaxOutstandingElementCount(1_000L)
.build())
.build());
Expand Down
Expand Up @@ -23,6 +23,8 @@
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
Expand All @@ -45,7 +47,9 @@ public BatchingRequestBuilder<RowMutationEntry, BulkMutation> newRequestBuilder(
}

@Override
public void splitResponse(Void response, List<SettableApiFuture<Void>> batch) {
public void splitResponse(
List<RowMutationEntry> elements, Void response, List<SettableApiFuture<Void>> batch) {
Preconditions.checkState(elements.size() == batch.size());
for (SettableApiFuture<Void> batchResponse : batch) {
batchResponse.set(null);
}
Expand Down Expand Up @@ -95,14 +99,22 @@ public long countBytes(RowMutationEntry entry) {
*/
static class RequestBuilder implements BatchingRequestBuilder<RowMutationEntry, BulkMutation> {
private BulkMutation bulkMutation;
private ImmutableList.Builder<RowMutationEntry> elements;

RequestBuilder(BulkMutation prototype) {
this.bulkMutation = prototype.clone();
this.elements = ImmutableList.builder();
}

@Override
public void add(RowMutationEntry entry) {
bulkMutation.add(entry);
elements.add(entry);
}

@Override
public List<RowMutationEntry> getElements() {
return elements.build();
}

@Override
Expand Down
Expand Up @@ -21,10 +21,19 @@
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// TODO: JavaDoc
/**
* Implementation for {@link BatchingDescriptor} to split batch response or exception into
* individual row request.
*
* <p>This class is considered an internal implementation detail and not meant to be used by
* applications directly.
*/
public class ReadRowsBatchingDescriptor
implements BatchingDescriptor<ByteString, Row, Query, List<Row>> {

Expand All @@ -34,18 +43,30 @@ public BatchingRequestBuilder<ByteString, Query> newRequestBuilder(Query query)
}

@Override
public void splitResponse(List<Row> rows, List<SettableApiFuture<Row>> futureResults) {
// TODO: verify if this check
Preconditions.checkState(rows.size() == futureResults.size());
public void splitResponse(
List<ByteString> requestElements,
List<Row> rowsResponse,
List<SettableApiFuture<Row>> futureResults) {
Preconditions.checkState(requestElements.size() == futureResults.size());

Map<ByteString, Row> rowKeysMap = new HashMap<>();
for (Row row : rowsResponse) {
rowKeysMap.put(row.getKey(), row);
}

for (int index = 0; index < futureResults.size(); index++) {
Row row = rowKeysMap.get(requestElements.get(index));

for (int rowCount = 0; rowCount < rows.size(); rowCount++) {
futureResults.get(rowCount).set(rows.get(rowCount));
if (row == null) {
// RowKey were not existent
futureResults.get(index).set(null);
}
futureResults.get(index).set(row);
}
}

@Override
public void splitException(Throwable throwable, List<SettableApiFuture<Row>> list) {
// TODO: do we have to consider specific exception?
for (SettableApiFuture<Row> row : list) {
row.setException(throwable);
}
Expand All @@ -57,16 +78,23 @@ public long countBytes(ByteString bytes) {
}

static class BulkReadRowsRequestBuilder implements BatchingRequestBuilder<ByteString, Query> {

private final Query query;
private final ImmutableList.Builder<ByteString> rowKeys;

BulkReadRowsRequestBuilder(Query query) {
this.query = query.clone();
this.rowKeys = ImmutableList.builder();
}

@Override
public void add(ByteString rowKey) {
query.rowKey(rowKey);
rowKeys.add(rowKey);
}

@Override
public List<ByteString> getElements() {
return rowKeys.build();
}

@Override
Expand Down
Expand Up @@ -15,20 +15,19 @@
*/
package com.google.cloud.bigtable.data.v2.it;

import static com.google.common.truth.Truth.assertThat;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.Batcher;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowCell;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.test_helpers.env.TestEnvRule;
import com.google.common.collect.ImmutableList;
import com.google.common.truth.Truth;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
Expand All @@ -46,18 +45,16 @@ public class BulkReadIT {
@Test
public void testBulkRead() throws InterruptedException, ExecutionException {
BigtableDataClient client = testEnvRule.env().getDataClient();
String tableId = testEnvRule.env().getTableId();
String family = testEnvRule.env().getFamilyId();
String rowPrefix = UUID.randomUUID().toString();
int numRows = 10;

BulkMutation bulkMutation = BulkMutation.create(tableId);
BulkMutation bulkMutation = BulkMutation.create(testEnvRule.env().getTableId());
List<Row> expectedRows = new ArrayList<>();
for (int i = 0; i < numRows; i++) {
bulkMutation.add(
RowMutationEntry.create(rowPrefix + "-" + i)
.setCell(family, "qualifier", 10_000L, "value-" + i));

expectedRows.add(
Row.create(
ByteString.copyFromUtf8(rowPrefix + "-" + i),
Expand All @@ -69,20 +66,32 @@ public void testBulkRead() throws InterruptedException, ExecutionException {
ImmutableList.<String>of(),
ByteString.copyFromUtf8("value-" + i)))));
}

client.bulkMutateRows(bulkMutation);

List<ApiFuture<Row>> rowFuture = new ArrayList<>();
Filters.Filter myFilters = Filters.FILTERS.family().exactMatch(family);
try (Batcher<ByteString, Row> batcher = client.newBulkReadRowsBatcher(tableId, myFilters)) {
try (Batcher<ByteString, Row> batcher =
client.newBulkReadRowsBatcher(testEnvRule.env().getTableId())) {

for (int rowCount = 0; rowCount < numRows; rowCount++) {
rowFuture.add(batcher.add(ByteString.copyFromUtf8(rowPrefix + "-" + rowCount)));
}
}
batcher.sendOutstanding();
List<Row> actualRows = ApiFutures.allAsList(rowFuture).get();
assertThat(actualRows).isEqualTo(expectedRows);

rowFuture = new ArrayList<>();
// duplicate row key
rowFuture.add(batcher.add(ByteString.copyFromUtf8(rowPrefix + "-" + 0)));
rowFuture.add(batcher.add(ByteString.copyFromUtf8(rowPrefix + "-" + 0)));

List<Row> actualRows = ApiFutures.allAsList(rowFuture).get();
// non-existent row key
rowFuture.add(batcher.add(ByteString.copyFromUtf8(UUID.randomUUID().toString())));

Truth.assertThat(actualRows).isEqualTo(expectedRows);
batcher.sendOutstanding();
actualRows = ApiFutures.allAsList(rowFuture).get();
assertThat(actualRows.get(0)).isEqualTo(expectedRows.get(0));
assertThat(actualRows.get(1)).isEqualTo(expectedRows.get(0));
assertThat(actualRows.get(2)).isEqualTo(null);
}
}
}
Expand Up @@ -41,9 +41,14 @@ public class MutateRowsBatchingDescriptorTest {
private static final String FAMILY = "fake-family";
private static final String QUALIFIER = "fake-qualifier";
private static final String VALUE = "fake-value";
private static final long TIMESTAMP = 10_000L;

private static final RequestContext requestContext =
RequestContext.create("fake-project", "fake-instance", "fake-profile");
private static final List<RowMutationEntry> ENTRIES =
ImmutableList.of(
RowMutationEntry.create(ROW_KEY).setCell(FAMILY, QUALIFIER, TIMESTAMP, VALUE),
RowMutationEntry.create("rowKey-2").setCell("family-2", "q", TIMESTAMP, VALUE));

@Test
public void countBytesTest() {
Expand All @@ -57,22 +62,20 @@ public void countBytesTest() {
@Test
public void requestBuilderTest() {
MutateRowsBatchingDescriptor underTest = new MutateRowsBatchingDescriptor();
long timestamp = 10_000L;
BulkMutation bulkMutation = BulkMutation.create("fake-table");
BatchingRequestBuilder<RowMutationEntry, BulkMutation> requestBuilder =
underTest.newRequestBuilder(bulkMutation);
requestBuilder.add(
RowMutationEntry.create(ROW_KEY).setCell(FAMILY, QUALIFIER, timestamp, VALUE));
requestBuilder.add(
RowMutationEntry.create("rowKey-2").setCell("family-2", "q", 20_000L, "some-value"));
requestBuilder.add(ENTRIES.get(0));
requestBuilder.add(ENTRIES.get(1));

BulkMutation actualBulkMutation = requestBuilder.build();
assertThat(actualBulkMutation.toProto(requestContext))
.isEqualTo(
BulkMutation.create("fake-table")
.add(ROW_KEY, Mutation.create().setCell(FAMILY, QUALIFIER, timestamp, VALUE))
.add("rowKey-2", Mutation.create().setCell("family-2", "q", 20_000L, "some-value"))
.add(ROW_KEY, Mutation.create().setCell(FAMILY, QUALIFIER, TIMESTAMP, VALUE))
.add("rowKey-2", Mutation.create().setCell("family-2", "q", TIMESTAMP, VALUE))
.toProto(requestContext));
assertThat(requestBuilder.getElements()).isEqualTo(ENTRIES);
}

@Test
Expand All @@ -83,7 +86,7 @@ public void splitResponseTest() {
assertThat(batchResponse.get(1).isDone()).isFalse();

MutateRowsBatchingDescriptor underTest = new MutateRowsBatchingDescriptor();
underTest.splitResponse(null, batchResponse);
underTest.splitResponse(ENTRIES, null, batchResponse);
assertThat(batchResponse.get(0).isDone()).isTrue();
assertThat(batchResponse.get(1).isDone()).isTrue();
}
Expand Down

0 comments on commit c2dd5fe

Please sign in to comment.