Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add schema update back to json writer #905

Merged
merged 7 commits into from Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Expand Up @@ -28,4 +28,19 @@
<differenceType>7002</differenceType>
<method>void flushAll(long)</method>
</difference>
<difference>
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter$Builder</className>
<differenceType>7002</differenceType>
<method>com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter$Builder setBatchingSettings(com.google.api.gax.batching.BatchingSettings)</method>
</difference>
<difference>
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter$Builder</className>
<differenceType>7002</differenceType>
<method>com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter$Builder setExecutorProvider(com.google.api.gax.core.ExecutorProvider)</method>
</difference>
<difference>
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter$Builder</className>
<differenceType>7002</differenceType>
<method>com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter$Builder setRetrySettings(com.google.api.gax.retrying.RetrySettings)</method>
</difference>
</differences>
Expand Up @@ -17,9 +17,8 @@

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.Schema;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -51,6 +50,7 @@ public class JsonStreamWriter implements AutoCloseable {
private BigQueryWriteClient client;
private String streamName;
private StreamWriter streamWriter;
private StreamWriter.Builder streamWriterBuilder;
private Descriptor descriptor;
private TableSchema tableSchema;

Expand All @@ -66,20 +66,16 @@ private JsonStreamWriter(Builder builder)
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema);

StreamWriter.Builder streamWriterBuilder;
if (this.client == null) {
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName);
} else {
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName, builder.client);
}
setStreamWriterSettings(
streamWriterBuilder,
builder.channelProvider,
builder.credentialsProvider,
builder.batchingSettings,
builder.retrySettings,
builder.executorProvider,
builder.endpoint,
builder.flowControlSettings,
builder.createDefaultStream);
this.streamWriter = streamWriterBuilder.build();
this.streamName = this.streamWriter.getStreamNameString();
Expand Down Expand Up @@ -134,17 +130,17 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
}

/**
* Refreshes connection for a JsonStreamWriter by first flushing all remaining rows, then calling
* refreshAppend(), and finally setting the descriptor. All of these actions need to be performed
* atomically to avoid having synchronization issues with append(). Flushing all rows first is
* necessary since if there are rows remaining when the connection refreshes, it will send out the
* old writer schema instead of the new one.
* Refreshes connection for a JsonStreamWriter by first flushing all remaining rows, then
* recreates stream writer, and finally setting the descriptor. All of these actions need to be
* performed atomically to avoid having synchronization issues with append(). Flushing all rows
* first is necessary since if there are rows remaining when the connection refreshes, it will
* send out the old writer schema instead of the new one.
*/
void refreshConnection()
throws IOException, InterruptedException, Descriptors.DescriptorValidationException {
synchronized (this) {
this.streamWriter.writeAllOutstanding();
this.streamWriter.refreshAppend();
this.streamWriter.shutdown();
this.streamWriter = streamWriterBuilder.build();
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(this.tableSchema);
}
Expand All @@ -170,39 +166,37 @@ public Descriptor getDescriptor() {

/** Sets all StreamWriter settings. */
private void setStreamWriterSettings(
StreamWriter.Builder builder,
@Nullable TransportChannelProvider channelProvider,
@Nullable CredentialsProvider credentialsProvider,
@Nullable BatchingSettings batchingSettings,
@Nullable RetrySettings retrySettings,
@Nullable ExecutorProvider executorProvider,
@Nullable String endpoint,
@Nullable FlowControlSettings flowControlSettings,
Boolean createDefaultStream) {
if (channelProvider != null) {
builder.setChannelProvider(channelProvider);
streamWriterBuilder.setChannelProvider(channelProvider);
}
if (credentialsProvider != null) {
builder.setCredentialsProvider(credentialsProvider);
streamWriterBuilder.setCredentialsProvider(credentialsProvider);
}
if (batchingSettings != null) {
builder.setBatchingSettings(batchingSettings);
}
if (retrySettings != null) {
builder.setRetrySettings(retrySettings);
}
if (executorProvider != null) {
builder.setExecutorProvider(executorProvider);
BatchingSettings.Builder batchSettingBuilder =
BatchingSettings.newBuilder()
.setElementCountThreshold(1L)
.setRequestByteThreshold(4 * 1024 * 1024L);
if (flowControlSettings != null) {
streamWriterBuilder.setBatchingSettings(
batchSettingBuilder.setFlowControlSettings(flowControlSettings).build());
} else {
streamWriterBuilder.setBatchingSettings(batchSettingBuilder.build());
}
if (endpoint != null) {
builder.setEndpoint(endpoint);
streamWriterBuilder.setEndpoint(endpoint);
}
if (createDefaultStream) {
builder.createDefaultStream();
streamWriterBuilder.createDefaultStream();
}
JsonStreamWriterOnSchemaUpdateRunnable jsonStreamWriterOnSchemaUpdateRunnable =
new JsonStreamWriterOnSchemaUpdateRunnable();
jsonStreamWriterOnSchemaUpdateRunnable.setJsonStreamWriter(this);
builder.setOnSchemaUpdateRunnable(jsonStreamWriterOnSchemaUpdateRunnable);
streamWriterBuilder.setOnSchemaUpdateRunnable(jsonStreamWriterOnSchemaUpdateRunnable);
}

/**
Expand Down Expand Up @@ -313,9 +307,7 @@ public static final class Builder {

private TransportChannelProvider channelProvider;
private CredentialsProvider credentialsProvider;
private BatchingSettings batchingSettings;
private RetrySettings retrySettings;
private ExecutorProvider executorProvider;
private FlowControlSettings flowControlSettings;
private String endpoint;
private boolean createDefaultStream = false;

Expand Down Expand Up @@ -359,37 +351,15 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
}

/**
* Setter for the underlying StreamWriter's BatchingSettings.
*
* @param batchingSettings
* @return Builder
*/
public Builder setBatchingSettings(BatchingSettings batchingSettings) {
this.batchingSettings =
Preconditions.checkNotNull(batchingSettings, "BatchingSettings is null.");
return this;
}

/**
* Setter for the underlying StreamWriter's RetrySettings.
*
* @param retrySettings
* @return Builder
*/
public Builder setRetrySettings(RetrySettings retrySettings) {
this.retrySettings = Preconditions.checkNotNull(retrySettings, "RetrySettings is null.");
return this;
}

/**
* Setter for the underlying StreamWriter's ExecutorProvider.
* Setter for the underlying StreamWriter's FlowControlSettings.
*
* @param executorProvider
* @param flowControlSettings
* @return Builder
*/
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider =
Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null.");
public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
Preconditions.checkNotNull(flowControlSettings, "FlowControlSettings is null.");
this.flowControlSettings =
Preconditions.checkNotNull(flowControlSettings, "FlowControlSettings is null.");
return this;
}

Expand Down
Expand Up @@ -265,7 +265,9 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
List<InflightBatch> batchesToSend;
batchesToSend = messagesBatch.add(outstandingAppend);
// Setup the next duration based delivery alarm if there are messages batched.
setupAlarm();
if (batchingSettings.getDelayThreshold() != null) {
setupAlarm();
}
if (!batchesToSend.isEmpty()) {
for (final InflightBatch batch : batchesToSend) {
LOG.fine("Scheduling a batch for immediate sending");
Expand Down Expand Up @@ -738,58 +740,31 @@ public Builder setBatchingSettings(BatchingSettings batchingSettings) {
if (batchingSettings.getRequestByteThreshold() > getApiMaxRequestBytes()) {
builder.setRequestByteThreshold(getApiMaxRequestBytes());
}
Preconditions.checkNotNull(batchingSettings.getDelayThreshold());
Preconditions.checkArgument(batchingSettings.getDelayThreshold().toMillis() > 0);
LOG.info("here" + batchingSettings.getFlowControlSettings());
if (batchingSettings.getFlowControlSettings() == null) {
builder.setFlowControlSettings(DEFAULT_FLOW_CONTROL_SETTINGS);
} else {

if (batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount() == null) {
builder.setFlowControlSettings(
batchingSettings
.getFlowControlSettings()
.toBuilder()
.setMaxOutstandingElementCount(
DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingElementCount())
.build());
} else {
Preconditions.checkArgument(
batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount() > 0);
if (batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount()
> getApiMaxInflightRequests()) {
builder.setFlowControlSettings(
batchingSettings
.getFlowControlSettings()
.toBuilder()
.setMaxOutstandingElementCount(getApiMaxInflightRequests())
.build());
}
Long elementCount =
batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount();
if (elementCount == null || elementCount > getApiMaxInflightRequests()) {
elementCount = DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingElementCount();
}
if (batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes() == null) {
builder.setFlowControlSettings(
batchingSettings
.getFlowControlSettings()
.toBuilder()
.setMaxOutstandingRequestBytes(
DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingRequestBytes())
.build());
} else {
Preconditions.checkArgument(
batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes() > 0);
Long elementSize =
batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes();
if (elementSize == null || elementSize < 0) {
elementSize = DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingRequestBytes();
}
if (batchingSettings.getFlowControlSettings().getLimitExceededBehavior() == null) {
builder.setFlowControlSettings(
batchingSettings
.getFlowControlSettings()
.toBuilder()
.setLimitExceededBehavior(
DEFAULT_FLOW_CONTROL_SETTINGS.getLimitExceededBehavior())
.build());
} else {
Preconditions.checkArgument(
batchingSettings.getFlowControlSettings().getLimitExceededBehavior()
!= FlowController.LimitExceededBehavior.Ignore);
FlowController.LimitExceededBehavior behavior =
batchingSettings.getFlowControlSettings().getLimitExceededBehavior();
if (behavior == null || behavior == FlowController.LimitExceededBehavior.Ignore) {
behavior = DEFAULT_FLOW_CONTROL_SETTINGS.getLimitExceededBehavior();
}
builder.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(elementCount)
.setMaxOutstandingRequestBytes(elementSize)
.setLimitExceededBehavior(behavior)
.build());
}
this.batchingSettings = builder.build();
return this;
Expand Down