diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java index 06afee8b58..f9a117fccb 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java @@ -15,7 +15,9 @@ */ package com.google.cloud.bigquery.storage.v1alpha2; -import com.google.api.core.*; +import com.google.api.core.ApiFunction; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.rpc.InvalidArgumentException; import com.google.cloud.bigquery.storage.v1alpha2.ProtoBufProto.ProtoRows; diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriter.java new file mode 100644 index 0000000000..ed8ee0f9fe --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriter.java @@ -0,0 +1,403 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1alpha2; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.bigquery.storage.v1alpha2.ProtoBufProto.ProtoRows; +import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsRequest; +import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsResponse; +import com.google.common.base.Preconditions; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.Int64Value; +import com.google.protobuf.Message; +import java.io.IOException; +import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nullable; +import org.json.JSONArray; +import org.json.JSONObject; + +/** + * A StreamWriter that can write JSON data (JSONObjects) to BigQuery tables. The JsonStreamWriter is + * built on top of a StreamWriter, and it simply converts all JSON data to protobuf messages then + * calls StreamWriter's append() method to write to BigQuery tables. It maintains all StreamWriter + * functions, but also provides an additional feature: schema update support, where if the BigQuery + * table schema is updated, users will be able to ingest data on the new schema after some time (in + * order of minutes). + */ +public class JsonStreamWriter implements AutoCloseable { + private static String streamPatternString = + "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"; + private static Pattern streamPattern = Pattern.compile(streamPatternString); + private static final Logger LOG = Logger.getLogger(JsonStreamWriter.class.getName()); + + private BigQueryWriteClient client; + private String streamName; + private StreamWriter streamWriter; + private Descriptor descriptor; + private Table.TableSchema tableSchema; + + /** + * Constructs the JsonStreamWriter + * + * @param builder The Builder object for the JsonStreamWriter + */ + private JsonStreamWriter(Builder builder) + throws Descriptors.DescriptorValidationException, IllegalArgumentException, IOException, + InterruptedException { + Matcher matcher = streamPattern.matcher(builder.streamName); + if (!matcher.matches()) { + throw new IllegalArgumentException("Invalid stream name: " + builder.streamName); + } + + this.streamName = builder.streamName; + this.client = builder.client; + this.descriptor = + BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema); + + StreamWriter.Builder streamWriterBuilder; + if (this.client == null) { + streamWriterBuilder = StreamWriter.newBuilder(builder.streamName); + } else { + streamWriterBuilder = StreamWriter.newBuilder(builder.streamName, builder.client); + } + setStreamWriterSettings( + streamWriterBuilder, + builder.channelProvider, + builder.credentialsProvider, + builder.batchingSettings, + builder.retrySettings, + builder.executorProvider, + builder.endpoint); + this.streamWriter = streamWriterBuilder.build(); + } + /** + * Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON + * data to protobuf messages, then using StreamWriter's append() to write the data. If there is a + * schema update, the OnSchemaUpdateRunnable will be used to determine what actions to perform. + * + * @param jsonArr The JSON array that contains JSONObjects to be written + * @param allowUnknownFields if true, json data can have fields unknown to the BigQuery table. + * @return ApiFuture returns an AppendRowsResponse message wrapped in an + * ApiFuture + */ + public ApiFuture append(JSONArray jsonArr, boolean allowUnknownFields) { + return append(jsonArr, -1, allowUnknownFields); + } + + /** + * Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON + * data to protobuf messages, then using StreamWriter's append() to write the data. If there is a + * schema update, the OnSchemaUpdateRunnable will be used to determine what actions to perform. + * + * @param jsonArr The JSON array that contains JSONObjects to be written + * @param offset Offset for deduplication + * @param allowUnknownFields if true, json data can have fields unknown to the BigQuery table. + * @return ApiFuture returns an AppendRowsResponse message wrapped in an + * ApiFuture + */ + public ApiFuture append( + JSONArray jsonArr, long offset, boolean allowUnknownFields) { + ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); + // Any error in convertJsonToProtoMessage will throw an + // IllegalArgumentException/IllegalStateException/NullPointerException and will halt processing + // of JSON data. + for (int i = 0; i < jsonArr.length(); i++) { + JSONObject json = jsonArr.getJSONObject(i); + Message protoMessage = + JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, json, allowUnknownFields); + rowsBuilder.addSerializedRows(protoMessage.toByteString()); + } + AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder(); + // Need to make sure refreshAppendAndSetDescriptor finish first before this can run + synchronized (this) { + data.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor)); + data.setRows(rowsBuilder.build()); + final ApiFuture appendResponseFuture = + this.streamWriter.append( + AppendRowsRequest.newBuilder() + .setProtoRows(data.build()) + .setOffset(Int64Value.of(offset)) + .build()); + return appendResponseFuture; + } + } + + /** + * Refreshes connection for a JsonStreamWriter by first flushing all remaining rows, then calling + * refreshAppend(), and finally setting the descriptor. All of these actions need to be performed + * atomically to avoid having synchronization issues with append(). Flushing all rows first is + * necessary since if there are rows remaining when the connection refreshes, it will send out the + * old writer schema instead of the new one. + */ + void refreshConnection() + throws IOException, InterruptedException, Descriptors.DescriptorValidationException { + synchronized (this) { + this.streamWriter.writeAllOutstanding(); + this.streamWriter.refreshAppend(); + this.descriptor = + BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(this.tableSchema); + } + } + + /** + * Gets streamName + * + * @return String + */ + public String getStreamName() { + return this.streamName; + } + + /** + * Gets current descriptor + * + * @return Descriptor + */ + public Descriptor getDescriptor() { + return this.descriptor; + } + + /** Sets all StreamWriter settings. */ + private void setStreamWriterSettings( + StreamWriter.Builder builder, + @Nullable TransportChannelProvider channelProvider, + @Nullable CredentialsProvider credentialsProvider, + @Nullable BatchingSettings batchingSettings, + @Nullable RetrySettings retrySettings, + @Nullable ExecutorProvider executorProvider, + @Nullable String endpoint) { + if (channelProvider != null) { + builder.setChannelProvider(channelProvider); + } + if (credentialsProvider != null) { + builder.setCredentialsProvider(credentialsProvider); + } + if (batchingSettings != null) { + builder.setBatchingSettings(batchingSettings); + } + if (retrySettings != null) { + builder.setRetrySettings(retrySettings); + } + if (executorProvider != null) { + builder.setExecutorProvider(executorProvider); + } + if (endpoint != null) { + builder.setEndpoint(endpoint); + } + JsonStreamWriterOnSchemaUpdateRunnable jsonStreamWriterOnSchemaUpdateRunnable = + new JsonStreamWriterOnSchemaUpdateRunnable(); + jsonStreamWriterOnSchemaUpdateRunnable.setJsonStreamWriter(this); + builder.setOnSchemaUpdateRunnable(jsonStreamWriterOnSchemaUpdateRunnable); + } + + /** + * Setter for table schema. Used for schema updates. + * + * @param tableSchema + */ + void setTableSchema(Table.TableSchema tableSchema) { + this.tableSchema = tableSchema; + } + + /** + * newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by + * StreamWriter by default. + * + * @param streamName name of the stream that must follow + * "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" + * @param tableSchema The schema of the table when the stream was created, which is passed back + * through {@code WriteStream} + * @return Builder + */ + public static Builder newBuilder(String streamName, Table.TableSchema tableSchema) { + Preconditions.checkNotNull(streamName, "StreamName is null."); + Preconditions.checkNotNull(tableSchema, "TableSchema is null."); + return new Builder(streamName, tableSchema, null); + } + + /** + * newBuilder that constructs a JsonStreamWriter builder. + * + * @param streamName name of the stream that must follow + * "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" + * @param tableSchema The schema of the table when the stream was created, which is passed back + * through {@code WriteStream} + * @param client + * @return Builder + */ + public static Builder newBuilder( + String streamName, Table.TableSchema tableSchema, BigQueryWriteClient client) { + Preconditions.checkNotNull(streamName, "StreamName is null."); + Preconditions.checkNotNull(tableSchema, "TableSchema is null."); + Preconditions.checkNotNull(client, "BigQuery client is null."); + return new Builder(streamName, tableSchema, client); + } + + /** Closes the underlying StreamWriter. */ + @Override + public void close() { + this.streamWriter.close(); + } + + private class JsonStreamWriterOnSchemaUpdateRunnable extends OnSchemaUpdateRunnable { + private JsonStreamWriter jsonStreamWriter; + /** + * Setter for the jsonStreamWriter + * + * @param jsonStreamWriter + */ + public void setJsonStreamWriter(JsonStreamWriter jsonStreamWriter) { + this.jsonStreamWriter = jsonStreamWriter; + } + + /** Getter for the jsonStreamWriter */ + public JsonStreamWriter getJsonStreamWriter() { + return this.jsonStreamWriter; + } + + @Override + public void run() { + this.getJsonStreamWriter().setTableSchema(this.getUpdatedSchema()); + try { + this.getJsonStreamWriter().refreshConnection(); + } catch (InterruptedException | IOException e) { + LOG.severe("StreamWriter failed to refresh upon schema update." + e); + return; + } catch (Descriptors.DescriptorValidationException e) { + LOG.severe( + "Schema update fail: updated schema could not be converted to a valid descriptor."); + return; + } + LOG.info("Successfully updated schema: " + this.getUpdatedSchema()); + } + } + + public static final class Builder { + private String streamName; + private BigQueryWriteClient client; + private Table.TableSchema tableSchema; + + private TransportChannelProvider channelProvider; + private CredentialsProvider credentialsProvider; + private BatchingSettings batchingSettings; + private RetrySettings retrySettings; + private ExecutorProvider executorProvider; + private String endpoint; + + /** + * Constructor for JsonStreamWriter's Builder + * + * @param streamName name of the stream that must follow + * "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" + * @param tableSchema schema used to convert Json to proto messages. + * @param client + */ + private Builder(String streamName, Table.TableSchema tableSchema, BigQueryWriteClient client) { + this.streamName = streamName; + this.tableSchema = tableSchema; + this.client = client; + } + + /** + * Setter for the underlying StreamWriter's TransportChannelProvider. + * + * @param channelProvider + * @return Builder + */ + public Builder setChannelProvider(TransportChannelProvider channelProvider) { + this.channelProvider = + Preconditions.checkNotNull(channelProvider, "ChannelProvider is null."); + return this; + } + + /** + * Setter for the underlying StreamWriter's CredentialsProvider. + * + * @param credentialsProvider + * @return Builder + */ + public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { + this.credentialsProvider = + Preconditions.checkNotNull(credentialsProvider, "CredentialsProvider is null."); + return this; + } + + /** + * Setter for the underlying StreamWriter's BatchingSettings. + * + * @param batchingSettings + * @return Builder + */ + public Builder setBatchingSettings(BatchingSettings batchingSettings) { + this.batchingSettings = + Preconditions.checkNotNull(batchingSettings, "BatchingSettings is null."); + return this; + } + + /** + * Setter for the underlying StreamWriter's RetrySettings. + * + * @param retrySettings + * @return Builder + */ + public Builder setRetrySettings(RetrySettings retrySettings) { + this.retrySettings = Preconditions.checkNotNull(retrySettings, "RetrySettings is null."); + return this; + } + + /** + * Setter for the underlying StreamWriter's ExecutorProvider. + * + * @param executorProvider + * @return Builder + */ + public Builder setExecutorProvider(ExecutorProvider executorProvider) { + this.executorProvider = + Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null."); + return this; + } + + /** + * Setter for the underlying StreamWriter's Endpoint. + * + * @param endpoint + * @return Builder + */ + public Builder setEndpoint(String endpoint) { + this.endpoint = Preconditions.checkNotNull(endpoint, "Endpoint is null."); + return this; + } + + /** + * Builds JsonStreamWriter + * + * @return JsonStreamWriter + */ + public JsonStreamWriter build() + throws Descriptors.DescriptorValidationException, IllegalArgumentException, IOException, + InterruptedException { + return new JsonStreamWriter(this); + } + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/OnSchemaUpdateRunnable.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/OnSchemaUpdateRunnable.java new file mode 100644 index 0000000000..a8df165602 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/OnSchemaUpdateRunnable.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1alpha2; + +/** + * A abstract class that implements the Runnable interface and provides access to the current + * StreamWriter and updatedSchema. This runnable will only be called when a updated schema has been + * passed back through the AppendRowsResponse. Users should only implement the run() function. + */ +public abstract class OnSchemaUpdateRunnable implements Runnable { + private StreamWriter streamWriter; + private Table.TableSchema updatedSchema; + + /** + * Setter for the updatedSchema + * + * @param updatedSchema + */ + void setUpdatedSchema(Table.TableSchema updatedSchema) { + this.updatedSchema = updatedSchema; + } + + /** + * Setter for the streamWriter + * + * @param streamWriter + */ + void setStreamWriter(StreamWriter streamWriter) { + this.streamWriter = streamWriter; + } + + /** Getter for the updatedSchema */ + Table.TableSchema getUpdatedSchema() { + return this.updatedSchema; + } + + /** Getter for the streamWriter */ + StreamWriter getStreamWriter() { + return this.streamWriter; + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverter.java index a623bffaad..6876d7e2e1 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverter.java @@ -24,7 +24,8 @@ import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; import io.grpc.Status; -import java.util.*; +import java.util.HashSet; +import java.util.Set; // A Converter class that turns a native protobuf::DescriptorProto to a self contained // protobuf::DescriptorProto diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java index f3bd103d0b..214dff3814 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java @@ -29,7 +29,12 @@ import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.retrying.RetrySettings; -import com.google.api.gax.rpc.*; +import com.google.api.gax.rpc.AbortedException; +import com.google.api.gax.rpc.BidiStreamingCallable; +import com.google.api.gax.rpc.ClientStream; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.StreamController; +import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsRequest; import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsResponse; @@ -37,7 +42,10 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -88,6 +96,7 @@ public class StreamWriter implements AutoCloseable { private BigQueryWriteSettings stubSettings; private final Lock messagesBatchLock; + private final Lock appendAndRefreshAppendLock; private final MessagesBatch messagesBatch; private BackgroundResource backgroundResources; @@ -110,6 +119,11 @@ public class StreamWriter implements AutoCloseable { private Integer currentRetries = 0; + // Used for schema updates + private OnSchemaUpdateRunnable onSchemaUpdateRunnable; + + private final int REFRESH_STREAM_WAIT_TIME = 7; + /** The maximum size of one request. Defined by the API. */ public static long getApiMaxRequestBytes() { return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) @@ -133,6 +147,7 @@ private StreamWriter(Builder builder) this.retrySettings = builder.retrySettings; this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName); messagesBatchLock = new ReentrantLock(); + appendAndRefreshAppendLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); executor = builder.executorProvider.getExecutor(); backgroundResourceList = new ArrayList<>(); @@ -155,8 +170,12 @@ private StreamWriter(Builder builder) stub = builder.client; } backgroundResources = new BackgroundResourceAggregation(backgroundResourceList); - shutdown = new AtomicBoolean(false); + if (builder.onSchemaUpdateRunnable != null) { + this.onSchemaUpdateRunnable = builder.onSchemaUpdateRunnable; + this.onSchemaUpdateRunnable.setStreamWriter(this); + } + refreshAppend(); Stream.WriteStream stream = stub.getWriteStream(Storage.GetWriteStreamRequest.newBuilder().setName(streamName).build()); @@ -183,6 +202,11 @@ public String getTableNameString() { return tableName; } + /** OnSchemaUpdateRunnable for this streamWriter. */ + OnSchemaUpdateRunnable getOnSchemaUpdateRunnable() { + return this.onSchemaUpdateRunnable; + } + /** Returns if a stream has expired. */ public Boolean expired() { return createTime.plus(streamTTL).compareTo(Instant.now()) < 0; @@ -216,6 +240,7 @@ public Boolean expired() { * @return the message ID wrapped in a future. */ public ApiFuture append(AppendRowsRequest message) { + appendAndRefreshAppendLock.lock(); Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer."); Preconditions.checkNotNull(message, "Message is null."); final AppendRequestAndFutureResponse outstandingAppend = @@ -234,6 +259,7 @@ public ApiFuture append(AppendRowsRequest message) { } } finally { messagesBatchLock.unlock(); + appendAndRefreshAppendLock.unlock(); } return outstandingAppend.appendResult; @@ -268,26 +294,34 @@ public void flush(long offset) { * @throws IOException */ public void refreshAppend() throws IOException, InterruptedException { - synchronized (this) { - if (shutdown.get()) { - LOG.warning("Cannot refresh on a already shutdown writer."); - return; - } - // There could be a moment, stub is not yet initialized. - if (clientStream != null) { - LOG.info("Closing the stream " + streamName); - clientStream.closeSend(); - } - messagesBatch.resetAttachSchema(); - bidiStreamingCallable = stub.appendRowsCallable(); - clientStream = bidiStreamingCallable.splitCall(responseObserver); + appendAndRefreshAppendLock.lock(); + if (shutdown.get()) { + LOG.warning("Cannot refresh on a already shutdown writer."); + appendAndRefreshAppendLock.unlock(); + return; + } + // There could be a moment, stub is not yet initialized. + if (clientStream != null) { + LOG.info("Closing the stream " + streamName); + clientStream.closeSend(); } + messagesBatch.resetAttachSchema(); + bidiStreamingCallable = stub.appendRowsCallable(); + clientStream = bidiStreamingCallable.splitCall(responseObserver); try { while (!clientStream.isSendReady()) { Thread.sleep(10); } } catch (InterruptedException expected) { } + // Currently there is a bug that it took reconnected stream 5 seconds to pick up + // stream count. So wait at least 7 seconds before sending a new request. + Thread.sleep( + Math.max( + this.retrySettings.getInitialRetryDelay().toMillis(), + Duration.ofSeconds(REFRESH_STREAM_WAIT_TIME).toMillis())); + // Can only unlock here since need to sleep the full 7 seconds before stream can allow appends. + appendAndRefreshAppendLock.unlock(); LOG.info("Write Stream " + streamName + " connection established"); } @@ -620,6 +654,8 @@ public static final class Builder { private CredentialsProvider credentialsProvider = BigQueryWriteSettings.defaultCredentialsProviderBuilder().build(); + private OnSchemaUpdateRunnable onSchemaUpdateRunnable; + private Builder(String stream, BigQueryWriteClient client) { this.streamName = Preconditions.checkNotNull(stream); this.client = client; @@ -743,6 +779,13 @@ public Builder setEndpoint(String endpoint) { return this; } + /** Gives the ability to set action on schema update. */ + public Builder setOnSchemaUpdateRunnable(OnSchemaUpdateRunnable onSchemaUpdateRunnable) { + this.onSchemaUpdateRunnable = + Preconditions.checkNotNull(onSchemaUpdateRunnable, "onSchemaUpdateRunnable is null."); + return this; + } + /** Builds the {@code StreamWriter}. */ public StreamWriter build() throws IllegalArgumentException, IOException, InterruptedException { return new StreamWriter(this); @@ -800,6 +843,13 @@ public void onResponse(AppendRowsResponse response) { if (response == null) { inflightBatch.onFailure(new IllegalStateException("Response is null")); } + if (response.hasUpdatedSchema()) { + if (streamWriter.getOnSchemaUpdateRunnable() != null) { + streamWriter.getOnSchemaUpdateRunnable().setUpdatedSchema(response.getUpdatedSchema()); + streamWriter.executor.schedule( + streamWriter.getOnSchemaUpdateRunnable(), 0L, TimeUnit.MILLISECONDS); + } + } // TODO: Deal with in stream errors. if (response.hasError()) { StatusRuntimeException exception = @@ -850,12 +900,6 @@ public void onError(Throwable t) { if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts() && !streamWriter.shutdown.get()) { streamWriter.refreshAppend(); - // Currently there is a bug that it took reconnected stream 5 seconds to pick up - // stream count. So wait at least 7 seconds before sending a new request. - Thread.sleep( - Math.min( - streamWriter.getRetrySettings().getInitialRetryDelay().toMillis(), - Duration.ofSeconds(7).toMillis())); LOG.info("Resending requests on transient error:" + streamWriter.currentRetries); streamWriter.writeBatch(inflightBatch); synchronized (streamWriter.currentRetries) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriterTest.java new file mode 100644 index 0000000000..4fcf38a4c4 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/JsonStreamWriterTest.java @@ -0,0 +1,957 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1alpha2; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.api.core.*; +import com.google.api.core.ApiFuture; +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.grpc.testing.MockGrpcService; +import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.cloud.bigquery.storage.test.JsonTest.ComplexRoot; +import com.google.cloud.bigquery.storage.test.Test.FooType; +import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType; +import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType2; +import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsResponse; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import java.util.Arrays; +import java.util.HashSet; +import java.util.UUID; +import java.util.logging.Logger; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.threeten.bp.Instant; + +@RunWith(JUnit4.class) +public class JsonStreamWriterTest { + private static final Logger LOG = Logger.getLogger(JsonStreamWriterTest.class.getName()); + private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s"; + private static final ExecutorProvider SINGLE_THREAD_EXECUTOR = + InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build(); + private static LocalChannelProvider channelProvider; + private FakeScheduledExecutorService fakeExecutor; + private FakeBigQueryWrite testBigQueryWrite; + private static MockServiceHelper serviceHelper; + + private final Table.TableFieldSchema FOO = + Table.TableFieldSchema.newBuilder() + .setType(Table.TableFieldSchema.Type.STRING) + .setMode(Table.TableFieldSchema.Mode.NULLABLE) + .setName("foo") + .build(); + private final Table.TableSchema TABLE_SCHEMA = + Table.TableSchema.newBuilder().addFields(0, FOO).build(); + + private final Table.TableFieldSchema BAR = + Table.TableFieldSchema.newBuilder() + .setType(Table.TableFieldSchema.Type.STRING) + .setMode(Table.TableFieldSchema.Mode.NULLABLE) + .setName("bar") + .build(); + private final Table.TableFieldSchema BAZ = + Table.TableFieldSchema.newBuilder() + .setType(Table.TableFieldSchema.Type.STRING) + .setMode(Table.TableFieldSchema.Mode.NULLABLE) + .setName("baz") + .build(); + private final Table.TableSchema UPDATED_TABLE_SCHEMA = + Table.TableSchema.newBuilder().addFields(0, FOO).addFields(1, BAR).build(); + private final Table.TableSchema UPDATED_TABLE_SCHEMA_2 = + Table.TableSchema.newBuilder().addFields(0, FOO).addFields(1, BAR).addFields(2, BAZ).build(); + + private final Table.TableFieldSchema TEST_INT = + Table.TableFieldSchema.newBuilder() + .setType(Table.TableFieldSchema.Type.INT64) + .setMode(Table.TableFieldSchema.Mode.NULLABLE) + .setName("test_int") + .build(); + private final Table.TableFieldSchema TEST_STRING = + Table.TableFieldSchema.newBuilder() + .setType(Table.TableFieldSchema.Type.STRING) + .setMode(Table.TableFieldSchema.Mode.REPEATED) + .setName("test_string") + .build(); + private final Table.TableFieldSchema TEST_BYTES = + Table.TableFieldSchema.newBuilder() + .setType(Table.TableFieldSchema.Type.BYTES) + .setMode(Table.TableFieldSchema.Mode.REQUIRED) + .setName("test_bytes") + .build(); + private final Table.TableFieldSchema TEST_BOOL = + Table.TableFieldSchema.newBuilder() + .setType(Table.TableFieldSchema.Type.BOOL) + .setMode(Table.TableFieldSchema.Mode.NULLABLE) + .setName("test_bool") + .build(); + private final Table.TableFieldSchema TEST_DOUBLE = + Table.TableFieldSchema.newBuilder() + .setType(Table.TableFieldSchema.Type.DOUBLE) + .setMode(Table.TableFieldSchema.Mode.REPEATED) + .setName("test_double") + .build(); + private final Table.TableFieldSchema TEST_DATE = + Table.TableFieldSchema.newBuilder() + .setType(Table.TableFieldSchema.Type.DATE) + .setMode(Table.TableFieldSchema.Mode.REQUIRED) + .setName("test_date") + .build(); + private final Table.TableFieldSchema COMPLEXLVL2 = + Table.TableFieldSchema.newBuilder() + .setType(Table.TableFieldSchema.Type.STRUCT) + .setMode(Table.TableFieldSchema.Mode.REQUIRED) + .addFields(0, TEST_INT) + .setName("complex_lvl2") + .build(); + private final Table.TableFieldSchema COMPLEXLVL1 = + Table.TableFieldSchema.newBuilder() + .setType(Table.TableFieldSchema.Type.STRUCT) + .setMode(Table.TableFieldSchema.Mode.REQUIRED) + .addFields(0, TEST_INT) + .addFields(1, COMPLEXLVL2) + .setName("complex_lvl1") + .build(); + private final Table.TableSchema COMPLEX_TABLE_SCHEMA = + Table.TableSchema.newBuilder() + .addFields(0, TEST_INT) + .addFields(1, TEST_STRING) + .addFields(2, TEST_BYTES) + .addFields(3, TEST_BOOL) + .addFields(4, TEST_DOUBLE) + .addFields(5, TEST_DATE) + .addFields(6, COMPLEXLVL1) + .addFields(7, COMPLEXLVL2) + .build(); + + @Before + public void setUp() throws Exception { + testBigQueryWrite = new FakeBigQueryWrite(); + serviceHelper = + new MockServiceHelper( + UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite)); + serviceHelper.start(); + channelProvider = serviceHelper.createChannelProvider(); + fakeExecutor = new FakeScheduledExecutorService(); + testBigQueryWrite.setExecutor(fakeExecutor); + Instant time = Instant.now(); + Timestamp timestamp = + Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build(); + // Add enough GetWriteStream response. + for (int i = 0; i < 4; i++) { + testBigQueryWrite.addResponse( + Stream.WriteStream.newBuilder().setName(TEST_STREAM).setCreateTime(timestamp).build()); + } + } + + @After + public void tearDown() throws Exception { + LOG.info("tearDown called"); + serviceHelper.stop(); + } + + private JsonStreamWriter.Builder getTestJsonStreamWriterBuilder( + String testStream, Table.TableSchema BQTableSchema) { + return JsonStreamWriter.newBuilder(testStream, BQTableSchema) + .setChannelProvider(channelProvider) + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setCredentialsProvider(NoCredentialsProvider.create()); + } + + @Test + public void testTwoParamNewBuilder() throws Exception { + try { + getTestJsonStreamWriterBuilder(null, TABLE_SCHEMA); + } catch (NullPointerException e) { + assertEquals(e.getMessage(), "StreamName is null."); + } + + try { + getTestJsonStreamWriterBuilder(TEST_STREAM, null); + } catch (NullPointerException e) { + assertEquals(e.getMessage(), "TableSchema is null."); + } + + JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build(); + assertEquals(TEST_STREAM, writer.getStreamName()); + } + + @Test + public void testSingleAppendSimpleJson() throws Exception { + FooType expectedProto = FooType.newBuilder().setFoo("allen").build(); + JSONObject foo = new JSONObject(); + foo.put("foo", "allen"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { + + testBigQueryWrite.addResponse(Storage.AppendRowsResponse.newBuilder().setOffset(0).build()); + + ApiFuture appendFuture = + writer.append(jsonArr, -1, /* allowUnknownFields */ false); + + assertEquals(0L, appendFuture.get().getOffset()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRows(0), + expectedProto.toByteString()); + } + } + + @Test + public void testSingleAppendMultipleSimpleJson() throws Exception { + FooType expectedProto = FooType.newBuilder().setFoo("allen").build(); + JSONObject foo = new JSONObject(); + foo.put("foo", "allen"); + JSONObject foo1 = new JSONObject(); + foo1.put("foo", "allen"); + JSONObject foo2 = new JSONObject(); + foo2.put("foo", "allen"); + JSONObject foo3 = new JSONObject(); + foo3.put("foo", "allen"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + jsonArr.put(foo1); + jsonArr.put(foo2); + jsonArr.put(foo3); + + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { + testBigQueryWrite.addResponse(Storage.AppendRowsResponse.newBuilder().setOffset(0).build()); + + ApiFuture appendFuture = + writer.append(jsonArr, -1, /* allowUnknownFields */ false); + + assertEquals(0L, appendFuture.get().getOffset()); + assertEquals( + 4, + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + for (int i = 0; i < 4; i++) { + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRows(i), + expectedProto.toByteString()); + } + } + } + + @Test + public void testMultipleAppendSimpleJson() throws Exception { + FooType expectedProto = FooType.newBuilder().setFoo("allen").build(); + JSONObject foo = new JSONObject(); + foo.put("foo", "allen"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { + testBigQueryWrite.addResponse(Storage.AppendRowsResponse.newBuilder().setOffset(0).build()); + testBigQueryWrite.addResponse(Storage.AppendRowsResponse.newBuilder().setOffset(1).build()); + testBigQueryWrite.addResponse(Storage.AppendRowsResponse.newBuilder().setOffset(2).build()); + testBigQueryWrite.addResponse(Storage.AppendRowsResponse.newBuilder().setOffset(3).build()); + ApiFuture appendFuture; + for (int i = 0; i < 4; i++) { + appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); + + assertEquals((long) i, appendFuture.get().getOffset()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(i) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(i) + .getProtoRows() + .getRows() + .getSerializedRows(0), + expectedProto.toByteString()); + } + } + } + + @Test + public void testSingleAppendComplexJson() throws Exception { + ComplexRoot expectedProto = + ComplexRoot.newBuilder() + .setTestInt(1) + .addTestString("a") + .addTestString("b") + .addTestString("c") + .setTestBytes(ByteString.copyFrom("hello".getBytes())) + .setTestBool(true) + .addTestDouble(1.1) + .addTestDouble(2.2) + .addTestDouble(3.3) + .addTestDouble(4.4) + .setTestDate(1) + .setComplexLvl1( + com.google.cloud.bigquery.storage.test.JsonTest.ComplexLvl1.newBuilder() + .setTestInt(2) + .setComplexLvl2( + com.google.cloud.bigquery.storage.test.JsonTest.ComplexLvl2.newBuilder() + .setTestInt(3) + .build()) + .build()) + .setComplexLvl2( + com.google.cloud.bigquery.storage.test.JsonTest.ComplexLvl2.newBuilder() + .setTestInt(3) + .build()) + .build(); + JSONObject complex_lvl2 = new JSONObject(); + complex_lvl2.put("test_int", 3); + + JSONObject complex_lvl1 = new JSONObject(); + complex_lvl1.put("test_int", 2); + complex_lvl1.put("complex_lvl2", complex_lvl2); + + JSONObject json = new JSONObject(); + json.put("test_int", 1); + json.put("test_string", new JSONArray(new String[] {"a", "b", "c"})); + json.put("test_bytes", "hello"); + json.put("test_bool", true); + json.put("test_DOUBLe", new JSONArray(new Double[] {1.1, 2.2, 3.3, 4.4})); + json.put("test_date", 1); + json.put("complex_lvl1", complex_lvl1); + json.put("complex_lvl2", complex_lvl2); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(json); + + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, COMPLEX_TABLE_SCHEMA).build()) { + testBigQueryWrite.addResponse(Storage.AppendRowsResponse.newBuilder().setOffset(0).build()); + ApiFuture appendFuture = + writer.append(jsonArr, -1, /* allowUnknownFields */ false); + + assertEquals(0L, appendFuture.get().getOffset()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRows(0), + expectedProto.toByteString()); + } + } + + @Test + public void testAppendMultipleSchemaUpdate() throws Exception { + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { + // Add fake resposne for FakeBigQueryWrite, first response has updated schema. + testBigQueryWrite.addResponse( + Storage.AppendRowsResponse.newBuilder() + .setOffset(0) + .setUpdatedSchema(UPDATED_TABLE_SCHEMA) + .build()); + testBigQueryWrite.addResponse( + Storage.AppendRowsResponse.newBuilder() + .setOffset(1) + .setUpdatedSchema(UPDATED_TABLE_SCHEMA_2) + .build()); + testBigQueryWrite.addResponse(Storage.AppendRowsResponse.newBuilder().setOffset(2).build()); + // First append + JSONObject foo = new JSONObject(); + foo.put("foo", "allen"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + ApiFuture appendFuture1 = + writer.append(jsonArr, -1, /* allowUnknownFields */ false); + + int millis = 0; + while (millis <= 10000) { + if (writer.getDescriptor().getFields().size() == 2) { + break; + } + Thread.sleep(100); + millis += 100; + } + assertTrue(writer.getDescriptor().getFields().size() == 2); + assertEquals(0L, appendFuture1.get().getOffset()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRows(0), + FooType.newBuilder().setFoo("allen").build().toByteString()); + + // Second append with updated schema. + JSONObject updatedFoo = new JSONObject(); + updatedFoo.put("foo", "allen"); + updatedFoo.put("bar", "allen2"); + JSONArray updatedJsonArr = new JSONArray(); + updatedJsonArr.put(updatedFoo); + + ApiFuture appendFuture2 = + writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false); + + millis = 0; + while (millis <= 10000) { + if (writer.getDescriptor().getFields().size() == 3) { + break; + } + Thread.sleep(100); + millis += 100; + } + assertTrue(writer.getDescriptor().getFields().size() == 3); + assertEquals(1L, appendFuture2.get().getOffset()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(1) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(1) + .getProtoRows() + .getRows() + .getSerializedRows(0), + UpdatedFooType.newBuilder().setFoo("allen").setBar("allen2").build().toByteString()); + + // Third append with updated schema. + JSONObject updatedFoo2 = new JSONObject(); + updatedFoo2.put("foo", "allen"); + updatedFoo2.put("bar", "allen2"); + updatedFoo2.put("baz", "allen3"); + JSONArray updatedJsonArr2 = new JSONArray(); + updatedJsonArr2.put(updatedFoo2); + + ApiFuture appendFuture3 = + writer.append(updatedJsonArr2, -1, /* allowUnknownFields */ false); + + assertEquals(2L, appendFuture3.get().getOffset()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(1) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(2) + .getProtoRows() + .getRows() + .getSerializedRows(0), + UpdatedFooType2.newBuilder() + .setFoo("allen") + .setBar("allen2") + .setBaz("allen3") + .build() + .toByteString()); + // // Check if writer schemas were added in for both connections. + assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); + assertTrue(testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); + assertTrue(testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema()); + } + } + + @Test + public void testAppendAlreadyExistsException() throws Exception { + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { + testBigQueryWrite.addResponse( + Storage.AppendRowsResponse.newBuilder() + .setError(com.google.rpc.Status.newBuilder().setCode(6).build()) + .build()); + JSONObject foo = new JSONObject(); + foo.put("foo", "allen"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + ApiFuture appendFuture = + writer.append(jsonArr, -1, /* allowUnknownFields */ false); + try { + appendFuture.get(); + } catch (Throwable t) { + assertEquals(t.getCause().getMessage(), "ALREADY_EXISTS: "); + } + } + } + + @Test + public void testAppendOutOfRangeException() throws Exception { + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { + testBigQueryWrite.addResponse( + Storage.AppendRowsResponse.newBuilder() + .setError(com.google.rpc.Status.newBuilder().setCode(11).build()) + .build()); + JSONObject foo = new JSONObject(); + foo.put("foo", "allen"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + ApiFuture appendFuture = + writer.append(jsonArr, -1, /* allowUnknownFields */ false); + try { + appendFuture.get(); + } catch (Throwable t) { + assertEquals(t.getCause().getMessage(), "OUT_OF_RANGE: "); + } + } + } + + @Test + public void testAppendOutOfRangeAndUpdateSchema() throws Exception { + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { + testBigQueryWrite.addResponse( + Storage.AppendRowsResponse.newBuilder() + .setError(com.google.rpc.Status.newBuilder().setCode(11).build()) + .setUpdatedSchema(UPDATED_TABLE_SCHEMA) + .build()); + testBigQueryWrite.addResponse(Storage.AppendRowsResponse.newBuilder().setOffset(0).build()); + + JSONObject foo = new JSONObject(); + foo.put("foo", "allen"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + ApiFuture appendFuture = + writer.append(jsonArr, -1, /* allowUnknownFields */ false); + try { + appendFuture.get(); + } catch (Throwable t) { + assertEquals(t.getCause().getMessage(), "OUT_OF_RANGE: "); + int millis = 0; + while (millis <= 10000) { + if (writer.getDescriptor().getFields().size() == 2) { + break; + } + Thread.sleep(100); + millis += 100; + } + assertTrue(writer.getDescriptor().getFields().size() == 2); + } + + JSONObject updatedFoo = new JSONObject(); + updatedFoo.put("foo", "allen"); + updatedFoo.put("bar", "allen2"); + JSONArray updatedJsonArr = new JSONArray(); + updatedJsonArr.put(updatedFoo); + + ApiFuture appendFuture2 = + writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false); + + assertEquals(0L, appendFuture2.get().getOffset()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(1) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(1) + .getProtoRows() + .getRows() + .getSerializedRows(0), + UpdatedFooType.newBuilder().setFoo("allen").setBar("allen2").build().toByteString()); + + // Check if writer schemas were added in for both connections. + assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); + assertTrue(testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); + } + } + + @Test + public void testSchemaUpdateWithNonemptyBatch() throws Exception { + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA) + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .build()) + .build()) { + testBigQueryWrite.addResponse( + Storage.AppendRowsResponse.newBuilder() + .setOffset(0) + .setUpdatedSchema(UPDATED_TABLE_SCHEMA) + .build()); + testBigQueryWrite.addResponse(Storage.AppendRowsResponse.newBuilder().setOffset(2).build()); + testBigQueryWrite.addResponse(Storage.AppendRowsResponse.newBuilder().setOffset(3).build()); + // First append + JSONObject foo = new JSONObject(); + foo.put("foo", "allen"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + ApiFuture appendFuture1 = + writer.append(jsonArr, -1, /* allowUnknownFields */ false); + ApiFuture appendFuture2 = + writer.append(jsonArr, -1, /* allowUnknownFields */ false); + ApiFuture appendFuture3 = + writer.append(jsonArr, -1, /* allowUnknownFields */ false); + + assertEquals(0L, appendFuture1.get().getOffset()); + assertEquals(1L, appendFuture2.get().getOffset()); + assertEquals( + 2, + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRows(0), + FooType.newBuilder().setFoo("allen").build().toByteString()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRows(1), + FooType.newBuilder().setFoo("allen").build().toByteString()); + + assertEquals(2L, appendFuture3.get().getOffset()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(1) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(1) + .getProtoRows() + .getRows() + .getSerializedRows(0), + FooType.newBuilder().setFoo("allen").build().toByteString()); + + int millis = 0; + while (millis <= 10000) { + if (writer.getDescriptor().getFields().size() == 2) { + break; + } + Thread.sleep(100); + millis += 100; + } + assertTrue(writer.getDescriptor().getFields().size() == 2); + + // Second append with updated schema. + JSONObject updatedFoo = new JSONObject(); + updatedFoo.put("foo", "allen"); + updatedFoo.put("bar", "allen2"); + JSONArray updatedJsonArr = new JSONArray(); + updatedJsonArr.put(updatedFoo); + + ApiFuture appendFuture4 = + writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false); + + assertEquals(3L, appendFuture4.get().getOffset()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(2) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(2) + .getProtoRows() + .getRows() + .getSerializedRows(0), + UpdatedFooType.newBuilder().setFoo("allen").setBar("allen2").build().toByteString()); + + assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); + assertTrue( + testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema() + || testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema()); + } + } + + @Test + public void testMultiThreadAppendNoSchemaUpdate() throws Exception { + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA) + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .build()) + .build()) { + + JSONObject foo = new JSONObject(); + foo.put("foo", "allen"); + final JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + final HashSet offset_sets = new HashSet(); + int thread_nums = 5; + Thread[] thread_arr = new Thread[thread_nums]; + for (int i = 0; i < thread_nums; i++) { + testBigQueryWrite.addResponse( + Storage.AppendRowsResponse.newBuilder().setOffset((long) i).build()); + offset_sets.add((long) i); + Thread t = + new Thread( + new Runnable() { + public void run() { + try { + ApiFuture appendFuture = + writer.append(jsonArr, -1, /* allowUnknownFields */ false); + AppendRowsResponse response = appendFuture.get(); + LOG.info("Processing complete, offset is " + response.getOffset()); + offset_sets.remove(response.getOffset()); + } catch (Exception e) { + LOG.severe("Thread execution failed: " + e.getMessage()); + } + } + }); + thread_arr[i] = t; + LOG.info("Starting thread " + i + "."); + t.start(); + } + + for (int i = 0; i < thread_nums; i++) { + thread_arr[i].join(); + } + assertTrue(offset_sets.size() == 0); + for (int i = 0; i < thread_nums; i++) { + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(i) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(i) + .getProtoRows() + .getRows() + .getSerializedRows(0), + FooType.newBuilder().setFoo("allen").build().toByteString()); + } + } + } + + @Test + public void testMultiThreadAppendWithSchemaUpdate() throws Exception { + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA) + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .build()) + .build()) { + JSONObject foo = new JSONObject(); + foo.put("foo", "allen"); + final JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + final HashSet offset_sets = new HashSet(); + int thread_nums = 5; + Thread[] thread_arr = new Thread[thread_nums]; + for (int i = 0; i < thread_nums; i++) { + if (i == 2) { + testBigQueryWrite.addResponse( + Storage.AppendRowsResponse.newBuilder() + .setOffset((long) i) + .setUpdatedSchema(UPDATED_TABLE_SCHEMA) + .build()); + } else { + testBigQueryWrite.addResponse( + Storage.AppendRowsResponse.newBuilder().setOffset((long) i).build()); + } + + offset_sets.add((long) i); + Thread t = + new Thread( + new Runnable() { + public void run() { + try { + ApiFuture appendFuture = + writer.append(jsonArr, -1, /* allowUnknownFields */ false); + AppendRowsResponse response = appendFuture.get(); + LOG.info("Processing complete, offset is " + response.getOffset()); + offset_sets.remove(response.getOffset()); + } catch (Exception e) { + LOG.severe("Thread execution failed: " + e.getMessage()); + } + } + }); + thread_arr[i] = t; + LOG.info("Starting thread " + i + "."); + t.start(); + } + + for (int i = 0; i < thread_nums; i++) { + thread_arr[i].join(); + } + assertTrue(offset_sets.size() == 0); + for (int i = 0; i < thread_nums; i++) { + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(i) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(i) + .getProtoRows() + .getRows() + .getSerializedRows(0), + FooType.newBuilder().setFoo("allen").build().toByteString()); + } + + int millis = 0; + while (millis <= 10000) { + if (writer.getDescriptor().getFields().size() == 2) { + break; + } + Thread.sleep(100); + millis += 100; + } + assertTrue(writer.getDescriptor().getFields().size() == 2); + + foo.put("bar", "allen2"); + final JSONArray jsonArr2 = new JSONArray(); + jsonArr2.put(foo); + + for (int i = thread_nums; i < thread_nums + 5; i++) { + testBigQueryWrite.addResponse( + Storage.AppendRowsResponse.newBuilder().setOffset((long) i).build()); + offset_sets.add((long) i); + Thread t = + new Thread( + new Runnable() { + public void run() { + try { + ApiFuture appendFuture = + writer.append(jsonArr2, -1, /* allowUnknownFields */ false); + AppendRowsResponse response = appendFuture.get(); + LOG.info("Processing complete, offset is " + response.getOffset()); + offset_sets.remove(response.getOffset()); + } catch (Exception e) { + LOG.severe("Thread execution failed: " + e.getMessage()); + } + } + }); + thread_arr[i - 5] = t; + LOG.info("Starting thread " + i + " with updated json data."); + t.start(); + } + + for (int i = 0; i < thread_nums; i++) { + thread_arr[i].join(); + } + assertTrue(offset_sets.size() == 0); + for (int i = 0; i < thread_nums; i++) { + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(i + 5) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + testBigQueryWrite + .getAppendRequests() + .get(i + 5) + .getProtoRows() + .getRows() + .getSerializedRows(0), + UpdatedFooType.newBuilder().setFoo("allen").setBar("allen2").build().toByteString()); + } + } + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java index c64d55f53f..64e8a07f4f 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java @@ -31,12 +31,15 @@ import com.google.cloud.bigquery.storage.v1alpha2.Storage.*; import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; +import com.google.protobuf.Descriptors; import com.google.protobuf.Int64Value; import com.google.protobuf.Message; import java.io.IOException; import java.util.*; import java.util.concurrent.*; import java.util.logging.Logger; +import org.json.JSONArray; +import org.json.JSONObject; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -192,16 +195,225 @@ public void testBatchWriteWithCommittedStream() createAppendRequest(writeStream.getName(), new String[] {"ddd"}).build()); assertEquals(1, response1.get().getOffset()); assertEquals(3, response2.get().getOffset()); + + TableResult result = + bigquery.listTableData( + tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + Iterator iter = result.getValues().iterator(); + assertEquals("aaa", iter.next().get(0).getStringValue()); + assertEquals("bbb", iter.next().get(0).getStringValue()); + assertEquals("ccc", iter.next().get(0).getStringValue()); + assertEquals("ddd", iter.next().get(0).getStringValue()); + assertEquals(false, iter.hasNext()); } + } - TableResult result = - bigquery.listTableData(tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); - Iterator iter = result.getValues().iterator(); - assertEquals("aaa", iter.next().get(0).getStringValue()); - assertEquals("bbb", iter.next().get(0).getStringValue()); - assertEquals("ccc", iter.next().get(0).getStringValue()); - assertEquals("ddd", iter.next().get(0).getStringValue()); - assertEquals(false, iter.hasNext()); + @Test + public void testJsonStreamWriterBatchWriteWithCommittedStream() + throws IOException, InterruptedException, ExecutionException, + Descriptors.DescriptorValidationException { + String tableName = "JsonTable"; + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, tableName), + StandardTableDefinition.of( + Schema.of( + com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING) + .build()))) + .build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setRequestByteThreshold(1024 * 1024L) // 1 Mb + .setElementCountThreshold(2L) + .setDelayThreshold(Duration.ofSeconds(2)) + .build()) + .build()) { + LOG.info("Sending one message"); + JSONObject foo = new JSONObject(); + foo.put("foo", "aaa"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + ApiFuture response = + jsonStreamWriter.append(jsonArr, -1, /* allowUnknownFields */ false); + assertEquals(0, response.get().getOffset()); + + LOG.info("Sending two more messages"); + JSONObject foo1 = new JSONObject(); + foo1.put("foo", "bbb"); + JSONObject foo2 = new JSONObject(); + foo2.put("foo", "ccc"); + JSONArray jsonArr1 = new JSONArray(); + jsonArr1.put(foo1); + jsonArr1.put(foo2); + + JSONObject foo3 = new JSONObject(); + foo3.put("foo", "ddd"); + JSONArray jsonArr2 = new JSONArray(); + jsonArr2.put(foo3); + + ApiFuture response1 = + jsonStreamWriter.append(jsonArr1, -1, /* allowUnknownFields */ false); + ApiFuture response2 = + jsonStreamWriter.append(jsonArr2, -1, /* allowUnknownFields */ false); + assertEquals(1, response1.get().getOffset()); + assertEquals(3, response2.get().getOffset()); + + TableResult result = + bigquery.listTableData( + tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + Iterator iter = result.getValues().iterator(); + assertEquals("aaa", iter.next().get(0).getStringValue()); + assertEquals("bbb", iter.next().get(0).getStringValue()); + assertEquals("ccc", iter.next().get(0).getStringValue()); + assertEquals("ddd", iter.next().get(0).getStringValue()); + assertEquals(false, iter.hasNext()); + jsonStreamWriter.close(); + } + } + + @Test + public void testJsonStreamWriterSchemaUpdate() + throws IOException, InterruptedException, ExecutionException, + Descriptors.DescriptorValidationException { + String tableName = "SchemaUpdateTable"; + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, tableName), + StandardTableDefinition.of( + Schema.of( + com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING) + .build()))) + .build(); + + bigquery.create(tableInfo); + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .build()) + .build()) { + // 1). Send 1 row + JSONObject foo = new JSONObject(); + foo.put("foo", "aaa"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + ApiFuture response = + jsonStreamWriter.append(jsonArr, -1, /* allowUnknownFields */ false); + assertEquals(0, response.get().getOffset()); + // 2). Schema update and wait until querying it returns a new schema. + try { + com.google.cloud.bigquery.Table table = bigquery.getTable(DATASET, tableName); + Schema schema = table.getDefinition().getSchema(); + FieldList fields = schema.getFields(); + Field newField = + Field.newBuilder("bar", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build(); + + List fieldList = new ArrayList(); + fieldList.add(fields.get(0)); + fieldList.add(newField); + Schema newSchema = Schema.of(fieldList); + // Update the table with the new schema + com.google.cloud.bigquery.Table updatedTable = + table.toBuilder().setDefinition(StandardTableDefinition.of(newSchema)).build(); + updatedTable.update(); + int millis = 0; + while (millis <= 10000) { + if (newSchema.equals(table.reload().getDefinition().getSchema())) { + break; + } + Thread.sleep(1000); + millis += 1000; + } + newSchema = schema; + LOG.info( + "bar column successfully added to table in " + + millis + + " millis: " + + bigquery.getTable(DATASET, tableName).getDefinition().getSchema()); + } catch (BigQueryException e) { + LOG.severe("bar column was not added. \n" + e.toString()); + } + // 3). Send rows to wait for updatedSchema to be returned. + JSONObject foo2 = new JSONObject(); + foo2.put("foo", "bbb"); + JSONArray jsonArr2 = new JSONArray(); + jsonArr2.put(foo2); + + int next = 0; + for (int i = 1; i < 100; i++) { + ApiFuture response2 = + jsonStreamWriter.append(jsonArr2, -1, /* allowUnknownFields */ false); + assertEquals(i, response2.get().getOffset()); + if (response2.get().hasUpdatedSchema()) { + next = i; + break; + } else { + Thread.sleep(1000); + } + } + + int millis = 0; + while (millis <= 10000) { + if (jsonStreamWriter.getDescriptor().getFields().size() == 2) { + LOG.info("JsonStreamWriter successfully updated internal descriptor!"); + break; + } + Thread.sleep(100); + millis += 100; + } + assertTrue(jsonStreamWriter.getDescriptor().getFields().size() == 2); + // 4). Send rows with updated schema. + JSONObject updatedFoo = new JSONObject(); + updatedFoo.put("foo", "ccc"); + updatedFoo.put("bar", "ddd"); + JSONArray updatedJsonArr = new JSONArray(); + updatedJsonArr.put(updatedFoo); + for (int i = 0; i < 10; i++) { + ApiFuture response3 = + jsonStreamWriter.append(updatedJsonArr, -1, /* allowUnknownFields */ false); + assertEquals(next + 1 + i, response3.get().getOffset()); + } + + TableResult result3 = + bigquery.listTableData( + tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + Iterator iter3 = result3.getValues().iterator(); + assertEquals("aaa", iter3.next().get(0).getStringValue()); + for (int j = 1; j <= next; j++) { + assertEquals("bbb", iter3.next().get(0).getStringValue()); + } + for (int j = next + 1; j < next + 1 + 10; j++) { + FieldValueList temp = iter3.next(); + assertEquals("ccc", temp.get(0).getStringValue()); + assertEquals("ddd", temp.get(1).getStringValue()); + } + assertEquals(false, iter3.hasNext()); + } } @Test diff --git a/google-cloud-bigquerystorage/src/test/proto/test.proto b/google-cloud-bigquerystorage/src/test/proto/test.proto index 2ed9136610..18c00aab5c 100644 --- a/google-cloud-bigquerystorage/src/test/proto/test.proto +++ b/google-cloud-bigquerystorage/src/test/proto/test.proto @@ -63,6 +63,17 @@ message FooType { optional string foo = 1; } +message UpdatedFooType { + optional string foo = 1; + optional string bar = 2; +} + +message UpdatedFooType2 { + optional string foo = 1; + optional string bar = 2; + optional string baz = 3; +} + message DuplicateType { optional TestEnum f1 = 1; optional TestEnum f2 = 2;