Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Direct writer #165

Merged
merged 6 commits into from Apr 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,105 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1alpha2;

import com.google.api.core.*;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.cloud.bigquery.storage.v1alpha2.ProtoBufProto.ProtoRows;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsRequest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import io.grpc.Status;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;

/**
* Writer that can help user to write data to BigQuery. This is a simplified version of the Write
* API. For users writing with COMMITTED stream and don't care about row deduplication, it is
* recommended to use this Writer.
*
* <pre>{@code
* DataProto data;
* ApiFuture<Long> response = DirectWriter.<DataProto>append("projects/pid/datasets/did/tables/tid", Arrays.asList(data1));
* }</pre>
*
* <p>{@link DirectWriter} will use the credentials set on the channel, which uses application
* default credentials through {@link GoogleCredentials#getApplicationDefault} by default.
*/
public class DirectWriter {
private static final Logger LOG = Logger.getLogger(DirectWriter.class.getName());
private static WriterCache cache = null;
private static Lock cacheLock = new ReentrantLock();

/**
* Append rows to the given table.
*
* @param tableName table name in the form of "projects/{pName}/datasets/{dName}/tables/{tName}"
* @param protoRows rows in proto buffer format.
* @return A future that contains the offset at which the append happened. Only when the future
* returns with valid offset, then the append actually happened.
* @throws IOException, InterruptedException, InvalidArgumentException
*/
public static <T extends Message> ApiFuture<Long> append(String tableName, List<T> protoRows)
throws IOException, InterruptedException, InvalidArgumentException {
if (protoRows.isEmpty()) {
throw new InvalidArgumentException(
new Exception("Empty rows are not allowed"),
GrpcStatusCode.of(Status.Code.INVALID_ARGUMENT),
false);
}
try {
cacheLock.lock();
if (cache == null) {
cache = WriterCache.getInstance();
}
} finally {
cacheLock.unlock();
}

StreamWriter writer = cache.getTableWriter(tableName, protoRows.get(0).getDescriptorForType());
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
Descriptors.Descriptor descriptor = null;
yirutang marked this conversation as resolved.
Show resolved Hide resolved
for (Message protoRow : protoRows) {
rowsBuilder.addSerializedRows(protoRow.toByteString());
}

AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder();
data.setWriterSchema(ProtoSchemaConverter.convert(protoRows.get(0).getDescriptorForType()));
data.setRows(rowsBuilder.build());

return ApiFutures.<Storage.AppendRowsResponse, Long>transform(
writer.append(AppendRowsRequest.newBuilder().setProtoRows(data.build()).build()),
new ApiFunction<Storage.AppendRowsResponse, Long>() {
@Override
public Long apply(Storage.AppendRowsResponse appendRowsResponse) {
return Long.valueOf(appendRowsResponse.getOffset());
}
},
MoreExecutors.directExecutor());
}

@VisibleForTesting
public static void testSetStub(
BigQueryWriteClient stub, int maxTableEntry, SchemaCompact schemaCheck) {
cache = WriterCache.getTestInstance(stub, maxTableEntry, schemaCheck);
}
}
@@ -0,0 +1,99 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1alpha2;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

yirutang marked this conversation as resolved.
Show resolved Hide resolved
/**
* A class that checks the schema compatibility between user schema in proto descriptor and Bigquery
* table schema. If this check is passed, then user can write to BigQuery table using the user
* schema, otherwise the write will fail.
*
* <p>The implementation as of now is not complete, which measn, if this check passed, there is
* still a possbility of writing will fail.
*/
public class SchemaCompact {
private BigQuery bigquery;
private static SchemaCompact compact;
private static String tablePatternString = "projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)";
private static Pattern tablePattern = Pattern.compile(tablePatternString);

private SchemaCompact(BigQuery bigquery) {
this.bigquery = bigquery;
}

/**
* Gets a singleton {code SchemaCompact} object.
*
* @return
*/
public static SchemaCompact getInstance() {
if (compact == null) {
RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
compact = new SchemaCompact(bigqueryHelper.getOptions().getService());
}
return compact;
}

/**
* Gets a {code SchemaCompact} object with custom BigQuery stub.
*
* @param bigquery
* @return
*/
@VisibleForTesting
public static SchemaCompact getInstance(BigQuery bigquery) {
return new SchemaCompact(bigquery);
}

private TableId getTableId(String tableName) {
Matcher matcher = tablePattern.matcher(tableName);
if (!matcher.matches() || matcher.groupCount() != 3) {
throw new IllegalArgumentException("Invalid table name: " + tableName);
}
return TableId.of(matcher.group(1), matcher.group(2), matcher.group(3));
}

/**
* Checks if the userSchema is compatible with the table's current schema for writing. The current
* implementatoin is not complete. If the check failed, the write couldn't succeed.
*
* @param tableName The name of the table to write to.
* @param userSchema The schema user uses to append data.
* @throws IllegalArgumentException the check failed.
*/
public void check(String tableName, Descriptors.Descriptor userSchema)
throws IllegalArgumentException {
Table table = bigquery.getTable(getTableId(tableName));
Schema schema = table.getDefinition().getSchema();
// TODO: We only have very limited check here. More checks to be added.
if (schema.getFields().size() != userSchema.getFields().size()) {
yirutang marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalArgumentException(
"User schema doesn't have expected field number with BigQuery table schema, expected: "
+ schema.getFields().size()
+ " actual: "
+ userSchema.getFields().size());
}
}
}