Skip to content

Commit

Permalink
fix: more writeapi manual client test issues (#241)
Browse files Browse the repository at this point in the history
* fix: more test issues

- Fix some unwanted exceptions in the test run by make sure writers are cleaned up.
- Fix another executor race issue.
- The reconnection test failure is wired, it seems the test was shut down while running. I split the tests into 3 and reduce its run length. It could be due to testing taking too long to run (each wait is 7 seconds and there were 4 waits).

	modified:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java
	modified:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java

* remove some unhonored retry setting checks
  • Loading branch information
yirutang committed May 4, 2020
1 parent 89c8623 commit 65c5ec9
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 13 deletions.
Expand Up @@ -165,10 +165,14 @@ private StreamWriter(Builder builder)
Instant.ofEpochSecond(
stream.getCreateTime().getSeconds(), stream.getCreateTime().getNanos());
if (stream.getType() == Stream.WriteStream.Type.PENDING && stream.hasCommitTime()) {
backgroundResources.shutdown();
backgroundResources.awaitTermination(1, TimeUnit.MINUTES);
throw new IllegalStateException(
"Cannot write to a stream that is already committed: " + streamName);
}
if (createTime.plus(streamTTL).compareTo(Instant.now()) < 0) {
backgroundResources.shutdown();
backgroundResources.awaitTermination(1, TimeUnit.MINUTES);
throw new IllegalStateException(
"Cannot write to a stream that is already expired: " + streamName);
}
Expand Down Expand Up @@ -247,7 +251,7 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
*/
public void refreshAppend() throws IOException, InterruptedException {
synchronized (this) {
Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer.");
Preconditions.checkState(!shutdown.get(), "Cannot shut down on a shut-down writer.");
// There could be a moment, stub is not yet initialized.
if (clientStream != null) {
clientStream.closeSend();
Expand Down Expand Up @@ -475,6 +479,7 @@ public RetrySettings getRetrySettings() {
public void shutdown() {
Preconditions.checkState(
!shutdown.getAndSet(true), "Cannot shut down a writer already shut-down.");
LOG.info("Shutdown called on writer");
if (currentAlarmFuture != null && activeAlarm.getAndSet(false)) {
currentAlarmFuture.cancel(false);
}
Expand Down Expand Up @@ -684,10 +689,6 @@ > getApiMaxInflightRequests()) {
*/
public Builder setRetrySettings(RetrySettings retrySettings) {
Preconditions.checkNotNull(retrySettings);
Preconditions.checkArgument(
retrySettings.getTotalTimeout().compareTo(MIN_TOTAL_TIMEOUT) >= 0);
Preconditions.checkArgument(
retrySettings.getInitialRpcTimeout().compareTo(MIN_RPC_TIMEOUT) >= 0);
this.retrySettings = retrySettings;
return this;
}
Expand Down
Expand Up @@ -18,6 +18,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.protobuf.Descriptors.Descriptor;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -53,6 +55,15 @@ private WriterCache(BigQueryWriteClient stub, int maxTableEntry, SchemaCompact c
writerCache =
CacheBuilder.newBuilder()
.maximumSize(maxTableEntry)
.removalListener(
new RemovalListener<String, Cache<Descriptor, StreamWriter>>() {
@Override
public void onRemoval(
RemovalNotification<String, Cache<Descriptor, StreamWriter>>
removalNotification) {
removalNotification.getValue().invalidateAll();
}
})
.<String, Cache<Descriptor, StreamWriter>>build();
}

Expand Down Expand Up @@ -135,6 +146,14 @@ public StreamWriter getTableWriter(String tableName, Descriptor userSchema)
tableEntry =
CacheBuilder.newBuilder()
.maximumSize(MAX_WRITERS_PER_TABLE)
.removalListener(
new RemovalListener<Descriptor, StreamWriter>() {
@Override
public void onRemoval(
RemovalNotification<Descriptor, StreamWriter> removalNotification) {
removalNotification.getValue().close();
}
})
.<Descriptor, StreamWriter>build();
writer = CreateNewWriter(streamName);
tableEntry.put(userSchema, writer);
Expand Down
Expand Up @@ -33,6 +33,7 @@
import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.DataLossException;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.*;
Expand Down Expand Up @@ -439,7 +440,7 @@ public void testFlowControlBehaviorException() throws Exception {
}

@Test
public void testStreamReconnection() throws Exception {
public void testStreamReconnectionTransient() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
Expand All @@ -450,17 +451,27 @@ public void testStreamReconnection() throws Exception {
.build())
.build();

// Case 1: Request succeeded after retry since the error is transient.
StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE);
testBigQueryWrite.addException(transientError);
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
ApiFuture<AppendRowsResponse> future1 = sendTestMessage(writer, new String[] {"m1"});
assertEquals(false, future1.isDone());
// Retry is scheduled to be 7 seconds later.
assertEquals(0L, future1.get().getOffset());
writer.close();
}

LOG.info("======CASE II");
// Case 2 : Request failed since the error is not transient.
@Test
public void testStreamReconnectionPermanant() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setDelayThreshold(Duration.ofSeconds(100000))
.setElementCountThreshold(1L)
.build())
.build();
StatusRuntimeException permanentError = new StatusRuntimeException(Status.INVALID_ARGUMENT);
testBigQueryWrite.addException(permanentError);
ApiFuture<AppendRowsResponse> future2 = sendTestMessage(writer, new String[] {"m2"});
Expand All @@ -470,11 +481,26 @@ public void testStreamReconnection() throws Exception {
} catch (ExecutionException e) {
assertEquals(permanentError.toString(), e.getCause().getCause().toString());
}
writer.close();
}

LOG.info("======CASE III");
// Case 3: Failed after retried max retry times.
testBigQueryWrite.addException(transientError);
testBigQueryWrite.addException(transientError);
@Test
public void testStreamReconnectionExceedRetry() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setDelayThreshold(Duration.ofSeconds(100000))
.setElementCountThreshold(1L)
.build())
.setRetrySettings(
RetrySettings.newBuilder()
.setMaxRetryDelay(Duration.ofMillis(100))
.setMaxAttempts(1)
.build())
.build();
StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE);
testBigQueryWrite.addException(transientError);
testBigQueryWrite.addException(transientError);
ApiFuture<AppendRowsResponse> future3 = sendTestMessage(writer, new String[] {"m3"});
Expand Down Expand Up @@ -814,5 +840,7 @@ public void testExistingClient() throws Exception {
StreamWriter writer = StreamWriter.newBuilder(TEST_STREAM, client).build();
writer.close();
assertFalse(client.isShutdown());
client.shutdown();
client.awaitTermination(1, TimeUnit.MINUTES);
}
}
Expand Up @@ -33,6 +33,7 @@
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.junit.*;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -285,5 +286,11 @@ public void run() {
}
});
}
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOG.info(e.toString());
}
}
}

0 comments on commit 65c5ec9

Please sign in to comment.