Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Temporally disable test/code where breaking change is used to help pushing out the breaking change in unreleased Beta #727

Merged
merged 8 commits into from Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -857,18 +857,19 @@ public void onResponse(AppendRowsResponse response) {
inflightBatch.onFailure(exception);
}
}
if (inflightBatch.getExpectedOffset() > 0
&& response.getOffset() != inflightBatch.getExpectedOffset()) {
IllegalStateException exception =
new IllegalStateException(
String.format(
"The append result offset %s does not match " + "the expected offset %s.",
response.getOffset(), inflightBatch.getExpectedOffset()));
inflightBatch.onFailure(exception);
abortInflightRequests(exception);
} else {
inflightBatch.onSuccess(response);
}
// Temp for Breaking Change.
// if (inflightBatch.getExpectedOffset() > 0
// && response.getOffset() != inflightBatch.getExpectedOffset()) {
// IllegalStateException exception =
// new IllegalStateException(
// String.format(
// "The append result offset %s does not match " + "the expected offset %s.",
// response.getOffset(), inflightBatch.getExpectedOffset()));
// inflightBatch.onFailure(exception);
// abortInflightRequests(exception);
// } else {
inflightBatch.onSuccess(response);
// }
} finally {
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
}
Expand Down
Expand Up @@ -244,12 +244,15 @@ public void testSingleAppendSimpleJson() throws Exception {
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {

testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
// Temp for Breaking Change.
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());

ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);

assertEquals(0L, appendFuture.get().getOffset());
// Temp for Breaking Change.
// assertEquals(0L, appendFuture.get().getOffset());
appendFuture.get();
assertEquals(
1,
testBigQueryWrite
Expand Down Expand Up @@ -288,12 +291,16 @@ public void testSingleAppendMultipleSimpleJson() throws Exception {

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
// Temp for Breaking Change.
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());

ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);

assertEquals(0L, appendFuture.get().getOffset());
// Temp for Breaking Change.
// assertEquals(0L, appendFuture.get().getOffset());
appendFuture.get();
assertEquals(
4,
testBigQueryWrite
Expand Down Expand Up @@ -325,15 +332,21 @@ public void testMultipleAppendSimpleJson() throws Exception {

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build());
// Temp for Breaking Change.
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1).build());
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build());
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
ApiFuture<AppendRowsResponse> appendFuture;
for (int i = 0; i < 4; i++) {
appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false);

assertEquals((long) i, appendFuture.get().getOffset());
// Temp for Breaking Change.
// assertEquals((long) i, appendFuture.get().getOffset());
appendFuture.get();
assertEquals(
1,
testBigQueryWrite
Expand Down Expand Up @@ -411,11 +424,16 @@ public void testSingleAppendComplexJson() throws Exception {

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, COMPLEX_TABLE_SCHEMA).build()) {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
// Temp for Breaking Change.
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());

ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);

assertEquals(0L, appendFuture.get().getOffset());
// Temp for Breaking Change.
// assertEquals(0L, appendFuture.get().getOffset());
appendFuture.get();
assertEquals(
1,
testBigQueryWrite
Expand All @@ -440,17 +458,19 @@ public void testAppendMultipleSchemaUpdate() throws Exception {
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
// Add fake resposne for FakeBigQueryWrite, first response has updated schema.
// Temp for Breaking Change.
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setOffset(0)
// .setOffset(0)
.setUpdatedSchema(UPDATED_TABLE_SCHEMA)
.build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setOffset(1)
// .setOffset(1)
.setUpdatedSchema(UPDATED_TABLE_SCHEMA_2)
.build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build());
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
// First append
JSONObject foo = new JSONObject();
foo.put("foo", "allen");
Expand All @@ -469,7 +489,9 @@ public void testAppendMultipleSchemaUpdate() throws Exception {
millis += 100;
}
assertTrue(writer.getDescriptor().getFields().size() == 2);
assertEquals(0L, appendFuture1.get().getOffset());
// Temp for Breaking Change.
// assertEquals(0L, appendFuture1.get().getOffset());
appendFuture1.get();
assertEquals(
1,
testBigQueryWrite
Expand Down Expand Up @@ -506,7 +528,9 @@ public void testAppendMultipleSchemaUpdate() throws Exception {
millis += 100;
}
assertTrue(writer.getDescriptor().getFields().size() == 3);
assertEquals(1L, appendFuture2.get().getOffset());
// Temp for Breaking Change.
// assertEquals(1L, appendFuture2.get().getOffset());
appendFuture2.get();
assertEquals(
1,
testBigQueryWrite
Expand Down Expand Up @@ -535,7 +559,9 @@ public void testAppendMultipleSchemaUpdate() throws Exception {
ApiFuture<AppendRowsResponse> appendFuture3 =
writer.append(updatedJsonArr2, -1, /* allowUnknownFields */ false);

assertEquals(2L, appendFuture3.get().getOffset());
// Temp for Breaking Change.
// assertEquals(2L, appendFuture3.get().getOffset());
appendFuture3.get();
assertEquals(
1,
testBigQueryWrite
Expand Down Expand Up @@ -616,7 +642,9 @@ public void testAppendOutOfRangeAndUpdateSchema() throws Exception {
.setError(com.google.rpc.Status.newBuilder().setCode(11).build())
.setUpdatedSchema(UPDATED_TABLE_SCHEMA)
.build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
// Temp for Breaking Change.
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());

JSONObject foo = new JSONObject();
foo.put("foo", "allen");
Expand Down Expand Up @@ -648,8 +676,9 @@ public void testAppendOutOfRangeAndUpdateSchema() throws Exception {

ApiFuture<AppendRowsResponse> appendFuture2 =
writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false);

assertEquals(0L, appendFuture2.get().getOffset());
// Temp for Breaking Change.
// assertEquals(0L, appendFuture2.get().getOffset());
appendFuture2.get();
assertEquals(
1,
testBigQueryWrite
Expand Down Expand Up @@ -683,13 +712,16 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception {
.setElementCountThreshold(2L)
.build())
.build()) {
// Temp for Breaking Change.
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setOffset(0)
// .setOffset(0)
.setUpdatedSchema(UPDATED_TABLE_SCHEMA)
.build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build());
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build());
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
// First append
JSONObject foo = new JSONObject();
foo.put("foo", "allen");
Expand All @@ -703,8 +735,11 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception {
ApiFuture<AppendRowsResponse> appendFuture3 =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);

assertEquals(0L, appendFuture1.get().getOffset());
assertEquals(1L, appendFuture2.get().getOffset());
// Temp for Breaking Change.
// assertEquals(0L, appendFuture1.get().getOffset());
// assertEquals(1L, appendFuture2.get().getOffset());
appendFuture1.get();
appendFuture2.get();
assertEquals(
2,
testBigQueryWrite
Expand All @@ -730,7 +765,9 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception {
.getSerializedRows(1),
FooType.newBuilder().setFoo("allen").build().toByteString());

assertEquals(2L, appendFuture3.get().getOffset());
// Temp for Breaking Change.
// assertEquals(2L, appendFuture3.get().getOffset());
appendFuture3.get();
assertEquals(
1,
testBigQueryWrite
Expand Down Expand Up @@ -768,7 +805,9 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception {
ApiFuture<AppendRowsResponse> appendFuture4 =
writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false);

assertEquals(3L, appendFuture4.get().getOffset());
// Temp for Breaking Change.
// assertEquals(3L, appendFuture4.get().getOffset());
appendFuture4.get();
assertEquals(
1,
testBigQueryWrite
Expand Down Expand Up @@ -813,7 +852,10 @@ public void testMultiThreadAppendNoSchemaUpdate() throws Exception {
int thread_nums = 5;
Thread[] thread_arr = new Thread[thread_nums];
for (int i = 0; i < thread_nums; i++) {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long) i).build());
// Temp for Breaking Change.
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long)
// i).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
offsetSets.add((long) i);
Thread t =
new Thread(
Expand All @@ -823,8 +865,9 @@ public void run() {
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
AppendRowsResponse response = appendFuture.get();
offsetSets.remove(response.getOffset());
// offsetSets.remove(response.getOffset());
} catch (Exception e) {

LOG.severe("Thread execution failed: " + e.getMessage());
}
}
Expand All @@ -836,7 +879,7 @@ public void run() {
for (int i = 0; i < thread_nums; i++) {
thread_arr[i].join();
}
assertTrue(offsetSets.size() == 0);
// assertTrue(offsetSets.size() == 0);
for (int i = 0; i < thread_nums; i++) {
assertEquals(
1,
Expand Down Expand Up @@ -878,14 +921,17 @@ public void testMultiThreadAppendWithSchemaUpdate() throws Exception {
Thread[] thread_arr = new Thread[numberThreads];
for (int i = 0; i < numberThreads; i++) {
if (i == 2) {
// Temp for Breaking Change.
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setOffset((long) i)
// .setOffset((long) i)
.setUpdatedSchema(UPDATED_TABLE_SCHEMA)
.build());
} else {
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder().setOffset((long) i).build());
// Temp for Breaking Change.
// testBigQueryWrite.addResponse(
// AppendRowsResponse.newBuilder().setOffset((long) i).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
}

offsetSets.add((long) i);
Expand All @@ -897,7 +943,7 @@ public void run() {
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
AppendRowsResponse response = appendFuture.get();
offsetSets.remove(response.getOffset());
// offsetSets.remove(response.getOffset());
} catch (Exception e) {
LOG.severe("Thread execution failed: " + e.getMessage());
}
Expand All @@ -910,7 +956,7 @@ public void run() {
for (int i = 0; i < numberThreads; i++) {
thread_arr[i].join();
}
assertTrue(offsetSets.size() == 0);
// assertTrue(offsetSets.size() == 0);
for (int i = 0; i < numberThreads; i++) {
assertEquals(
1,
Expand Down Expand Up @@ -945,7 +991,10 @@ public void run() {
jsonArr2.put(foo);

for (int i = numberThreads; i < numberThreads + 5; i++) {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long) i).build());
// Temp for Breaking Change.
// testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long)
// i).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
offsetSets.add((long) i);
Thread t =
new Thread(
Expand All @@ -968,7 +1017,7 @@ public void run() {
for (int i = 0; i < numberThreads; i++) {
thread_arr[i].join();
}
assertTrue(offsetSets.size() == 0);
// assertTrue(offsetSets.size() == 0);
for (int i = 0; i < numberThreads; i++) {
assertEquals(
1,
Expand Down