Skip to content

Commit

Permalink
fix: flaky writeapi manual client tests (#238)
Browse files Browse the repository at this point in the history
1. There are some warnings in the test runs saying that open connections are not closed. Make sure everything is shutdown after test.
2. There is some unexpected exceptions thrown which is not caught. Now catch a more general exception and also fix some issues that incorrectly calling remove on List (which is not supported).
3. Make sure the executor in the tests are finished running, so it will not run into a race with test shutdown.
  • Loading branch information
yirutang committed May 1, 2020
1 parent e0b0fcd commit 89c8623
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 20 deletions.
Expand Up @@ -102,4 +102,9 @@ public static void testSetStub(
BigQueryWriteClient stub, int maxTableEntry, SchemaCompact schemaCheck) {
cache = WriterCache.getTestInstance(stub, maxTableEntry, schemaCheck);
}

/** Clears the underlying cache and all the transport connections. */
public static void clearCache() {
cache.clear();
}
}
Expand Up @@ -20,6 +20,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.Descriptors.Descriptor;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -144,6 +145,22 @@ public StreamWriter getTableWriter(String tableName, Descriptor userSchema)
return writer;
}

/** Clear the cache and close all the writers in the cache. */
public void clear() {
synchronized (this) {
ConcurrentMap<String, Cache<Descriptor, StreamWriter>> map = writerCache.asMap();
for (String key : map.keySet()) {
Cache<Descriptor, StreamWriter> entry = writerCache.getIfPresent(key);
ConcurrentMap<Descriptor, StreamWriter> entryMap = entry.asMap();
for (Descriptor descriptor : entryMap.keySet()) {
StreamWriter writer = entry.getIfPresent(descriptor);
writer.close();
}
}
writerCache.cleanUp();
}
}

@VisibleForTesting
public long cachedTableCount() {
synchronized (writerCache) {
Expand Down
Expand Up @@ -24,15 +24,15 @@
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.Test.*;
import com.google.common.collect.Sets;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.junit.*;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -42,6 +42,8 @@

@RunWith(JUnit4.class)
public class DirectWriterTest {
private static final Logger LOG = Logger.getLogger(DirectWriterTest.class.getName());

private static final String TEST_TABLE = "projects/p/datasets/d/tables/t";
private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
private static final String TEST_STREAM_2 = "projects/p/datasets/d/tables/t/streams/s2";
Expand Down Expand Up @@ -86,7 +88,7 @@ public void tearDown() throws Exception {
}

/** Response mocks for create a new writer */
void WriterCreationResponseMock(String testStreamName, List<Long> responseOffsets) {
void WriterCreationResponseMock(String testStreamName, Set<Long> responseOffsets) {
// Response from CreateWriteStream
Stream.WriteStream expectedResponse =
Stream.WriteStream.newBuilder().setName(testStreamName).build();
Expand Down Expand Up @@ -117,7 +119,7 @@ public void testWriteSuccess() throws Exception {
FooType m1 = FooType.newBuilder().setFoo("m1").build();
FooType m2 = FooType.newBuilder().setFoo("m2").build();

WriterCreationResponseMock(TEST_STREAM, Arrays.asList(Long.valueOf(0L)));
WriterCreationResponseMock(TEST_STREAM, Sets.newHashSet(Long.valueOf(0L)));
ApiFuture<Long> ret = DirectWriter.<FooType>append(TEST_TABLE, Arrays.asList(m1, m2));
verify(schemaCheck).check(TEST_TABLE, FooType.getDescriptor());
assertEquals(Long.valueOf(0L), ret.get());
Expand Down Expand Up @@ -159,7 +161,7 @@ public void testWriteSuccess() throws Exception {
assertEquals(expectRequest.toString(), actualRequests.get(3).toString());

// Write with a different schema.
WriterCreationResponseMock(TEST_STREAM_2, Arrays.asList(Long.valueOf(0L)));
WriterCreationResponseMock(TEST_STREAM_2, Sets.newHashSet(Long.valueOf(0L)));
AllSupportedTypes m3 = AllSupportedTypes.newBuilder().setStringValue("s").build();
ret = DirectWriter.<AllSupportedTypes>append(TEST_TABLE, Arrays.asList(m3));
verify(schemaCheck).check(TEST_TABLE, AllSupportedTypes.getDescriptor());
Expand All @@ -181,6 +183,8 @@ public void testWriteSuccess() throws Exception {
((Storage.CreateWriteStreamRequest) actualRequests.get(4)).getWriteStream().getType());
assertEquals(TEST_STREAM_2, ((Storage.GetWriteStreamRequest) actualRequests.get(5)).getName());
assertEquals(expectRequest.toString(), actualRequests.get(6).toString());

DirectWriter.clearCache();
}

@Test
Expand All @@ -195,15 +199,17 @@ public void testWriteBadTableName() throws Exception {
} catch (IllegalArgumentException expected) {
assertEquals("Invalid table name: abc", expected.getMessage());
}

DirectWriter.clearCache();
}

@Test
public void testConcurrentAccess() throws Exception {
WriterCache cache = WriterCache.getTestInstance(client, 2, schemaCheck);
DirectWriter.testSetStub(client, 2, schemaCheck);
final FooType m1 = FooType.newBuilder().setFoo("m1").build();
final FooType m2 = FooType.newBuilder().setFoo("m2").build();
final List<Long> expectedOffset =
Arrays.asList(
final Set<Long> expectedOffset =
Sets.newHashSet(
Long.valueOf(0L),
Long.valueOf(2L),
Long.valueOf(4L),
Expand All @@ -221,12 +227,21 @@ public void run() {
try {
ApiFuture<Long> result =
DirectWriter.<FooType>append(TEST_TABLE, Arrays.asList(m1, m2));
assertTrue(expectedOffset.remove(result.get()));
} catch (IOException | InterruptedException | ExecutionException e) {
fail(e.getMessage());
synchronized (expectedOffset) {
assertTrue(expectedOffset.remove(result.get()));
}
} catch (Exception e) {
fail(e.toString());
}
}
});
}
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
LOG.info(e.toString());
}
DirectWriter.clearCache();
}
}
Expand Up @@ -135,8 +135,9 @@ private ApiFuture<AppendRowsResponse> sendTestMessage(StreamWriter writer, Strin

@Test
public void testTableName() throws Exception {
StreamWriter writer = getTestStreamWriterBuilder().build();
assertEquals("projects/p/datasets/d/tables/t", writer.getTableNameString());
try (StreamWriter writer = getTestStreamWriterBuilder().build()) {
assertEquals("projects/p/datasets/d/tables/t", writer.getTableNameString());
}
}

@Test
Expand Down Expand Up @@ -175,7 +176,7 @@ public void testAppendByDuration() throws Exception {
.getSerializedRowsCount());
assertEquals(
true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema());
writer.shutdown();
writer.close();
}

@Test
Expand Down Expand Up @@ -228,7 +229,7 @@ public void testAppendByNumBatchedMessages() throws Exception {
.getSerializedRowsCount());
assertEquals(
false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema());
writer.shutdown();
writer.close();
}

@Test
Expand Down Expand Up @@ -264,7 +265,7 @@ public void testAppendByNumBytes() throws Exception {

assertEquals(3, testBigQueryWrite.getAppendRequests().size());

writer.shutdown();
writer.close();
}

@Test
Expand Down Expand Up @@ -429,7 +430,8 @@ public void testFlowControlBehaviorException() throws Exception {
try {
appendFuture2.get();
Assert.fail("This should fail");
} catch (ExecutionException e) {
} catch (Exception e) {
LOG.info("ControlFlow test exception: " + e.toString());
assertEquals("The maximum number of batch elements: 1 have been reached.", e.getMessage());
}
assertEquals(1L, appendFuture1.get().getOffset());
Expand Down
Expand Up @@ -143,6 +143,7 @@ public void testCreateNewWriter() throws Exception {
assertEquals(TEST_TABLE, writer.getTableNameString());
assertEquals(TEST_STREAM, writer.getStreamNameString());
assertEquals(1, cache.cachedTableCount());
cache.clear();
}

@Test
Expand Down Expand Up @@ -173,6 +174,7 @@ public void testWriterExpired() throws Exception {
"Cannot write to a stream that is already expired: projects/p/datasets/d/tables/t/streams/s",
e.getMessage());
}
cache.clear();
}

@Test
Expand Down Expand Up @@ -216,6 +218,7 @@ public void testWriterWithNewSchema() throws Exception {
assertEquals(TEST_STREAM_3, writer4.getStreamNameString());
assertEquals(TEST_STREAM_4, writer5.getStreamNameString());
assertEquals(1, cache.cachedTableCount());
cache.clear();
}

@Test
Expand Down Expand Up @@ -259,6 +262,7 @@ public void testWriterWithDifferentTable() throws Exception {
assertEquals(TEST_STREAM_31, writer4.getStreamNameString());
assertEquals(TEST_STREAM, writer5.getStreamNameString());
assertEquals(2, cache.cachedTableCount());
cache.clear();
}

@Test
Expand All @@ -275,7 +279,7 @@ public void testConcurrentAccess() throws Exception {
public void run() {
try {
assertTrue(cache.getTableWriter(TEST_TABLE, FooType.getDescriptor()) != null);
} catch (IOException | InterruptedException e) {
} catch (Exception e) {
fail(e.getMessage());
}
}
Expand Down
Expand Up @@ -391,5 +391,12 @@ public Long call() throws IOException, InterruptedException, ExecutionException
assertTrue(expectedOffset.remove(response.get()));
}
assertTrue(expectedOffset.isEmpty());
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
LOG.info(e.toString());
}
DirectWriter.clearCache();
}
}

0 comments on commit 89c8623

Please sign in to comment.