Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add unit test for concurrent issues we worried about, and fix some locking issues #854

Merged
merged 40 commits into from Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -50,12 +50,14 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.concurrent.GuardedBy;
import org.threeten.bp.Duration;

/**
Expand Down Expand Up @@ -100,25 +102,34 @@ public class StreamWriter implements AutoCloseable {

private final Lock messagesBatchLock;
private final Lock appendAndRefreshAppendLock;

@GuardedBy("appendAndRefreshAppendLock")
private final MessagesBatch messagesBatch;

// Indicates if a stream has some non recoverable exception happened.
private final Lock exceptionLock;
private Throwable streamException;
private AtomicReference<Throwable> streamException;

private BackgroundResource backgroundResources;
private List<BackgroundResource> backgroundResourceList;

private BigQueryWriteClient stub;
BidiStreamingCallable<AppendRowsRequest, AppendRowsResponse> bidiStreamingCallable;

@GuardedBy("appendAndRefreshAppendLock")
ClientStream<AppendRowsRequest> clientStream;

private final AppendResponseObserver responseObserver;

private final ScheduledExecutorService executor;

private final AtomicBoolean shutdown;
@GuardedBy("appendAndRefreshAppendLock")
private boolean shutdown;

private final Waiter messagesWaiter;
private final AtomicBoolean activeAlarm;

@GuardedBy("appendAndRefreshAppendLock")
private boolean activeAlarm;

private ScheduledFuture<?> currentAlarmFuture;

private Integer currentRetries = 0;
Expand Down Expand Up @@ -160,9 +171,8 @@ private StreamWriter(Builder builder)
this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName, this);
messagesBatchLock = new ReentrantLock();
appendAndRefreshAppendLock = new ReentrantLock();
activeAlarm = new AtomicBoolean(false);
this.exceptionLock = new ReentrantLock();
this.streamException = null;
activeAlarm = false;
this.streamException = new AtomicReference<Throwable>(null);

executor = builder.executorProvider.getExecutor();
backgroundResourceList = new ArrayList<>();
Expand All @@ -185,7 +195,7 @@ private StreamWriter(Builder builder)
stub = builder.client;
}
backgroundResources = new BackgroundResourceAggregation(backgroundResourceList);
shutdown = new AtomicBoolean(false);
shutdown = false;
if (builder.onSchemaUpdateRunnable != null) {
this.onSchemaUpdateRunnable = builder.onSchemaUpdateRunnable;
this.onSchemaUpdateRunnable.setStreamWriter(this);
Expand Down Expand Up @@ -216,14 +226,6 @@ OnSchemaUpdateRunnable getOnSchemaUpdateRunnable() {
return this.onSchemaUpdateRunnable;
}

private void setException(Throwable t) {
exceptionLock.lock();
if (this.streamException == null) {
this.streamException = t;
}
exceptionLock.unlock();
}

/**
* Schedules the writing of a message. The write of the message may occur immediately or be
* delayed based on the writer batching options.
Expand Down Expand Up @@ -253,27 +255,27 @@ private void setException(Throwable t) {
*/
public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
appendAndRefreshAppendLock.lock();
Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer.");
Preconditions.checkNotNull(message, "Message is null.");
final AppendRequestAndFutureResponse outstandingAppend =
new AppendRequestAndFutureResponse(message);
List<InflightBatch> batchesToSend;
messagesBatchLock.lock();

try {
Preconditions.checkState(!shutdown, "Cannot append on a shut-down writer.");
Preconditions.checkNotNull(message, "Message is null.");
Preconditions.checkState(streamException.get() == null, "Stream already failed.");
final AppendRequestAndFutureResponse outstandingAppend =
new AppendRequestAndFutureResponse(message);
List<InflightBatch> batchesToSend;
batchesToSend = messagesBatch.add(outstandingAppend);
// Setup the next duration based delivery alarm if there are messages batched.
setupAlarm();
if (!batchesToSend.isEmpty()) {
for (final InflightBatch batch : batchesToSend) {
LOG.fine("Scheduling a batch for immediate sending.");
LOG.fine("Scheduling a batch for immediate sending");
writeBatch(batch);
}
}
return outstandingAppend.appendResult;
} finally {
messagesBatchLock.unlock();
appendAndRefreshAppendLock.unlock();
}
return outstandingAppend.appendResult;
}

/**
Expand All @@ -285,9 +287,10 @@ public void refreshAppend() throws InterruptedException {
throw new UnimplementedException(null, GrpcStatusCode.of(Status.Code.UNIMPLEMENTED), false);
}

@GuardedBy("appendAndRefreshAppendLock")
private void setupAlarm() {
if (!messagesBatch.isEmpty()) {
if (!activeAlarm.getAndSet(true)) {
if (!activeAlarm) {
long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis();
LOG.log(Level.FINE, "Setting up alarm for the next {0} ms.", delayThresholdMs);
currentAlarmFuture =
Expand All @@ -296,12 +299,12 @@ private void setupAlarm() {
@Override
public void run() {
LOG.fine("Sending messages based on schedule");
activeAlarm.getAndSet(false);
messagesBatchLock.lock();
appendAndRefreshAppendLock.lock();
activeAlarm = false;
try {
writeBatch(messagesBatch.popBatch());
} finally {
messagesBatchLock.unlock();
appendAndRefreshAppendLock.unlock();
}
}
},
Expand All @@ -310,9 +313,8 @@ public void run() {
}
} else if (currentAlarmFuture != null) {
LOG.log(Level.FINER, "Cancelling alarm, no more messages");
if (activeAlarm.getAndSet(false)) {
currentAlarmFuture.cancel(false);
}
currentAlarmFuture.cancel(false);
activeAlarm = false;
}
}

Expand All @@ -321,27 +323,41 @@ public void run() {
* wait for the send operations to complete. To wait for messages to send, call {@code get} on the
* futures returned from {@code append}.
*/
@GuardedBy("appendAndRefreshAppendLock")
public void writeAllOutstanding() {
InflightBatch unorderedOutstandingBatch = null;
messagesBatchLock.lock();
try {
if (!messagesBatch.isEmpty()) {
writeBatch(messagesBatch.popBatch());
}
messagesBatch.reset();
} finally {
messagesBatchLock.unlock();
if (!messagesBatch.isEmpty()) {
writeBatch(messagesBatch.popBatch());
}
messagesBatch.reset();
}

@GuardedBy("appendAndRefreshAppendLock")
private void writeBatch(final InflightBatch inflightBatch) {
if (inflightBatch != null) {
AppendRowsRequest request = inflightBatch.getMergedRequest();
try {
appendAndRefreshAppendLock.unlock();
messagesWaiter.acquire(inflightBatch.getByteSize());
appendAndRefreshAppendLock.lock();
if (shutdown || streamException.get() != null) {
appendAndRefreshAppendLock.unlock();
messagesWaiter.release(inflightBatch.getByteSize());
appendAndRefreshAppendLock.lock();
inflightBatch.onFailure(
new AbortedException(
shutdown
? "Stream closed, abort append."
: "Stream has previous errors, abort append.",
null,
GrpcStatusCode.of(Status.Code.ABORTED),
true));
return;
}
responseObserver.addInflightBatch(inflightBatch);
clientStream.send(request);
} catch (FlowController.FlowControlException ex) {
appendAndRefreshAppendLock.lock();
inflightBatch.onFailure(ex);
}
}
Expand Down Expand Up @@ -447,9 +463,6 @@ private void onFailure(Throwable t) {
// Error has been set already.
LOG.warning("Ignore " + t.toString() + " since error has already been set");
return;
} else {
LOG.info("Setting " + t.toString() + " on response");
this.streamWriter.setException(t);
}

for (AppendRequestAndFutureResponse request : inflightRequests) {
Expand Down Expand Up @@ -511,26 +524,68 @@ public RetrySettings getRetrySettings() {
* pending messages are lost.
*/
protected void shutdown() {
if (shutdown.getAndSet(true)) {
LOG.fine("Already shutdown.");
return;
}
LOG.fine("Shutdown called on writer");
if (currentAlarmFuture != null && activeAlarm.getAndSet(false)) {
currentAlarmFuture.cancel(false);
}
writeAllOutstanding();
appendAndRefreshAppendLock.lock();
try {
synchronized (messagesWaiter) {
if (shutdown) {
LOG.fine("Already shutdown.");
return;
}
shutdown = true;
LOG.info("Shutdown called on writer: " + streamName);
if (currentAlarmFuture != null && activeAlarm) {
currentAlarmFuture.cancel(false);
activeAlarm = false;
}
// Wait for current inflight to drain.
try {
appendAndRefreshAppendLock.unlock();
messagesWaiter.waitComplete(0);
} catch (InterruptedException e) {
LOG.warning("Failed to wait for messages to return " + e.toString());
}
} catch (InterruptedException e) {
LOG.warning("Failed to wait for messages to return " + e.toString());
}
if (clientStream.isSendReady()) {
clientStream.closeSend();
appendAndRefreshAppendLock.lock();
// Try to send out what's left in batch.
if (!messagesBatch.isEmpty()) {
InflightBatch inflightBatch = messagesBatch.popBatch();
AppendRowsRequest request = inflightBatch.getMergedRequest();
if (streamException.get() != null) {
inflightBatch.onFailure(
new AbortedException(
shutdown
? "Stream closed, abort append."
: "Stream has previous errors, abort append.",
null,
GrpcStatusCode.of(Status.Code.ABORTED),
true));
} else {
try {
appendAndRefreshAppendLock.unlock();
messagesWaiter.acquire(inflightBatch.getByteSize());
appendAndRefreshAppendLock.lock();
responseObserver.addInflightBatch(inflightBatch);
clientStream.send(request);
} catch (FlowController.FlowControlException ex) {
appendAndRefreshAppendLock.lock();
LOG.warning(
"Unexpected flow control exception when sending batch leftover: " + ex.toString());
}
}
}
// Close the stream.
try {
appendAndRefreshAppendLock.unlock();
messagesWaiter.waitComplete(0);
} catch (InterruptedException e) {
LOG.warning("Failed to wait for messages to return " + e.toString());
}
appendAndRefreshAppendLock.lock();
if (clientStream.isSendReady()) {
clientStream.closeSend();
}
backgroundResources.shutdown();
} finally {
appendAndRefreshAppendLock.unlock();
}
backgroundResources.shutdown();
}

/**
Expand Down Expand Up @@ -815,11 +870,12 @@ public void onStart(StreamController controller) {
}

private void abortInflightRequests(Throwable t) {
LOG.fine("Aborting all inflight requests");
synchronized (this.inflightBatches) {
boolean first_error = true;
while (!this.inflightBatches.isEmpty()) {
InflightBatch inflightBatch = this.inflightBatches.poll();
if (first_error) {
if (first_error || t.getCause().getClass() == AbortedException.class) {
inflightBatch.onFailure(t);
first_error = false;
} else {
Expand Down Expand Up @@ -894,7 +950,8 @@ public void onComplete() {

@Override
public void onError(Throwable t) {
LOG.fine("OnError called");
LOG.info("OnError called: " + t.toString());
streamWriter.streamException.set(t);
abortInflightRequests(t);
}
};
Expand All @@ -917,6 +974,7 @@ private MessagesBatch(
}

// Get all the messages out in a batch.
@GuardedBy("appendAndRefreshAppendLock")
private InflightBatch popBatch() {
InflightBatch batch =
new InflightBatch(
Expand Down Expand Up @@ -958,6 +1016,7 @@ private long getMaxBatchBytes() {
// The message batch returned could contain the previous batch of messages plus the current
// message.
// if the message is too large.
@GuardedBy("appendAndRefreshAppendLock")
private List<InflightBatch> add(AppendRequestAndFutureResponse outstandingAppend) {
List<InflightBatch> batchesToSend = new ArrayList<>();
// Check if the next message makes the current batch exceed the max batch byte size.
Expand All @@ -978,7 +1037,6 @@ && getBatchedBytes() + outstandingAppend.messageSize >= getMaxBatchBytes()) {
|| getMessagesCount() == batchingSettings.getElementCountThreshold()) {
batchesToSend.add(popBatch());
}

return batchesToSend;
}
}
Expand Down
Expand Up @@ -66,6 +66,7 @@ private void notifyNextAcquires() {

public synchronized void release(long messageSize) throws IllegalStateException {
lock.lock();
LOG.fine("release: " + pendingCount + " to " + (pendingCount - 1));
--pendingCount;
if (pendingCount < 0) {
throw new IllegalStateException("pendingCount cannot be less than 0");
Expand All @@ -82,6 +83,7 @@ public synchronized void release(long messageSize) throws IllegalStateException
public void acquire(long messageSize) throws FlowController.FlowControlException {
lock.lock();
try {
LOG.fine("acquire " + pendingCount + " to " + (pendingCount + 1));
if (pendingCount >= countLimit
&& behavior == FlowController.LimitExceededBehavior.ThrowException) {
throw new FlowController.MaxOutstandingElementCountReachedException(countLimit);
Expand Down
Expand Up @@ -39,6 +39,10 @@ public List<AbstractMessage> getRequests() {
return new LinkedList<AbstractMessage>(serviceImpl.getCapturedRequests());
}

public void waitForResponseScheduled() throws InterruptedException {
serviceImpl.waitForResponseScheduled();
}

public List<AppendRowsRequest> getAppendRequests() {
return serviceImpl.getCapturedRequests();
}
Expand Down