Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: more writeapi manual client test issues #241

Merged
merged 2 commits into from May 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -165,10 +165,14 @@ private StreamWriter(Builder builder)
Instant.ofEpochSecond(
stream.getCreateTime().getSeconds(), stream.getCreateTime().getNanos());
if (stream.getType() == Stream.WriteStream.Type.PENDING && stream.hasCommitTime()) {
backgroundResources.shutdown();
backgroundResources.awaitTermination(1, TimeUnit.MINUTES);
throw new IllegalStateException(
"Cannot write to a stream that is already committed: " + streamName);
}
if (createTime.plus(streamTTL).compareTo(Instant.now()) < 0) {
backgroundResources.shutdown();
backgroundResources.awaitTermination(1, TimeUnit.MINUTES);
throw new IllegalStateException(
"Cannot write to a stream that is already expired: " + streamName);
}
Expand Down Expand Up @@ -247,7 +251,7 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
*/
public void refreshAppend() throws IOException, InterruptedException {
synchronized (this) {
Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer.");
Preconditions.checkState(!shutdown.get(), "Cannot shut down on a shut-down writer.");
// There could be a moment, stub is not yet initialized.
if (clientStream != null) {
clientStream.closeSend();
Expand Down Expand Up @@ -475,6 +479,7 @@ public RetrySettings getRetrySettings() {
public void shutdown() {
Preconditions.checkState(
!shutdown.getAndSet(true), "Cannot shut down a writer already shut-down.");
LOG.info("Shutdown called on writer");
if (currentAlarmFuture != null && activeAlarm.getAndSet(false)) {
currentAlarmFuture.cancel(false);
}
Expand Down Expand Up @@ -684,10 +689,6 @@ > getApiMaxInflightRequests()) {
*/
public Builder setRetrySettings(RetrySettings retrySettings) {
Preconditions.checkNotNull(retrySettings);
Preconditions.checkArgument(
retrySettings.getTotalTimeout().compareTo(MIN_TOTAL_TIMEOUT) >= 0);
Preconditions.checkArgument(
retrySettings.getInitialRpcTimeout().compareTo(MIN_RPC_TIMEOUT) >= 0);
this.retrySettings = retrySettings;
return this;
}
Expand Down
Expand Up @@ -18,6 +18,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.protobuf.Descriptors.Descriptor;
import java.io.IOException;
import java.util.logging.Logger;
Expand Down Expand Up @@ -52,6 +54,15 @@ private WriterCache(BigQueryWriteClient stub, int maxTableEntry, SchemaCompact c
writerCache =
CacheBuilder.newBuilder()
.maximumSize(maxTableEntry)
.removalListener(
new RemovalListener<String, Cache<Descriptor, StreamWriter>>() {
@Override
public void onRemoval(
RemovalNotification<String, Cache<Descriptor, StreamWriter>>
removalNotification) {
removalNotification.getValue().invalidateAll();
}
})
.<String, Cache<Descriptor, StreamWriter>>build();
}

Expand Down Expand Up @@ -134,6 +145,14 @@ public StreamWriter getTableWriter(String tableName, Descriptor userSchema)
tableEntry =
CacheBuilder.newBuilder()
.maximumSize(MAX_WRITERS_PER_TABLE)
.removalListener(
new RemovalListener<Descriptor, StreamWriter>() {
@Override
public void onRemoval(
RemovalNotification<Descriptor, StreamWriter> removalNotification) {
removalNotification.getValue().close();
}
})
.<Descriptor, StreamWriter>build();
writer = CreateNewWriter(streamName);
tableEntry.put(userSchema, writer);
Expand Down
Expand Up @@ -33,6 +33,7 @@
import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.DataLossException;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.*;
Expand Down Expand Up @@ -437,7 +438,7 @@ public void testFlowControlBehaviorException() throws Exception {
}

@Test
public void testStreamReconnection() throws Exception {
public void testStreamReconnectionTransient() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
Expand All @@ -448,17 +449,27 @@ public void testStreamReconnection() throws Exception {
.build())
.build();

// Case 1: Request succeeded after retry since the error is transient.
StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE);
testBigQueryWrite.addException(transientError);
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
ApiFuture<AppendRowsResponse> future1 = sendTestMessage(writer, new String[] {"m1"});
assertEquals(false, future1.isDone());
// Retry is scheduled to be 7 seconds later.
assertEquals(0L, future1.get().getOffset());
writer.close();
}

LOG.info("======CASE II");
// Case 2 : Request failed since the error is not transient.
@Test
public void testStreamReconnectionPermanant() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setDelayThreshold(Duration.ofSeconds(100000))
.setElementCountThreshold(1L)
.build())
.build();
StatusRuntimeException permanentError = new StatusRuntimeException(Status.INVALID_ARGUMENT);
testBigQueryWrite.addException(permanentError);
ApiFuture<AppendRowsResponse> future2 = sendTestMessage(writer, new String[] {"m2"});
Expand All @@ -468,11 +479,26 @@ public void testStreamReconnection() throws Exception {
} catch (ExecutionException e) {
assertEquals(permanentError.toString(), e.getCause().getCause().toString());
}
writer.close();
}

LOG.info("======CASE III");
// Case 3: Failed after retried max retry times.
testBigQueryWrite.addException(transientError);
testBigQueryWrite.addException(transientError);
@Test
public void testStreamReconnectionExceedRetry() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setDelayThreshold(Duration.ofSeconds(100000))
.setElementCountThreshold(1L)
.build())
.setRetrySettings(
RetrySettings.newBuilder()
.setMaxRetryDelay(Duration.ofMillis(100))
.setMaxAttempts(1)
.build())
.build();
StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE);
testBigQueryWrite.addException(transientError);
testBigQueryWrite.addException(transientError);
ApiFuture<AppendRowsResponse> future3 = sendTestMessage(writer, new String[] {"m3"});
Expand Down Expand Up @@ -812,5 +838,7 @@ public void testExistingClient() throws Exception {
StreamWriter writer = StreamWriter.newBuilder(TEST_STREAM, client).build();
writer.close();
assertFalse(client.isShutdown());
client.shutdown();
client.awaitTermination(1, TimeUnit.MINUTES);
}
}
Expand Up @@ -33,6 +33,7 @@
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.junit.*;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -281,5 +282,11 @@ public void run() {
}
});
}
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOG.info(e.toString());
}
}
}