Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: better default stream support in client library #750

Merged
merged 20 commits into from Dec 30, 2020
4 changes: 0 additions & 4 deletions .github/generated-files-bot.yml
Expand Up @@ -5,7 +5,3 @@ externalManifests:
- type: json
file: '.github/readme/synth.metadata/synth.metadata'
jsonpath: '$.generatedFiles[*]'
ignoreAuthors:
- 'renovate-bot'
- 'yoshi-automation'
- 'release-please[bot]'
@@ -0,0 +1,85 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1beta2;

import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.common.collect.ImmutableMap;

/** Converts structure from BigQuery v2 API to BigQueryStorage API */
public class BQV2ToBQStorageConverter {
private static ImmutableMap<Field.Mode, TableFieldSchema.Mode> BQTableSchemaModeMap =
ImmutableMap.of(
Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE,
Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED,
Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED);

private static ImmutableMap<StandardSQLTypeName, TableFieldSchema.Type> BQTableSchemaTypeMap =
new ImmutableMap.Builder<StandardSQLTypeName, TableFieldSchema.Type>()
.put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL)
.put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES)
.put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE)
.put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME)
.put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE)
.put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY)
.put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64)
.put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC)
.put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING)
.put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT)
.put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME)
.put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP)
.build();

/**
* Converts from bigquery v2 Table Schema to bigquery storage API Table Schema.
*
* @param schame the bigquery v2 Table Schema
* @return the bigquery storage API Table Schema
*/
public static TableSchema ConvertTableSchema(Schema schema) {
TableSchema.Builder result = TableSchema.newBuilder();
for (int i = 0; i < schema.getFields().size(); i++) {
result.addFields(i, ConvertFieldSchema(schema.getFields().get(i)));
}
return result.build();
}

/**
* Converts from bigquery v2 Field Schema to bigquery storage API Field Schema.
*
* @param schame the bigquery v2 Field Schema
* @return the bigquery storage API Field Schema
*/
public static TableFieldSchema ConvertFieldSchema(Field field) {
TableFieldSchema.Builder result = TableFieldSchema.newBuilder();
if (field.getMode() == null) {
field = field.toBuilder().setMode(Field.Mode.NULLABLE).build();
}
result.setMode(BQTableSchemaModeMap.get(field.getMode()));
result.setName(field.getName());
result.setType(BQTableSchemaTypeMap.get(field.getType().getStandardType()));
if (field.getDescription() != null) {
result.setDescription(field.getDescription());
}
if (field.getSubFields() != null) {
for (int i = 0; i < field.getSubFields().size(); i++) {
result.addFields(i, ConvertFieldSchema(field.getSubFields().get(i)));
}
}
return result.build();
}
}
Expand Up @@ -21,14 +21,14 @@
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.Schema;
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.json.JSONArray;
Expand Down Expand Up @@ -62,21 +62,15 @@ public class JsonStreamWriter implements AutoCloseable {
private JsonStreamWriter(Builder builder)
throws Descriptors.DescriptorValidationException, IllegalArgumentException, IOException,
InterruptedException {
Matcher matcher = streamPattern.matcher(builder.streamName);
if (!matcher.matches()) {
throw new IllegalArgumentException("Invalid stream name: " + builder.streamName);
}

this.streamName = builder.streamName;
this.client = builder.client;
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema);

StreamWriter.Builder streamWriterBuilder;
if (this.client == null) {
streamWriterBuilder = StreamWriter.newBuilder(builder.streamName);
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName);
} else {
streamWriterBuilder = StreamWriter.newBuilder(builder.streamName, builder.client);
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName, builder.client);
}
setStreamWriterSettings(
streamWriterBuilder,
Expand All @@ -85,9 +79,12 @@ private JsonStreamWriter(Builder builder)
builder.batchingSettings,
builder.retrySettings,
builder.executorProvider,
builder.endpoint);
builder.endpoint,
builder.createDefaultStream);
this.streamWriter = streamWriterBuilder.build();
this.streamName = this.streamWriter.getStreamNameString();
}

/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data. If there is a
Expand Down Expand Up @@ -130,12 +127,12 @@ public ApiFuture<AppendRowsResponse> append(
synchronized (this) {
data.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor));
data.setRows(rowsBuilder.build());
AppendRowsRequest.Builder request = AppendRowsRequest.newBuilder().setProtoRows(data.build());
if (offset >= 0) {
request.setOffset(Int64Value.of(offset));
}
final ApiFuture<AppendRowsResponse> appendResponseFuture =
this.streamWriter.append(
AppendRowsRequest.newBuilder()
.setProtoRows(data.build())
.setOffset(Int64Value.of(offset))
.build());
this.streamWriter.append(request.build());
return appendResponseFuture;
}
}
Expand Down Expand Up @@ -183,7 +180,8 @@ private void setStreamWriterSettings(
@Nullable BatchingSettings batchingSettings,
@Nullable RetrySettings retrySettings,
@Nullable ExecutorProvider executorProvider,
@Nullable String endpoint) {
@Nullable String endpoint,
Boolean createDefaultStream) {
if (channelProvider != null) {
builder.setChannelProvider(channelProvider);
}
Expand All @@ -202,6 +200,9 @@ private void setStreamWriterSettings(
if (endpoint != null) {
builder.setEndpoint(endpoint);
}
if (createDefaultStream) {
builder.createDefaultStream();
}
JsonStreamWriterOnSchemaUpdateRunnable jsonStreamWriterOnSchemaUpdateRunnable =
new JsonStreamWriterOnSchemaUpdateRunnable();
jsonStreamWriterOnSchemaUpdateRunnable.setJsonStreamWriter(this);
Expand All @@ -221,34 +222,53 @@ void setTableSchema(TableSchema tableSchema) {
* newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by
* StreamWriter by default.
*
* @param streamName name of the stream that must follow
* @param streamOrTableName name of the stream that must follow
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" or if it is default stream
* (createDefaultStream is true on builder), then the name here should be a table name
* ""projects/[^/]+/datasets/[^/]+/tables/[^/]+"
* @param tableSchema The schema of the table when the stream was created, which is passed back
* through {@code WriteStream}
* @return Builder
*/
public static Builder newBuilder(String streamOrTableName, TableSchema tableSchema) {
Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null.");
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
return new Builder(streamOrTableName, tableSchema, null);
}

/**
* newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by
* StreamWriter by default.
*
* @param streamOrTableName name of the stream that must follow
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"
* @param tableSchema The schema of the table when the stream was created, which is passed back
* through {@code WriteStream}
* @return Builder
*/
public static Builder newBuilder(String streamName, TableSchema tableSchema) {
Preconditions.checkNotNull(streamName, "StreamName is null.");
public static Builder newBuilder(String streamOrTableName, Schema tableSchema) {
Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null.");
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
return new Builder(streamName, tableSchema, null);
return new Builder(
streamOrTableName, BQV2ToBQStorageConverter.ConvertTableSchema(tableSchema), null);
}

/**
* newBuilder that constructs a JsonStreamWriter builder.
*
* @param streamName name of the stream that must follow
* @param streamOrTableName name of the stream that must follow
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"
* @param tableSchema The schema of the table when the stream was created, which is passed back
* through {@code WriteStream}
* @param client
* @return Builder
*/
public static Builder newBuilder(
String streamName, TableSchema tableSchema, BigQueryWriteClient client) {
Preconditions.checkNotNull(streamName, "StreamName is null.");
String streamOrTableName, TableSchema tableSchema, BigQueryWriteClient client) {
Preconditions.checkNotNull(streamOrTableName, "StreamName is null.");
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
Preconditions.checkNotNull(client, "BigQuery client is null.");
return new Builder(streamName, tableSchema, client);
return new Builder(streamOrTableName, tableSchema, client);
}

/** Closes the underlying StreamWriter. */
Expand Down Expand Up @@ -291,7 +311,7 @@ public void run() {
}

public static final class Builder {
private String streamName;
private String streamOrTableName;
private BigQueryWriteClient client;
private TableSchema tableSchema;

Expand All @@ -301,17 +321,19 @@ public static final class Builder {
private RetrySettings retrySettings;
private ExecutorProvider executorProvider;
private String endpoint;
private boolean createDefaultStream = false;

/**
* Constructor for JsonStreamWriter's Builder
*
* @param streamName name of the stream that must follow
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"
* @param streamOrTableName name of the stream that must follow
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" or
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/_default"
* @param tableSchema schema used to convert Json to proto messages.
* @param client
*/
private Builder(String streamName, TableSchema tableSchema, BigQueryWriteClient client) {
this.streamName = streamName;
private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWriteClient client) {
this.streamOrTableName = streamOrTableName;
this.tableSchema = tableSchema;
this.client = client;
}
Expand Down Expand Up @@ -375,6 +397,16 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) {
return this;
}

/**
* If it is writing to a default stream.
*
* @return Builder
*/
public Builder createDefaultStream() {
this.createDefaultStream = true;
return this;
}

/**
* Setter for the underlying StreamWriter's Endpoint.
*
Expand Down