From ed718c14289a3ea41f6ef7cccd8b00d7c7c0ba6c Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Thu, 16 Apr 2020 12:49:37 -0700 Subject: [PATCH] feat: Direct writer (#165) * feat:Direct Writer new file: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java new file: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java new file: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java new file: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java r 39ea964 feat:Direct Writer r de2cb8c feat:Direct Writer 2 pick 8e67681 feat:direct writer 3 new file: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java new file: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java new file: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java new file: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java * feat:Direct Writer 2 * feat:direct writer 3 modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/MockBigQueryWriteImpl.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java * Fix a logging * Add very basic schema compact check modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java new file: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java new file: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java * fix e2e modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java --- .../storage/v1alpha2/DirectWriter.java | 105 +++++++ .../storage/v1alpha2/SchemaCompact.java | 99 ++++++ .../storage/v1alpha2/StreamWriter.java | 92 ++++-- .../storage/v1alpha2/WriterCache.java | 153 ++++++++++ .../storage/v1alpha2/DirectWriterTest.java | 232 ++++++++++++++ .../storage/v1alpha2/FakeBigQueryWrite.java | 6 + .../v1alpha2/FakeBigQueryWriteImpl.java | 28 ++ .../v1alpha2/MockBigQueryWriteImpl.java | 3 +- .../storage/v1alpha2/SchemaCompactTest.java | 127 ++++++++ .../storage/v1alpha2/StreamWriterTest.java | 16 + .../storage/v1alpha2/WriterCacheTest.java | 285 ++++++++++++++++++ .../it/ITBigQueryWriteManualClientTest.java | 123 +++++--- 12 files changed, 1211 insertions(+), 58 deletions(-) create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java new file mode 100644 index 0000000000..295638f74f --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java @@ -0,0 +1,105 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1alpha2; + +import com.google.api.core.*; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.InvalidArgumentException; +import com.google.cloud.bigquery.storage.v1alpha2.ProtoBufProto.ProtoRows; +import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsRequest; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import io.grpc.Status; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Logger; + +/** + * Writer that can help user to write data to BigQuery. This is a simplified version of the Write + * API. For users writing with COMMITTED stream and don't care about row deduplication, it is + * recommended to use this Writer. + * + *
{@code
+ * DataProto data;
+ * ApiFuture response = DirectWriter.append("projects/pid/datasets/did/tables/tid", Arrays.asList(data1));
+ * }
+ * + *

{@link DirectWriter} will use the credentials set on the channel, which uses application + * default credentials through {@link GoogleCredentials#getApplicationDefault} by default. + */ +public class DirectWriter { + private static final Logger LOG = Logger.getLogger(DirectWriter.class.getName()); + private static WriterCache cache = null; + private static Lock cacheLock = new ReentrantLock(); + + /** + * Append rows to the given table. + * + * @param tableName table name in the form of "projects/{pName}/datasets/{dName}/tables/{tName}" + * @param protoRows rows in proto buffer format. + * @return A future that contains the offset at which the append happened. Only when the future + * returns with valid offset, then the append actually happened. + * @throws IOException, InterruptedException, InvalidArgumentException + */ + public static ApiFuture append(String tableName, List protoRows) + throws IOException, InterruptedException, InvalidArgumentException { + if (protoRows.isEmpty()) { + throw new InvalidArgumentException( + new Exception("Empty rows are not allowed"), + GrpcStatusCode.of(Status.Code.INVALID_ARGUMENT), + false); + } + try { + cacheLock.lock(); + if (cache == null) { + cache = WriterCache.getInstance(); + } + } finally { + cacheLock.unlock(); + } + + StreamWriter writer = cache.getTableWriter(tableName, protoRows.get(0).getDescriptorForType()); + ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); + Descriptors.Descriptor descriptor = null; + for (Message protoRow : protoRows) { + rowsBuilder.addSerializedRows(protoRow.toByteString()); + } + + AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder(); + data.setWriterSchema(ProtoSchemaConverter.convert(protoRows.get(0).getDescriptorForType())); + data.setRows(rowsBuilder.build()); + + return ApiFutures.transform( + writer.append(AppendRowsRequest.newBuilder().setProtoRows(data.build()).build()), + new ApiFunction() { + @Override + public Long apply(Storage.AppendRowsResponse appendRowsResponse) { + return Long.valueOf(appendRowsResponse.getOffset()); + } + }, + MoreExecutors.directExecutor()); + } + + @VisibleForTesting + public static void testSetStub( + BigQueryWriteClient stub, int maxTableEntry, SchemaCompact schemaCheck) { + cache = WriterCache.getTestInstance(stub, maxTableEntry, schemaCheck); + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java new file mode 100644 index 0000000000..3f17f44951 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java @@ -0,0 +1,99 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1alpha2; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Descriptors; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A class that checks the schema compatibility between user schema in proto descriptor and Bigquery + * table schema. If this check is passed, then user can write to BigQuery table using the user + * schema, otherwise the write will fail. + * + *

The implementation as of now is not complete, which measn, if this check passed, there is + * still a possbility of writing will fail. + */ +public class SchemaCompact { + private BigQuery bigquery; + private static SchemaCompact compact; + private static String tablePatternString = "projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)"; + private static Pattern tablePattern = Pattern.compile(tablePatternString); + + private SchemaCompact(BigQuery bigquery) { + this.bigquery = bigquery; + } + + /** + * Gets a singleton {code SchemaCompact} object. + * + * @return + */ + public static SchemaCompact getInstance() { + if (compact == null) { + RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create(); + compact = new SchemaCompact(bigqueryHelper.getOptions().getService()); + } + return compact; + } + + /** + * Gets a {code SchemaCompact} object with custom BigQuery stub. + * + * @param bigquery + * @return + */ + @VisibleForTesting + public static SchemaCompact getInstance(BigQuery bigquery) { + return new SchemaCompact(bigquery); + } + + private TableId getTableId(String tableName) { + Matcher matcher = tablePattern.matcher(tableName); + if (!matcher.matches() || matcher.groupCount() != 3) { + throw new IllegalArgumentException("Invalid table name: " + tableName); + } + return TableId.of(matcher.group(1), matcher.group(2), matcher.group(3)); + } + + /** + * Checks if the userSchema is compatible with the table's current schema for writing. The current + * implementatoin is not complete. If the check failed, the write couldn't succeed. + * + * @param tableName The name of the table to write to. + * @param userSchema The schema user uses to append data. + * @throws IllegalArgumentException the check failed. + */ + public void check(String tableName, Descriptors.Descriptor userSchema) + throws IllegalArgumentException { + Table table = bigquery.getTable(getTableId(tableName)); + Schema schema = table.getDefinition().getSchema(); + // TODO: We only have very limited check here. More checks to be added. + if (schema.getFields().size() != userSchema.getFields().size()) { + throw new IllegalArgumentException( + "User schema doesn't have expected field number with BigQuery table schema, expected: " + + schema.getFields().size() + + " actual: " + + userSchema.getFields().size()); + } + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java index 5b8a07177b..78e23458ab 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java @@ -46,11 +46,18 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.threeten.bp.Duration; +import org.threeten.bp.Instant; /** * A BigQuery Stream Writer that can be used to write data into BigQuery Table. * + *

This is to be used to managed streaming write when you are working with PENDING streams or + * want to explicitly manage offset. In that most common cases when writing with COMMITTED stream + * without offset, please use a simpler writer {@code DirectWriter}. + * *

A {@link StreamWrier} provides built-in capabilities to: handle batching of messages; * controlling memory utilization (through flow control); automatic connection re-establishment and * request cleanup (only keeps write schema on first request in the stream). @@ -68,7 +75,13 @@ public class StreamWriter implements AutoCloseable { private static final Logger LOG = Logger.getLogger(StreamWriter.class.getName()); + private static String streamPatternString = + "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+"; + + private static Pattern streamPattern = Pattern.compile(streamPatternString); + private final String streamName; + private final String tableName; private final BatchingSettings batchingSettings; private final RetrySettings retrySettings; @@ -92,6 +105,9 @@ public class StreamWriter implements AutoCloseable { private final AtomicBoolean activeAlarm; private ScheduledFuture currentAlarmFuture; + private Instant createTime; + private Duration streamTTL = Duration.ofDays(1); + private Integer currentRetries = 0; /** The maximum size of one request. Defined by the API. */ @@ -104,12 +120,18 @@ public static long getApiMaxInflightRequests() { return 5000L; } - private StreamWriter(Builder builder) throws IOException { + private StreamWriter(Builder builder) + throws IllegalArgumentException, IOException, InterruptedException { + Matcher matcher = streamPattern.matcher(builder.streamName); + if (!matcher.matches()) { + throw new IllegalArgumentException("Invalid stream name: " + builder.streamName); + } streamName = builder.streamName; + tableName = matcher.group(1); this.batchingSettings = builder.batchingSettings; this.retrySettings = builder.retrySettings; - this.messagesBatch = new MessagesBatch(batchingSettings); + this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName); messagesBatchLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); executor = builder.executorProvider.getExecutor(); @@ -129,6 +151,19 @@ private StreamWriter(Builder builder) throws IOException { .build(); shutdown = new AtomicBoolean(false); refreshAppend(); + Stream.WriteStream stream = + stub.getWriteStream(Storage.GetWriteStreamRequest.newBuilder().setName(streamName).build()); + createTime = + Instant.ofEpochSecond( + stream.getCreateTime().getSeconds(), stream.getCreateTime().getNanos()); + if (stream.getType() == Stream.WriteStream.Type.PENDING && stream.hasCommitTime()) { + throw new IllegalStateException( + "Cannot write to a stream that is already committed: " + streamName); + } + if (createTime.plus(streamTTL).compareTo(Instant.now()) < 0) { + throw new IllegalStateException( + "Cannot write to a stream that is already expired: " + streamName); + } } /** Stream name we are writing to. */ @@ -136,6 +171,16 @@ public String getStreamNameString() { return streamName; } + /** Table name we are writing to. */ + public String getTableNameString() { + return tableName; + } + + /** Returns if a stream has expired. */ + public Boolean expired() { + return createTime.plus(streamTTL).compareTo(Instant.now()) < 0; + } + /** * Schedules the writing of a message. The write of the message may occur immediately or be * delayed based on the writer batching options. @@ -192,11 +237,13 @@ public ApiFuture append(AppendRowsRequest message) { * * @throws IOException */ - private void refreshAppend() throws IOException { + public void refreshAppend() throws IOException, InterruptedException { synchronized (this) { Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer."); if (stub != null) { + clientStream.closeSend(); stub.shutdown(); + stub.awaitTermination(1, TimeUnit.MINUTES); } backgroundResourceList.remove(stub); stub = BigQueryWriteClient.create(stubSettings); @@ -212,13 +259,14 @@ private void refreshAppend() throws IOException { } } catch (InterruptedException expected) { } + LOG.info("Write Stream " + streamName + " connection established"); } private void setupAlarm() { if (!messagesBatch.isEmpty()) { if (!activeAlarm.getAndSet(true)) { long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis(); - LOG.log(Level.INFO, "Setting up alarm for the next {0} ms.", delayThresholdMs); + LOG.log(Level.FINE, "Setting up alarm for the next {0} ms.", delayThresholdMs); currentAlarmFuture = executor.schedule( new Runnable() { @@ -281,6 +329,7 @@ private void writeBatch(final InflightBatch inflightBatch) { /** Close the stream writer. Shut down all resources. */ @Override public void close() { + LOG.info("Closing stream writer"); shutdown(); try { awaitTermination(1, TimeUnit.MINUTES); @@ -300,10 +349,12 @@ private static final class InflightBatch { int batchSizeBytes; long expectedOffset; Boolean attachSchema; + String streamName; InflightBatch( List inflightRequests, int batchSizeBytes, + String streamName, Boolean attachSchema) { this.inflightRequests = inflightRequests; this.offsetList = new ArrayList(inflightRequests.size()); @@ -319,6 +370,7 @@ private static final class InflightBatch { creationTime = System.currentTimeMillis(); this.batchSizeBytes = batchSizeBytes; this.attachSchema = attachSchema; + this.streamName = streamName; } int count() { @@ -345,15 +397,18 @@ private AppendRowsRequest getMergedRequest() throws IllegalStateException { } AppendRowsRequest.ProtoData.Builder data = inflightRequests.get(0).message.getProtoRows().toBuilder().setRows(rowsBuilder.build()); + AppendRowsRequest.Builder requestBuilder = inflightRequests.get(0).message.toBuilder(); if (!attachSchema) { data.clearWriterSchema(); + requestBuilder.clearWriteStream(); } else { if (!data.hasWriterSchema()) { throw new IllegalStateException( "The first message on the connection must have writer schema set"); } + requestBuilder.setWriteStream(streamName); } - return inflightRequests.get(0).message.toBuilder().setProtoRows(data.build()).build(); + return requestBuilder.setProtoRows(data.build()).build(); } private void onFailure(Throwable t) { @@ -453,13 +508,8 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted * WriteStream response = bigQueryWriteClient.createWriteStream(request); * stream = response.getName(); * } - * WriteStream writer = WriteStream.newBuilder(stream).build(); - * try { - * // ... - * } finally { - * // When finished with the writer, make sure to shutdown to free up resources. - * writer.shutdown(); - * writer.awaitTermination(1, TimeUnit.MINUTES); + * try (WriteStream writer = WriteStream.newBuilder(stream).build()) { + * //... * } * } */ @@ -467,7 +517,7 @@ public static Builder newBuilder(String streamName) { return new Builder(streamName); } - /** A builder of {@link Publisher}s. */ + /** A builder of {@link StreamWriter}s. */ public static final class Builder { static final Duration MIN_TOTAL_TIMEOUT = Duration.ofSeconds(10); static final Duration MIN_RPC_TIMEOUT = Duration.ofMillis(10); @@ -475,7 +525,7 @@ public static final class Builder { // Meaningful defaults. static final long DEFAULT_ELEMENT_COUNT_THRESHOLD = 100L; static final long DEFAULT_REQUEST_BYTES_THRESHOLD = 100 * 1024L; // 100 kB - static final Duration DEFAULT_DELAY_THRESHOLD = Duration.ofMillis(1); + static final Duration DEFAULT_DELAY_THRESHOLD = Duration.ofMillis(10); static final FlowControlSettings DEFAULT_FLOW_CONTROL_SETTINGS = FlowControlSettings.newBuilder() .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) @@ -515,9 +565,6 @@ public static final class Builder { private TransportChannelProvider channelProvider = BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build(); - private HeaderProvider headerProvider = new NoHeaderProvider(); - private HeaderProvider internalHeaderProvider = - BigQueryWriteSettings.defaultApiClientHeaderProviderBuilder().build(); ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER; private CredentialsProvider credentialsProvider = BigQueryWriteSettings.defaultCredentialsProviderBuilder().build(); @@ -647,7 +694,7 @@ public Builder setEndpoint(String endpoint) { } /** Builds the {@code StreamWriter}. */ - public StreamWriter build() throws IOException { + public StreamWriter build() throws IllegalArgumentException, IOException, InterruptedException { return new StreamWriter(this); } } @@ -785,7 +832,7 @@ public void onError(Throwable t) { try { // Establish a new connection. streamWriter.refreshAppend(); - } catch (IOException e) { + } catch (IOException | InterruptedException e) { LOG.info("Failed to establish a new connection"); } } @@ -805,15 +852,18 @@ private static class MessagesBatch { private int batchedBytes; private final BatchingSettings batchingSettings; private Boolean attachSchema = true; + private final String streamName; - private MessagesBatch(BatchingSettings batchingSettings) { + private MessagesBatch(BatchingSettings batchingSettings, String streamName) { this.batchingSettings = batchingSettings; + this.streamName = streamName; reset(); } // Get all the messages out in a batch. private InflightBatch popBatch() { - InflightBatch batch = new InflightBatch(messages, batchedBytes, this.attachSchema); + InflightBatch batch = + new InflightBatch(messages, batchedBytes, this.streamName, this.attachSchema); this.attachSchema = false; reset(); return batch; diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java new file mode 100644 index 0000000000..9b7cb1fd18 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java @@ -0,0 +1,153 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1alpha2; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.protobuf.Descriptors.Descriptor; +import java.io.IOException; +import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A cache of StreamWriters that can be looked up by Table Name. The entries will expire after 5 + * minutes if not used. Code sample: WriterCache cache = WriterCache.getInstance(); StreamWriter + * writer = cache.getWriter(); // Use... cache.returnWriter(writer); + */ +public class WriterCache { + private static final Logger LOG = Logger.getLogger(WriterCache.class.getName()); + + private static String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)"; + private static Pattern tablePattern = Pattern.compile(tablePatternString); + + private static WriterCache instance; + private Cache> writerCache; + + // Maximum number of tables to hold in the cache, once the maxium exceeded, the cache will be + // evicted based on least recent used. + private static final int MAX_TABLE_ENTRY = 100; + private static final int MAX_WRITERS_PER_TABLE = 2; + + private final BigQueryWriteClient stub; + private final SchemaCompact compact; + + private WriterCache(BigQueryWriteClient stub, int maxTableEntry, SchemaCompact compact) { + this.stub = stub; + this.compact = compact; + writerCache = + CacheBuilder.newBuilder() + .maximumSize(maxTableEntry) + .>build(); + } + + public static WriterCache getInstance() throws IOException { + if (instance == null) { + BigQueryWriteSettings stubSettings = BigQueryWriteSettings.newBuilder().build(); + BigQueryWriteClient stub = BigQueryWriteClient.create(stubSettings); + instance = new WriterCache(stub, MAX_TABLE_ENTRY, SchemaCompact.getInstance()); + } + return instance; + } + + /** Returns a cache with custom stub used by test. */ + @VisibleForTesting + public static WriterCache getTestInstance( + BigQueryWriteClient stub, int maxTableEntry, SchemaCompact compact) { + return new WriterCache(stub, maxTableEntry, compact); + } + + /** Returns an entry with {@code StreamWriter} and expiration time in millis. */ + private String CreateNewStream(String tableName) { + Stream.WriteStream stream = + Stream.WriteStream.newBuilder().setType(Stream.WriteStream.Type.COMMITTED).build(); + stream = + stub.createWriteStream( + Storage.CreateWriteStreamRequest.newBuilder() + .setParent(tableName) + .setWriteStream(stream) + .build()); + LOG.info("Created write stream:" + stream.getName()); + return stream.getName(); + } + + StreamWriter CreateNewWriter(String streamName) + throws IllegalArgumentException, IOException, InterruptedException { + return StreamWriter.newBuilder(streamName) + .setChannelProvider(stub.getSettings().getTransportChannelProvider()) + .setCredentialsProvider(stub.getSettings().getCredentialsProvider()) + .setExecutorProvider(stub.getSettings().getExecutorProvider()) + .build(); + } + /** + * Gets a writer for a given table with a given user schema from global cache. + * + * @param tableName + * @param userSchema + * @return + * @throws Exception + */ + public StreamWriter getTableWriter(String tableName, Descriptor userSchema) + throws IllegalArgumentException, IOException, InterruptedException { + Matcher matcher = tablePattern.matcher(tableName); + if (!matcher.matches()) { + throw new IllegalArgumentException("Invalid table name: " + tableName); + } + + String streamName = null; + Boolean streamExpired = false; + StreamWriter writer = null; + Cache tableEntry = null; + + synchronized (this) { + tableEntry = writerCache.getIfPresent(tableName); + if (tableEntry != null) { + writer = tableEntry.getIfPresent(userSchema); + if (writer != null) { + if (!writer.expired()) { + return writer; + } else { + writer.close(); + } + } + compact.check(tableName, userSchema); + streamName = CreateNewStream(tableName); + writer = CreateNewWriter(streamName); + tableEntry.put(userSchema, writer); + } else { + compact.check(tableName, userSchema); + streamName = CreateNewStream(tableName); + tableEntry = + CacheBuilder.newBuilder() + .maximumSize(MAX_WRITERS_PER_TABLE) + .build(); + writer = CreateNewWriter(streamName); + tableEntry.put(userSchema, writer); + writerCache.put(tableName, tableEntry); + } + } + + return writer; + } + + @VisibleForTesting + public long cachedTableCount() { + synchronized (writerCache) { + return writerCache.size(); + } + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java new file mode 100644 index 0000000000..f6a0948802 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java @@ -0,0 +1,232 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1alpha2; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.grpc.testing.MockGrpcService; +import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.cloud.bigquery.storage.test.Test.*; +import com.google.protobuf.AbstractMessage; +import com.google.protobuf.Timestamp; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.*; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.threeten.bp.Instant; + +@RunWith(JUnit4.class) +public class DirectWriterTest { + private static final String TEST_TABLE = "projects/p/datasets/d/tables/t"; + private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s"; + private static final String TEST_STREAM_2 = "projects/p/datasets/d/tables/t/streams/s2"; + + private static MockBigQueryWrite mockBigQueryWrite; + private static MockServiceHelper serviceHelper; + private BigQueryWriteClient client; + private LocalChannelProvider channelProvider; + + @Mock private static SchemaCompact schemaCheck; + + @BeforeClass + public static void startStaticServer() { + mockBigQueryWrite = new MockBigQueryWrite(); + serviceHelper = + new MockServiceHelper( + UUID.randomUUID().toString(), Arrays.asList(mockBigQueryWrite)); + serviceHelper.start(); + } + + @AfterClass + public static void stopServer() { + serviceHelper.stop(); + } + + @Before + public void setUp() throws IOException { + serviceHelper.reset(); + channelProvider = serviceHelper.createChannelProvider(); + BigQueryWriteSettings settings = + BigQueryWriteSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(NoCredentialsProvider.create()) + .build(); + client = BigQueryWriteClient.create(settings); + MockitoAnnotations.initMocks(this); + } + + @After + public void tearDown() throws Exception { + client.close(); + } + + /** Response mocks for create a new writer */ + void WriterCreationResponseMock(String testStreamName, List responseOffsets) { + // Response from CreateWriteStream + Stream.WriteStream expectedResponse = + Stream.WriteStream.newBuilder().setName(testStreamName).build(); + mockBigQueryWrite.addResponse(expectedResponse); + + // Response from GetWriteStream + Instant time = Instant.now(); + Timestamp timestamp = + Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build(); + Stream.WriteStream expectedResponse2 = + Stream.WriteStream.newBuilder() + .setName(testStreamName) + .setType(Stream.WriteStream.Type.COMMITTED) + .setCreateTime(timestamp) + .build(); + mockBigQueryWrite.addResponse(expectedResponse2); + + for (Long offset : responseOffsets) { + Storage.AppendRowsResponse response = + Storage.AppendRowsResponse.newBuilder().setOffset(offset).build(); + mockBigQueryWrite.addResponse(response); + } + } + + @Test + public void testWriteSuccess() throws Exception { + DirectWriter.testSetStub(client, 10, schemaCheck); + FooType m1 = FooType.newBuilder().setFoo("m1").build(); + FooType m2 = FooType.newBuilder().setFoo("m2").build(); + + WriterCreationResponseMock(TEST_STREAM, Arrays.asList(Long.valueOf(0L))); + ApiFuture ret = DirectWriter.append(TEST_TABLE, Arrays.asList(m1, m2)); + verify(schemaCheck).check(TEST_TABLE, FooType.getDescriptor()); + assertEquals(Long.valueOf(0L), ret.get()); + List actualRequests = mockBigQueryWrite.getRequests(); + Assert.assertEquals(3, actualRequests.size()); + assertEquals( + TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent()); + assertEquals( + Stream.WriteStream.Type.COMMITTED, + ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType()); + assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName()); + + Storage.AppendRowsRequest.ProtoData.Builder dataBuilder = + Storage.AppendRowsRequest.ProtoData.newBuilder(); + dataBuilder.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor())); + dataBuilder.setRows( + ProtoBufProto.ProtoRows.newBuilder() + .addSerializedRows(m1.toByteString()) + .addSerializedRows(m2.toByteString()) + .build()); + Storage.AppendRowsRequest expectRequest = + Storage.AppendRowsRequest.newBuilder() + .setWriteStream(TEST_STREAM) + .setProtoRows(dataBuilder.build()) + .build(); + assertEquals(expectRequest.toString(), actualRequests.get(2).toString()); + + Storage.AppendRowsResponse response = + Storage.AppendRowsResponse.newBuilder().setOffset(2).build(); + mockBigQueryWrite.addResponse(response); + // Append again, write stream name and schema are cleared. + ret = DirectWriter.append(TEST_TABLE, Arrays.asList(m1)); + assertEquals(Long.valueOf(2L), ret.get()); + dataBuilder = Storage.AppendRowsRequest.ProtoData.newBuilder(); + dataBuilder.setRows( + ProtoBufProto.ProtoRows.newBuilder().addSerializedRows(m1.toByteString()).build()); + expectRequest = + Storage.AppendRowsRequest.newBuilder().setProtoRows(dataBuilder.build()).build(); + assertEquals(expectRequest.toString(), actualRequests.get(3).toString()); + + // Write with a different schema. + WriterCreationResponseMock(TEST_STREAM_2, Arrays.asList(Long.valueOf(0L))); + AllSupportedTypes m3 = AllSupportedTypes.newBuilder().setStringValue("s").build(); + ret = DirectWriter.append(TEST_TABLE, Arrays.asList(m3)); + verify(schemaCheck).check(TEST_TABLE, AllSupportedTypes.getDescriptor()); + assertEquals(Long.valueOf(0L), ret.get()); + dataBuilder = Storage.AppendRowsRequest.ProtoData.newBuilder(); + dataBuilder.setWriterSchema(ProtoSchemaConverter.convert(AllSupportedTypes.getDescriptor())); + dataBuilder.setRows( + ProtoBufProto.ProtoRows.newBuilder().addSerializedRows(m3.toByteString()).build()); + expectRequest = + Storage.AppendRowsRequest.newBuilder() + .setWriteStream(TEST_STREAM_2) + .setProtoRows(dataBuilder.build()) + .build(); + Assert.assertEquals(7, actualRequests.size()); + assertEquals( + TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(4)).getParent()); + assertEquals( + Stream.WriteStream.Type.COMMITTED, + ((Storage.CreateWriteStreamRequest) actualRequests.get(4)).getWriteStream().getType()); + assertEquals(TEST_STREAM_2, ((Storage.GetWriteStreamRequest) actualRequests.get(5)).getName()); + assertEquals(expectRequest.toString(), actualRequests.get(6).toString()); + } + + @Test + public void testWriteBadTableName() throws Exception { + DirectWriter.testSetStub(client, 10, schemaCheck); + FooType m1 = FooType.newBuilder().setFoo("m1").build(); + FooType m2 = FooType.newBuilder().setFoo("m2").build(); + + try { + ApiFuture ret = DirectWriter.append("abc", Arrays.asList(m1, m2)); + fail("should fail"); + } catch (IllegalArgumentException expected) { + assertEquals("Invalid table name: abc", expected.getMessage()); + } + } + + @Test + public void testConcurrentAccess() throws Exception { + WriterCache cache = WriterCache.getTestInstance(client, 2, schemaCheck); + final FooType m1 = FooType.newBuilder().setFoo("m1").build(); + final FooType m2 = FooType.newBuilder().setFoo("m2").build(); + final List expectedOffset = + Arrays.asList( + Long.valueOf(0L), + Long.valueOf(2L), + Long.valueOf(4L), + Long.valueOf(8L), + Long.valueOf(10L)); + // Make sure getting the same table writer in multiple thread only cause create to be called + // once. + WriterCreationResponseMock(TEST_STREAM, expectedOffset); + ExecutorService executor = Executors.newFixedThreadPool(5); + for (int i = 0; i < 5; i++) { + executor.execute( + new Runnable() { + @Override + public void run() { + try { + ApiFuture result = + DirectWriter.append(TEST_TABLE, Arrays.asList(m1, m2)); + assertTrue(expectedOffset.remove(result.get())); + } catch (IOException | InterruptedException | ExecutionException e) { + fail(e.getMessage()); + } + } + }); + } + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java index 5298e80ae4..f1350ce7e7 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java @@ -44,10 +44,16 @@ public List getAppendRequests() { return serviceImpl.getCapturedRequests(); } + public List getWriteStreamRequests() { + return serviceImpl.getCapturedWriteRequests(); + } + @Override public void addResponse(AbstractMessage response) { if (response instanceof AppendRowsResponse) { serviceImpl.addResponse((AppendRowsResponse) response); + } else if (response instanceof Stream.WriteStream) { + serviceImpl.addWriteStreamResponse((Stream.WriteStream) response); } else { throw new IllegalStateException("Unsupported service"); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java index aa3f7e734d..0a3aa2e622 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java @@ -36,7 +36,11 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase { private static final Logger LOG = Logger.getLogger(FakeBigQueryWriteImpl.class.getName()); private final LinkedBlockingQueue requests = new LinkedBlockingQueue<>(); + private final LinkedBlockingQueue writeRequests = + new LinkedBlockingQueue<>(); private final LinkedBlockingQueue responses = new LinkedBlockingQueue<>(); + private final LinkedBlockingQueue writeResponses = + new LinkedBlockingQueue<>(); private final AtomicInteger nextMessageId = new AtomicInteger(1); private boolean autoPublishResponse; private ScheduledExecutorService executor = null; @@ -78,6 +82,21 @@ public String toString() { } } + @Override + public void getWriteStream( + GetWriteStreamRequest request, StreamObserver responseObserver) { + Object response = writeResponses.remove(); + if (response instanceof Stream.WriteStream) { + writeRequests.add(request); + responseObserver.onNext((Stream.WriteStream) response); + responseObserver.onCompleted(); + } else if (response instanceof Exception) { + responseObserver.onError((Exception) response); + } else { + responseObserver.onError(new IllegalArgumentException("Unrecognized response type")); + } + } + @Override public StreamObserver appendRows( final StreamObserver responseObserver) { @@ -149,6 +168,11 @@ public FakeBigQueryWriteImpl addResponse(AppendRowsResponse.Builder appendRespon return addResponse(appendResponseBuilder.build()); } + public FakeBigQueryWriteImpl addWriteStreamResponse(Stream.WriteStream response) { + writeResponses.add(response); + return this; + } + public FakeBigQueryWriteImpl addConnectionError(Throwable error) { responses.add(new Response(error)); return this; @@ -158,6 +182,10 @@ public List getCapturedRequests() { return new ArrayList(requests); } + public List getCapturedWriteRequests() { + return new ArrayList(writeRequests); + } + public void reset() { requests.clear(); responses.clear(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/MockBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/MockBigQueryWriteImpl.java index 32e15e28d0..a82a3dbdb3 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/MockBigQueryWriteImpl.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/MockBigQueryWriteImpl.java @@ -83,11 +83,12 @@ public void createWriteStream( @Override public StreamObserver appendRows( final StreamObserver responseObserver) { - final Object response = responses.remove(); StreamObserver requestObserver = new StreamObserver() { @Override public void onNext(AppendRowsRequest value) { + requests.add(value); + final Object response = responses.remove(); if (response instanceof AppendRowsResponse) { responseObserver.onNext((AppendRowsResponse) response); } else if (response instanceof Exception) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java new file mode 100644 index 0000000000..205d814968 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java @@ -0,0 +1,127 @@ +/* + * Copyright 2016 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1alpha2; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import com.google.cloud.bigquery.*; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.storage.test.Test.FooType; +import java.io.IOException; +import javax.annotation.Nullable; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +@RunWith(JUnit4.class) +public class SchemaCompactTest { + @Mock private BigQuery mockBigquery; + @Mock private Table mockBigqueryTable; + + @Before + public void setUp() throws IOException { + MockitoAnnotations.initMocks(this); + when(mockBigquery.getTable(any(TableId.class))).thenReturn(mockBigqueryTable); + } + + @After + public void tearDown() { + verifyNoMoreInteractions(mockBigquery); + verifyNoMoreInteractions(mockBigqueryTable); + } + + @Test + public void testSuccess() throws Exception { + TableDefinition definition = + new TableDefinition() { + @Override + public Type getType() { + return null; + } + + @Nullable + @Override + public Schema getSchema() { + return Schema.of(Field.of("Foo", LegacySQLTypeName.STRING)); + } + + @Override + public Builder toBuilder() { + return null; + } + }; + when(mockBigqueryTable.getDefinition()).thenReturn(definition); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + compact.check("projects/p/datasets/d/tables/t", FooType.getDescriptor()); + verify(mockBigquery, times(1)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(1)).getDefinition(); + } + + @Test + public void testFailed() throws Exception { + TableDefinition definition = + new TableDefinition() { + @Override + public Type getType() { + return null; + } + + @Nullable + @Override + public Schema getSchema() { + return Schema.of( + Field.of("Foo", LegacySQLTypeName.STRING), + Field.of("Bar", LegacySQLTypeName.STRING)); + } + + @Override + public Builder toBuilder() { + return null; + } + }; + when(mockBigqueryTable.getDefinition()).thenReturn(definition); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + try { + compact.check("projects/p/datasets/d/tables/t", FooType.getDescriptor()); + fail("should fail"); + } catch (IllegalArgumentException expected) { + assertEquals( + "User schema doesn't have expected field number with BigQuery table schema, expected: 2 actual: 1", + expected.getMessage()); + } + verify(mockBigquery, times(1)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(1)).getDefinition(); + } + + @Test + public void testBadTableName() throws Exception { + try { + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + compact.check("blah", FooType.getDescriptor()); + fail("should fail"); + } catch (IllegalArgumentException expected) { + assertEquals("Invalid table name: blah", expected.getMessage()); + } + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java index 11bbd66010..38394a7479 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java @@ -38,6 +38,7 @@ import com.google.cloud.bigquery.storage.v1alpha2.Storage.*; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Int64Value; +import com.google.protobuf.Timestamp; import io.grpc.Status; import io.grpc.StatusRuntimeException; import java.util.Arrays; @@ -53,6 +54,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.threeten.bp.Duration; +import org.threeten.bp.Instant; @RunWith(JUnit4.class) public class StreamWriterTest { @@ -75,6 +77,14 @@ public void setUp() throws Exception { channelProvider = serviceHelper.createChannelProvider(); fakeExecutor = new FakeScheduledExecutorService(); testBigQueryWrite.setExecutor(fakeExecutor); + Instant time = Instant.now(); + Timestamp timestamp = + Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build(); + // Add enough GetWriteStream response. + for (int i = 0; i < 4; i++) { + testBigQueryWrite.addResponse( + Stream.WriteStream.newBuilder().setName(TEST_STREAM).setCreateTime(timestamp).build()); + } } @After @@ -123,6 +133,12 @@ private ApiFuture sendTestMessage(StreamWriter writer, Strin return writer.append(createAppendRequest(messages, -1)); } + @Test + public void testTableName() throws Exception { + StreamWriter writer = getTestStreamWriterBuilder().build(); + assertEquals("projects/p/datasets/d/tables/t", writer.getTableNameString()); + } + @Test public void testAppendByDuration() throws Exception { StreamWriter writer = diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java new file mode 100644 index 0000000000..47ad647e66 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java @@ -0,0 +1,285 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1alpha2; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.*; + +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.grpc.testing.MockGrpcService; +import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.cloud.bigquery.storage.test.Test.*; +import com.google.protobuf.AbstractMessage; +import com.google.protobuf.Timestamp; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Logger; +import org.junit.*; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.threeten.bp.Instant; +import org.threeten.bp.temporal.ChronoUnit; + +@RunWith(JUnit4.class) +public class WriterCacheTest { + private static final Logger LOG = Logger.getLogger(StreamWriterTest.class.getName()); + + private static final String TEST_TABLE = "projects/p/datasets/d/tables/t"; + private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s"; + private static final String TEST_STREAM_2 = "projects/p/datasets/d/tables/t/streams/s2"; + private static final String TEST_STREAM_3 = "projects/p/datasets/d/tables/t/streams/s3"; + private static final String TEST_STREAM_4 = "projects/p/datasets/d/tables/t/streams/s4"; + private static final String TEST_TABLE_2 = "projects/p/datasets/d/tables/t2"; + private static final String TEST_STREAM_21 = "projects/p/datasets/d/tables/t2/streams/s1"; + private static final String TEST_TABLE_3 = "projects/p/datasets/d/tables/t3"; + private static final String TEST_STREAM_31 = "projects/p/datasets/d/tables/t3/streams/s1"; + + private static MockBigQueryWrite mockBigQueryWrite; + private static MockServiceHelper serviceHelper; + @Mock private static SchemaCompact mockSchemaCheck; + private BigQueryWriteClient client; + private LocalChannelProvider channelProvider; + + @BeforeClass + public static void startStaticServer() { + mockBigQueryWrite = new MockBigQueryWrite(); + serviceHelper = + new MockServiceHelper( + UUID.randomUUID().toString(), Arrays.asList(mockBigQueryWrite)); + serviceHelper.start(); + } + + @AfterClass + public static void stopServer() { + serviceHelper.stop(); + } + + @Before + public void setUp() throws IOException { + serviceHelper.reset(); + channelProvider = serviceHelper.createChannelProvider(); + BigQueryWriteSettings settings = + BigQueryWriteSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(NoCredentialsProvider.create()) + .build(); + client = BigQueryWriteClient.create(settings); + MockitoAnnotations.initMocks(this); + } + + /** Response mocks for create a new writer */ + void WriterCreationResponseMock(String testStreamName) { + // Response from CreateWriteStream + Stream.WriteStream expectedResponse = + Stream.WriteStream.newBuilder().setName(testStreamName).build(); + mockBigQueryWrite.addResponse(expectedResponse); + + // Response from GetWriteStream + Instant time = Instant.now(); + Timestamp timestamp = + Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build(); + Stream.WriteStream expectedResponse2 = + Stream.WriteStream.newBuilder() + .setName(testStreamName) + .setType(Stream.WriteStream.Type.COMMITTED) + .setCreateTime(timestamp) + .build(); + mockBigQueryWrite.addResponse(expectedResponse2); + } + + @After + public void tearDown() throws Exception { + client.close(); + } + + @Test + public void testRejectBadTableName() throws Exception { + WriterCache cache = WriterCache.getTestInstance(client, 10, mockSchemaCheck); + try { + cache.getTableWriter("abc", FooType.getDescriptor()); + fail(); + } catch (IllegalArgumentException expected) { + assertEquals(expected.getMessage(), "Invalid table name: abc"); + } + } + + @Test + public void testCreateNewWriter() throws Exception { + WriterCache cache = WriterCache.getTestInstance(client, 10, mockSchemaCheck); + WriterCreationResponseMock(TEST_STREAM); + StreamWriter writer = cache.getTableWriter(TEST_TABLE, FooType.getDescriptor()); + verify(mockSchemaCheck, times(1)).check(TEST_TABLE, FooType.getDescriptor()); + List actualRequests = mockBigQueryWrite.getRequests(); + assertEquals(2, actualRequests.size()); + assertEquals( + TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent()); + assertEquals( + Stream.WriteStream.Type.COMMITTED, + ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType()); + assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName()); + + assertEquals(TEST_TABLE, writer.getTableNameString()); + assertEquals(TEST_STREAM, writer.getStreamNameString()); + assertEquals(1, cache.cachedTableCount()); + } + + @Test + public void testWriterExpired() throws Exception { + WriterCache cache = WriterCache.getTestInstance(client, 10, mockSchemaCheck); + // Response from CreateWriteStream + Stream.WriteStream expectedResponse = + Stream.WriteStream.newBuilder().setName(TEST_STREAM).build(); + mockBigQueryWrite.addResponse(expectedResponse); + + // Response from GetWriteStream + Instant time = Instant.now().minus(2, ChronoUnit.DAYS); + Timestamp timestamp = + Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build(); + Stream.WriteStream expectedResponse2 = + Stream.WriteStream.newBuilder() + .setName(TEST_STREAM) + .setType(Stream.WriteStream.Type.COMMITTED) + .setCreateTime(timestamp) + .build(); + mockBigQueryWrite.addResponse(expectedResponse2); + + try { + StreamWriter writer = cache.getTableWriter(TEST_TABLE, FooType.getDescriptor()); + fail("Should fail"); + } catch (IllegalStateException e) { + assertEquals( + "Cannot write to a stream that is already expired: projects/p/datasets/d/tables/t/streams/s", + e.getMessage()); + } + } + + @Test + public void testWriterWithNewSchema() throws Exception { + WriterCache cache = WriterCache.getTestInstance(client, 10, mockSchemaCheck); + WriterCreationResponseMock(TEST_STREAM); + WriterCreationResponseMock(TEST_STREAM_2); + StreamWriter writer1 = cache.getTableWriter(TEST_TABLE, FooType.getDescriptor()); + verify(mockSchemaCheck, times(1)).check(TEST_TABLE, FooType.getDescriptor()); + + StreamWriter writer2 = cache.getTableWriter(TEST_TABLE, AllSupportedTypes.getDescriptor()); + verify(mockSchemaCheck, times(1)).check(TEST_TABLE, AllSupportedTypes.getDescriptor()); + + List actualRequests = mockBigQueryWrite.getRequests(); + assertEquals(4, actualRequests.size()); + assertEquals( + TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent()); + assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName()); + assertEquals( + TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(2)).getParent()); + assertEquals(TEST_STREAM_2, ((Storage.GetWriteStreamRequest) actualRequests.get(3)).getName()); + assertEquals(TEST_STREAM, writer1.getStreamNameString()); + assertEquals(TEST_STREAM_2, writer2.getStreamNameString()); + assertEquals(1, cache.cachedTableCount()); + + // Still able to get the FooType writer. + StreamWriter writer3 = cache.getTableWriter(TEST_TABLE, FooType.getDescriptor()); + verify(mockSchemaCheck, times(1)).check(TEST_TABLE, FooType.getDescriptor()); + assertEquals(TEST_STREAM, writer3.getStreamNameString()); + + // Create a writer with a even new schema. + WriterCreationResponseMock(TEST_STREAM_3); + WriterCreationResponseMock(TEST_STREAM_4); + StreamWriter writer4 = cache.getTableWriter(TEST_TABLE, NestedType.getDescriptor()); + verify(mockSchemaCheck, times(1)).check(TEST_TABLE, NestedType.getDescriptor()); + + LOG.info("blah"); + // This would cause a new stream to be created since the old entry is evicted. + StreamWriter writer5 = cache.getTableWriter(TEST_TABLE, AllSupportedTypes.getDescriptor()); + verify(mockSchemaCheck, times(2)).check(TEST_TABLE, AllSupportedTypes.getDescriptor()); + assertEquals(TEST_STREAM_3, writer4.getStreamNameString()); + assertEquals(TEST_STREAM_4, writer5.getStreamNameString()); + assertEquals(1, cache.cachedTableCount()); + } + + @Test + public void testWriterWithDifferentTable() throws Exception { + WriterCache cache = WriterCache.getTestInstance(client, 2, mockSchemaCheck); + WriterCreationResponseMock(TEST_STREAM); + WriterCreationResponseMock(TEST_STREAM_21); + StreamWriter writer1 = cache.getTableWriter(TEST_TABLE, FooType.getDescriptor()); + StreamWriter writer2 = cache.getTableWriter(TEST_TABLE_2, FooType.getDescriptor()); + verify(mockSchemaCheck, times(1)).check(TEST_TABLE, FooType.getDescriptor()); + verify(mockSchemaCheck, times(1)).check(TEST_TABLE_2, FooType.getDescriptor()); + + List actualRequests = mockBigQueryWrite.getRequests(); + assertEquals(4, actualRequests.size()); + assertEquals( + TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent()); + assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName()); + assertEquals( + TEST_TABLE_2, ((Storage.CreateWriteStreamRequest) actualRequests.get(2)).getParent()); + Assert.assertEquals( + TEST_STREAM_21, ((Storage.GetWriteStreamRequest) actualRequests.get(3)).getName()); + assertEquals(TEST_STREAM, writer1.getStreamNameString()); + assertEquals(TEST_STREAM_21, writer2.getStreamNameString()); + assertEquals(2, cache.cachedTableCount()); + + // Still able to get the FooType writer. + StreamWriter writer3 = cache.getTableWriter(TEST_TABLE_2, FooType.getDescriptor()); + verify(mockSchemaCheck, times(1)).check(TEST_TABLE_2, FooType.getDescriptor()); + Assert.assertEquals(TEST_STREAM_21, writer3.getStreamNameString()); + + // Create a writer with a even new schema. + WriterCreationResponseMock(TEST_STREAM_31); + WriterCreationResponseMock(TEST_STREAM); + StreamWriter writer4 = cache.getTableWriter(TEST_TABLE_3, NestedType.getDescriptor()); + verify(mockSchemaCheck, times(1)).check(TEST_TABLE_3, NestedType.getDescriptor()); + + // This would cause a new stream to be created since the old entry is evicted. + StreamWriter writer5 = cache.getTableWriter(TEST_TABLE, FooType.getDescriptor()); + verify(mockSchemaCheck, times(2)).check(TEST_TABLE, FooType.getDescriptor()); + + assertEquals(TEST_STREAM_31, writer4.getStreamNameString()); + assertEquals(TEST_STREAM, writer5.getStreamNameString()); + assertEquals(2, cache.cachedTableCount()); + } + + @Test + public void testConcurrentAccess() throws Exception { + final WriterCache cache = WriterCache.getTestInstance(client, 2, mockSchemaCheck); + // Make sure getting the same table writer in multiple thread only cause create to be called + // once. + WriterCreationResponseMock(TEST_STREAM); + ExecutorService executor = Executors.newFixedThreadPool(10); + for (int i = 0; i < 10; i++) { + executor.execute( + new Runnable() { + @Override + public void run() { + try { + assertTrue(cache.getTableWriter(TEST_TABLE, FooType.getDescriptor()) != null); + } catch (IOException | InterruptedException e) { + fail(e.getMessage()); + } + } + }); + } + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java index ba07e2b5b9..04c831ccc6 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java @@ -17,23 +17,24 @@ package com.google.cloud.bigquery.storage.v1alpha2.it; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import com.google.api.core.ApiFuture; import com.google.cloud.ServiceOptions; import com.google.cloud.bigquery.*; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.storage.test.Test.*; -import com.google.cloud.bigquery.storage.v1alpha2.BigQueryWriteClient; -import com.google.cloud.bigquery.storage.v1alpha2.ProtoBufProto; -import com.google.cloud.bigquery.storage.v1alpha2.ProtoSchemaConverter; +import com.google.cloud.bigquery.storage.v1alpha2.*; import com.google.cloud.bigquery.storage.v1alpha2.Storage.*; import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream; -import com.google.cloud.bigquery.storage.v1alpha2.StreamWriter; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.protobuf.Int64Value; +import com.google.protobuf.Message; import java.io.IOException; -import java.util.Iterator; -import java.util.concurrent.ExecutionException; +import java.util.*; +import java.util.concurrent.*; import java.util.logging.Logger; import org.junit.AfterClass; import org.junit.Assert; @@ -225,18 +226,30 @@ public void testComplicateSchemaWithPendingStream() .setOffset(Int64Value.of(1L)) .build()); assertEquals(1, response2.get().getOffset()); - } - // Nothing showed up since rows are not committed. - TableResult result = - bigquery.listTableData( - tableInfo2.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); - Iterator iter = result.getValues().iterator(); - assertEquals(false, iter.hasNext()); + // Nothing showed up since rows are not committed. + TableResult result = + bigquery.listTableData( + tableInfo2.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + Iterator iter = result.getValues().iterator(); + assertEquals(false, iter.hasNext()); - FinalizeWriteStreamResponse finalizeResponse = - client.finalizeWriteStream( - FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build()); + FinalizeWriteStreamResponse finalizeResponse = + client.finalizeWriteStream( + FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build()); + + ApiFuture response3 = + streamWriter.append( + createAppendRequestComplicateType(writeStream.getName(), new String[] {"ccc"}) + .setOffset(Int64Value.of(1L)) + .build()); + try { + assertEquals(2, response3.get().getOffset()); + fail("Append to finalized stream should fail."); + } catch (Exception expected) { + // The exception thrown is not stable. Opened a bug to fix it. + } + } // Finalize row count is not populated. // assertEquals(1, finalizeResponse.getRowCount()); BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse = @@ -246,23 +259,18 @@ public void testComplicateSchemaWithPendingStream() .addWriteStreams(writeStream.getName()) .build()); assertEquals(true, batchCommitWriteStreamsResponse.hasCommitTime()); - - LOG.info("Waiting for termination"); - // The awaitTermination always returns false. - // assertEquals(true, streamWriter.awaitTermination(10, TimeUnit.SECONDS)); - - // Data showed up. - result = - bigquery.listTableData( - tableInfo2.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); - iter = result.getValues().iterator(); + TableResult queryResult = + bigquery.query( + QueryJobConfiguration.newBuilder("SELECT * from " + DATASET + '.' + TABLE2).build()); + Iterator queryIter = queryResult.getValues().iterator(); + assertTrue(queryIter.hasNext()); assertEquals( "[FieldValue{attribute=REPEATED, value=[FieldValue{attribute=PRIMITIVE, value=aaa}, FieldValue{attribute=PRIMITIVE, value=aaa}]}]", - iter.next().get(1).getRepeatedValue().toString()); + queryIter.next().get(1).getRepeatedValue().toString()); assertEquals( "[FieldValue{attribute=REPEATED, value=[FieldValue{attribute=PRIMITIVE, value=bbb}, FieldValue{attribute=PRIMITIVE, value=bbb}]}]", - iter.next().get(1).getRepeatedValue().toString()); - assertEquals(false, iter.hasNext()); + queryIter.next().get(1).getRepeatedValue().toString()); + assertFalse(queryIter.hasNext()); } @Test @@ -275,7 +283,6 @@ public void testStreamError() throws IOException, InterruptedException, Executio WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) .build()); try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) { - AppendRowsRequest request = createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build(); request @@ -290,18 +297,15 @@ public void testStreamError() throws IOException, InterruptedException, Executio ApiFuture response2 = streamWriter.append( createAppendRequest(writeStream.getName(), new String[] {"aaa"}) - .setWriteStream("blah") + .setOffset(Int64Value.of(100L)) .build()); try { response2.get().getOffset(); Assert.fail("Should fail"); } catch (ExecutionException e) { assertEquals( - true, - e.getCause() - .getMessage() - .startsWith( - "INVALID_ARGUMENT: Stream name `blah` in the request doesn't match the one already specified")); + "OUT_OF_RANGE: The offset is beyond stream, expected offset 1, received 100", + e.getCause().getMessage()); } // We can keep sending requests on the same stream. ApiFuture response3 = @@ -341,4 +345,51 @@ public void testStreamReconnect() throws IOException, InterruptedException, Exec assertEquals(1L, response.get().getOffset()); } } + + class CallAppend implements Runnable { + List> resultList; + List messages; + + CallAppend(List> resultList, List messages) { + this.resultList = resultList; + this.messages = messages; + } + + @Override + public void run() { + try { + LOG.info("size: " + resultList.size()); + resultList.add(DirectWriter.append(tableId, messages)); + } catch (Exception e) { + fail("Unexpected Exception: " + e.toString()); + } + } + } + + @Test + public void testDirectWrite() throws IOException, InterruptedException, ExecutionException { + final FooType fa = FooType.newBuilder().setFoo("aaa").build(); + final FooType fb = FooType.newBuilder().setFoo("bbb").build(); + Set expectedOffset = new HashSet<>(); + for (int i = 0; i < 10; i++) { + expectedOffset.add(Long.valueOf(i * 2)); + } + ExecutorService executor = Executors.newFixedThreadPool(10); + List> responses = new ArrayList<>(); + Callable callable = + new Callable() { + @Override + public Long call() throws IOException, InterruptedException, ExecutionException { + ApiFuture result = DirectWriter.append(tableId, Arrays.asList(fa, fb)); + return result.get(); + } + }; + for (int i = 0; i < 10; i++) { + responses.add(executor.submit(callable)); + } + for (Future response : responses) { + assertTrue(expectedOffset.remove(response.get())); + } + assertTrue(expectedOffset.isEmpty()); + } }