Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: enable tests that are disabled due to breaking change and stop i…
…gnoring ALREADY_EXISTED error (#748)

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-bigquerystorage/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️
  • Loading branch information
yirutang committed Dec 21, 2020
1 parent 7056a5f commit 8caf5a2
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 326 deletions.
Expand Up @@ -36,8 +36,8 @@
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.storage.v1beta2.StorageProto.*;
import com.google.common.base.Preconditions;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
Expand Down Expand Up @@ -412,7 +412,7 @@ private static final class InflightBatch {
this.inflightRequests = inflightRequests;
this.offsetList = new ArrayList<Long>(inflightRequests.size());
for (AppendRequestAndFutureResponse request : inflightRequests) {
if (request.message.getOffset().getValue() > 0) {
if (request.message.hasOffset()) {
offsetList.add(new Long(request.message.getOffset().getValue()));
} else {
offsetList.add(new Long(-1));
Expand Down Expand Up @@ -485,17 +485,15 @@ private void onFailure(Throwable t) {
private void onSuccess(AppendRowsResponse response) {
for (int i = 0; i < inflightRequests.size(); i++) {
AppendRowsResponse.Builder singleResponse = response.toBuilder();
// if (offsetList.get(i) > 0) {
// singleResponse.setOffset(offsetList.get(i));
// } else {
// long actualOffset = response.getOffset();
// for (int j = 0; j < i; j++) {
// actualOffset +=
//
// inflightRequests.get(j).message.getProtoRows().getRows().getSerializedRowsCount();
// }
// singleResponse.setOffset(actualOffset);
// }
if (response.getAppendResult().hasOffset()) {
long actualOffset = response.getAppendResult().getOffset().getValue();
for (int j = 0; j < i; j++) {
actualOffset +=
inflightRequests.get(j).message.getProtoRows().getRows().getSerializedRowsCount();
}
singleResponse.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(actualOffset)));
}
inflightRequests.get(i).appendResult.set(singleResponse.build());
}
}
Expand Down Expand Up @@ -850,27 +848,28 @@ public void onResponse(AppendRowsResponse response) {
}
// Currently there is nothing retryable. If the error is already exists, then ignore it.
if (response.hasError()) {
if (response.getError().getCode() != 6 /* ALREADY_EXISTS */) {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.fromCodeValue(response.getError().getCode())
.withDescription(response.getError().getMessage()));
StatusRuntimeException exception =
new StatusRuntimeException(
Status.fromCodeValue(response.getError().getCode())
.withDescription(response.getError().getMessage()));
inflightBatch.onFailure(exception);
} else {
if (inflightBatch.getExpectedOffset() > 0
&& (response.getAppendResult().hasOffset()
&& response.getAppendResult().getOffset().getValue()
!= inflightBatch.getExpectedOffset())) {
IllegalStateException exception =
new IllegalStateException(
String.format(
"The append result offset %s does not match " + "the expected offset %s.",
response.getAppendResult().getOffset().getValue(),
inflightBatch.getExpectedOffset()));
inflightBatch.onFailure(exception);
abortInflightRequests(exception);
} else {
inflightBatch.onSuccess(response);
}
}
// Temp for Breaking Change.
// if (inflightBatch.getExpectedOffset() > 0
// && response.getOffset() != inflightBatch.getExpectedOffset()) {
// IllegalStateException exception =
// new IllegalStateException(
// String.format(
// "The append result offset %s does not match " + "the expected offset %s.",
// response.getOffset(), inflightBatch.getExpectedOffset()));
// inflightBatch.onFailure(exception);
// abortInflightRequests(exception);
// } else {
inflightBatch.onSuccess(response);
// }
} finally {
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
}
Expand Down
Expand Up @@ -426,6 +426,7 @@ public void testComplicateSchemaWithPendingStream()
.setParent(tableId2)
.setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build())
.build());
FinalizeWriteStreamResponse finalizeResponse = FinalizeWriteStreamResponse.getDefaultInstance();
try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) {
LOG.info("Sending two messages");
ApiFuture<AppendRowsResponse> response =
Expand All @@ -449,24 +450,25 @@ public void testComplicateSchemaWithPendingStream()
Iterator<FieldValueList> iter = result.getValues().iterator();
assertEquals(false, iter.hasNext());

FinalizeWriteStreamResponse finalizeResponse =
finalizeResponse =
client.finalizeWriteStream(
FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build());

ApiFuture<AppendRowsResponse> response3 =
streamWriter.append(
createAppendRequestComplicateType(writeStream.getName(), new String[] {"ccc"})
.setOffset(Int64Value.of(1L))
.setOffset(Int64Value.of(2L))
.build());
try {
assertEquals(2, response3.get().getOffset());
fail("Append to finalized stream should fail.");
} catch (Exception expected) {
// The exception thrown is not stable. Opened a bug to fix it.
LOG.info("Got exception: " + expected.toString());
}
}
// Finalize row count is not populated.
// assertEquals(1, finalizeResponse.getRowCount());
assertEquals(2, finalizeResponse.getRowCount());
BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse =
client.batchCommitWriteStreams(
BatchCommitWriteStreamsRequest.newBuilder()
Expand Down

0 comments on commit 8caf5a2

Please sign in to comment.