Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: enable tests that are disabled due to breaking change and stop ignoring ALREADY_EXISTED error #748

Merged
merged 14 commits into from Dec 21, 2020
Expand Up @@ -36,8 +36,8 @@
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.storage.v1beta2.StorageProto.*;
import com.google.common.base.Preconditions;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
Expand Down Expand Up @@ -412,7 +412,7 @@ private static final class InflightBatch {
this.inflightRequests = inflightRequests;
this.offsetList = new ArrayList<Long>(inflightRequests.size());
for (AppendRequestAndFutureResponse request : inflightRequests) {
if (request.message.getOffset().getValue() > 0) {
if (request.message.hasOffset()) {
offsetList.add(new Long(request.message.getOffset().getValue()));
} else {
offsetList.add(new Long(-1));
Expand Down Expand Up @@ -485,17 +485,15 @@ private void onFailure(Throwable t) {
private void onSuccess(AppendRowsResponse response) {
for (int i = 0; i < inflightRequests.size(); i++) {
AppendRowsResponse.Builder singleResponse = response.toBuilder();
// if (offsetList.get(i) > 0) {
// singleResponse.setOffset(offsetList.get(i));
// } else {
// long actualOffset = response.getOffset();
// for (int j = 0; j < i; j++) {
// actualOffset +=
//
// inflightRequests.get(j).message.getProtoRows().getRows().getSerializedRowsCount();
// }
// singleResponse.setOffset(actualOffset);
// }
if (response.getAppendResult().hasOffset()) {
long actualOffset = response.getAppendResult().getOffset().getValue();
for (int j = 0; j < i; j++) {
actualOffset +=
inflightRequests.get(j).message.getProtoRows().getRows().getSerializedRowsCount();
}
singleResponse.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(actualOffset)));
}
inflightRequests.get(i).appendResult.set(singleResponse.build());
}
}
Expand Down Expand Up @@ -850,27 +848,28 @@ public void onResponse(AppendRowsResponse response) {
}
// Currently there is nothing retryable. If the error is already exists, then ignore it.
if (response.hasError()) {
if (response.getError().getCode() != 6 /* ALREADY_EXISTS */) {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.fromCodeValue(response.getError().getCode())
.withDescription(response.getError().getMessage()));
StatusRuntimeException exception =
new StatusRuntimeException(
Status.fromCodeValue(response.getError().getCode())
.withDescription(response.getError().getMessage()));
inflightBatch.onFailure(exception);
} else {
if (inflightBatch.getExpectedOffset() > 0
&& (response.getAppendResult().hasOffset()
&& response.getAppendResult().getOffset().getValue()
!= inflightBatch.getExpectedOffset())) {
IllegalStateException exception =
new IllegalStateException(
String.format(
"The append result offset %s does not match " + "the expected offset %s.",
response.getAppendResult().getOffset().getValue(),
inflightBatch.getExpectedOffset()));
inflightBatch.onFailure(exception);
abortInflightRequests(exception);
} else {
inflightBatch.onSuccess(response);
}
}
// Temp for Breaking Change.
// if (inflightBatch.getExpectedOffset() > 0
// && response.getOffset() != inflightBatch.getExpectedOffset()) {
// IllegalStateException exception =
// new IllegalStateException(
// String.format(
// "The append result offset %s does not match " + "the expected offset %s.",
// response.getOffset(), inflightBatch.getExpectedOffset()));
// inflightBatch.onFailure(exception);
// abortInflightRequests(exception);
// } else {
inflightBatch.onSuccess(response);
// }
} finally {
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
}
Expand Down
Expand Up @@ -426,6 +426,7 @@ public void testComplicateSchemaWithPendingStream()
.setParent(tableId2)
.setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build())
.build());
FinalizeWriteStreamResponse finalizeResponse = FinalizeWriteStreamResponse.getDefaultInstance();
try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) {
LOG.info("Sending two messages");
ApiFuture<AppendRowsResponse> response =
Expand All @@ -449,24 +450,25 @@ public void testComplicateSchemaWithPendingStream()
Iterator<FieldValueList> iter = result.getValues().iterator();
assertEquals(false, iter.hasNext());

FinalizeWriteStreamResponse finalizeResponse =
finalizeResponse =
client.finalizeWriteStream(
FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build());

ApiFuture<AppendRowsResponse> response3 =
streamWriter.append(
createAppendRequestComplicateType(writeStream.getName(), new String[] {"ccc"})
.setOffset(Int64Value.of(1L))
.setOffset(Int64Value.of(2L))
.build());
try {
assertEquals(2, response3.get().getOffset());
fail("Append to finalized stream should fail.");
} catch (Exception expected) {
// The exception thrown is not stable. Opened a bug to fix it.
LOG.info("Got exception: " + expected.toString());
}
}
// Finalize row count is not populated.
// assertEquals(1, finalizeResponse.getRowCount());
assertEquals(2, finalizeResponse.getRowCount());
BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse =
client.batchCommitWriteStreams(
BatchCommitWriteStreamsRequest.newBuilder()
Expand Down