Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add a flushAll() method that will flush all the inflight request and make sure all responses returned #492

Merged
merged 17 commits into from Aug 19, 2020
Expand Up @@ -99,6 +99,10 @@ public class StreamWriter implements AutoCloseable {
private final Lock appendAndRefreshAppendLock;
private final MessagesBatch messagesBatch;

// Indicates if a stream has some non recoverable exception happened.
private final Lock exceptionLock;
private Throwable streamException;

private BackgroundResource backgroundResources;
private List<BackgroundResource> backgroundResourceList;

Expand Down Expand Up @@ -145,10 +149,13 @@ private StreamWriter(Builder builder)

this.batchingSettings = builder.batchingSettings;
this.retrySettings = builder.retrySettings;
this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName);
this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName, this);
messagesBatchLock = new ReentrantLock();
appendAndRefreshAppendLock = new ReentrantLock();
activeAlarm = new AtomicBoolean(false);
this.exceptionLock = new ReentrantLock();
this.streamException = null;

executor = builder.executorProvider.getExecutor();
backgroundResourceList = new ArrayList<>();
if (builder.executorProvider.shouldAutoClose()) {
Expand Down Expand Up @@ -212,6 +219,14 @@ public Boolean expired() {
return createTime.plus(streamTTL).compareTo(Instant.now()) < 0;
}

private void setException(Throwable t) {
exceptionLock.lock();
if (this.streamException == null) {
this.streamException = t;
}
exceptionLock.unlock();
}

/**
* Schedules the writing of a message. The write of the message may occur immediately or be
* delayed based on the writer batching options.
Expand Down Expand Up @@ -265,6 +280,33 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
return outstandingAppend.appendResult;
}

/**
* This is the general flush method for asynchronise append operation. When you have outstanding
* append requests, calling flush will make sure all outstanding append requests completed and
* successful. Otherwise there will be an exception thrown.
*
* @throws Exception
*/
public void flushAll(long timeoutMillis) throws Exception {
appendAndRefreshAppendLock.lock();
try {
writeAllOutstanding();
synchronized (messagesWaiter) {
messagesWaiter.waitComplete(timeoutMillis);
yirutang marked this conversation as resolved.
Show resolved Hide resolved
}
} finally {
appendAndRefreshAppendLock.unlock();
}
exceptionLock.lock();
try {
if (streamException != null) {
throw new Exception(streamException);
}
} finally {
exceptionLock.unlock();
}
}

/**
* Flush the rows on a BUFFERED stream, up to the specified offset. After flush, rows will be
* available for read. If no exception is thrown, it means the flush happened.
Expand Down Expand Up @@ -411,14 +453,15 @@ private static final class InflightBatch {
private long expectedOffset;
private Boolean attachSchema;
private String streamName;

private final AtomicBoolean failed;
private final StreamWriter streamWriter;

InflightBatch(
List<AppendRequestAndFutureResponse> inflightRequests,
long batchSizeBytes,
String streamName,
Boolean attachSchema) {
Boolean attachSchema,
StreamWriter streamWriter) {
this.inflightRequests = inflightRequests;
this.offsetList = new ArrayList<Long>(inflightRequests.size());
for (AppendRequestAndFutureResponse request : inflightRequests) {
Expand All @@ -435,6 +478,7 @@ private static final class InflightBatch {
this.attachSchema = attachSchema;
this.streamName = streamName;
this.failed = new AtomicBoolean(false);
this.streamWriter = streamWriter;
}

int count() {
Expand Down Expand Up @@ -482,7 +526,9 @@ private void onFailure(Throwable t) {
return;
} else {
LOG.info("Setting " + t.toString() + " on response");
this.streamWriter.setException(t);
}

for (AppendRequestAndFutureResponse request : inflightRequests) {
request.appendResult.setException(t);
}
Expand Down Expand Up @@ -552,8 +598,12 @@ protected void shutdown() {
currentAlarmFuture.cancel(false);
}
writeAllOutstanding();
synchronized (messagesWaiter) {
messagesWaiter.waitComplete();
try {
synchronized (messagesWaiter) {
messagesWaiter.waitComplete(0);
}
} catch (InterruptedException e) {
LOG.warning("Failed to wait for messages to return " + e.toString());
}
if (clientStream.isSendReady()) {
clientStream.closeSend();
Expand Down Expand Up @@ -820,14 +870,14 @@ public void onStart(StreamController controller) {
private void abortInflightRequests(Throwable t) {
synchronized (this.inflightBatches) {
while (!this.inflightBatches.isEmpty()) {
this.inflightBatches
.poll()
.onFailure(
new AbortedException(
"Request aborted due to previous failures",
t,
GrpcStatusCode.of(Status.Code.ABORTED),
true));
InflightBatch inflightBatch = this.inflightBatches.poll();
inflightBatch.onFailure(
new AbortedException(
"Request aborted due to previous failures",
t,
GrpcStatusCode.of(Status.Code.ABORTED),
true));
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
}
}
}
Expand All @@ -850,13 +900,15 @@ public void onResponse(AppendRowsResponse response) {
streamWriter.getOnSchemaUpdateRunnable(), 0L, TimeUnit.MILLISECONDS);
}
}
// TODO: Deal with in stream errors.
// Currently there is nothing retryable. If the error is already exists, then ignore it.
if (response.hasError()) {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.fromCodeValue(response.getError().getCode())
.withDescription(response.getError().getMessage()));
inflightBatch.onFailure(exception);
if (response.getError().getCode() != 6 /* ALREADY_EXISTS */) {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.fromCodeValue(response.getError().getCode())
.withDescription(response.getError().getMessage()));
inflightBatch.onFailure(exception);
}
}
if (inflightBatch.getExpectedOffset() > 0
&& response.getOffset() != inflightBatch.getExpectedOffset()) {
Expand Down Expand Up @@ -907,30 +959,25 @@ public void onError(Throwable t) {
}
} else {
inflightBatch.onFailure(t);
abortInflightRequests(t);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
}
} catch (IOException | InterruptedException e) {
LOG.info("Got exception while retrying.");
inflightBatch.onFailure(e);
abortInflightRequests(e);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
}
} else {
inflightBatch.onFailure(t);
abortInflightRequests(t);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries = 0;
}
try {
if (!streamWriter.shutdown.get()) {
// Establish a new connection.
streamWriter.refreshAppend();
}
} catch (IOException | InterruptedException e) {
LOG.info("Failed to establish a new connection");
}
}
} finally {
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
Expand All @@ -945,17 +992,21 @@ private static class MessagesBatch {
private final BatchingSettings batchingSettings;
private Boolean attachSchema = true;
private final String streamName;
private final StreamWriter streamWriter;

private MessagesBatch(BatchingSettings batchingSettings, String streamName) {
private MessagesBatch(
BatchingSettings batchingSettings, String streamName, StreamWriter streamWriter) {
this.batchingSettings = batchingSettings;
this.streamName = streamName;
this.streamWriter = streamWriter;
reset();
}

// Get all the messages out in a batch.
private InflightBatch popBatch() {
InflightBatch batch =
new InflightBatch(messages, batchedBytes, this.streamName, this.attachSchema);
new InflightBatch(
messages, batchedBytes, this.streamName, this.attachSchema, this.streamWriter);
this.attachSchema = false;
reset();
return batch;
Expand Down
Expand Up @@ -146,20 +146,22 @@ public void acquire(long messageSize) throws FlowController.FlowControlException
}
}

public synchronized void waitComplete() {
public synchronized void waitComplete(long timeoutMillis) throws InterruptedException {
long end = System.currentTimeMillis() + timeoutMillis;
lock.lock();
try {
while (pendingCount > 0) {
while (pendingCount > 0 && (timeoutMillis == 0 || end > System.currentTimeMillis())) {
lock.unlock();
try {
wait();
wait(timeoutMillis == 0 ? 0 : end - System.currentTimeMillis());
} catch (InterruptedException e) {
LOG.warning("Interrupted while waiting for completion");
throw e;
}
lock.lock();
}
} catch (Exception e) {
LOG.warning(e.toString());
if (pendingCount > 0) {
throw new InterruptedException("Wait timeout");
}
} finally {
lock.unlock();
}
Expand Down
Expand Up @@ -834,4 +834,67 @@ public void testExistingClient() throws Exception {
client.shutdown();
client.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
public void testFlushAll() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(2L)
.setDelayThreshold(Duration.ofSeconds(100000))
.build())
.build();

testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build());

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});

assertFalse(appendFuture3.isDone());
writer.flushAll(100000);

assertTrue(appendFuture3.isDone());

writer.close();
}

@Test
public void testFlushAllFailed() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(2L)
.setDelayThreshold(Duration.ofSeconds(100000))
.build())
.build();

testBigQueryWrite.addException(Status.DATA_LOSS.asException());

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});

assertFalse(appendFuture3.isDone());
try {
writer.flushAll(100000);
fail("Should have thrown an Exception");
} catch (Exception expected) {
if (expected.getCause() instanceof com.google.api.gax.rpc.DataLossException) {
LOG.info("got: " + expected.toString());
} else {
fail("Unexpected exception:" + expected.toString());
}
}

assertTrue(appendFuture3.isDone());

writer.close();
}
}