Skip to content

Commit

Permalink
fix: Remove flushAll method (#850)
Browse files Browse the repository at this point in the history
* .

* .

* .
  • Loading branch information
yirutang committed Feb 19, 2021
1 parent 6021920 commit 33a4502
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 66 deletions.
5 changes: 5 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Expand Up @@ -23,4 +23,9 @@
<from>com.google.protobuf.DynamicMessage convertJsonToProtoMessage(com.google.protobuf.Descriptors$Descriptor, org.json.JSONObject, boolean)</from>
<to>com.google.protobuf.DynamicMessage convertJsonToProtoMessage(com.google.protobuf.Descriptors$Descriptor, org.json.JSONObject)</to>
</difference>
<difference>
<className>com/google/cloud/bigquery/storage/v1beta2/StreamWriter</className>
<differenceType>7002</differenceType>
<method>void flushAll(long)</method>
</difference>
</differences>
Expand Up @@ -272,37 +272,9 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
messagesBatchLock.unlock();
appendAndRefreshAppendLock.unlock();
}

return outstandingAppend.appendResult;
}

/**
* This is the general flush method for asynchronise append operation. When you have outstanding
* append requests, calling flush will make sure all outstanding append requests completed and
* successful. Otherwise there will be an exception thrown.
*
* @throws Exception
*/
public void flushAll(long timeoutMillis) throws Exception {
appendAndRefreshAppendLock.lock();
try {
writeAllOutstanding();
synchronized (messagesWaiter) {
messagesWaiter.waitComplete(timeoutMillis);
}
} finally {
appendAndRefreshAppendLock.unlock();
}
exceptionLock.lock();
try {
if (streamException != null) {
throw new Exception(streamException);
}
} finally {
exceptionLock.unlock();
}
}

/**
* Re-establishes a stream connection.
*
Expand Down
Expand Up @@ -863,44 +863,6 @@ public void testExistingClient() throws Exception {
client.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
public void testFlushAll() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(2L)
.setDelayThreshold(Duration.ofSeconds(100000))
.build())
.build();

testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build())
.build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build())
.build());

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});
assertFalse(appendFuture3.isDone());
writer.flushAll(100000);
assertTrue(appendFuture3.isDone());

writer.close();
}

@Test
public void testDatasetTraceId() throws Exception {
StreamWriter writer =
Expand Down

0 comments on commit 33a4502

Please sign in to comment.