Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add JsonWriterCache.java and added JsonWriterCache in DirectWriter to allow JsonWrites #489

Merged
merged 5 commits into from Aug 16, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -33,11 +33,12 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import org.json.JSONArray;

/**
* Writer that can help user to write data to BigQuery. This is a simplified version of the Write
* API. For users writing with COMMITTED stream and don't care about row deduplication, it is
* recommended to use this Writer.
* recommended to use this Writer. The DiectWriter can be used to write both JSON and protobuf data.
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
*
* <pre>{@code
* DataProto data;
Expand All @@ -50,7 +51,9 @@
public class DirectWriter {
private static final Logger LOG = Logger.getLogger(DirectWriter.class.getName());
private static WriterCache cache = null;
private static JsonWriterCache jsonCache = null;
private static Lock cacheLock = new ReentrantLock();
private static Lock jsonCacheLock = new ReentrantLock();

/**
* Append rows to the given table.
Expand Down Expand Up @@ -103,10 +106,53 @@ public Long apply(Storage.AppendRowsResponse appendRowsResponse) {
MoreExecutors.directExecutor());
}

/**
* Append rows to the given table.
*
* @param tableName table name in the form of "projects/{pName}/datasets/{dName}/tables/{tName}"
* @param json A JSONArray
* @return A future that contains the offset at which the append happened. Only when the future
* returns with valid offset, then the append actually happened.
* @throws IOException, InterruptedException, InvalidArgumentException,
* Descriptors.DescriptorValidationException
*/
public static ApiFuture<Long> append(String tableName, JSONArray json)
throws IOException, InterruptedException, InvalidArgumentException,
Descriptors.DescriptorValidationException {
Preconditions.checkNotNull(tableName, "TableName is null.");
Preconditions.checkNotNull(json, "JSONArray is null.");

if (json.length() == 0) {
throw new InvalidArgumentException(
new Exception("Empty JSONArrays are not allowed"),
GrpcStatusCode.of(Status.Code.INVALID_ARGUMENT),
false);
}
try {
jsonCacheLock.lock();
if (jsonCache == null) {
jsonCache = JsonWriterCache.getInstance();
}
} finally {
jsonCacheLock.unlock();
}
JsonStreamWriter writer = jsonCache.getTableWriter(tableName);
return ApiFutures.<Storage.AppendRowsResponse, Long>transform(
writer.append(json, /* offset = */ -1, /*allowUnknownFields = */ false),
new ApiFunction<Storage.AppendRowsResponse, Long>() {
@Override
public Long apply(Storage.AppendRowsResponse appendRowsResponse) {
return Long.valueOf(appendRowsResponse.getOffset());
}
},
MoreExecutors.directExecutor());
}

@VisibleForTesting
public static void testSetStub(
BigQueryWriteClient stub, int maxTableEntry, SchemaCompatibility schemaCheck) {
cache = WriterCache.getTestInstance(stub, maxTableEntry, schemaCheck);
jsonCache = JsonWriterCache.getTestInstance(stub, maxTableEntry);
}

/** Clears the underlying cache and all the transport connections. */
Expand Down
Expand Up @@ -260,6 +260,11 @@ public void close() {
this.streamWriter.close();
}

/** Returns if a stream has expired. */
public Boolean expired() {
return this.streamWriter.expired();
}

private class JsonStreamWriterOnSchemaUpdateRunnable extends OnSchemaUpdateRunnable {
private JsonStreamWriter jsonStreamWriter;
/**
Expand Down
@@ -0,0 +1,147 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1alpha2;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.Descriptors;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* A cache of JsonStreamWriters that can be looked up by Table Name. The entries will expire after 5
* minutes if not used. Code sample: JsonWriterCache cache = JsonWriterCache.getInstance();
* JsonStreamWriter writer = cache.getWriter(); // Use... cache.returnWriter(writer);
*/
public class JsonWriterCache {
private static final Logger LOG = Logger.getLogger(JsonWriterCache.class.getName());

private static String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)";
private static Pattern tablePattern = Pattern.compile(tablePatternString);

private static JsonWriterCache instance;
private Cache<String, JsonStreamWriter> jsonWriterCache;

// Maximum number of tables to hold in the cache, once the maxium exceeded, the cache will be
// evicted based on least recent used.
private static final int MAX_TABLE_ENTRY = 100;
private static final int MAX_WRITERS_PER_TABLE = 1;

private final BigQueryWriteClient stub;

private JsonWriterCache(BigQueryWriteClient stub, int maxTableEntry) {
this.stub = stub;
jsonWriterCache =
CacheBuilder.newBuilder().maximumSize(maxTableEntry).<String, JsonStreamWriter>build();
}

public static JsonWriterCache getInstance() throws IOException {
if (instance == null) {
BigQueryWriteSettings stubSettings = BigQueryWriteSettings.newBuilder().build();
BigQueryWriteClient stub = BigQueryWriteClient.create(stubSettings);
instance = new JsonWriterCache(stub, MAX_TABLE_ENTRY);
}
return instance;
}

/** Returns a cache with custom stub used by test. */
@VisibleForTesting
public static JsonWriterCache getTestInstance(BigQueryWriteClient stub, int maxTableEntry) {
Preconditions.checkNotNull(stub, "Stub is null.");
return new JsonWriterCache(stub, maxTableEntry);
}

private Stream.WriteStream CreateNewWriteStream(String tableName) {
Stream.WriteStream stream =
Stream.WriteStream.newBuilder().setType(Stream.WriteStream.Type.COMMITTED).build();
stream =
stub.createWriteStream(
Storage.CreateWriteStreamRequest.newBuilder()
.setParent(tableName)
.setWriteStream(stream)
.build());
LOG.info("Created write stream:" + stream.getName());
return stream;
}

JsonStreamWriter CreateNewWriter(Stream.WriteStream writeStream)
throws IllegalArgumentException, IOException, InterruptedException,
Descriptors.DescriptorValidationException {
return JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
.setChannelProvider(stub.getSettings().getTransportChannelProvider())
.setCredentialsProvider(stub.getSettings().getCredentialsProvider())
.setExecutorProvider(stub.getSettings().getExecutorProvider())
.build();
}
/**
* Gets a writer for a given table with the given tableName
*
* @param tableName
* @return
* @throws Exception
*/
public JsonStreamWriter getTableWriter(String tableName)
throws IllegalArgumentException, IOException, InterruptedException,
Descriptors.DescriptorValidationException {
Preconditions.checkNotNull(tableName, "TableName is null.");
Matcher matcher = tablePattern.matcher(tableName);
if (!matcher.matches()) {
throw new IllegalArgumentException("Invalid table name: " + tableName);
}

Stream.WriteStream writeStream = null;
JsonStreamWriter writer = null;

synchronized (this) {
writer = jsonWriterCache.getIfPresent(tableName);
if (writer != null) {
if (!writer.expired()) {
return writer;
} else {
writer.close();
}
}
writeStream = CreateNewWriteStream(tableName);
writer = CreateNewWriter(writeStream);
jsonWriterCache.put(tableName, writer);
}
return writer;
}

/** Clear the cache and close all the writers in the cache. */
public void clear() {
synchronized (this) {
ConcurrentMap<String, JsonStreamWriter> map = jsonWriterCache.asMap();
for (String key : map.keySet()) {
JsonStreamWriter entry = jsonWriterCache.getIfPresent(key);
entry.close();
}
jsonWriterCache.cleanUp();
}
}

@VisibleForTesting
public long cachedTableCount() {
synchronized (jsonWriterCache) {
return jsonWriterCache.size();
}
}
}