From 487d5b214e13c765d51853b270cb94bdfb2615c8 Mon Sep 17 00:00:00 2001 From: yirutang Date: Thu, 10 Dec 2020 10:16:17 -0800 Subject: [PATCH 1/5] fix: a race condition in test --- .../bigquery/storage/v1alpha2/JsonStreamWriterTest.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriterTest.java index 78c943e1c6..d0a084a0a7 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriterTest.java @@ -34,9 +34,7 @@ import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.Timestamp; import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.logging.Logger; import org.json.JSONArray; @@ -777,7 +775,7 @@ public void testMultiThreadAppendNoSchemaUpdate() throws Exception { final JSONArray jsonArr = new JSONArray(); jsonArr.put(foo); - final HashSet offset_sets = new HashSet(); + final Collection offset_sets = Collections.synchronizedCollection(new HashSet()); int thread_nums = 5; Thread[] thread_arr = new Thread[thread_nums]; for (int i = 0; i < thread_nums; i++) { From a6a56827e80a94ab8385e4bbc025032727875f3e Mon Sep 17 00:00:00 2001 From: yirutang Date: Thu, 10 Dec 2020 10:19:13 -0800 Subject: [PATCH 2/5] . --- .../v1alpha2/JsonStreamWriterTest.java | 2 +- .../storage/v1beta2/JsonStreamWriterTest.java | 20 +++++++++---------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriterTest.java index d0a084a0a7..103b89657e 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriterTest.java @@ -840,7 +840,7 @@ public void testMultiThreadAppendWithSchemaUpdate() throws Exception { final JSONArray jsonArr = new JSONArray(); jsonArr.put(foo); - final HashSet offsetSets = new HashSet(); + final Collection offsetSets = Collections.synchronizedCollection(new HashSet()); int numberThreads = 5; Thread[] thread_arr = new Thread[numberThreads]; for (int i = 0; i < numberThreads; i++) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java index 4fc3e13ef5..a9de2cb49e 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java @@ -33,9 +33,7 @@ import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.Timestamp; import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.logging.Logger; import org.json.JSONArray; @@ -775,7 +773,7 @@ public void testMultiThreadAppendNoSchemaUpdate() throws Exception { final JSONArray jsonArr = new JSONArray(); jsonArr.put(foo); - final HashSet offset_sets = new HashSet(); + final Collection offset_sets = Collections.synchronizedCollection(new HashSet()); int thread_nums = 5; Thread[] thread_arr = new Thread[thread_nums]; for (int i = 0; i < thread_nums; i++) { @@ -839,7 +837,7 @@ public void testMultiThreadAppendWithSchemaUpdate() throws Exception { final JSONArray jsonArr = new JSONArray(); jsonArr.put(foo); - final HashSet offsetSets = new HashSet(); + final Collection offset_sets = Collections.synchronizedCollection(new HashSet()); int numberThreads = 5; Thread[] thread_arr = new Thread[numberThreads]; for (int i = 0; i < numberThreads; i++) { @@ -854,7 +852,7 @@ public void testMultiThreadAppendWithSchemaUpdate() throws Exception { AppendRowsResponse.newBuilder().setOffset((long) i).build()); } - offsetSets.add((long) i); + offset_sets.add((long) i); Thread t = new Thread( new Runnable() { @@ -863,7 +861,7 @@ public void run() { ApiFuture appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); AppendRowsResponse response = appendFuture.get(); - offsetSets.remove(response.getOffset()); + offset_sets.remove(response.getOffset()); } catch (Exception e) { LOG.severe("Thread execution failed: " + e.getMessage()); } @@ -876,7 +874,7 @@ public void run() { for (int i = 0; i < numberThreads; i++) { thread_arr[i].join(); } - assertTrue(offsetSets.size() == 0); + assertTrue(offset_sets.size() == 0); for (int i = 0; i < numberThreads; i++) { assertEquals( 1, @@ -912,7 +910,7 @@ public void run() { for (int i = numberThreads; i < numberThreads + 5; i++) { testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long) i).build()); - offsetSets.add((long) i); + offset_sets.add((long) i); Thread t = new Thread( new Runnable() { @@ -921,7 +919,7 @@ public void run() { ApiFuture appendFuture = writer.append(jsonArr2, -1, /* allowUnknownFields */ false); AppendRowsResponse response = appendFuture.get(); - offsetSets.remove(response.getOffset()); + offset_sets.remove(response.getOffset()); } catch (Exception e) { LOG.severe("Thread execution failed: " + e.getMessage()); } @@ -934,7 +932,7 @@ public void run() { for (int i = 0; i < numberThreads; i++) { thread_arr[i].join(); } - assertTrue(offsetSets.size() == 0); + assertTrue(offset_sets.size() == 0); for (int i = 0; i < numberThreads; i++) { assertEquals( 1, From bc8de1b9152b23ba8da36876cdc089a1335306ff Mon Sep 17 00:00:00 2001 From: yirutang Date: Fri, 11 Dec 2020 10:53:19 -0800 Subject: [PATCH 3/5] . --- .../storage/v1beta2/StreamWriter.java | 25 +-- .../storage/v1beta2/JsonStreamWriterTest.java | 132 ++++++++++----- .../storage/v1beta2/StreamWriterTest.java | 150 ++++++++++++------ .../it/ITBigQueryWriteManualClientTest.java | 29 ++-- 4 files changed, 226 insertions(+), 110 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index b7b7fbb035..3cee9a457f 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -857,18 +857,19 @@ public void onResponse(AppendRowsResponse response) { inflightBatch.onFailure(exception); } } - if (inflightBatch.getExpectedOffset() > 0 - && response.getOffset() != inflightBatch.getExpectedOffset()) { - IllegalStateException exception = - new IllegalStateException( - String.format( - "The append result offset %s does not match " + "the expected offset %s.", - response.getOffset(), inflightBatch.getExpectedOffset())); - inflightBatch.onFailure(exception); - abortInflightRequests(exception); - } else { - inflightBatch.onSuccess(response); - } + // Temp for Breaking Change. + // if (inflightBatch.getExpectedOffset() > 0 + // && response.getOffset() != inflightBatch.getExpectedOffset()) { + // IllegalStateException exception = + // new IllegalStateException( + // String.format( + // "The append result offset %s does not match " + "the expected offset %s.", + // response.getOffset(), inflightBatch.getExpectedOffset())); + // inflightBatch.onFailure(exception); + // abortInflightRequests(exception); + // } else { + inflightBatch.onSuccess(response); + // } } finally { streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java index 21dd4db13a..a3924c4e17 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java @@ -216,12 +216,15 @@ public void testSingleAppendSimpleJson() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); - - assertEquals(0L, appendFuture.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture.get().getOffset()); + appendFuture.get(); assertEquals( 1, testBigQueryWrite @@ -260,12 +263,16 @@ public void testSingleAppendMultipleSimpleJson() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); - assertEquals(0L, appendFuture.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture.get().getOffset()); + appendFuture.get(); assertEquals( 4, testBigQueryWrite @@ -297,15 +304,21 @@ public void testMultipleAppendSimpleJson() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture; for (int i = 0; i < 4; i++) { appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); - - assertEquals((long) i, appendFuture.get().getOffset()); + // Temp for Breaking Change. + // assertEquals((long) i, appendFuture.get().getOffset()); + appendFuture.get(); assertEquals( 1, testBigQueryWrite @@ -375,11 +388,16 @@ public void testSingleAppendComplexJson() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, COMPLEX_TABLE_SCHEMA).build()) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + ApiFuture appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); - assertEquals(0L, appendFuture.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture.get().getOffset()); + appendFuture.get(); assertEquals( 1, testBigQueryWrite @@ -404,17 +422,19 @@ public void testAppendMultipleSchemaUpdate() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { // Add fake resposne for FakeBigQueryWrite, first response has updated schema. + // Temp for Breaking Change. testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() - .setOffset(0) + // .setOffset(0) .setUpdatedSchema(UPDATED_TABLE_SCHEMA) .build()); testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() - .setOffset(1) + // .setOffset(1) .setUpdatedSchema(UPDATED_TABLE_SCHEMA_2) .build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); // First append JSONObject foo = new JSONObject(); foo.put("foo", "allen"); @@ -433,7 +453,9 @@ public void testAppendMultipleSchemaUpdate() throws Exception { millis += 100; } assertTrue(writer.getDescriptor().getFields().size() == 2); - assertEquals(0L, appendFuture1.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture1.get().getOffset()); + appendFuture1.get(); assertEquals( 1, testBigQueryWrite @@ -470,7 +492,9 @@ public void testAppendMultipleSchemaUpdate() throws Exception { millis += 100; } assertTrue(writer.getDescriptor().getFields().size() == 3); - assertEquals(1L, appendFuture2.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(1L, appendFuture2.get().getOffset()); + appendFuture2.get(); assertEquals( 1, testBigQueryWrite @@ -499,7 +523,9 @@ public void testAppendMultipleSchemaUpdate() throws Exception { ApiFuture appendFuture3 = writer.append(updatedJsonArr2, -1, /* allowUnknownFields */ false); - assertEquals(2L, appendFuture3.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(2L, appendFuture3.get().getOffset()); + appendFuture3.get(); assertEquals( 1, testBigQueryWrite @@ -580,7 +606,9 @@ public void testAppendOutOfRangeAndUpdateSchema() throws Exception { .setError(com.google.rpc.Status.newBuilder().setCode(11).build()) .setUpdatedSchema(UPDATED_TABLE_SCHEMA) .build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // Temp for Breaking Change. + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); JSONObject foo = new JSONObject(); foo.put("foo", "allen"); @@ -612,8 +640,9 @@ public void testAppendOutOfRangeAndUpdateSchema() throws Exception { ApiFuture appendFuture2 = writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false); - - assertEquals(0L, appendFuture2.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture2.get().getOffset()); + appendFuture2.get(); assertEquals( 1, testBigQueryWrite @@ -647,13 +676,16 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception { .setElementCountThreshold(2L) .build()) .build()) { + // Temp for Breaking Change. testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() - .setOffset(0) + // .setOffset(0) .setUpdatedSchema(UPDATED_TABLE_SCHEMA) .build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); // First append JSONObject foo = new JSONObject(); foo.put("foo", "allen"); @@ -667,8 +699,11 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception { ApiFuture appendFuture3 = writer.append(jsonArr, -1, /* allowUnknownFields */ false); - assertEquals(0L, appendFuture1.get().getOffset()); - assertEquals(1L, appendFuture2.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture1.get().getOffset()); + // assertEquals(1L, appendFuture2.get().getOffset()); + appendFuture1.get(); + appendFuture2.get(); assertEquals( 2, testBigQueryWrite @@ -694,7 +729,9 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception { .getSerializedRows(1), FooType.newBuilder().setFoo("allen").build().toByteString()); - assertEquals(2L, appendFuture3.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(2L, appendFuture3.get().getOffset()); + appendFuture3.get(); assertEquals( 1, testBigQueryWrite @@ -732,7 +769,9 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception { ApiFuture appendFuture4 = writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false); - assertEquals(3L, appendFuture4.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(3L, appendFuture4.get().getOffset()); + appendFuture4.get(); assertEquals( 1, testBigQueryWrite @@ -777,7 +816,10 @@ public void testMultiThreadAppendNoSchemaUpdate() throws Exception { int thread_nums = 5; Thread[] thread_arr = new Thread[thread_nums]; for (int i = 0; i < thread_nums; i++) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long) i).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long) + // i).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); offsetSets.add((long) i); Thread t = new Thread( @@ -787,7 +829,7 @@ public void run() { ApiFuture appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); AppendRowsResponse response = appendFuture.get(); - offsetSets.remove(response.getOffset()); + // offsetSets.remove(response.getOffset()); } catch (Exception e) { LOG.severe("Thread execution failed: " + e.getMessage()); @@ -801,7 +843,7 @@ public void run() { for (int i = 0; i < thread_nums; i++) { thread_arr[i].join(); } - assertTrue(offsetSets.size() == 0); + // assertTrue(offsetSets.size() == 0); for (int i = 0; i < thread_nums; i++) { assertEquals( 1, @@ -843,17 +885,20 @@ public void testMultiThreadAppendWithSchemaUpdate() throws Exception { Thread[] thread_arr = new Thread[numberThreads]; for (int i = 0; i < numberThreads; i++) { if (i == 2) { + // Temp for Breaking Change. testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() - .setOffset((long) i) + // .setOffset((long) i) .setUpdatedSchema(UPDATED_TABLE_SCHEMA) .build()); } else { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder().setOffset((long) i).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse( + // AppendRowsResponse.newBuilder().setOffset((long) i).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); } - offset_sets.add((long) i); + offsetSets.add((long) i); Thread t = new Thread( new Runnable() { @@ -862,7 +907,7 @@ public void run() { ApiFuture appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); AppendRowsResponse response = appendFuture.get(); - offset_sets.remove(response.getOffset()); + // offsetSets.remove(response.getOffset()); } catch (Exception e) { LOG.severe("Thread execution failed: " + e.getMessage()); } @@ -875,7 +920,7 @@ public void run() { for (int i = 0; i < numberThreads; i++) { thread_arr[i].join(); } - assertTrue(offset_sets.size() == 0); + // assertTrue(offsetSets.size() == 0); for (int i = 0; i < numberThreads; i++) { assertEquals( 1, @@ -910,8 +955,11 @@ public void run() { jsonArr2.put(foo); for (int i = numberThreads; i < numberThreads + 5; i++) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long) i).build()); - offset_sets.add((long) i); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long) + // i).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + offsetSets.add((long) i); Thread t = new Thread( new Runnable() { @@ -920,7 +968,7 @@ public void run() { ApiFuture appendFuture = writer.append(jsonArr2, -1, /* allowUnknownFields */ false); AppendRowsResponse response = appendFuture.get(); - offset_sets.remove(response.getOffset()); + offsetSets.remove(response.getOffset()); } catch (Exception e) { LOG.severe("Thread execution failed: " + e.getMessage()); } @@ -933,7 +981,7 @@ public void run() { for (int i = 0; i < numberThreads; i++) { thread_arr[i].join(); } - assertTrue(offset_sets.size() == 0); + // assertTrue(offsetSets.size() == 0); for (int i = 0; i < numberThreads; i++) { assertEquals( 1, diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java index 80d8c493dd..dd7987d796 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java @@ -37,7 +37,6 @@ import com.google.cloud.bigquery.storage.test.Test.FooType; import com.google.common.base.Strings; import com.google.protobuf.DescriptorProtos; -import com.google.protobuf.Int64Value; import com.google.protobuf.Timestamp; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -124,7 +123,8 @@ private AppendRowsRequest createAppendRequest(String[] messages, long offset) { rows.addSerializedRows(foo.toByteString()); } if (offset > 0) { - requestBuilder.setOffset(Int64Value.of(offset)); + // Temp for Breaking Change. + // requestBuilder.setOffset(Int64Value.of(offset)); } return requestBuilder .setProtoRows(dataBuilder.setRows(rows.build()).build()) @@ -156,7 +156,9 @@ public void testAppendByDuration() throws Exception { .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) .build(); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); @@ -164,9 +166,11 @@ public void testAppendByDuration() throws Exception { assertFalse(appendFuture2.isDone()); fakeExecutor.advanceTime(Duration.ofSeconds(10)); - assertEquals(0L, appendFuture1.get().getOffset()); - assertEquals(1L, appendFuture2.get().getOffset()); - + // Temp for Breaking Change. + // assertEquals(0L, appendFuture1.get().getOffset()); + // assertEquals(1L, appendFuture2.get().getOffset()); + appendFuture1.get(); + appendFuture2.get(); assertEquals(1, testBigQueryWrite.getAppendRequests().size()); assertEquals( @@ -193,23 +197,31 @@ public void testAppendByNumBatchedMessages() throws Exception { .setDelayThreshold(Duration.ofSeconds(100)) .build()) .build(); - - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); - assertEquals(0L, appendFuture1.get().getOffset()); - assertEquals(1L, appendFuture2.get().getOffset()); + // Temp for Breaking Change. + appendFuture1.get(); + appendFuture2.get(); + // assertEquals(0L, appendFuture1.get().getOffset()); + // assertEquals(1L, appendFuture2.get().getOffset()); assertFalse(appendFuture3.isDone()); ApiFuture appendFuture4 = sendTestMessage(writer, new String[] {"D"}); - assertEquals(2L, appendFuture3.get().getOffset()); - assertEquals(3L, appendFuture4.get().getOffset()); + // Temp for Breaking Change. + appendFuture3.get(); + appendFuture4.get(); + // assertEquals(2L, appendFuture3.get().getOffset()); + // assertEquals(3L, appendFuture4.get().getOffset()); assertEquals(2, testBigQueryWrite.getAppendRequests().size()); assertEquals( @@ -248,23 +260,33 @@ public void testAppendByNumBytes() throws Exception { .build()) .build(); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); - assertEquals(0L, appendFuture1.get().getOffset()); - assertEquals(1L, appendFuture2.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture1.get().getOffset()); + // assertEquals(1L, appendFuture2.get().getOffset()); + appendFuture1.get(); + appendFuture2.get(); assertFalse(appendFuture3.isDone()); // This message is big enough to trigger send on the previous message and itself. ApiFuture appendFuture4 = sendTestMessage(writer, new String[] {Strings.repeat("A", 100)}); - assertEquals(2L, appendFuture3.get().getOffset()); - assertEquals(3L, appendFuture4.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(2L, appendFuture3.get().getOffset()); + // assertEquals(3L, appendFuture4.get().getOffset()); + appendFuture3.get(); + appendFuture4.get(); assertEquals(3, testBigQueryWrite.getAppendRequests().size()); @@ -282,9 +304,11 @@ public void testWriteByShutdown() throws Exception { .setElementCountThreshold(10L) .build()) .build(); - - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0L).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0L).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); @@ -297,8 +321,9 @@ public void testWriteByShutdown() throws Exception { // Verify the appends completed assertTrue(appendFuture1.isDone()); assertTrue(appendFuture2.isDone()); - assertEquals(0L, appendFuture1.get().getOffset()); - assertEquals(1L, appendFuture2.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture1.get().getOffset()); + // assertEquals(1L, appendFuture2.get().getOffset()); } @Test @@ -312,9 +337,11 @@ public void testWriteMixedSizeAndDuration() throws Exception { .setDelayThreshold(Duration.ofSeconds(5)) .build()) .build()) { - + // Temp for Breaking Change. testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0L).build()); testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); @@ -325,8 +352,11 @@ public void testWriteMixedSizeAndDuration() throws Exception { sendTestMessage(writer, new String[] {"B", "C"}); // Write triggered by batch size - assertEquals(0L, appendFuture1.get().getOffset()); - assertEquals(1L, appendFuture2.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, appendFuture1.get().getOffset()); + // assertEquals(1L, appendFuture2.get().getOffset()); + appendFuture1.get(); + appendFuture2.get(); ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"D"}); @@ -377,8 +407,11 @@ public void testFlowControlBehaviorBlock() throws Exception { .build()) .build(); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2L).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3L).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2L).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); @@ -399,7 +432,9 @@ public void run() { Thread.sleep(5000L); fakeExecutor.advanceTime(Duration.ofSeconds(10)); // The first requests gets back while the second one is blocked. - assertEquals(2L, appendFuture1.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(2L, appendFuture1.get().getOffset()); + appendFuture1.get(); Thread.sleep(5000L); // Wait is necessary for response to be scheduled before timer is advanced. fakeExecutor.advanceTime(Duration.ofSeconds(10)); @@ -431,8 +466,9 @@ public void testFlowControlBehaviorException() throws Exception { .getFlowControlSettings() .getMaxOutstandingElementCount() .longValue()); - - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); @@ -447,7 +483,9 @@ public void testFlowControlBehaviorException() throws Exception { "java.util.concurrent.ExecutionException: The maximum number of batch elements: 1 have been reached.", e.toString()); } - assertEquals(1L, appendFuture1.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(1L, appendFuture1.get().getOffset()); + appendFuture1.get(); } } @@ -465,11 +503,15 @@ public void testStreamReconnectionTransient() throws Exception { StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE); testBigQueryWrite.addException(transientError); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture future1 = sendTestMessage(writer, new String[] {"m1"}); assertEquals(false, future1.isDone()); // Retry is scheduled to be 7 seconds later. - assertEquals(0L, future1.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0L, future1.get().getOffset()); + future1.get(); writer.close(); } @@ -537,8 +579,11 @@ public void testOffset() throws Exception { .build()) .build()) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(10L).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(13L).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(10L).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(13L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L); ApiFuture appendFuture1 = writer.append(request1); AppendRowsRequest request2 = createAppendRequest(new String[] {"B", "C"}, 11L); @@ -547,10 +592,14 @@ public void testOffset() throws Exception { ApiFuture appendFuture3 = writer.append(request3); AppendRowsRequest request4 = createAppendRequest(new String[] {"G"}, 15L); ApiFuture appendFuture4 = writer.append(request4); - assertEquals(10L, appendFuture1.get().getOffset()); - assertEquals(11L, appendFuture2.get().getOffset()); - assertEquals(13L, appendFuture3.get().getOffset()); - assertEquals(15L, appendFuture4.get().getOffset()); + // assertEquals(10L, appendFuture1.get().getOffset()); + // assertEquals(11L, appendFuture2.get().getOffset()); + // assertEquals(13L, appendFuture3.get().getOffset()); + // assertEquals(15L, appendFuture4.get().getOffset()); + appendFuture1.get(); + appendFuture2.get(); + appendFuture3.get(); + appendFuture4.get(); } } @@ -564,12 +613,15 @@ public void testOffsetMismatch() throws Exception { .setElementCountThreshold(1L) .build()) .build()) { - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(11L).build()); + // Temp for Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(11L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L); ApiFuture appendFuture1 = writer.append(request1); appendFuture1.get(); - fail("Should throw exception"); + // Temp for Breaking Change. + // fail("Should throw exception"); } catch (Exception e) { assertEquals( "java.lang.IllegalStateException: The append result offset 11 does not match the expected offset 10.", @@ -845,9 +897,13 @@ public void testFlushAll() throws Exception { .build()) .build(); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + // Temp Breaking Change. + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java index af4c58341a..3cd3ca3671 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java @@ -246,7 +246,9 @@ public void testJsonStreamWriterBatchWriteWithCommittedStream() ApiFuture response = jsonStreamWriter.append(jsonArr, -1, /* allowUnknownFields */ false); - assertEquals(0, response.get().getOffset()); + // Temp for Breaking Change. + response.get(); + // assertEquals(0, response.get().getOffset()); LOG.info("Sending two more messages"); JSONObject foo1 = new JSONObject(); @@ -266,9 +268,10 @@ public void testJsonStreamWriterBatchWriteWithCommittedStream() jsonStreamWriter.append(jsonArr1, -1, /* allowUnknownFields */ false); ApiFuture response2 = jsonStreamWriter.append(jsonArr2, -1, /* allowUnknownFields */ false); - // Waiting for API breaking change to be generated in new client. + // Temp for Breaking Change. // assertEquals(1, response1.get().getOffset()); // assertEquals(3, response2.get().getOffset()); + response1.get(); response2.get(); TableResult result = @@ -324,7 +327,9 @@ public void testJsonStreamWriterSchemaUpdate() ApiFuture response = jsonStreamWriter.append(jsonArr, -1, /* allowUnknownFields */ false); - assertEquals(0, response.get().getOffset()); + // Temp for Breaking Change. + // assertEquals(0, response.get().getOffset()); + response.get(); // 2). Schema update and wait until querying it returns a new schema. try { com.google.cloud.bigquery.Table table = bigquery.getTable(DATASET, tableName); @@ -368,8 +373,9 @@ public void testJsonStreamWriterSchemaUpdate() for (int i = 1; i < 100; i++) { ApiFuture response2 = jsonStreamWriter.append(jsonArr2, -1, /* allowUnknownFields */ false); - // Waiting for API breaking change to be generated in new client. + // Temp for Breaking Change. // assertEquals(i, response2.get().getOffset()); + response2.get(); if (response2.get().hasUpdatedSchema()) { next = i; break; @@ -397,7 +403,7 @@ public void testJsonStreamWriterSchemaUpdate() for (int i = 0; i < 10; i++) { ApiFuture response3 = jsonStreamWriter.append(updatedJsonArr, -1, /* allowUnknownFields */ false); - // Waiting for API breaking change to be generated in new client. + // Temp for Breaking Change. // assertEquals(next + 1 + i, response3.get().getOffset()); response3.get(); } @@ -444,6 +450,7 @@ public void testComplicateSchemaWithPendingStream() .build()); // Waiting for API breaking change to be generated in new client. // assertEquals(1, response2.get().getOffset()); + response2.get(); // Nothing showed up since rows are not committed. TableResult result = @@ -462,8 +469,9 @@ public void testComplicateSchemaWithPendingStream() .setOffset(Int64Value.of(1L)) .build()); try { - // Waiting for API breaking change to be generated in new client. + // Temp for Breaking Change. // assertEquals(2, response3.get().getOffset()); + response3.get(); // fail("Append to finalized stream should fail."); } catch (Exception expected) { // The exception thrown is not stable. Opened a bug to fix it. @@ -511,7 +519,7 @@ public void testStreamError() throws IOException, InterruptedException, Executio ApiFuture response = streamWriter.append( createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build()); - // Waiting for API breaking change to be generated in new client. + // Temp for Breaking Change. // assertEquals(0L, response.get().getOffset()); response.get(); // Send in a bogus stream name should cause in connection error. @@ -533,6 +541,7 @@ public void testStreamError() throws IOException, InterruptedException, Executio createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build()); // Waiting for API breaking change to be generated in new client. // assertEquals(1L, response3.get().getOffset()); + response3.get(); } finally { } } @@ -552,8 +561,9 @@ public void testStreamReconnect() throws IOException, InterruptedException, Exec createAppendRequest(writeStream.getName(), new String[] {"aaa"}) .setOffset(Int64Value.of(0L)) .build()); - // Waiting for API breaking change to be generated in new client. + // Temp for Breaking Change. // assertEquals(0L, response.get().getOffset()); + response.get(); } try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) { @@ -564,8 +574,9 @@ public void testStreamReconnect() throws IOException, InterruptedException, Exec createAppendRequest(writeStream.getName(), new String[] {"bbb"}) .setOffset(Int64Value.of(1L)) .build()); - // Waiting for API breaking change to be generated in new client. + // Temp for Breaking Change. // assertEquals(1L, response.get().getOffset()); + response.get(); } } } From cc23c5491731a372c64a5c3a5a18886b1c961476 Mon Sep 17 00:00:00 2001 From: yirutang Date: Fri, 11 Dec 2020 10:53:19 -0800 Subject: [PATCH 4/5] . --- .../storage/v1beta2/it/ITBigQueryWriteManualClientTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java index 4a6fd0263d..c487da2df8 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java @@ -256,7 +256,11 @@ public void testJsonStreamWriterBatchWriteWithCommittedStream() JSONArray row = new JSONArray(new JSONObject[] {testStr, testNumerics, testDateTime}); ApiFuture response = +<<<<<<< HEAD jsonStreamWriter.append(jsonArr, -1, /* allowUnknownFields */ false); +======= + jsonStreamWriter.append(row, -1, /* allowUnknownFields */ false); +>>>>>>> 59f9f5c (.) // Temp for Breaking Change. response.get(); // assertEquals(0, response.get().getOffset()); From 424a2dc781389f86fb3b5174aae752dc619e52a1 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 11 Dec 2020 14:44:17 -0500 Subject: [PATCH 5/5] resolve merge conflict --- .../storage/v1beta2/it/ITBigQueryWriteManualClientTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java index c487da2df8..5dd73c0d80 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java @@ -256,11 +256,7 @@ public void testJsonStreamWriterBatchWriteWithCommittedStream() JSONArray row = new JSONArray(new JSONObject[] {testStr, testNumerics, testDateTime}); ApiFuture response = -<<<<<<< HEAD - jsonStreamWriter.append(jsonArr, -1, /* allowUnknownFields */ false); -======= jsonStreamWriter.append(row, -1, /* allowUnknownFields */ false); ->>>>>>> 59f9f5c (.) // Temp for Breaking Change. response.get(); // assertEquals(0, response.get().getOffset());