Skip to content

Commit

Permalink
fix: Add unit test for concurrent issues we worried about, and fix so…
Browse files Browse the repository at this point in the history
…me locking issues (#854)

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .
  • Loading branch information
yirutang committed Mar 2, 2021
1 parent c087677 commit 0870797
Show file tree
Hide file tree
Showing 5 changed files with 723 additions and 87 deletions.
Expand Up @@ -50,12 +50,14 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.concurrent.GuardedBy;
import org.threeten.bp.Duration;

/**
Expand Down Expand Up @@ -100,25 +102,34 @@ public class StreamWriter implements AutoCloseable {

private final Lock messagesBatchLock;
private final Lock appendAndRefreshAppendLock;

@GuardedBy("appendAndRefreshAppendLock")
private final MessagesBatch messagesBatch;

// Indicates if a stream has some non recoverable exception happened.
private final Lock exceptionLock;
private Throwable streamException;
private AtomicReference<Throwable> streamException;

private BackgroundResource backgroundResources;
private List<BackgroundResource> backgroundResourceList;

private BigQueryWriteClient stub;
BidiStreamingCallable<AppendRowsRequest, AppendRowsResponse> bidiStreamingCallable;

@GuardedBy("appendAndRefreshAppendLock")
ClientStream<AppendRowsRequest> clientStream;

private final AppendResponseObserver responseObserver;

private final ScheduledExecutorService executor;

private final AtomicBoolean shutdown;
@GuardedBy("appendAndRefreshAppendLock")
private boolean shutdown;

private final Waiter messagesWaiter;
private final AtomicBoolean activeAlarm;

@GuardedBy("appendAndRefreshAppendLock")
private boolean activeAlarm;

private ScheduledFuture<?> currentAlarmFuture;

private Integer currentRetries = 0;
Expand Down Expand Up @@ -160,9 +171,8 @@ private StreamWriter(Builder builder)
this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName, this);
messagesBatchLock = new ReentrantLock();
appendAndRefreshAppendLock = new ReentrantLock();
activeAlarm = new AtomicBoolean(false);
this.exceptionLock = new ReentrantLock();
this.streamException = null;
activeAlarm = false;
this.streamException = new AtomicReference<Throwable>(null);

executor = builder.executorProvider.getExecutor();
backgroundResourceList = new ArrayList<>();
Expand All @@ -185,7 +195,7 @@ private StreamWriter(Builder builder)
stub = builder.client;
}
backgroundResources = new BackgroundResourceAggregation(backgroundResourceList);
shutdown = new AtomicBoolean(false);
shutdown = false;
if (builder.onSchemaUpdateRunnable != null) {
this.onSchemaUpdateRunnable = builder.onSchemaUpdateRunnable;
this.onSchemaUpdateRunnable.setStreamWriter(this);
Expand Down Expand Up @@ -216,14 +226,6 @@ OnSchemaUpdateRunnable getOnSchemaUpdateRunnable() {
return this.onSchemaUpdateRunnable;
}

private void setException(Throwable t) {
exceptionLock.lock();
if (this.streamException == null) {
this.streamException = t;
}
exceptionLock.unlock();
}

/**
* Schedules the writing of a message. The write of the message may occur immediately or be
* delayed based on the writer batching options.
Expand Down Expand Up @@ -253,27 +255,27 @@ private void setException(Throwable t) {
*/
public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
appendAndRefreshAppendLock.lock();
Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer.");
Preconditions.checkNotNull(message, "Message is null.");
final AppendRequestAndFutureResponse outstandingAppend =
new AppendRequestAndFutureResponse(message);
List<InflightBatch> batchesToSend;
messagesBatchLock.lock();

try {
Preconditions.checkState(!shutdown, "Cannot append on a shut-down writer.");
Preconditions.checkNotNull(message, "Message is null.");
Preconditions.checkState(streamException.get() == null, "Stream already failed.");
final AppendRequestAndFutureResponse outstandingAppend =
new AppendRequestAndFutureResponse(message);
List<InflightBatch> batchesToSend;
batchesToSend = messagesBatch.add(outstandingAppend);
// Setup the next duration based delivery alarm if there are messages batched.
setupAlarm();
if (!batchesToSend.isEmpty()) {
for (final InflightBatch batch : batchesToSend) {
LOG.fine("Scheduling a batch for immediate sending.");
LOG.fine("Scheduling a batch for immediate sending");
writeBatch(batch);
}
}
return outstandingAppend.appendResult;
} finally {
messagesBatchLock.unlock();
appendAndRefreshAppendLock.unlock();
}
return outstandingAppend.appendResult;
}

/**
Expand All @@ -285,9 +287,10 @@ public void refreshAppend() throws InterruptedException {
throw new UnimplementedException(null, GrpcStatusCode.of(Status.Code.UNIMPLEMENTED), false);
}

@GuardedBy("appendAndRefreshAppendLock")
private void setupAlarm() {
if (!messagesBatch.isEmpty()) {
if (!activeAlarm.getAndSet(true)) {
if (!activeAlarm) {
long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis();
LOG.log(Level.FINE, "Setting up alarm for the next {0} ms.", delayThresholdMs);
currentAlarmFuture =
Expand All @@ -296,12 +299,12 @@ private void setupAlarm() {
@Override
public void run() {
LOG.fine("Sending messages based on schedule");
activeAlarm.getAndSet(false);
messagesBatchLock.lock();
appendAndRefreshAppendLock.lock();
activeAlarm = false;
try {
writeBatch(messagesBatch.popBatch());
} finally {
messagesBatchLock.unlock();
appendAndRefreshAppendLock.unlock();
}
}
},
Expand All @@ -310,9 +313,8 @@ public void run() {
}
} else if (currentAlarmFuture != null) {
LOG.log(Level.FINER, "Cancelling alarm, no more messages");
if (activeAlarm.getAndSet(false)) {
currentAlarmFuture.cancel(false);
}
currentAlarmFuture.cancel(false);
activeAlarm = false;
}
}

Expand All @@ -321,27 +323,41 @@ public void run() {
* wait for the send operations to complete. To wait for messages to send, call {@code get} on the
* futures returned from {@code append}.
*/
@GuardedBy("appendAndRefreshAppendLock")
public void writeAllOutstanding() {
InflightBatch unorderedOutstandingBatch = null;
messagesBatchLock.lock();
try {
if (!messagesBatch.isEmpty()) {
writeBatch(messagesBatch.popBatch());
}
messagesBatch.reset();
} finally {
messagesBatchLock.unlock();
if (!messagesBatch.isEmpty()) {
writeBatch(messagesBatch.popBatch());
}
messagesBatch.reset();
}

@GuardedBy("appendAndRefreshAppendLock")
private void writeBatch(final InflightBatch inflightBatch) {
if (inflightBatch != null) {
AppendRowsRequest request = inflightBatch.getMergedRequest();
try {
appendAndRefreshAppendLock.unlock();
messagesWaiter.acquire(inflightBatch.getByteSize());
appendAndRefreshAppendLock.lock();
if (shutdown || streamException.get() != null) {
appendAndRefreshAppendLock.unlock();
messagesWaiter.release(inflightBatch.getByteSize());
appendAndRefreshAppendLock.lock();
inflightBatch.onFailure(
new AbortedException(
shutdown
? "Stream closed, abort append."
: "Stream has previous errors, abort append.",
null,
GrpcStatusCode.of(Status.Code.ABORTED),
true));
return;
}
responseObserver.addInflightBatch(inflightBatch);
clientStream.send(request);
} catch (FlowController.FlowControlException ex) {
appendAndRefreshAppendLock.lock();
inflightBatch.onFailure(ex);
}
}
Expand Down Expand Up @@ -447,9 +463,6 @@ private void onFailure(Throwable t) {
// Error has been set already.
LOG.warning("Ignore " + t.toString() + " since error has already been set");
return;
} else {
LOG.info("Setting " + t.toString() + " on response");
this.streamWriter.setException(t);
}

for (AppendRequestAndFutureResponse request : inflightRequests) {
Expand Down Expand Up @@ -511,26 +524,68 @@ public RetrySettings getRetrySettings() {
* pending messages are lost.
*/
protected void shutdown() {
if (shutdown.getAndSet(true)) {
LOG.fine("Already shutdown.");
return;
}
LOG.fine("Shutdown called on writer");
if (currentAlarmFuture != null && activeAlarm.getAndSet(false)) {
currentAlarmFuture.cancel(false);
}
writeAllOutstanding();
appendAndRefreshAppendLock.lock();
try {
synchronized (messagesWaiter) {
if (shutdown) {
LOG.fine("Already shutdown.");
return;
}
shutdown = true;
LOG.info("Shutdown called on writer: " + streamName);
if (currentAlarmFuture != null && activeAlarm) {
currentAlarmFuture.cancel(false);
activeAlarm = false;
}
// Wait for current inflight to drain.
try {
appendAndRefreshAppendLock.unlock();
messagesWaiter.waitComplete(0);
} catch (InterruptedException e) {
LOG.warning("Failed to wait for messages to return " + e.toString());
}
} catch (InterruptedException e) {
LOG.warning("Failed to wait for messages to return " + e.toString());
}
if (clientStream.isSendReady()) {
clientStream.closeSend();
appendAndRefreshAppendLock.lock();
// Try to send out what's left in batch.
if (!messagesBatch.isEmpty()) {
InflightBatch inflightBatch = messagesBatch.popBatch();
AppendRowsRequest request = inflightBatch.getMergedRequest();
if (streamException.get() != null) {
inflightBatch.onFailure(
new AbortedException(
shutdown
? "Stream closed, abort append."
: "Stream has previous errors, abort append.",
null,
GrpcStatusCode.of(Status.Code.ABORTED),
true));
} else {
try {
appendAndRefreshAppendLock.unlock();
messagesWaiter.acquire(inflightBatch.getByteSize());
appendAndRefreshAppendLock.lock();
responseObserver.addInflightBatch(inflightBatch);
clientStream.send(request);
} catch (FlowController.FlowControlException ex) {
appendAndRefreshAppendLock.lock();
LOG.warning(
"Unexpected flow control exception when sending batch leftover: " + ex.toString());
}
}
}
// Close the stream.
try {
appendAndRefreshAppendLock.unlock();
messagesWaiter.waitComplete(0);
} catch (InterruptedException e) {
LOG.warning("Failed to wait for messages to return " + e.toString());
}
appendAndRefreshAppendLock.lock();
if (clientStream.isSendReady()) {
clientStream.closeSend();
}
backgroundResources.shutdown();
} finally {
appendAndRefreshAppendLock.unlock();
}
backgroundResources.shutdown();
}

/**
Expand Down Expand Up @@ -815,11 +870,12 @@ public void onStart(StreamController controller) {
}

private void abortInflightRequests(Throwable t) {
LOG.fine("Aborting all inflight requests");
synchronized (this.inflightBatches) {
boolean first_error = true;
while (!this.inflightBatches.isEmpty()) {
InflightBatch inflightBatch = this.inflightBatches.poll();
if (first_error) {
if (first_error || t.getCause().getClass() == AbortedException.class) {
inflightBatch.onFailure(t);
first_error = false;
} else {
Expand Down Expand Up @@ -894,7 +950,8 @@ public void onComplete() {

@Override
public void onError(Throwable t) {
LOG.fine("OnError called");
LOG.info("OnError called: " + t.toString());
streamWriter.streamException.set(t);
abortInflightRequests(t);
}
};
Expand All @@ -917,6 +974,7 @@ private MessagesBatch(
}

// Get all the messages out in a batch.
@GuardedBy("appendAndRefreshAppendLock")
private InflightBatch popBatch() {
InflightBatch batch =
new InflightBatch(
Expand Down Expand Up @@ -958,6 +1016,7 @@ private long getMaxBatchBytes() {
// The message batch returned could contain the previous batch of messages plus the current
// message.
// if the message is too large.
@GuardedBy("appendAndRefreshAppendLock")
private List<InflightBatch> add(AppendRequestAndFutureResponse outstandingAppend) {
List<InflightBatch> batchesToSend = new ArrayList<>();
// Check if the next message makes the current batch exceed the max batch byte size.
Expand All @@ -978,7 +1037,6 @@ && getBatchedBytes() + outstandingAppend.messageSize >= getMaxBatchBytes()) {
|| getMessagesCount() == batchingSettings.getElementCountThreshold()) {
batchesToSend.add(popBatch());
}

return batchesToSend;
}
}
Expand Down
Expand Up @@ -66,6 +66,7 @@ private void notifyNextAcquires() {

public synchronized void release(long messageSize) throws IllegalStateException {
lock.lock();
LOG.fine("release: " + pendingCount + " to " + (pendingCount - 1));
--pendingCount;
if (pendingCount < 0) {
throw new IllegalStateException("pendingCount cannot be less than 0");
Expand All @@ -82,6 +83,7 @@ public synchronized void release(long messageSize) throws IllegalStateException
public void acquire(long messageSize) throws FlowController.FlowControlException {
lock.lock();
try {
LOG.fine("acquire " + pendingCount + " to " + (pendingCount + 1));
if (pendingCount >= countLimit
&& behavior == FlowController.LimitExceededBehavior.ThrowException) {
throw new FlowController.MaxOutstandingElementCountReachedException(countLimit);
Expand Down
Expand Up @@ -39,6 +39,10 @@ public List<AbstractMessage> getRequests() {
return new LinkedList<AbstractMessage>(serviceImpl.getCapturedRequests());
}

public void waitForResponseScheduled() throws InterruptedException {
serviceImpl.waitForResponseScheduled();
}

public List<AppendRowsRequest> getAppendRequests() {
return serviceImpl.getCapturedRequests();
}
Expand Down

0 comments on commit 0870797

Please sign in to comment.