Skip to content

Commit

Permalink
feat: add a Flush API to enable finer grained data commit needs for d…
Browse files Browse the repository at this point in the history
…ataflow. (#272)

fix: add resource definition for Table/ReadStream/WriteStream message
fix: add proper resource_reference for messages
chore: update copyright

committer: @xiaozhenliugg
PiperOrigin-RevId: 311188524

Source-Author: Google APIs <noreply@google.com>
Source-Date: Tue May 12 13:14:37 2020 -0700
Source-Repo: googleapis/googleapis
Source-Sha: bf17ae5fd93929beb44ac4c6b04f5088c3ee4a02
Source-Link: googleapis/googleapis@bf17ae5
  • Loading branch information
yoshi-automation committed May 13, 2020
1 parent 9c9471a commit b1c827f
Show file tree
Hide file tree
Showing 11 changed files with 1,911 additions and 126 deletions.
Expand Up @@ -26,6 +26,8 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
import com.google.cloud.bigquery.storage.v1alpha2.stub.BigQueryWriteStub;
Expand Down Expand Up @@ -591,6 +593,108 @@ public final BatchCommitWriteStreamsResponse batchCommitWriteStreams(
return stub.batchCommitWriteStreamsCallable();
}

// AUTO-GENERATED DOCUMENTATION AND METHOD
/**
* Flushes rows to a BUFFERED stream. If users are appending rows to BUFFERED stream, flush
* operation is required in order for the rows to become available for reading. A Flush operation
* flushes up to any previously flushed offset in a BUFFERED stream, to the offset specified in
* the request.
*
* <p>Sample code:
*
* <pre><code>
* try (BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient.create()) {
* WriteStreamName writeStream = WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]");
* FlushRowsResponse response = bigQueryWriteClient.flushRows(writeStream);
* }
* </code></pre>
*
* @param writeStream Required. The stream that is the target of the flush operation.
* @throws com.google.api.gax.rpc.ApiException if the remote call fails
*/
public final FlushRowsResponse flushRows(WriteStreamName writeStream) {
FlushRowsRequest request =
FlushRowsRequest.newBuilder()
.setWriteStream(writeStream == null ? null : writeStream.toString())
.build();
return flushRows(request);
}

// AUTO-GENERATED DOCUMENTATION AND METHOD
/**
* Flushes rows to a BUFFERED stream. If users are appending rows to BUFFERED stream, flush
* operation is required in order for the rows to become available for reading. A Flush operation
* flushes up to any previously flushed offset in a BUFFERED stream, to the offset specified in
* the request.
*
* <p>Sample code:
*
* <pre><code>
* try (BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient.create()) {
* WriteStreamName writeStream = WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]");
* FlushRowsResponse response = bigQueryWriteClient.flushRows(writeStream.toString());
* }
* </code></pre>
*
* @param writeStream Required. The stream that is the target of the flush operation.
* @throws com.google.api.gax.rpc.ApiException if the remote call fails
*/
public final FlushRowsResponse flushRows(String writeStream) {
FlushRowsRequest request = FlushRowsRequest.newBuilder().setWriteStream(writeStream).build();
return flushRows(request);
}

// AUTO-GENERATED DOCUMENTATION AND METHOD
/**
* Flushes rows to a BUFFERED stream. If users are appending rows to BUFFERED stream, flush
* operation is required in order for the rows to become available for reading. A Flush operation
* flushes up to any previously flushed offset in a BUFFERED stream, to the offset specified in
* the request.
*
* <p>Sample code:
*
* <pre><code>
* try (BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient.create()) {
* WriteStreamName writeStream = WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]");
* FlushRowsRequest request = FlushRowsRequest.newBuilder()
* .setWriteStream(writeStream.toString())
* .build();
* FlushRowsResponse response = bigQueryWriteClient.flushRows(request);
* }
* </code></pre>
*
* @param request The request object containing all of the parameters for the API call.
* @throws com.google.api.gax.rpc.ApiException if the remote call fails
*/
public final FlushRowsResponse flushRows(FlushRowsRequest request) {
return flushRowsCallable().call(request);
}

// AUTO-GENERATED DOCUMENTATION AND METHOD
/**
* Flushes rows to a BUFFERED stream. If users are appending rows to BUFFERED stream, flush
* operation is required in order for the rows to become available for reading. A Flush operation
* flushes up to any previously flushed offset in a BUFFERED stream, to the offset specified in
* the request.
*
* <p>Sample code:
*
* <pre><code>
* try (BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient.create()) {
* WriteStreamName writeStream = WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]");
* FlushRowsRequest request = FlushRowsRequest.newBuilder()
* .setWriteStream(writeStream.toString())
* .build();
* ApiFuture&lt;FlushRowsResponse&gt; future = bigQueryWriteClient.flushRowsCallable().futureCall(request);
* // Do something
* FlushRowsResponse response = future.get();
* }
* </code></pre>
*/
public final UnaryCallable<FlushRowsRequest, FlushRowsResponse> flushRowsCallable() {
return stub.flushRowsCallable();
}

@Override
public final void close() {
stub.close();
Expand Down
Expand Up @@ -33,6 +33,8 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
import com.google.cloud.bigquery.storage.v1alpha2.stub.BigQueryWriteStubSettings;
Expand Down Expand Up @@ -102,6 +104,11 @@ public UnaryCallSettings<GetWriteStreamRequest, WriteStream> getWriteStreamSetti
return ((BigQueryWriteStubSettings) getStubSettings()).batchCommitWriteStreamsSettings();
}

/** Returns the object with the settings used for calls to flushRows. */
public UnaryCallSettings<FlushRowsRequest, FlushRowsResponse> flushRowsSettings() {
return ((BigQueryWriteStubSettings) getStubSettings()).flushRowsSettings();
}

public static final BigQueryWriteSettings create(BigQueryWriteStubSettings stub)
throws IOException {
return new BigQueryWriteSettings.Builder(stub.toBuilder()).build();
Expand Down Expand Up @@ -229,6 +236,11 @@ public UnaryCallSettings.Builder<GetWriteStreamRequest, WriteStream> getWriteStr
return getStubSettingsBuilder().batchCommitWriteStreamsSettings();
}

/** Returns the builder for the settings used for calls to flushRows. */
public UnaryCallSettings.Builder<FlushRowsRequest, FlushRowsResponse> flushRowsSettings() {
return getStubSettingsBuilder().flushRowsSettings();
}

@Override
public BigQueryWriteSettings build() throws IOException {
return new BigQueryWriteSettings(this);
Expand Down
Expand Up @@ -26,6 +26,8 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
import javax.annotation.Generated;
Expand Down Expand Up @@ -62,6 +64,10 @@ public UnaryCallable<GetWriteStreamRequest, WriteStream> getWriteStreamCallable(
throw new UnsupportedOperationException("Not implemented: batchCommitWriteStreamsCallable()");
}

public UnaryCallable<FlushRowsRequest, FlushRowsResponse> flushRowsCallable() {
throw new UnsupportedOperationException("Not implemented: flushRowsCallable()");
}

@Override
public abstract void close();
}
Expand Up @@ -38,6 +38,8 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -99,6 +101,7 @@ public class BigQueryWriteStubSettings extends StubSettings<BigQueryWriteStubSet
finalizeWriteStreamSettings;
private final UnaryCallSettings<BatchCommitWriteStreamsRequest, BatchCommitWriteStreamsResponse>
batchCommitWriteStreamsSettings;
private final UnaryCallSettings<FlushRowsRequest, FlushRowsResponse> flushRowsSettings;

/** Returns the object with the settings used for calls to createWriteStream. */
public UnaryCallSettings<CreateWriteStreamRequest, WriteStream> createWriteStreamSettings() {
Expand Down Expand Up @@ -127,6 +130,11 @@ public UnaryCallSettings<GetWriteStreamRequest, WriteStream> getWriteStreamSetti
return batchCommitWriteStreamsSettings;
}

/** Returns the object with the settings used for calls to flushRows. */
public UnaryCallSettings<FlushRowsRequest, FlushRowsResponse> flushRowsSettings() {
return flushRowsSettings;
}

@BetaApi("A restructuring of stub classes is planned, so this may break in the future")
public BigQueryWriteStub createStub() throws IOException {
if (getTransportChannelProvider()
Expand Down Expand Up @@ -201,6 +209,7 @@ protected BigQueryWriteStubSettings(Builder settingsBuilder) throws IOException
getWriteStreamSettings = settingsBuilder.getWriteStreamSettings().build();
finalizeWriteStreamSettings = settingsBuilder.finalizeWriteStreamSettings().build();
batchCommitWriteStreamsSettings = settingsBuilder.batchCommitWriteStreamsSettings().build();
flushRowsSettings = settingsBuilder.flushRowsSettings().build();
}

/** Builder for BigQueryWriteStubSettings. */
Expand All @@ -218,6 +227,7 @@ public static class Builder extends StubSettings.Builder<BigQueryWriteStubSettin
private final UnaryCallSettings.Builder<
BatchCommitWriteStreamsRequest, BatchCommitWriteStreamsResponse>
batchCommitWriteStreamsSettings;
private final UnaryCallSettings.Builder<FlushRowsRequest, FlushRowsResponse> flushRowsSettings;

private static final ImmutableMap<String, ImmutableSet<StatusCode.Code>>
RETRYABLE_CODE_DEFINITIONS;
Expand Down Expand Up @@ -270,12 +280,15 @@ protected Builder(ClientContext clientContext) {

batchCommitWriteStreamsSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();

flushRowsSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
createWriteStreamSettings,
getWriteStreamSettings,
finalizeWriteStreamSettings,
batchCommitWriteStreamsSettings);
batchCommitWriteStreamsSettings,
flushRowsSettings);

initDefaults(this);
}
Expand Down Expand Up @@ -311,6 +324,11 @@ private static Builder initDefaults(Builder builder) {
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("idempotent"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("default"));

builder
.flushRowsSettings()
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("non_idempotent"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("default"));

return builder;
}

Expand All @@ -322,13 +340,15 @@ protected Builder(BigQueryWriteStubSettings settings) {
getWriteStreamSettings = settings.getWriteStreamSettings.toBuilder();
finalizeWriteStreamSettings = settings.finalizeWriteStreamSettings.toBuilder();
batchCommitWriteStreamsSettings = settings.batchCommitWriteStreamsSettings.toBuilder();
flushRowsSettings = settings.flushRowsSettings.toBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
createWriteStreamSettings,
getWriteStreamSettings,
finalizeWriteStreamSettings,
batchCommitWriteStreamsSettings);
batchCommitWriteStreamsSettings,
flushRowsSettings);
}

// NEXT_MAJOR_VER: remove 'throws Exception'
Expand Down Expand Up @@ -377,6 +397,11 @@ public UnaryCallSettings.Builder<GetWriteStreamRequest, WriteStream> getWriteStr
return batchCommitWriteStreamsSettings;
}

/** Returns the builder for the settings used for calls to flushRows. */
public UnaryCallSettings.Builder<FlushRowsRequest, FlushRowsResponse> flushRowsSettings() {
return flushRowsSettings;
}

@Override
public BigQueryWriteStubSettings build() throws IOException {
return new BigQueryWriteStubSettings(this);
Expand Down
Expand Up @@ -31,6 +31,8 @@
import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -103,6 +105,14 @@ public class GrpcBigQueryWriteStub extends BigQueryWriteStub {
.setResponseMarshaller(
ProtoUtils.marshaller(BatchCommitWriteStreamsResponse.getDefaultInstance()))
.build();
private static final MethodDescriptor<FlushRowsRequest, FlushRowsResponse>
flushRowsMethodDescriptor =
MethodDescriptor.<FlushRowsRequest, FlushRowsResponse>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName("google.cloud.bigquery.storage.v1alpha2.BigQueryWrite/FlushRows")
.setRequestMarshaller(ProtoUtils.marshaller(FlushRowsRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(FlushRowsResponse.getDefaultInstance()))
.build();

private final BackgroundResource backgroundResources;

Expand All @@ -113,6 +123,7 @@ public class GrpcBigQueryWriteStub extends BigQueryWriteStub {
finalizeWriteStreamCallable;
private final UnaryCallable<BatchCommitWriteStreamsRequest, BatchCommitWriteStreamsResponse>
batchCommitWriteStreamsCallable;
private final UnaryCallable<FlushRowsRequest, FlushRowsResponse> flushRowsCallable;

private final GrpcStubCallableFactory callableFactory;

Expand Down Expand Up @@ -212,6 +223,19 @@ public Map<String, String> extract(BatchCommitWriteStreamsRequest request) {
}
})
.build();
GrpcCallSettings<FlushRowsRequest, FlushRowsResponse> flushRowsTransportSettings =
GrpcCallSettings.<FlushRowsRequest, FlushRowsResponse>newBuilder()
.setMethodDescriptor(flushRowsMethodDescriptor)
.setParamsExtractor(
new RequestParamsExtractor<FlushRowsRequest>() {
@Override
public Map<String, String> extract(FlushRowsRequest request) {
ImmutableMap.Builder<String, String> params = ImmutableMap.builder();
params.put("write_stream", String.valueOf(request.getWriteStream()));
return params.build();
}
})
.build();

this.createWriteStreamCallable =
callableFactory.createUnaryCallable(
Expand All @@ -234,6 +258,9 @@ public Map<String, String> extract(BatchCommitWriteStreamsRequest request) {
batchCommitWriteStreamsTransportSettings,
settings.batchCommitWriteStreamsSettings(),
clientContext);
this.flushRowsCallable =
callableFactory.createUnaryCallable(
flushRowsTransportSettings, settings.flushRowsSettings(), clientContext);

backgroundResources = new BackgroundResourceAggregation(clientContext.getBackgroundResources());
}
Expand All @@ -260,6 +287,10 @@ public UnaryCallable<GetWriteStreamRequest, WriteStream> getWriteStreamCallable(
return batchCommitWriteStreamsCallable;
}

public UnaryCallable<FlushRowsRequest, FlushRowsResponse> flushRowsCallable() {
return flushRowsCallable;
}

@Override
public final void close() {
shutdown();
Expand Down

0 comments on commit b1c827f

Please sign in to comment.