Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: flaky writeapi manual client tests #238

Merged
merged 2 commits into from May 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -102,4 +102,9 @@ public static void testSetStub(
BigQueryWriteClient stub, int maxTableEntry, SchemaCompact schemaCheck) {
cache = WriterCache.getTestInstance(stub, maxTableEntry, schemaCheck);
}

/** Clears the underlying cache and all the transport connections. */
public static void clearCache() {
cache.clear();
}
}
Expand Up @@ -20,6 +20,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.Descriptors.Descriptor;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -144,6 +145,22 @@ public StreamWriter getTableWriter(String tableName, Descriptor userSchema)
return writer;
}

/** Clear the cache and close all the writers in the cache. */
public void clear() {
synchronized (this) {
ConcurrentMap<String, Cache<Descriptor, StreamWriter>> map = writerCache.asMap();
for (String key : map.keySet()) {
Cache<Descriptor, StreamWriter> entry = writerCache.getIfPresent(key);
ConcurrentMap<Descriptor, StreamWriter> entryMap = entry.asMap();
for (Descriptor descriptor : entryMap.keySet()) {
StreamWriter writer = entry.getIfPresent(descriptor);
writer.close();
}
}
writerCache.cleanUp();
}
}

@VisibleForTesting
public long cachedTableCount() {
synchronized (writerCache) {
Expand Down
Expand Up @@ -24,15 +24,15 @@
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.Test.*;
import com.google.common.collect.Sets;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.junit.*;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -42,6 +42,8 @@

@RunWith(JUnit4.class)
public class DirectWriterTest {
private static final Logger LOG = Logger.getLogger(DirectWriterTest.class.getName());

private static final String TEST_TABLE = "projects/p/datasets/d/tables/t";
private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
private static final String TEST_STREAM_2 = "projects/p/datasets/d/tables/t/streams/s2";
Expand Down Expand Up @@ -86,7 +88,7 @@ public void tearDown() throws Exception {
}

/** Response mocks for create a new writer */
void WriterCreationResponseMock(String testStreamName, List<Long> responseOffsets) {
void WriterCreationResponseMock(String testStreamName, Set<Long> responseOffsets) {
// Response from CreateWriteStream
Stream.WriteStream expectedResponse =
Stream.WriteStream.newBuilder().setName(testStreamName).build();
Expand Down Expand Up @@ -117,7 +119,7 @@ public void testWriteSuccess() throws Exception {
FooType m1 = FooType.newBuilder().setFoo("m1").build();
FooType m2 = FooType.newBuilder().setFoo("m2").build();

WriterCreationResponseMock(TEST_STREAM, Arrays.asList(Long.valueOf(0L)));
WriterCreationResponseMock(TEST_STREAM, Sets.newHashSet(Long.valueOf(0L)));
ApiFuture<Long> ret = DirectWriter.<FooType>append(TEST_TABLE, Arrays.asList(m1, m2));
verify(schemaCheck).check(TEST_TABLE, FooType.getDescriptor());
assertEquals(Long.valueOf(0L), ret.get());
Expand Down Expand Up @@ -159,7 +161,7 @@ public void testWriteSuccess() throws Exception {
assertEquals(expectRequest.toString(), actualRequests.get(3).toString());

// Write with a different schema.
WriterCreationResponseMock(TEST_STREAM_2, Arrays.asList(Long.valueOf(0L)));
WriterCreationResponseMock(TEST_STREAM_2, Sets.newHashSet(Long.valueOf(0L)));
AllSupportedTypes m3 = AllSupportedTypes.newBuilder().setStringValue("s").build();
ret = DirectWriter.<AllSupportedTypes>append(TEST_TABLE, Arrays.asList(m3));
verify(schemaCheck).check(TEST_TABLE, AllSupportedTypes.getDescriptor());
Expand All @@ -181,6 +183,8 @@ public void testWriteSuccess() throws Exception {
((Storage.CreateWriteStreamRequest) actualRequests.get(4)).getWriteStream().getType());
assertEquals(TEST_STREAM_2, ((Storage.GetWriteStreamRequest) actualRequests.get(5)).getName());
assertEquals(expectRequest.toString(), actualRequests.get(6).toString());

DirectWriter.clearCache();
}

@Test
Expand All @@ -195,15 +199,17 @@ public void testWriteBadTableName() throws Exception {
} catch (IllegalArgumentException expected) {
assertEquals("Invalid table name: abc", expected.getMessage());
}

DirectWriter.clearCache();
}

@Test
public void testConcurrentAccess() throws Exception {
WriterCache cache = WriterCache.getTestInstance(client, 2, schemaCheck);
DirectWriter.testSetStub(client, 2, schemaCheck);
final FooType m1 = FooType.newBuilder().setFoo("m1").build();
final FooType m2 = FooType.newBuilder().setFoo("m2").build();
final List<Long> expectedOffset =
Arrays.asList(
final Set<Long> expectedOffset =
Sets.newHashSet(
Long.valueOf(0L),
Long.valueOf(2L),
Long.valueOf(4L),
Expand All @@ -221,12 +227,21 @@ public void run() {
try {
ApiFuture<Long> result =
DirectWriter.<FooType>append(TEST_TABLE, Arrays.asList(m1, m2));
assertTrue(expectedOffset.remove(result.get()));
} catch (IOException | InterruptedException | ExecutionException e) {
fail(e.getMessage());
synchronized (expectedOffset) {
assertTrue(expectedOffset.remove(result.get()));
}
} catch (Exception e) {
fail(e.toString());
}
}
});
}
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
LOG.info(e.toString());
}
DirectWriter.clearCache();
}
}
Expand Up @@ -135,8 +135,9 @@ private ApiFuture<AppendRowsResponse> sendTestMessage(StreamWriter writer, Strin

@Test
public void testTableName() throws Exception {
StreamWriter writer = getTestStreamWriterBuilder().build();
assertEquals("projects/p/datasets/d/tables/t", writer.getTableNameString());
try (StreamWriter writer = getTestStreamWriterBuilder().build()) {
assertEquals("projects/p/datasets/d/tables/t", writer.getTableNameString());
}
}

@Test
Expand Down Expand Up @@ -175,7 +176,7 @@ public void testAppendByDuration() throws Exception {
.getSerializedRowsCount());
assertEquals(
true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema());
writer.shutdown();
writer.close();
}

@Test
Expand Down Expand Up @@ -228,7 +229,7 @@ public void testAppendByNumBatchedMessages() throws Exception {
.getSerializedRowsCount());
assertEquals(
false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema());
writer.shutdown();
writer.close();
}

@Test
Expand Down Expand Up @@ -264,7 +265,7 @@ public void testAppendByNumBytes() throws Exception {

assertEquals(3, testBigQueryWrite.getAppendRequests().size());

writer.shutdown();
writer.close();
}

@Test
Expand Down Expand Up @@ -429,7 +430,8 @@ public void testFlowControlBehaviorException() throws Exception {
try {
appendFuture2.get();
Assert.fail("This should fail");
} catch (ExecutionException e) {
} catch (Exception e) {
LOG.info("ControlFlow test exception: " + e.toString());
assertEquals("The maximum number of batch elements: 1 have been reached.", e.getMessage());
}
assertEquals(1L, appendFuture1.get().getOffset());
Expand Down
Expand Up @@ -143,6 +143,7 @@ public void testCreateNewWriter() throws Exception {
assertEquals(TEST_TABLE, writer.getTableNameString());
assertEquals(TEST_STREAM, writer.getStreamNameString());
assertEquals(1, cache.cachedTableCount());
cache.clear();
}

@Test
Expand Down Expand Up @@ -173,6 +174,7 @@ public void testWriterExpired() throws Exception {
"Cannot write to a stream that is already expired: projects/p/datasets/d/tables/t/streams/s",
e.getMessage());
}
cache.clear();
}

@Test
Expand Down Expand Up @@ -216,6 +218,7 @@ public void testWriterWithNewSchema() throws Exception {
assertEquals(TEST_STREAM_3, writer4.getStreamNameString());
assertEquals(TEST_STREAM_4, writer5.getStreamNameString());
assertEquals(1, cache.cachedTableCount());
cache.clear();
}

@Test
Expand Down Expand Up @@ -259,6 +262,7 @@ public void testWriterWithDifferentTable() throws Exception {
assertEquals(TEST_STREAM_31, writer4.getStreamNameString());
assertEquals(TEST_STREAM, writer5.getStreamNameString());
assertEquals(2, cache.cachedTableCount());
cache.clear();
}

@Test
Expand All @@ -275,7 +279,7 @@ public void testConcurrentAccess() throws Exception {
public void run() {
try {
assertTrue(cache.getTableWriter(TEST_TABLE, FooType.getDescriptor()) != null);
} catch (IOException | InterruptedException e) {
} catch (Exception e) {
fail(e.getMessage());
}
}
Expand Down
Expand Up @@ -391,5 +391,12 @@ public Long call() throws IOException, InterruptedException, ExecutionException
assertTrue(expectedOffset.remove(response.get()));
}
assertTrue(expectedOffset.isEmpty());
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
LOG.info(e.toString());
}
DirectWriter.clearCache();
}
}