Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: several StreamWriter issues #213

Merged
merged 3 commits into from Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -85,7 +85,7 @@ public class StreamWriter implements AutoCloseable {

private final BatchingSettings batchingSettings;
private final RetrySettings retrySettings;
private final BigQueryWriteSettings stubSettings;
private BigQueryWriteSettings stubSettings;

private final Lock messagesBatchLock;
private final MessagesBatch messagesBatch;
Expand Down Expand Up @@ -142,13 +142,21 @@ private StreamWriter(Builder builder)
messagesWaiter = new Waiter(this.batchingSettings.getFlowControlSettings());
responseObserver = new AppendResponseObserver(this);

stubSettings =
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setExecutorProvider(builder.executorProvider)
.setTransportChannelProvider(builder.channelProvider)
.setEndpoint(builder.endpoint)
.build();
if (builder.client == null) {
stubSettings =
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setExecutorProvider(builder.executorProvider)
.setTransportChannelProvider(builder.channelProvider)
.setEndpoint(builder.endpoint)
.build();
stub = BigQueryWriteClient.create(stubSettings);
backgroundResourceList.add(stub);
} else {
stub = builder.client;
}
backgroundResources = new BackgroundResourceAggregation(backgroundResourceList);

shutdown = new AtomicBoolean(false);
refreshAppend();
Stream.WriteStream stream =
Expand Down Expand Up @@ -240,15 +248,10 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
public void refreshAppend() throws IOException, InterruptedException {
synchronized (this) {
Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer.");
if (stub != null) {
// There could be a moment, stub is not yet initialized.
if (clientStream != null) {
clientStream.closeSend();
stub.shutdown();
stub.awaitTermination(1, TimeUnit.MINUTES);
}
backgroundResourceList.remove(stub);
stub = BigQueryWriteClient.create(stubSettings);
backgroundResourceList.add(stub);
backgroundResources = new BackgroundResourceAggregation(backgroundResourceList);
messagesBatch.resetAttachSchema();
bidiStreamingCallable = stub.appendRowsCallable();
clientStream = bidiStreamingCallable.splitCall(responseObserver);
Expand Down Expand Up @@ -314,14 +317,12 @@ public void writeAllOutstanding() {
private void writeBatch(final InflightBatch inflightBatch) {
if (inflightBatch != null) {
AppendRowsRequest request = inflightBatch.getMergedRequest();
messagesWaiter.waitOnElementCount();
messagesWaiter.waitOnSizeLimit(inflightBatch.getByteSize());
responseObserver.addInflightBatch(inflightBatch);
clientStream.send(request);

synchronized (messagesWaiter) {
messagesWaiter.incrementPendingCount(1);
messagesWaiter.incrementPendingSize(inflightBatch.getByteSize());
try {
messagesWaiter.acquire(inflightBatch.getByteSize());
responseObserver.addInflightBatch(inflightBatch);
clientStream.send(request);
} catch (FlowController.FlowControlException ex) {
inflightBatch.onFailure(ex);
}
}
}
Expand All @@ -346,14 +347,14 @@ private static final class InflightBatch {
final ArrayList<Long> offsetList;
final long creationTime;
int attempt;
int batchSizeBytes;
long batchSizeBytes;
long expectedOffset;
Boolean attachSchema;
String streamName;

InflightBatch(
List<AppendRequestAndFutureResponse> inflightRequests,
int batchSizeBytes,
long batchSizeBytes,
String streamName,
Boolean attachSchema) {
this.inflightRequests = inflightRequests;
Expand All @@ -377,7 +378,7 @@ int count() {
return inflightRequests.size();
}

int getByteSize() {
long getByteSize() {
return this.batchSizeBytes;
}

Expand Down Expand Up @@ -478,7 +479,9 @@ public void shutdown() {
currentAlarmFuture.cancel(false);
}
writeAllOutstanding();
messagesWaiter.waitComplete();
synchronized (messagesWaiter) {
messagesWaiter.waitComplete();
}
if (clientStream.isSendReady()) {
clientStream.closeSend();
}
Expand All @@ -496,7 +499,7 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted
}

/**
* Constructs a new {@link Builder} using the given topic.
* Constructs a new {@link Builder} using the given stream.
*
* <p>Example of creating a {@code WriteStream}.
*
Expand All @@ -514,7 +517,15 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted
* }</pre>
*/
public static Builder newBuilder(String streamName) {
return new Builder(streamName);
return new Builder(streamName, null);
}

/**
* Constructs a new {@link Builder} using the given stream and an existing BigQueryWriteClient.
*/
public static Builder newBuilder(String streamName, BigQueryWriteClient client) {
Preconditions.checkArgument(client != null);
return new Builder(streamName, client);
}

/** A builder of {@link StreamWriter}s. */
Expand All @@ -523,9 +534,6 @@ public static final class Builder {
static final Duration MIN_RPC_TIMEOUT = Duration.ofMillis(10);

// Meaningful defaults.
static final long DEFAULT_ELEMENT_COUNT_THRESHOLD = 100L;
static final long DEFAULT_REQUEST_BYTES_THRESHOLD = 100 * 1024L; // 100 kB
static final Duration DEFAULT_DELAY_THRESHOLD = Duration.ofMillis(10);
static final FlowControlSettings DEFAULT_FLOW_CONTROL_SETTINGS =
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
Expand All @@ -534,9 +542,9 @@ public static final class Builder {
.build();
public static final BatchingSettings DEFAULT_BATCHING_SETTINGS =
BatchingSettings.newBuilder()
.setDelayThreshold(DEFAULT_DELAY_THRESHOLD)
.setRequestByteThreshold(DEFAULT_REQUEST_BYTES_THRESHOLD)
.setElementCountThreshold(DEFAULT_ELEMENT_COUNT_THRESHOLD)
.setDelayThreshold(Duration.ofMillis(10))
.setRequestByteThreshold(100 * 1024L) // 100 kb
.setElementCountThreshold(100L)
.setFlowControlSettings(DEFAULT_FLOW_CONTROL_SETTINGS)
.build();
public static final RetrySettings DEFAULT_RETRY_SETTINGS =
Expand All @@ -555,6 +563,8 @@ public static final class Builder {
private String streamName;
private String endpoint = BigQueryWriteSettings.getDefaultEndpoint();

private BigQueryWriteClient client = null;

// Batching options
BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS;

Expand All @@ -569,8 +579,9 @@ public static final class Builder {
private CredentialsProvider credentialsProvider =
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();

private Builder(String stream) {
private Builder(String stream, BigQueryWriteClient client) {
this.streamName = Preconditions.checkNotNull(stream);
this.client = client;
}

/**
Expand Down Expand Up @@ -771,11 +782,7 @@ public void onResponse(AppendRowsResponse response) {
inflightBatch.onSuccess(response);
}
} finally {
synchronized (streamWriter.messagesWaiter) {
streamWriter.messagesWaiter.incrementPendingCount(-1);
streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize());
streamWriter.messagesWaiter.notifyAll();
}
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
}
}

Expand Down Expand Up @@ -805,11 +812,11 @@ public void onError(Throwable t) {
&& !streamWriter.shutdown.get()) {
streamWriter.refreshAppend();
// Currently there is a bug that it took reconnected stream 5 seconds to pick up
// stream count. So wait at least 5 seconds before sending a new request.
// stream count. So wait at least 7 seconds before sending a new request.
Thread.sleep(
Math.min(
streamWriter.getRetrySettings().getInitialRetryDelay().toMillis(),
Duration.ofSeconds(5).toMillis()));
Duration.ofSeconds(7).toMillis()));
streamWriter.writeBatch(inflightBatch);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries++;
Expand Down Expand Up @@ -837,19 +844,15 @@ public void onError(Throwable t) {
}
}
} finally {
synchronized (streamWriter.messagesWaiter) {
streamWriter.messagesWaiter.incrementPendingCount(-1);
streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize());
streamWriter.messagesWaiter.notifyAll();
}
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
}
}
};

// This class controls how many messages are going to be sent out in a batch.
private static class MessagesBatch {
private List<AppendRequestAndFutureResponse> messages;
private int batchedBytes;
private long batchedBytes;
private final BatchingSettings batchingSettings;
private Boolean attachSchema = true;
private final String streamName;
Expand Down Expand Up @@ -882,7 +885,7 @@ private boolean isEmpty() {
return messages.isEmpty();
}

private int getBatchedBytes() {
private long getBatchedBytes() {
return batchedBytes;
}

Expand Down