diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index b7b7fbb035..3cee9a457f 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -857,18 +857,19 @@ public void onResponse(AppendRowsResponse response) { inflightBatch.onFailure(exception); } } - if (inflightBatch.getExpectedOffset() > 0 - && response.getOffset() != inflightBatch.getExpectedOffset()) { - IllegalStateException exception = - new IllegalStateException( - String.format( - "The append result offset %s does not match " + "the expected offset %s.", - response.getOffset(), inflightBatch.getExpectedOffset())); - inflightBatch.onFailure(exception); - abortInflightRequests(exception); - } else { - inflightBatch.onSuccess(response); - } + // Temp for Breaking Change. + // if (inflightBatch.getExpectedOffset() > 0 + // && response.getOffset() != inflightBatch.getExpectedOffset()) { + // IllegalStateException exception = + // new IllegalStateException( + // String.format( + // "The append result offset %s does not match " + "the expected offset %s.", + // response.getOffset(), inflightBatch.getExpectedOffset())); + // inflightBatch.onFailure(exception); + // abortInflightRequests(exception); + // } else { + inflightBatch.onSuccess(response); + // } } finally { streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java index 9132b66a08..efd6484a16 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java @@ -244,12 +244,15 @@ public void testSingleAppendSimpleJson() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); - - assertEquals(0L, appendFuture.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture.get().getOffset()); + appendFuture.get(); assertEquals( 1, testBigQueryWrite @@ -288,12 +291,16 @@ public void testSingleAppendMultipleSimpleJson() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); - assertEquals(0L, appendFuture.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture.get().getOffset()); + appendFuture.get(); assertEquals( 4, testBigQueryWrite @@ -325,15 +332,21 @@ public void testMultipleAppendSimpleJson() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture; for (int i = 0; i < 4; i++) { appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); - - assertEquals((long) i, appendFuture.get().getOffset()); + // Temp for Breaking Change. + // assertEquals((long) i, appendFuture.get().getOffset()); + appendFuture.get(); assertEquals( 1, testBigQueryWrite @@ -411,11 +424,16 @@ public void testSingleAppendComplexJson() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, COMPLEX_TABLE_SCHEMA).build()) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + ApiFuture appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); - assertEquals(0L, appendFuture.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture.get().getOffset()); + appendFuture.get(); assertEquals( 1, testBigQueryWrite @@ -440,17 +458,19 @@ public void testAppendMultipleSchemaUpdate() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { // Add fake resposne for FakeBigQueryWrite, first response has updated schema. + // Temp for Breaking Change. testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() - .setOffset(0) + // .setOffset(0) .setUpdatedSchema(UPDATED_TABLE_SCHEMA) .build()); testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() - .setOffset(1) + // .setOffset(1) .setUpdatedSchema(UPDATED_TABLE_SCHEMA_2) .build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); // First append JSONObject foo = new JSONObject(); foo.put("foo", "allen"); @@ -469,7 +489,9 @@ public void testAppendMultipleSchemaUpdate() throws Exception { millis += 100; } assertTrue(writer.getDescriptor().getFields().size() == 2); - assertEquals(0L, appendFuture1.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture1.get().getOffset()); + appendFuture1.get(); assertEquals( 1, testBigQueryWrite @@ -506,7 +528,9 @@ public void testAppendMultipleSchemaUpdate() throws Exception { millis += 100; } assertTrue(writer.getDescriptor().getFields().size() == 3); - assertEquals(1L, appendFuture2.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(1L, appendFuture2.get().getOffset()); + appendFuture2.get(); assertEquals( 1, testBigQueryWrite @@ -535,7 +559,9 @@ public void testAppendMultipleSchemaUpdate() throws Exception { ApiFuture appendFuture3 = writer.append(updatedJsonArr2, -1, /* allowUnknownFields */ false); - assertEquals(2L, appendFuture3.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(2L, appendFuture3.get().getOffset()); + appendFuture3.get(); assertEquals( 1, testBigQueryWrite @@ -616,7 +642,9 @@ public void testAppendOutOfRangeAndUpdateSchema() throws Exception { .setError(com.google.rpc.Status.newBuilder().setCode(11).build()) .setUpdatedSchema(UPDATED_TABLE_SCHEMA) .build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // Temp for Breaking Change. + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); JSONObject foo = new JSONObject(); foo.put("foo", "allen"); @@ -648,8 +676,9 @@ public void testAppendOutOfRangeAndUpdateSchema() throws Exception { ApiFuture appendFuture2 = writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false); - - assertEquals(0L, appendFuture2.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture2.get().getOffset()); + appendFuture2.get(); assertEquals( 1, testBigQueryWrite @@ -683,13 +712,16 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception { .setElementCountThreshold(2L) .build()) .build()) { + // Temp for Breaking Change. testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() - .setOffset(0) + // .setOffset(0) .setUpdatedSchema(UPDATED_TABLE_SCHEMA) .build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); // First append JSONObject foo = new JSONObject(); foo.put("foo", "allen"); @@ -703,8 +735,11 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception { ApiFuture appendFuture3 = writer.append(jsonArr, -1, /* allowUnknownFields */ false); - assertEquals(0L, appendFuture1.get().getOffset()); - assertEquals(1L, appendFuture2.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture1.get().getOffset()); + // assertEquals(1L, appendFuture2.get().getOffset()); + appendFuture1.get(); + appendFuture2.get(); assertEquals( 2, testBigQueryWrite @@ -730,7 +765,9 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception { .getSerializedRows(1), FooType.newBuilder().setFoo("allen").build().toByteString()); - assertEquals(2L, appendFuture3.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(2L, appendFuture3.get().getOffset()); + appendFuture3.get(); assertEquals( 1, testBigQueryWrite @@ -768,7 +805,9 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception { ApiFuture appendFuture4 = writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false); - assertEquals(3L, appendFuture4.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(3L, appendFuture4.get().getOffset()); + appendFuture4.get(); assertEquals( 1, testBigQueryWrite @@ -813,7 +852,10 @@ public void testMultiThreadAppendNoSchemaUpdate() throws Exception { int thread_nums = 5; Thread[] thread_arr = new Thread[thread_nums]; for (int i = 0; i < thread_nums; i++) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long) i).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long) + // i).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); offsetSets.add((long) i); Thread t = new Thread( @@ -823,8 +865,9 @@ public void run() { ApiFuture appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); AppendRowsResponse response = appendFuture.get(); - offsetSets.remove(response.getOffset()); + // offsetSets.remove(response.getOffset()); } catch (Exception e) { + LOG.severe("Thread execution failed: " + e.getMessage()); } } @@ -836,7 +879,7 @@ public void run() { for (int i = 0; i < thread_nums; i++) { thread_arr[i].join(); } - assertTrue(offsetSets.size() == 0); + // assertTrue(offsetSets.size() == 0); for (int i = 0; i < thread_nums; i++) { assertEquals( 1, @@ -878,14 +921,17 @@ public void testMultiThreadAppendWithSchemaUpdate() throws Exception { Thread[] thread_arr = new Thread[numberThreads]; for (int i = 0; i < numberThreads; i++) { if (i == 2) { + // Temp for Breaking Change. testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() - .setOffset((long) i) + // .setOffset((long) i) .setUpdatedSchema(UPDATED_TABLE_SCHEMA) .build()); } else { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder().setOffset((long) i).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse( + // AppendRowsResponse.newBuilder().setOffset((long) i).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); } offsetSets.add((long) i); @@ -897,7 +943,7 @@ public void run() { ApiFuture appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); AppendRowsResponse response = appendFuture.get(); - offsetSets.remove(response.getOffset()); + // offsetSets.remove(response.getOffset()); } catch (Exception e) { LOG.severe("Thread execution failed: " + e.getMessage()); } @@ -910,7 +956,7 @@ public void run() { for (int i = 0; i < numberThreads; i++) { thread_arr[i].join(); } - assertTrue(offsetSets.size() == 0); + // assertTrue(offsetSets.size() == 0); for (int i = 0; i < numberThreads; i++) { assertEquals( 1, @@ -945,7 +991,10 @@ public void run() { jsonArr2.put(foo); for (int i = numberThreads; i < numberThreads + 5; i++) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long) i).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long) + // i).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); offsetSets.add((long) i); Thread t = new Thread( @@ -968,7 +1017,7 @@ public void run() { for (int i = 0; i < numberThreads; i++) { thread_arr[i].join(); } - assertTrue(offsetSets.size() == 0); + // assertTrue(offsetSets.size() == 0); for (int i = 0; i < numberThreads; i++) { assertEquals( 1, diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java index 80d8c493dd..dd7987d796 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java @@ -37,7 +37,6 @@ import com.google.cloud.bigquery.storage.test.Test.FooType; import com.google.common.base.Strings; import com.google.protobuf.DescriptorProtos; -import com.google.protobuf.Int64Value; import com.google.protobuf.Timestamp; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -124,7 +123,8 @@ private AppendRowsRequest createAppendRequest(String[] messages, long offset) { rows.addSerializedRows(foo.toByteString()); } if (offset > 0) { - requestBuilder.setOffset(Int64Value.of(offset)); + // Temp for Breaking Change. + // requestBuilder.setOffset(Int64Value.of(offset)); } return requestBuilder .setProtoRows(dataBuilder.setRows(rows.build()).build()) @@ -156,7 +156,9 @@ public void testAppendByDuration() throws Exception { .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) .build(); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); @@ -164,9 +166,11 @@ public void testAppendByDuration() throws Exception { assertFalse(appendFuture2.isDone()); fakeExecutor.advanceTime(Duration.ofSeconds(10)); - assertEquals(0L, appendFuture1.get().getOffset()); - assertEquals(1L, appendFuture2.get().getOffset()); - + // Temp for Breaking Change. + // assertEquals(0L, appendFuture1.get().getOffset()); + // assertEquals(1L, appendFuture2.get().getOffset()); + appendFuture1.get(); + appendFuture2.get(); assertEquals(1, testBigQueryWrite.getAppendRequests().size()); assertEquals( @@ -193,23 +197,31 @@ public void testAppendByNumBatchedMessages() throws Exception { .setDelayThreshold(Duration.ofSeconds(100)) .build()) .build(); - - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); - assertEquals(0L, appendFuture1.get().getOffset()); - assertEquals(1L, appendFuture2.get().getOffset()); + // Temp for Breaking Change. + appendFuture1.get(); + appendFuture2.get(); + // assertEquals(0L, appendFuture1.get().getOffset()); + // assertEquals(1L, appendFuture2.get().getOffset()); assertFalse(appendFuture3.isDone()); ApiFuture appendFuture4 = sendTestMessage(writer, new String[] {"D"}); - assertEquals(2L, appendFuture3.get().getOffset()); - assertEquals(3L, appendFuture4.get().getOffset()); + // Temp for Breaking Change. + appendFuture3.get(); + appendFuture4.get(); + // assertEquals(2L, appendFuture3.get().getOffset()); + // assertEquals(3L, appendFuture4.get().getOffset()); assertEquals(2, testBigQueryWrite.getAppendRequests().size()); assertEquals( @@ -248,23 +260,33 @@ public void testAppendByNumBytes() throws Exception { .build()) .build(); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); - assertEquals(0L, appendFuture1.get().getOffset()); - assertEquals(1L, appendFuture2.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture1.get().getOffset()); + // assertEquals(1L, appendFuture2.get().getOffset()); + appendFuture1.get(); + appendFuture2.get(); assertFalse(appendFuture3.isDone()); // This message is big enough to trigger send on the previous message and itself. ApiFuture appendFuture4 = sendTestMessage(writer, new String[] {Strings.repeat("A", 100)}); - assertEquals(2L, appendFuture3.get().getOffset()); - assertEquals(3L, appendFuture4.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(2L, appendFuture3.get().getOffset()); + // assertEquals(3L, appendFuture4.get().getOffset()); + appendFuture3.get(); + appendFuture4.get(); assertEquals(3, testBigQueryWrite.getAppendRequests().size()); @@ -282,9 +304,11 @@ public void testWriteByShutdown() throws Exception { .setElementCountThreshold(10L) .build()) .build(); - - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0L).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0L).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); @@ -297,8 +321,9 @@ public void testWriteByShutdown() throws Exception { // Verify the appends completed assertTrue(appendFuture1.isDone()); assertTrue(appendFuture2.isDone()); - assertEquals(0L, appendFuture1.get().getOffset()); - assertEquals(1L, appendFuture2.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture1.get().getOffset()); + // assertEquals(1L, appendFuture2.get().getOffset()); } @Test @@ -312,9 +337,11 @@ public void testWriteMixedSizeAndDuration() throws Exception { .setDelayThreshold(Duration.ofSeconds(5)) .build()) .build()) { - + // Temp for Breaking Change. testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0L).build()); testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); @@ -325,8 +352,11 @@ public void testWriteMixedSizeAndDuration() throws Exception { sendTestMessage(writer, new String[] {"B", "C"}); // Write triggered by batch size - assertEquals(0L, appendFuture1.get().getOffset()); - assertEquals(1L, appendFuture2.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture1.get().getOffset()); + // assertEquals(1L, appendFuture2.get().getOffset()); + appendFuture1.get(); + appendFuture2.get(); ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"D"}); @@ -377,8 +407,11 @@ public void testFlowControlBehaviorBlock() throws Exception { .build()) .build(); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2L).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3L).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2L).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); @@ -399,7 +432,9 @@ public void run() { Thread.sleep(5000L); fakeExecutor.advanceTime(Duration.ofSeconds(10)); // The first requests gets back while the second one is blocked. - assertEquals(2L, appendFuture1.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(2L, appendFuture1.get().getOffset()); + appendFuture1.get(); Thread.sleep(5000L); // Wait is necessary for response to be scheduled before timer is advanced. fakeExecutor.advanceTime(Duration.ofSeconds(10)); @@ -431,8 +466,9 @@ public void testFlowControlBehaviorException() throws Exception { .getFlowControlSettings() .getMaxOutstandingElementCount() .longValue()); - - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); @@ -447,7 +483,9 @@ public void testFlowControlBehaviorException() throws Exception { "java.util.concurrent.ExecutionException: The maximum number of batch elements: 1 have been reached.", e.toString()); } - assertEquals(1L, appendFuture1.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(1L, appendFuture1.get().getOffset()); + appendFuture1.get(); } } @@ -465,11 +503,15 @@ public void testStreamReconnectionTransient() throws Exception { StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE); testBigQueryWrite.addException(transientError); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture future1 = sendTestMessage(writer, new String[] {"m1"}); assertEquals(false, future1.isDone()); // Retry is scheduled to be 7 seconds later. - assertEquals(0L, future1.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, future1.get().getOffset()); + future1.get(); writer.close(); } @@ -537,8 +579,11 @@ public void testOffset() throws Exception { .build()) .build()) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(10L).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(13L).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(10L).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(13L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L); ApiFuture appendFuture1 = writer.append(request1); AppendRowsRequest request2 = createAppendRequest(new String[] {"B", "C"}, 11L); @@ -547,10 +592,14 @@ public void testOffset() throws Exception { ApiFuture appendFuture3 = writer.append(request3); AppendRowsRequest request4 = createAppendRequest(new String[] {"G"}, 15L); ApiFuture appendFuture4 = writer.append(request4); - assertEquals(10L, appendFuture1.get().getOffset()); - assertEquals(11L, appendFuture2.get().getOffset()); - assertEquals(13L, appendFuture3.get().getOffset()); - assertEquals(15L, appendFuture4.get().getOffset()); + // assertEquals(10L, appendFuture1.get().getOffset()); + // assertEquals(11L, appendFuture2.get().getOffset()); + // assertEquals(13L, appendFuture3.get().getOffset()); + // assertEquals(15L, appendFuture4.get().getOffset()); + appendFuture1.get(); + appendFuture2.get(); + appendFuture3.get(); + appendFuture4.get(); } } @@ -564,12 +613,15 @@ public void testOffsetMismatch() throws Exception { .setElementCountThreshold(1L) .build()) .build()) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(11L).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(11L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L); ApiFuture appendFuture1 = writer.append(request1); appendFuture1.get(); - fail("Should throw exception"); + // Temp for Breaking Change. + // fail("Should throw exception"); } catch (Exception e) { assertEquals( "java.lang.IllegalStateException: The append result offset 11 does not match the expected offset 10.", @@ -845,9 +897,13 @@ public void testFlushAll() throws Exception { .build()) .build(); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + // Temp Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java index f68122c13e..5dd73c0d80 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java @@ -257,7 +257,9 @@ public void testJsonStreamWriterBatchWriteWithCommittedStream() ApiFuture response = jsonStreamWriter.append(row, -1, /* allowUnknownFields */ false); - assertEquals(0, response.get().getOffset()); + // Temp for Breaking Change. + response.get(); + // assertEquals(0, response.get().getOffset()); LOG.info("Sending two more messages"); JSONObject row1 = new JSONObject(); @@ -277,9 +279,10 @@ public void testJsonStreamWriterBatchWriteWithCommittedStream() jsonStreamWriter.append(jsonArr1, -1, /* allowUnknownFields */ false); ApiFuture response2 = jsonStreamWriter.append(jsonArr2, -1, /* allowUnknownFields */ false); - // Waiting for API breaking change to be generated in new client. + // Temp for Breaking Change. // assertEquals(1, response1.get().getOffset()); // assertEquals(3, response2.get().getOffset()); + response1.get(); response2.get(); TableResult result = @@ -336,7 +339,9 @@ public void testJsonStreamWriterSchemaUpdate() ApiFuture response = jsonStreamWriter.append(jsonArr, -1, /* allowUnknownFields */ false); - assertEquals(0, response.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0, response.get().getOffset()); + response.get(); // 2). Schema update and wait until querying it returns a new schema. try { com.google.cloud.bigquery.Table table = bigquery.getTable(DATASET, tableName); @@ -380,8 +385,9 @@ public void testJsonStreamWriterSchemaUpdate() for (int i = 1; i < 100; i++) { ApiFuture response2 = jsonStreamWriter.append(jsonArr2, -1, /* allowUnknownFields */ false); - // Waiting for API breaking change to be generated in new client. + // Temp for Breaking Change. // assertEquals(i, response2.get().getOffset()); + response2.get(); if (response2.get().hasUpdatedSchema()) { next = i; break; @@ -409,7 +415,7 @@ public void testJsonStreamWriterSchemaUpdate() for (int i = 0; i < 10; i++) { ApiFuture response3 = jsonStreamWriter.append(updatedJsonArr, -1, /* allowUnknownFields */ false); - // Waiting for API breaking change to be generated in new client. + // Temp for Breaking Change. // assertEquals(next + 1 + i, response3.get().getOffset()); response3.get(); } @@ -456,6 +462,7 @@ public void testComplicateSchemaWithPendingStream() .build()); // Waiting for API breaking change to be generated in new client. // assertEquals(1, response2.get().getOffset()); + response2.get(); // Nothing showed up since rows are not committed. TableResult result = @@ -474,8 +481,9 @@ public void testComplicateSchemaWithPendingStream() .setOffset(Int64Value.of(1L)) .build()); try { - // Waiting for API breaking change to be generated in new client. + // Temp for Breaking Change. // assertEquals(2, response3.get().getOffset()); + response3.get(); // fail("Append to finalized stream should fail."); } catch (Exception expected) { // The exception thrown is not stable. Opened a bug to fix it. @@ -523,7 +531,7 @@ public void testStreamError() throws IOException, InterruptedException, Executio ApiFuture response = streamWriter.append( createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build()); - // Waiting for API breaking change to be generated in new client. + // Temp for Breaking Change. // assertEquals(0L, response.get().getOffset()); response.get(); // Send in a bogus stream name should cause in connection error. @@ -545,6 +553,7 @@ public void testStreamError() throws IOException, InterruptedException, Executio createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build()); // Waiting for API breaking change to be generated in new client. // assertEquals(1L, response3.get().getOffset()); + response3.get(); } finally { } } @@ -564,8 +573,9 @@ public void testStreamReconnect() throws IOException, InterruptedException, Exec createAppendRequest(writeStream.getName(), new String[] {"aaa"}) .setOffset(Int64Value.of(0L)) .build()); - // Waiting for API breaking change to be generated in new client. + // Temp for Breaking Change. // assertEquals(0L, response.get().getOffset()); + response.get(); } try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) { @@ -576,8 +586,9 @@ public void testStreamReconnect() throws IOException, InterruptedException, Exec createAppendRequest(writeStream.getName(), new String[] {"bbb"}) .setOffset(Int64Value.of(1L)) .build()); - // Waiting for API breaking change to be generated in new client. + // Temp for Breaking Change. // assertEquals(1L, response.get().getOffset()); + response.get(); } } }