diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java
new file mode 100644
index 0000000000..295638f74f
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigquery.storage.v1alpha2;
+
+import com.google.api.core.*;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.InvalidArgumentException;
+import com.google.cloud.bigquery.storage.v1alpha2.ProtoBufProto.ProtoRows;
+import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsRequest;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import io.grpc.Status;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Logger;
+
+/**
+ * Writer that can help user to write data to BigQuery. This is a simplified version of the Write
+ * API. For users writing with COMMITTED stream and don't care about row deduplication, it is
+ * recommended to use this Writer.
+ *
+ *
{@code
+ * DataProto data;
+ * ApiFuture response = DirectWriter.append("projects/pid/datasets/did/tables/tid", Arrays.asList(data1));
+ * }
+ *
+ * {@link DirectWriter} will use the credentials set on the channel, which uses application
+ * default credentials through {@link GoogleCredentials#getApplicationDefault} by default.
+ */
+public class DirectWriter {
+ private static final Logger LOG = Logger.getLogger(DirectWriter.class.getName());
+ private static WriterCache cache = null;
+ private static Lock cacheLock = new ReentrantLock();
+
+ /**
+ * Append rows to the given table.
+ *
+ * @param tableName table name in the form of "projects/{pName}/datasets/{dName}/tables/{tName}"
+ * @param protoRows rows in proto buffer format.
+ * @return A future that contains the offset at which the append happened. Only when the future
+ * returns with valid offset, then the append actually happened.
+ * @throws IOException, InterruptedException, InvalidArgumentException
+ */
+ public static ApiFuture append(String tableName, List protoRows)
+ throws IOException, InterruptedException, InvalidArgumentException {
+ if (protoRows.isEmpty()) {
+ throw new InvalidArgumentException(
+ new Exception("Empty rows are not allowed"),
+ GrpcStatusCode.of(Status.Code.INVALID_ARGUMENT),
+ false);
+ }
+ try {
+ cacheLock.lock();
+ if (cache == null) {
+ cache = WriterCache.getInstance();
+ }
+ } finally {
+ cacheLock.unlock();
+ }
+
+ StreamWriter writer = cache.getTableWriter(tableName, protoRows.get(0).getDescriptorForType());
+ ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
+ Descriptors.Descriptor descriptor = null;
+ for (Message protoRow : protoRows) {
+ rowsBuilder.addSerializedRows(protoRow.toByteString());
+ }
+
+ AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder();
+ data.setWriterSchema(ProtoSchemaConverter.convert(protoRows.get(0).getDescriptorForType()));
+ data.setRows(rowsBuilder.build());
+
+ return ApiFutures.transform(
+ writer.append(AppendRowsRequest.newBuilder().setProtoRows(data.build()).build()),
+ new ApiFunction() {
+ @Override
+ public Long apply(Storage.AppendRowsResponse appendRowsResponse) {
+ return Long.valueOf(appendRowsResponse.getOffset());
+ }
+ },
+ MoreExecutors.directExecutor());
+ }
+
+ @VisibleForTesting
+ public static void testSetStub(
+ BigQueryWriteClient stub, int maxTableEntry, SchemaCompact schemaCheck) {
+ cache = WriterCache.getTestInstance(stub, maxTableEntry, schemaCheck);
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java
new file mode 100644
index 0000000000..3f17f44951
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigquery.storage.v1alpha2;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.Table;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.Descriptors;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A class that checks the schema compatibility between user schema in proto descriptor and Bigquery
+ * table schema. If this check is passed, then user can write to BigQuery table using the user
+ * schema, otherwise the write will fail.
+ *
+ * The implementation as of now is not complete, which measn, if this check passed, there is
+ * still a possbility of writing will fail.
+ */
+public class SchemaCompact {
+ private BigQuery bigquery;
+ private static SchemaCompact compact;
+ private static String tablePatternString = "projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)";
+ private static Pattern tablePattern = Pattern.compile(tablePatternString);
+
+ private SchemaCompact(BigQuery bigquery) {
+ this.bigquery = bigquery;
+ }
+
+ /**
+ * Gets a singleton {code SchemaCompact} object.
+ *
+ * @return
+ */
+ public static SchemaCompact getInstance() {
+ if (compact == null) {
+ RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
+ compact = new SchemaCompact(bigqueryHelper.getOptions().getService());
+ }
+ return compact;
+ }
+
+ /**
+ * Gets a {code SchemaCompact} object with custom BigQuery stub.
+ *
+ * @param bigquery
+ * @return
+ */
+ @VisibleForTesting
+ public static SchemaCompact getInstance(BigQuery bigquery) {
+ return new SchemaCompact(bigquery);
+ }
+
+ private TableId getTableId(String tableName) {
+ Matcher matcher = tablePattern.matcher(tableName);
+ if (!matcher.matches() || matcher.groupCount() != 3) {
+ throw new IllegalArgumentException("Invalid table name: " + tableName);
+ }
+ return TableId.of(matcher.group(1), matcher.group(2), matcher.group(3));
+ }
+
+ /**
+ * Checks if the userSchema is compatible with the table's current schema for writing. The current
+ * implementatoin is not complete. If the check failed, the write couldn't succeed.
+ *
+ * @param tableName The name of the table to write to.
+ * @param userSchema The schema user uses to append data.
+ * @throws IllegalArgumentException the check failed.
+ */
+ public void check(String tableName, Descriptors.Descriptor userSchema)
+ throws IllegalArgumentException {
+ Table table = bigquery.getTable(getTableId(tableName));
+ Schema schema = table.getDefinition().getSchema();
+ // TODO: We only have very limited check here. More checks to be added.
+ if (schema.getFields().size() != userSchema.getFields().size()) {
+ throw new IllegalArgumentException(
+ "User schema doesn't have expected field number with BigQuery table schema, expected: "
+ + schema.getFields().size()
+ + " actual: "
+ + userSchema.getFields().size());
+ }
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java
index 5b8a07177b..78e23458ab 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java
@@ -46,11 +46,18 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.threeten.bp.Duration;
+import org.threeten.bp.Instant;
/**
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
*
+ *
This is to be used to managed streaming write when you are working with PENDING streams or
+ * want to explicitly manage offset. In that most common cases when writing with COMMITTED stream
+ * without offset, please use a simpler writer {@code DirectWriter}.
+ *
*
A {@link StreamWrier} provides built-in capabilities to: handle batching of messages;
* controlling memory utilization (through flow control); automatic connection re-establishment and
* request cleanup (only keeps write schema on first request in the stream).
@@ -68,7 +75,13 @@
public class StreamWriter implements AutoCloseable {
private static final Logger LOG = Logger.getLogger(StreamWriter.class.getName());
+ private static String streamPatternString =
+ "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
+
+ private static Pattern streamPattern = Pattern.compile(streamPatternString);
+
private final String streamName;
+ private final String tableName;
private final BatchingSettings batchingSettings;
private final RetrySettings retrySettings;
@@ -92,6 +105,9 @@ public class StreamWriter implements AutoCloseable {
private final AtomicBoolean activeAlarm;
private ScheduledFuture> currentAlarmFuture;
+ private Instant createTime;
+ private Duration streamTTL = Duration.ofDays(1);
+
private Integer currentRetries = 0;
/** The maximum size of one request. Defined by the API. */
@@ -104,12 +120,18 @@ public static long getApiMaxInflightRequests() {
return 5000L;
}
- private StreamWriter(Builder builder) throws IOException {
+ private StreamWriter(Builder builder)
+ throws IllegalArgumentException, IOException, InterruptedException {
+ Matcher matcher = streamPattern.matcher(builder.streamName);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException("Invalid stream name: " + builder.streamName);
+ }
streamName = builder.streamName;
+ tableName = matcher.group(1);
this.batchingSettings = builder.batchingSettings;
this.retrySettings = builder.retrySettings;
- this.messagesBatch = new MessagesBatch(batchingSettings);
+ this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName);
messagesBatchLock = new ReentrantLock();
activeAlarm = new AtomicBoolean(false);
executor = builder.executorProvider.getExecutor();
@@ -129,6 +151,19 @@ private StreamWriter(Builder builder) throws IOException {
.build();
shutdown = new AtomicBoolean(false);
refreshAppend();
+ Stream.WriteStream stream =
+ stub.getWriteStream(Storage.GetWriteStreamRequest.newBuilder().setName(streamName).build());
+ createTime =
+ Instant.ofEpochSecond(
+ stream.getCreateTime().getSeconds(), stream.getCreateTime().getNanos());
+ if (stream.getType() == Stream.WriteStream.Type.PENDING && stream.hasCommitTime()) {
+ throw new IllegalStateException(
+ "Cannot write to a stream that is already committed: " + streamName);
+ }
+ if (createTime.plus(streamTTL).compareTo(Instant.now()) < 0) {
+ throw new IllegalStateException(
+ "Cannot write to a stream that is already expired: " + streamName);
+ }
}
/** Stream name we are writing to. */
@@ -136,6 +171,16 @@ public String getStreamNameString() {
return streamName;
}
+ /** Table name we are writing to. */
+ public String getTableNameString() {
+ return tableName;
+ }
+
+ /** Returns if a stream has expired. */
+ public Boolean expired() {
+ return createTime.plus(streamTTL).compareTo(Instant.now()) < 0;
+ }
+
/**
* Schedules the writing of a message. The write of the message may occur immediately or be
* delayed based on the writer batching options.
@@ -192,11 +237,13 @@ public ApiFuture append(AppendRowsRequest message) {
*
* @throws IOException
*/
- private void refreshAppend() throws IOException {
+ public void refreshAppend() throws IOException, InterruptedException {
synchronized (this) {
Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer.");
if (stub != null) {
+ clientStream.closeSend();
stub.shutdown();
+ stub.awaitTermination(1, TimeUnit.MINUTES);
}
backgroundResourceList.remove(stub);
stub = BigQueryWriteClient.create(stubSettings);
@@ -212,13 +259,14 @@ private void refreshAppend() throws IOException {
}
} catch (InterruptedException expected) {
}
+ LOG.info("Write Stream " + streamName + " connection established");
}
private void setupAlarm() {
if (!messagesBatch.isEmpty()) {
if (!activeAlarm.getAndSet(true)) {
long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis();
- LOG.log(Level.INFO, "Setting up alarm for the next {0} ms.", delayThresholdMs);
+ LOG.log(Level.FINE, "Setting up alarm for the next {0} ms.", delayThresholdMs);
currentAlarmFuture =
executor.schedule(
new Runnable() {
@@ -281,6 +329,7 @@ private void writeBatch(final InflightBatch inflightBatch) {
/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
+ LOG.info("Closing stream writer");
shutdown();
try {
awaitTermination(1, TimeUnit.MINUTES);
@@ -300,10 +349,12 @@ private static final class InflightBatch {
int batchSizeBytes;
long expectedOffset;
Boolean attachSchema;
+ String streamName;
InflightBatch(
List inflightRequests,
int batchSizeBytes,
+ String streamName,
Boolean attachSchema) {
this.inflightRequests = inflightRequests;
this.offsetList = new ArrayList(inflightRequests.size());
@@ -319,6 +370,7 @@ private static final class InflightBatch {
creationTime = System.currentTimeMillis();
this.batchSizeBytes = batchSizeBytes;
this.attachSchema = attachSchema;
+ this.streamName = streamName;
}
int count() {
@@ -345,15 +397,18 @@ private AppendRowsRequest getMergedRequest() throws IllegalStateException {
}
AppendRowsRequest.ProtoData.Builder data =
inflightRequests.get(0).message.getProtoRows().toBuilder().setRows(rowsBuilder.build());
+ AppendRowsRequest.Builder requestBuilder = inflightRequests.get(0).message.toBuilder();
if (!attachSchema) {
data.clearWriterSchema();
+ requestBuilder.clearWriteStream();
} else {
if (!data.hasWriterSchema()) {
throw new IllegalStateException(
"The first message on the connection must have writer schema set");
}
+ requestBuilder.setWriteStream(streamName);
}
- return inflightRequests.get(0).message.toBuilder().setProtoRows(data.build()).build();
+ return requestBuilder.setProtoRows(data.build()).build();
}
private void onFailure(Throwable t) {
@@ -453,13 +508,8 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted
* WriteStream response = bigQueryWriteClient.createWriteStream(request);
* stream = response.getName();
* }
- * WriteStream writer = WriteStream.newBuilder(stream).build();
- * try {
- * // ...
- * } finally {
- * // When finished with the writer, make sure to shutdown to free up resources.
- * writer.shutdown();
- * writer.awaitTermination(1, TimeUnit.MINUTES);
+ * try (WriteStream writer = WriteStream.newBuilder(stream).build()) {
+ * //...
* }
* }
*/
@@ -467,7 +517,7 @@ public static Builder newBuilder(String streamName) {
return new Builder(streamName);
}
- /** A builder of {@link Publisher}s. */
+ /** A builder of {@link StreamWriter}s. */
public static final class Builder {
static final Duration MIN_TOTAL_TIMEOUT = Duration.ofSeconds(10);
static final Duration MIN_RPC_TIMEOUT = Duration.ofMillis(10);
@@ -475,7 +525,7 @@ public static final class Builder {
// Meaningful defaults.
static final long DEFAULT_ELEMENT_COUNT_THRESHOLD = 100L;
static final long DEFAULT_REQUEST_BYTES_THRESHOLD = 100 * 1024L; // 100 kB
- static final Duration DEFAULT_DELAY_THRESHOLD = Duration.ofMillis(1);
+ static final Duration DEFAULT_DELAY_THRESHOLD = Duration.ofMillis(10);
static final FlowControlSettings DEFAULT_FLOW_CONTROL_SETTINGS =
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
@@ -515,9 +565,6 @@ public static final class Builder {
private TransportChannelProvider channelProvider =
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
- private HeaderProvider headerProvider = new NoHeaderProvider();
- private HeaderProvider internalHeaderProvider =
- BigQueryWriteSettings.defaultApiClientHeaderProviderBuilder().build();
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
private CredentialsProvider credentialsProvider =
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();
@@ -647,7 +694,7 @@ public Builder setEndpoint(String endpoint) {
}
/** Builds the {@code StreamWriter}. */
- public StreamWriter build() throws IOException {
+ public StreamWriter build() throws IllegalArgumentException, IOException, InterruptedException {
return new StreamWriter(this);
}
}
@@ -785,7 +832,7 @@ public void onError(Throwable t) {
try {
// Establish a new connection.
streamWriter.refreshAppend();
- } catch (IOException e) {
+ } catch (IOException | InterruptedException e) {
LOG.info("Failed to establish a new connection");
}
}
@@ -805,15 +852,18 @@ private static class MessagesBatch {
private int batchedBytes;
private final BatchingSettings batchingSettings;
private Boolean attachSchema = true;
+ private final String streamName;
- private MessagesBatch(BatchingSettings batchingSettings) {
+ private MessagesBatch(BatchingSettings batchingSettings, String streamName) {
this.batchingSettings = batchingSettings;
+ this.streamName = streamName;
reset();
}
// Get all the messages out in a batch.
private InflightBatch popBatch() {
- InflightBatch batch = new InflightBatch(messages, batchedBytes, this.attachSchema);
+ InflightBatch batch =
+ new InflightBatch(messages, batchedBytes, this.streamName, this.attachSchema);
this.attachSchema = false;
reset();
return batch;
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java
new file mode 100644
index 0000000000..9b7cb1fd18
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigquery.storage.v1alpha2;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.protobuf.Descriptors.Descriptor;
+import java.io.IOException;
+import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A cache of StreamWriters that can be looked up by Table Name. The entries will expire after 5
+ * minutes if not used. Code sample: WriterCache cache = WriterCache.getInstance(); StreamWriter
+ * writer = cache.getWriter(); // Use... cache.returnWriter(writer);
+ */
+public class WriterCache {
+ private static final Logger LOG = Logger.getLogger(WriterCache.class.getName());
+
+ private static String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)";
+ private static Pattern tablePattern = Pattern.compile(tablePatternString);
+
+ private static WriterCache instance;
+ private Cache> writerCache;
+
+ // Maximum number of tables to hold in the cache, once the maxium exceeded, the cache will be
+ // evicted based on least recent used.
+ private static final int MAX_TABLE_ENTRY = 100;
+ private static final int MAX_WRITERS_PER_TABLE = 2;
+
+ private final BigQueryWriteClient stub;
+ private final SchemaCompact compact;
+
+ private WriterCache(BigQueryWriteClient stub, int maxTableEntry, SchemaCompact compact) {
+ this.stub = stub;
+ this.compact = compact;
+ writerCache =
+ CacheBuilder.newBuilder()
+ .maximumSize(maxTableEntry)
+ .>build();
+ }
+
+ public static WriterCache getInstance() throws IOException {
+ if (instance == null) {
+ BigQueryWriteSettings stubSettings = BigQueryWriteSettings.newBuilder().build();
+ BigQueryWriteClient stub = BigQueryWriteClient.create(stubSettings);
+ instance = new WriterCache(stub, MAX_TABLE_ENTRY, SchemaCompact.getInstance());
+ }
+ return instance;
+ }
+
+ /** Returns a cache with custom stub used by test. */
+ @VisibleForTesting
+ public static WriterCache getTestInstance(
+ BigQueryWriteClient stub, int maxTableEntry, SchemaCompact compact) {
+ return new WriterCache(stub, maxTableEntry, compact);
+ }
+
+ /** Returns an entry with {@code StreamWriter} and expiration time in millis. */
+ private String CreateNewStream(String tableName) {
+ Stream.WriteStream stream =
+ Stream.WriteStream.newBuilder().setType(Stream.WriteStream.Type.COMMITTED).build();
+ stream =
+ stub.createWriteStream(
+ Storage.CreateWriteStreamRequest.newBuilder()
+ .setParent(tableName)
+ .setWriteStream(stream)
+ .build());
+ LOG.info("Created write stream:" + stream.getName());
+ return stream.getName();
+ }
+
+ StreamWriter CreateNewWriter(String streamName)
+ throws IllegalArgumentException, IOException, InterruptedException {
+ return StreamWriter.newBuilder(streamName)
+ .setChannelProvider(stub.getSettings().getTransportChannelProvider())
+ .setCredentialsProvider(stub.getSettings().getCredentialsProvider())
+ .setExecutorProvider(stub.getSettings().getExecutorProvider())
+ .build();
+ }
+ /**
+ * Gets a writer for a given table with a given user schema from global cache.
+ *
+ * @param tableName
+ * @param userSchema
+ * @return
+ * @throws Exception
+ */
+ public StreamWriter getTableWriter(String tableName, Descriptor userSchema)
+ throws IllegalArgumentException, IOException, InterruptedException {
+ Matcher matcher = tablePattern.matcher(tableName);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException("Invalid table name: " + tableName);
+ }
+
+ String streamName = null;
+ Boolean streamExpired = false;
+ StreamWriter writer = null;
+ Cache tableEntry = null;
+
+ synchronized (this) {
+ tableEntry = writerCache.getIfPresent(tableName);
+ if (tableEntry != null) {
+ writer = tableEntry.getIfPresent(userSchema);
+ if (writer != null) {
+ if (!writer.expired()) {
+ return writer;
+ } else {
+ writer.close();
+ }
+ }
+ compact.check(tableName, userSchema);
+ streamName = CreateNewStream(tableName);
+ writer = CreateNewWriter(streamName);
+ tableEntry.put(userSchema, writer);
+ } else {
+ compact.check(tableName, userSchema);
+ streamName = CreateNewStream(tableName);
+ tableEntry =
+ CacheBuilder.newBuilder()
+ .maximumSize(MAX_WRITERS_PER_TABLE)
+ .build();
+ writer = CreateNewWriter(streamName);
+ tableEntry.put(userSchema, writer);
+ writerCache.put(tableName, tableEntry);
+ }
+ }
+
+ return writer;
+ }
+
+ @VisibleForTesting
+ public long cachedTableCount() {
+ synchronized (writerCache) {
+ return writerCache.size();
+ }
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java
new file mode 100644
index 0000000000..f6a0948802
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigquery.storage.v1alpha2;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.testing.LocalChannelProvider;
+import com.google.api.gax.grpc.testing.MockGrpcService;
+import com.google.api.gax.grpc.testing.MockServiceHelper;
+import com.google.cloud.bigquery.storage.test.Test.*;
+import com.google.protobuf.AbstractMessage;
+import com.google.protobuf.Timestamp;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.junit.*;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.threeten.bp.Instant;
+
+@RunWith(JUnit4.class)
+public class DirectWriterTest {
+ private static final String TEST_TABLE = "projects/p/datasets/d/tables/t";
+ private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
+ private static final String TEST_STREAM_2 = "projects/p/datasets/d/tables/t/streams/s2";
+
+ private static MockBigQueryWrite mockBigQueryWrite;
+ private static MockServiceHelper serviceHelper;
+ private BigQueryWriteClient client;
+ private LocalChannelProvider channelProvider;
+
+ @Mock private static SchemaCompact schemaCheck;
+
+ @BeforeClass
+ public static void startStaticServer() {
+ mockBigQueryWrite = new MockBigQueryWrite();
+ serviceHelper =
+ new MockServiceHelper(
+ UUID.randomUUID().toString(), Arrays.asList(mockBigQueryWrite));
+ serviceHelper.start();
+ }
+
+ @AfterClass
+ public static void stopServer() {
+ serviceHelper.stop();
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ serviceHelper.reset();
+ channelProvider = serviceHelper.createChannelProvider();
+ BigQueryWriteSettings settings =
+ BigQueryWriteSettings.newBuilder()
+ .setTransportChannelProvider(channelProvider)
+ .setCredentialsProvider(NoCredentialsProvider.create())
+ .build();
+ client = BigQueryWriteClient.create(settings);
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ client.close();
+ }
+
+ /** Response mocks for create a new writer */
+ void WriterCreationResponseMock(String testStreamName, List responseOffsets) {
+ // Response from CreateWriteStream
+ Stream.WriteStream expectedResponse =
+ Stream.WriteStream.newBuilder().setName(testStreamName).build();
+ mockBigQueryWrite.addResponse(expectedResponse);
+
+ // Response from GetWriteStream
+ Instant time = Instant.now();
+ Timestamp timestamp =
+ Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
+ Stream.WriteStream expectedResponse2 =
+ Stream.WriteStream.newBuilder()
+ .setName(testStreamName)
+ .setType(Stream.WriteStream.Type.COMMITTED)
+ .setCreateTime(timestamp)
+ .build();
+ mockBigQueryWrite.addResponse(expectedResponse2);
+
+ for (Long offset : responseOffsets) {
+ Storage.AppendRowsResponse response =
+ Storage.AppendRowsResponse.newBuilder().setOffset(offset).build();
+ mockBigQueryWrite.addResponse(response);
+ }
+ }
+
+ @Test
+ public void testWriteSuccess() throws Exception {
+ DirectWriter.testSetStub(client, 10, schemaCheck);
+ FooType m1 = FooType.newBuilder().setFoo("m1").build();
+ FooType m2 = FooType.newBuilder().setFoo("m2").build();
+
+ WriterCreationResponseMock(TEST_STREAM, Arrays.asList(Long.valueOf(0L)));
+ ApiFuture ret = DirectWriter.append(TEST_TABLE, Arrays.asList(m1, m2));
+ verify(schemaCheck).check(TEST_TABLE, FooType.getDescriptor());
+ assertEquals(Long.valueOf(0L), ret.get());
+ List actualRequests = mockBigQueryWrite.getRequests();
+ Assert.assertEquals(3, actualRequests.size());
+ assertEquals(
+ TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
+ assertEquals(
+ Stream.WriteStream.Type.COMMITTED,
+ ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType());
+ assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());
+
+ Storage.AppendRowsRequest.ProtoData.Builder dataBuilder =
+ Storage.AppendRowsRequest.ProtoData.newBuilder();
+ dataBuilder.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()));
+ dataBuilder.setRows(
+ ProtoBufProto.ProtoRows.newBuilder()
+ .addSerializedRows(m1.toByteString())
+ .addSerializedRows(m2.toByteString())
+ .build());
+ Storage.AppendRowsRequest expectRequest =
+ Storage.AppendRowsRequest.newBuilder()
+ .setWriteStream(TEST_STREAM)
+ .setProtoRows(dataBuilder.build())
+ .build();
+ assertEquals(expectRequest.toString(), actualRequests.get(2).toString());
+
+ Storage.AppendRowsResponse response =
+ Storage.AppendRowsResponse.newBuilder().setOffset(2).build();
+ mockBigQueryWrite.addResponse(response);
+ // Append again, write stream name and schema are cleared.
+ ret = DirectWriter.append(TEST_TABLE, Arrays.asList(m1));
+ assertEquals(Long.valueOf(2L), ret.get());
+ dataBuilder = Storage.AppendRowsRequest.ProtoData.newBuilder();
+ dataBuilder.setRows(
+ ProtoBufProto.ProtoRows.newBuilder().addSerializedRows(m1.toByteString()).build());
+ expectRequest =
+ Storage.AppendRowsRequest.newBuilder().setProtoRows(dataBuilder.build()).build();
+ assertEquals(expectRequest.toString(), actualRequests.get(3).toString());
+
+ // Write with a different schema.
+ WriterCreationResponseMock(TEST_STREAM_2, Arrays.asList(Long.valueOf(0L)));
+ AllSupportedTypes m3 = AllSupportedTypes.newBuilder().setStringValue("s").build();
+ ret = DirectWriter.append(TEST_TABLE, Arrays.asList(m3));
+ verify(schemaCheck).check(TEST_TABLE, AllSupportedTypes.getDescriptor());
+ assertEquals(Long.valueOf(0L), ret.get());
+ dataBuilder = Storage.AppendRowsRequest.ProtoData.newBuilder();
+ dataBuilder.setWriterSchema(ProtoSchemaConverter.convert(AllSupportedTypes.getDescriptor()));
+ dataBuilder.setRows(
+ ProtoBufProto.ProtoRows.newBuilder().addSerializedRows(m3.toByteString()).build());
+ expectRequest =
+ Storage.AppendRowsRequest.newBuilder()
+ .setWriteStream(TEST_STREAM_2)
+ .setProtoRows(dataBuilder.build())
+ .build();
+ Assert.assertEquals(7, actualRequests.size());
+ assertEquals(
+ TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(4)).getParent());
+ assertEquals(
+ Stream.WriteStream.Type.COMMITTED,
+ ((Storage.CreateWriteStreamRequest) actualRequests.get(4)).getWriteStream().getType());
+ assertEquals(TEST_STREAM_2, ((Storage.GetWriteStreamRequest) actualRequests.get(5)).getName());
+ assertEquals(expectRequest.toString(), actualRequests.get(6).toString());
+ }
+
+ @Test
+ public void testWriteBadTableName() throws Exception {
+ DirectWriter.testSetStub(client, 10, schemaCheck);
+ FooType m1 = FooType.newBuilder().setFoo("m1").build();
+ FooType m2 = FooType.newBuilder().setFoo("m2").build();
+
+ try {
+ ApiFuture ret = DirectWriter.append("abc", Arrays.asList(m1, m2));
+ fail("should fail");
+ } catch (IllegalArgumentException expected) {
+ assertEquals("Invalid table name: abc", expected.getMessage());
+ }
+ }
+
+ @Test
+ public void testConcurrentAccess() throws Exception {
+ WriterCache cache = WriterCache.getTestInstance(client, 2, schemaCheck);
+ final FooType m1 = FooType.newBuilder().setFoo("m1").build();
+ final FooType m2 = FooType.newBuilder().setFoo("m2").build();
+ final List expectedOffset =
+ Arrays.asList(
+ Long.valueOf(0L),
+ Long.valueOf(2L),
+ Long.valueOf(4L),
+ Long.valueOf(8L),
+ Long.valueOf(10L));
+ // Make sure getting the same table writer in multiple thread only cause create to be called
+ // once.
+ WriterCreationResponseMock(TEST_STREAM, expectedOffset);
+ ExecutorService executor = Executors.newFixedThreadPool(5);
+ for (int i = 0; i < 5; i++) {
+ executor.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ApiFuture result =
+ DirectWriter.append(TEST_TABLE, Arrays.asList(m1, m2));
+ assertTrue(expectedOffset.remove(result.get()));
+ } catch (IOException | InterruptedException | ExecutionException e) {
+ fail(e.getMessage());
+ }
+ }
+ });
+ }
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java
index 5298e80ae4..f1350ce7e7 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java
@@ -44,10 +44,16 @@ public List getAppendRequests() {
return serviceImpl.getCapturedRequests();
}
+ public List getWriteStreamRequests() {
+ return serviceImpl.getCapturedWriteRequests();
+ }
+
@Override
public void addResponse(AbstractMessage response) {
if (response instanceof AppendRowsResponse) {
serviceImpl.addResponse((AppendRowsResponse) response);
+ } else if (response instanceof Stream.WriteStream) {
+ serviceImpl.addWriteStreamResponse((Stream.WriteStream) response);
} else {
throw new IllegalStateException("Unsupported service");
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java
index aa3f7e734d..0a3aa2e622 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java
@@ -36,7 +36,11 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
private static final Logger LOG = Logger.getLogger(FakeBigQueryWriteImpl.class.getName());
private final LinkedBlockingQueue requests = new LinkedBlockingQueue<>();
+ private final LinkedBlockingQueue writeRequests =
+ new LinkedBlockingQueue<>();
private final LinkedBlockingQueue responses = new LinkedBlockingQueue<>();
+ private final LinkedBlockingQueue writeResponses =
+ new LinkedBlockingQueue<>();
private final AtomicInteger nextMessageId = new AtomicInteger(1);
private boolean autoPublishResponse;
private ScheduledExecutorService executor = null;
@@ -78,6 +82,21 @@ public String toString() {
}
}
+ @Override
+ public void getWriteStream(
+ GetWriteStreamRequest request, StreamObserver responseObserver) {
+ Object response = writeResponses.remove();
+ if (response instanceof Stream.WriteStream) {
+ writeRequests.add(request);
+ responseObserver.onNext((Stream.WriteStream) response);
+ responseObserver.onCompleted();
+ } else if (response instanceof Exception) {
+ responseObserver.onError((Exception) response);
+ } else {
+ responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
+ }
+ }
+
@Override
public StreamObserver appendRows(
final StreamObserver responseObserver) {
@@ -149,6 +168,11 @@ public FakeBigQueryWriteImpl addResponse(AppendRowsResponse.Builder appendRespon
return addResponse(appendResponseBuilder.build());
}
+ public FakeBigQueryWriteImpl addWriteStreamResponse(Stream.WriteStream response) {
+ writeResponses.add(response);
+ return this;
+ }
+
public FakeBigQueryWriteImpl addConnectionError(Throwable error) {
responses.add(new Response(error));
return this;
@@ -158,6 +182,10 @@ public List getCapturedRequests() {
return new ArrayList(requests);
}
+ public List getCapturedWriteRequests() {
+ return new ArrayList(writeRequests);
+ }
+
public void reset() {
requests.clear();
responses.clear();
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/MockBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/MockBigQueryWriteImpl.java
index 32e15e28d0..a82a3dbdb3 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/MockBigQueryWriteImpl.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/MockBigQueryWriteImpl.java
@@ -83,11 +83,12 @@ public void createWriteStream(
@Override
public StreamObserver appendRows(
final StreamObserver responseObserver) {
- final Object response = responses.remove();
StreamObserver requestObserver =
new StreamObserver() {
@Override
public void onNext(AppendRowsRequest value) {
+ requests.add(value);
+ final Object response = responses.remove();
if (response instanceof AppendRowsResponse) {
responseObserver.onNext((AppendRowsResponse) response);
} else if (response instanceof Exception) {
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java
new file mode 100644
index 0000000000..205d814968
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2016 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigquery.storage.v1alpha2;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import com.google.cloud.bigquery.*;
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.LegacySQLTypeName;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.Table;
+import com.google.cloud.bigquery.storage.test.Test.FooType;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+@RunWith(JUnit4.class)
+public class SchemaCompactTest {
+ @Mock private BigQuery mockBigquery;
+ @Mock private Table mockBigqueryTable;
+
+ @Before
+ public void setUp() throws IOException {
+ MockitoAnnotations.initMocks(this);
+ when(mockBigquery.getTable(any(TableId.class))).thenReturn(mockBigqueryTable);
+ }
+
+ @After
+ public void tearDown() {
+ verifyNoMoreInteractions(mockBigquery);
+ verifyNoMoreInteractions(mockBigqueryTable);
+ }
+
+ @Test
+ public void testSuccess() throws Exception {
+ TableDefinition definition =
+ new TableDefinition() {
+ @Override
+ public Type getType() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Schema getSchema() {
+ return Schema.of(Field.of("Foo", LegacySQLTypeName.STRING));
+ }
+
+ @Override
+ public Builder toBuilder() {
+ return null;
+ }
+ };
+ when(mockBigqueryTable.getDefinition()).thenReturn(definition);
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ compact.check("projects/p/datasets/d/tables/t", FooType.getDescriptor());
+ verify(mockBigquery, times(1)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(1)).getDefinition();
+ }
+
+ @Test
+ public void testFailed() throws Exception {
+ TableDefinition definition =
+ new TableDefinition() {
+ @Override
+ public Type getType() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Schema getSchema() {
+ return Schema.of(
+ Field.of("Foo", LegacySQLTypeName.STRING),
+ Field.of("Bar", LegacySQLTypeName.STRING));
+ }
+
+ @Override
+ public Builder toBuilder() {
+ return null;
+ }
+ };
+ when(mockBigqueryTable.getDefinition()).thenReturn(definition);
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ try {
+ compact.check("projects/p/datasets/d/tables/t", FooType.getDescriptor());
+ fail("should fail");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "User schema doesn't have expected field number with BigQuery table schema, expected: 2 actual: 1",
+ expected.getMessage());
+ }
+ verify(mockBigquery, times(1)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(1)).getDefinition();
+ }
+
+ @Test
+ public void testBadTableName() throws Exception {
+ try {
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ compact.check("blah", FooType.getDescriptor());
+ fail("should fail");
+ } catch (IllegalArgumentException expected) {
+ assertEquals("Invalid table name: blah", expected.getMessage());
+ }
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java
index 11bbd66010..38394a7479 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java
@@ -38,6 +38,7 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.*;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
+import com.google.protobuf.Timestamp;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.Arrays;
@@ -53,6 +54,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;
+import org.threeten.bp.Instant;
@RunWith(JUnit4.class)
public class StreamWriterTest {
@@ -75,6 +77,14 @@ public void setUp() throws Exception {
channelProvider = serviceHelper.createChannelProvider();
fakeExecutor = new FakeScheduledExecutorService();
testBigQueryWrite.setExecutor(fakeExecutor);
+ Instant time = Instant.now();
+ Timestamp timestamp =
+ Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
+ // Add enough GetWriteStream response.
+ for (int i = 0; i < 4; i++) {
+ testBigQueryWrite.addResponse(
+ Stream.WriteStream.newBuilder().setName(TEST_STREAM).setCreateTime(timestamp).build());
+ }
}
@After
@@ -123,6 +133,12 @@ private ApiFuture sendTestMessage(StreamWriter writer, Strin
return writer.append(createAppendRequest(messages, -1));
}
+ @Test
+ public void testTableName() throws Exception {
+ StreamWriter writer = getTestStreamWriterBuilder().build();
+ assertEquals("projects/p/datasets/d/tables/t", writer.getTableNameString());
+ }
+
@Test
public void testAppendByDuration() throws Exception {
StreamWriter writer =
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java
new file mode 100644
index 0000000000..47ad647e66
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java
@@ -0,0 +1,285 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigquery.storage.v1alpha2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.*;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.testing.LocalChannelProvider;
+import com.google.api.gax.grpc.testing.MockGrpcService;
+import com.google.api.gax.grpc.testing.MockServiceHelper;
+import com.google.cloud.bigquery.storage.test.Test.*;
+import com.google.protobuf.AbstractMessage;
+import com.google.protobuf.Timestamp;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Logger;
+import org.junit.*;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.threeten.bp.Instant;
+import org.threeten.bp.temporal.ChronoUnit;
+
+@RunWith(JUnit4.class)
+public class WriterCacheTest {
+ private static final Logger LOG = Logger.getLogger(StreamWriterTest.class.getName());
+
+ private static final String TEST_TABLE = "projects/p/datasets/d/tables/t";
+ private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
+ private static final String TEST_STREAM_2 = "projects/p/datasets/d/tables/t/streams/s2";
+ private static final String TEST_STREAM_3 = "projects/p/datasets/d/tables/t/streams/s3";
+ private static final String TEST_STREAM_4 = "projects/p/datasets/d/tables/t/streams/s4";
+ private static final String TEST_TABLE_2 = "projects/p/datasets/d/tables/t2";
+ private static final String TEST_STREAM_21 = "projects/p/datasets/d/tables/t2/streams/s1";
+ private static final String TEST_TABLE_3 = "projects/p/datasets/d/tables/t3";
+ private static final String TEST_STREAM_31 = "projects/p/datasets/d/tables/t3/streams/s1";
+
+ private static MockBigQueryWrite mockBigQueryWrite;
+ private static MockServiceHelper serviceHelper;
+ @Mock private static SchemaCompact mockSchemaCheck;
+ private BigQueryWriteClient client;
+ private LocalChannelProvider channelProvider;
+
+ @BeforeClass
+ public static void startStaticServer() {
+ mockBigQueryWrite = new MockBigQueryWrite();
+ serviceHelper =
+ new MockServiceHelper(
+ UUID.randomUUID().toString(), Arrays.asList(mockBigQueryWrite));
+ serviceHelper.start();
+ }
+
+ @AfterClass
+ public static void stopServer() {
+ serviceHelper.stop();
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ serviceHelper.reset();
+ channelProvider = serviceHelper.createChannelProvider();
+ BigQueryWriteSettings settings =
+ BigQueryWriteSettings.newBuilder()
+ .setTransportChannelProvider(channelProvider)
+ .setCredentialsProvider(NoCredentialsProvider.create())
+ .build();
+ client = BigQueryWriteClient.create(settings);
+ MockitoAnnotations.initMocks(this);
+ }
+
+ /** Response mocks for create a new writer */
+ void WriterCreationResponseMock(String testStreamName) {
+ // Response from CreateWriteStream
+ Stream.WriteStream expectedResponse =
+ Stream.WriteStream.newBuilder().setName(testStreamName).build();
+ mockBigQueryWrite.addResponse(expectedResponse);
+
+ // Response from GetWriteStream
+ Instant time = Instant.now();
+ Timestamp timestamp =
+ Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
+ Stream.WriteStream expectedResponse2 =
+ Stream.WriteStream.newBuilder()
+ .setName(testStreamName)
+ .setType(Stream.WriteStream.Type.COMMITTED)
+ .setCreateTime(timestamp)
+ .build();
+ mockBigQueryWrite.addResponse(expectedResponse2);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ client.close();
+ }
+
+ @Test
+ public void testRejectBadTableName() throws Exception {
+ WriterCache cache = WriterCache.getTestInstance(client, 10, mockSchemaCheck);
+ try {
+ cache.getTableWriter("abc", FooType.getDescriptor());
+ fail();
+ } catch (IllegalArgumentException expected) {
+ assertEquals(expected.getMessage(), "Invalid table name: abc");
+ }
+ }
+
+ @Test
+ public void testCreateNewWriter() throws Exception {
+ WriterCache cache = WriterCache.getTestInstance(client, 10, mockSchemaCheck);
+ WriterCreationResponseMock(TEST_STREAM);
+ StreamWriter writer = cache.getTableWriter(TEST_TABLE, FooType.getDescriptor());
+ verify(mockSchemaCheck, times(1)).check(TEST_TABLE, FooType.getDescriptor());
+ List actualRequests = mockBigQueryWrite.getRequests();
+ assertEquals(2, actualRequests.size());
+ assertEquals(
+ TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
+ assertEquals(
+ Stream.WriteStream.Type.COMMITTED,
+ ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getWriteStream().getType());
+ assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());
+
+ assertEquals(TEST_TABLE, writer.getTableNameString());
+ assertEquals(TEST_STREAM, writer.getStreamNameString());
+ assertEquals(1, cache.cachedTableCount());
+ }
+
+ @Test
+ public void testWriterExpired() throws Exception {
+ WriterCache cache = WriterCache.getTestInstance(client, 10, mockSchemaCheck);
+ // Response from CreateWriteStream
+ Stream.WriteStream expectedResponse =
+ Stream.WriteStream.newBuilder().setName(TEST_STREAM).build();
+ mockBigQueryWrite.addResponse(expectedResponse);
+
+ // Response from GetWriteStream
+ Instant time = Instant.now().minus(2, ChronoUnit.DAYS);
+ Timestamp timestamp =
+ Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
+ Stream.WriteStream expectedResponse2 =
+ Stream.WriteStream.newBuilder()
+ .setName(TEST_STREAM)
+ .setType(Stream.WriteStream.Type.COMMITTED)
+ .setCreateTime(timestamp)
+ .build();
+ mockBigQueryWrite.addResponse(expectedResponse2);
+
+ try {
+ StreamWriter writer = cache.getTableWriter(TEST_TABLE, FooType.getDescriptor());
+ fail("Should fail");
+ } catch (IllegalStateException e) {
+ assertEquals(
+ "Cannot write to a stream that is already expired: projects/p/datasets/d/tables/t/streams/s",
+ e.getMessage());
+ }
+ }
+
+ @Test
+ public void testWriterWithNewSchema() throws Exception {
+ WriterCache cache = WriterCache.getTestInstance(client, 10, mockSchemaCheck);
+ WriterCreationResponseMock(TEST_STREAM);
+ WriterCreationResponseMock(TEST_STREAM_2);
+ StreamWriter writer1 = cache.getTableWriter(TEST_TABLE, FooType.getDescriptor());
+ verify(mockSchemaCheck, times(1)).check(TEST_TABLE, FooType.getDescriptor());
+
+ StreamWriter writer2 = cache.getTableWriter(TEST_TABLE, AllSupportedTypes.getDescriptor());
+ verify(mockSchemaCheck, times(1)).check(TEST_TABLE, AllSupportedTypes.getDescriptor());
+
+ List actualRequests = mockBigQueryWrite.getRequests();
+ assertEquals(4, actualRequests.size());
+ assertEquals(
+ TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
+ assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());
+ assertEquals(
+ TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(2)).getParent());
+ assertEquals(TEST_STREAM_2, ((Storage.GetWriteStreamRequest) actualRequests.get(3)).getName());
+ assertEquals(TEST_STREAM, writer1.getStreamNameString());
+ assertEquals(TEST_STREAM_2, writer2.getStreamNameString());
+ assertEquals(1, cache.cachedTableCount());
+
+ // Still able to get the FooType writer.
+ StreamWriter writer3 = cache.getTableWriter(TEST_TABLE, FooType.getDescriptor());
+ verify(mockSchemaCheck, times(1)).check(TEST_TABLE, FooType.getDescriptor());
+ assertEquals(TEST_STREAM, writer3.getStreamNameString());
+
+ // Create a writer with a even new schema.
+ WriterCreationResponseMock(TEST_STREAM_3);
+ WriterCreationResponseMock(TEST_STREAM_4);
+ StreamWriter writer4 = cache.getTableWriter(TEST_TABLE, NestedType.getDescriptor());
+ verify(mockSchemaCheck, times(1)).check(TEST_TABLE, NestedType.getDescriptor());
+
+ LOG.info("blah");
+ // This would cause a new stream to be created since the old entry is evicted.
+ StreamWriter writer5 = cache.getTableWriter(TEST_TABLE, AllSupportedTypes.getDescriptor());
+ verify(mockSchemaCheck, times(2)).check(TEST_TABLE, AllSupportedTypes.getDescriptor());
+ assertEquals(TEST_STREAM_3, writer4.getStreamNameString());
+ assertEquals(TEST_STREAM_4, writer5.getStreamNameString());
+ assertEquals(1, cache.cachedTableCount());
+ }
+
+ @Test
+ public void testWriterWithDifferentTable() throws Exception {
+ WriterCache cache = WriterCache.getTestInstance(client, 2, mockSchemaCheck);
+ WriterCreationResponseMock(TEST_STREAM);
+ WriterCreationResponseMock(TEST_STREAM_21);
+ StreamWriter writer1 = cache.getTableWriter(TEST_TABLE, FooType.getDescriptor());
+ StreamWriter writer2 = cache.getTableWriter(TEST_TABLE_2, FooType.getDescriptor());
+ verify(mockSchemaCheck, times(1)).check(TEST_TABLE, FooType.getDescriptor());
+ verify(mockSchemaCheck, times(1)).check(TEST_TABLE_2, FooType.getDescriptor());
+
+ List actualRequests = mockBigQueryWrite.getRequests();
+ assertEquals(4, actualRequests.size());
+ assertEquals(
+ TEST_TABLE, ((Storage.CreateWriteStreamRequest) actualRequests.get(0)).getParent());
+ assertEquals(TEST_STREAM, ((Storage.GetWriteStreamRequest) actualRequests.get(1)).getName());
+ assertEquals(
+ TEST_TABLE_2, ((Storage.CreateWriteStreamRequest) actualRequests.get(2)).getParent());
+ Assert.assertEquals(
+ TEST_STREAM_21, ((Storage.GetWriteStreamRequest) actualRequests.get(3)).getName());
+ assertEquals(TEST_STREAM, writer1.getStreamNameString());
+ assertEquals(TEST_STREAM_21, writer2.getStreamNameString());
+ assertEquals(2, cache.cachedTableCount());
+
+ // Still able to get the FooType writer.
+ StreamWriter writer3 = cache.getTableWriter(TEST_TABLE_2, FooType.getDescriptor());
+ verify(mockSchemaCheck, times(1)).check(TEST_TABLE_2, FooType.getDescriptor());
+ Assert.assertEquals(TEST_STREAM_21, writer3.getStreamNameString());
+
+ // Create a writer with a even new schema.
+ WriterCreationResponseMock(TEST_STREAM_31);
+ WriterCreationResponseMock(TEST_STREAM);
+ StreamWriter writer4 = cache.getTableWriter(TEST_TABLE_3, NestedType.getDescriptor());
+ verify(mockSchemaCheck, times(1)).check(TEST_TABLE_3, NestedType.getDescriptor());
+
+ // This would cause a new stream to be created since the old entry is evicted.
+ StreamWriter writer5 = cache.getTableWriter(TEST_TABLE, FooType.getDescriptor());
+ verify(mockSchemaCheck, times(2)).check(TEST_TABLE, FooType.getDescriptor());
+
+ assertEquals(TEST_STREAM_31, writer4.getStreamNameString());
+ assertEquals(TEST_STREAM, writer5.getStreamNameString());
+ assertEquals(2, cache.cachedTableCount());
+ }
+
+ @Test
+ public void testConcurrentAccess() throws Exception {
+ final WriterCache cache = WriterCache.getTestInstance(client, 2, mockSchemaCheck);
+ // Make sure getting the same table writer in multiple thread only cause create to be called
+ // once.
+ WriterCreationResponseMock(TEST_STREAM);
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 10; i++) {
+ executor.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ assertTrue(cache.getTableWriter(TEST_TABLE, FooType.getDescriptor()) != null);
+ } catch (IOException | InterruptedException e) {
+ fail(e.getMessage());
+ }
+ }
+ });
+ }
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java
index ba07e2b5b9..04c831ccc6 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java
@@ -17,23 +17,24 @@
package com.google.cloud.bigquery.storage.v1alpha2.it;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import com.google.api.core.ApiFuture;
import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.*;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.storage.test.Test.*;
-import com.google.cloud.bigquery.storage.v1alpha2.BigQueryWriteClient;
-import com.google.cloud.bigquery.storage.v1alpha2.ProtoBufProto;
-import com.google.cloud.bigquery.storage.v1alpha2.ProtoSchemaConverter;
+import com.google.cloud.bigquery.storage.v1alpha2.*;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.*;
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
-import com.google.cloud.bigquery.storage.v1alpha2.StreamWriter;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.protobuf.Int64Value;
+import com.google.protobuf.Message;
import java.io.IOException;
-import java.util.Iterator;
-import java.util.concurrent.ExecutionException;
+import java.util.*;
+import java.util.concurrent.*;
import java.util.logging.Logger;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -225,18 +226,30 @@ public void testComplicateSchemaWithPendingStream()
.setOffset(Int64Value.of(1L))
.build());
assertEquals(1, response2.get().getOffset());
- }
- // Nothing showed up since rows are not committed.
- TableResult result =
- bigquery.listTableData(
- tableInfo2.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
- Iterator iter = result.getValues().iterator();
- assertEquals(false, iter.hasNext());
+ // Nothing showed up since rows are not committed.
+ TableResult result =
+ bigquery.listTableData(
+ tableInfo2.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
+ Iterator iter = result.getValues().iterator();
+ assertEquals(false, iter.hasNext());
- FinalizeWriteStreamResponse finalizeResponse =
- client.finalizeWriteStream(
- FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build());
+ FinalizeWriteStreamResponse finalizeResponse =
+ client.finalizeWriteStream(
+ FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build());
+
+ ApiFuture response3 =
+ streamWriter.append(
+ createAppendRequestComplicateType(writeStream.getName(), new String[] {"ccc"})
+ .setOffset(Int64Value.of(1L))
+ .build());
+ try {
+ assertEquals(2, response3.get().getOffset());
+ fail("Append to finalized stream should fail.");
+ } catch (Exception expected) {
+ // The exception thrown is not stable. Opened a bug to fix it.
+ }
+ }
// Finalize row count is not populated.
// assertEquals(1, finalizeResponse.getRowCount());
BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse =
@@ -246,23 +259,18 @@ public void testComplicateSchemaWithPendingStream()
.addWriteStreams(writeStream.getName())
.build());
assertEquals(true, batchCommitWriteStreamsResponse.hasCommitTime());
-
- LOG.info("Waiting for termination");
- // The awaitTermination always returns false.
- // assertEquals(true, streamWriter.awaitTermination(10, TimeUnit.SECONDS));
-
- // Data showed up.
- result =
- bigquery.listTableData(
- tableInfo2.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
- iter = result.getValues().iterator();
+ TableResult queryResult =
+ bigquery.query(
+ QueryJobConfiguration.newBuilder("SELECT * from " + DATASET + '.' + TABLE2).build());
+ Iterator queryIter = queryResult.getValues().iterator();
+ assertTrue(queryIter.hasNext());
assertEquals(
"[FieldValue{attribute=REPEATED, value=[FieldValue{attribute=PRIMITIVE, value=aaa}, FieldValue{attribute=PRIMITIVE, value=aaa}]}]",
- iter.next().get(1).getRepeatedValue().toString());
+ queryIter.next().get(1).getRepeatedValue().toString());
assertEquals(
"[FieldValue{attribute=REPEATED, value=[FieldValue{attribute=PRIMITIVE, value=bbb}, FieldValue{attribute=PRIMITIVE, value=bbb}]}]",
- iter.next().get(1).getRepeatedValue().toString());
- assertEquals(false, iter.hasNext());
+ queryIter.next().get(1).getRepeatedValue().toString());
+ assertFalse(queryIter.hasNext());
}
@Test
@@ -275,7 +283,6 @@ public void testStreamError() throws IOException, InterruptedException, Executio
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) {
-
AppendRowsRequest request =
createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build();
request
@@ -290,18 +297,15 @@ public void testStreamError() throws IOException, InterruptedException, Executio
ApiFuture response2 =
streamWriter.append(
createAppendRequest(writeStream.getName(), new String[] {"aaa"})
- .setWriteStream("blah")
+ .setOffset(Int64Value.of(100L))
.build());
try {
response2.get().getOffset();
Assert.fail("Should fail");
} catch (ExecutionException e) {
assertEquals(
- true,
- e.getCause()
- .getMessage()
- .startsWith(
- "INVALID_ARGUMENT: Stream name `blah` in the request doesn't match the one already specified"));
+ "OUT_OF_RANGE: The offset is beyond stream, expected offset 1, received 100",
+ e.getCause().getMessage());
}
// We can keep sending requests on the same stream.
ApiFuture response3 =
@@ -341,4 +345,51 @@ public void testStreamReconnect() throws IOException, InterruptedException, Exec
assertEquals(1L, response.get().getOffset());
}
}
+
+ class CallAppend implements Runnable {
+ List> resultList;
+ List messages;
+
+ CallAppend(List> resultList, List messages) {
+ this.resultList = resultList;
+ this.messages = messages;
+ }
+
+ @Override
+ public void run() {
+ try {
+ LOG.info("size: " + resultList.size());
+ resultList.add(DirectWriter.append(tableId, messages));
+ } catch (Exception e) {
+ fail("Unexpected Exception: " + e.toString());
+ }
+ }
+ }
+
+ @Test
+ public void testDirectWrite() throws IOException, InterruptedException, ExecutionException {
+ final FooType fa = FooType.newBuilder().setFoo("aaa").build();
+ final FooType fb = FooType.newBuilder().setFoo("bbb").build();
+ Set expectedOffset = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ expectedOffset.add(Long.valueOf(i * 2));
+ }
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+ List> responses = new ArrayList<>();
+ Callable callable =
+ new Callable() {
+ @Override
+ public Long call() throws IOException, InterruptedException, ExecutionException {
+ ApiFuture result = DirectWriter.append(tableId, Arrays.asList(fa, fb));
+ return result.get();
+ }
+ };
+ for (int i = 0; i < 10; i++) {
+ responses.add(executor.submit(callable));
+ }
+ for (Future response : responses) {
+ assertTrue(expectedOffset.remove(response.get()));
+ }
+ assertTrue(expectedOffset.isEmpty());
+ }
}