Skip to content

Commit b6fb4c7

Browse files
authored
fix: fix retry so it won't fail when rows read == rows limit (#2925)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-bigtable-hbase/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> ☕️
1 parent 817c363 commit b6fb4c7

File tree

3 files changed

+55
-12
lines changed

3 files changed

+55
-12
lines changed

bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/scanner/ReadRowsRequestManager.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,6 @@ class ReadRowsRequestManager {
5252
this.originalRequest = originalRequest;
5353
}
5454

55-
public ByteString getLastFoundKey() {
56-
return lastFoundKey;
57-
}
58-
5955
void updateLastFoundKey(ByteString key) {
6056
this.lastFoundKey = key;
6157
}
@@ -64,6 +60,21 @@ void incrementRowCount(int count) {
6460
rowCount += count;
6561
}
6662

63+
boolean isConsumed() {
64+
// A non-null lastFoundKey implies that we had at least one row that was returned and the
65+
// filtered request should be truncated. In this case an empty filtered row set implies that all
66+
// the rows have been seen and this stream is complete.
67+
if (this.lastFoundKey != null && filterRows().equals(RowSet.getDefaultInstance())) {
68+
return true;
69+
}
70+
// If the request has a rowsLimit and rowCount has reached the limit, it implies that the
71+
// stream is complete.
72+
if (originalRequest.getRowsLimit() > 0 && originalRequest.getRowsLimit() == rowCount) {
73+
return true;
74+
}
75+
return false;
76+
}
77+
6778
ReadRowsRequest buildUpdatedRequest() {
6879
ReadRowsRequest.Builder newRequest =
6980
ReadRowsRequest.newBuilder()

bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/scanner/RetryingReadRowsOperation.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.google.api.core.InternalApi;
2121
import com.google.bigtable.v2.ReadRowsRequest;
2222
import com.google.bigtable.v2.ReadRowsResponse;
23-
import com.google.bigtable.v2.RowSet;
2423
import com.google.cloud.bigtable.config.RetryOptions;
2524
import com.google.cloud.bigtable.grpc.DeadlineGenerator;
2625
import com.google.cloud.bigtable.grpc.async.AbstractRetryingOperation;
@@ -204,13 +203,11 @@ public void onClose(Status status, Metadata trailers) {
204203
// scan. To mitigate this, we will just mask the status as an ok after we are certain that we
205204
// have received all of the data.
206205
// This mitigation must only be activated after at least one row is received so that we can
207-
// distinguish from a perfectly valid initial full table scan.
208-
if (!status.isOk() && requestManager.getLastFoundKey() != null) {
209-
ReadRowsRequest retryRequest = requestManager.buildUpdatedRequest();
210-
boolean isFullTableScan = retryRequest.getRows().equals(RowSet.getDefaultInstance());
211-
if (isFullTableScan) {
212-
status = Status.OK;
213-
}
206+
// distinguish from a perfectly valid initial full table scan. And we verify the ReadRows stream
207+
// is done by checking if all the keys in the request are read, or the number of rows returned
208+
// reached the rowsLimit.
209+
if (!status.isOk() && requestManager.isConsumed()) {
210+
status = Status.OK;
214211
}
215212

216213
if (status.getCause() instanceof StreamWaitTimeoutException) {

bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/scanner/RetryingReadRowsOperationTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,41 @@ public void testErrorAfterComplete() throws UnsupportedEncodingException {
449449
Assert.assertTrue(underTest.getRowMerger().isComplete());
450450
}
451451

452+
@Test
453+
public void testErrorAfterCompleteWithRowsLimit() throws UnsupportedEncodingException {
454+
ByteString key1 = ByteString.copyFromUtf8("SomeKey1");
455+
ByteString key2 = ByteString.copyFromUtf8("SomeKey2");
456+
457+
ReadRowsRequest req =
458+
ReadRowsRequest.newBuilder()
459+
.setRows(RowSet.newBuilder().addRowKeys(key1).addRowKeys(key2))
460+
.setRowsLimit(1)
461+
.build();
462+
RetryingReadRowsOperation underTest =
463+
createOperation(DeadlineGenerator.DEFAULT, req, mockFlatRowObserver);
464+
465+
start(underTest);
466+
// Not all the rows are read yet, but because the rows limit has reached, this will still return
467+
// an OK status
468+
underTest.onMessage(buildResponse(key1));
469+
underTest.onClose(Status.CANCELLED, new Metadata());
470+
471+
verify(mockFlatRowObserver, times(1)).onCompleted();
472+
Assert.assertFalse(underTest.inRetryMode());
473+
Assert.assertTrue(underTest.getRowMerger().isComplete());
474+
}
475+
476+
@Test
477+
public void testFullTableScanRetried() {
478+
ReadRowsRequest req = ReadRowsRequest.newBuilder().build();
479+
RetryingReadRowsOperation underTest =
480+
createOperation(DeadlineGenerator.DEFAULT, req, mockFlatRowObserver);
481+
482+
start(underTest);
483+
underTest.onClose(Status.DEADLINE_EXCEEDED, new Metadata());
484+
Assert.assertTrue(underTest.inRetryMode());
485+
}
486+
452487
@SuppressWarnings("unchecked")
453488
@Test
454489
public void testImmediateOnClose() {

0 commit comments

Comments
 (0)