Skip to content

Commit

Permalink
tests: fix flaky tests
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Feb 28, 2020
1 parent 81f9ecc commit fb1fdc4
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 40 deletions.
Expand Up @@ -258,14 +258,15 @@ public void asyncRunnerCommitAborted() throws Exception {
new AsyncWork<Long>() {
@Override
public ApiFuture<Long> doWorkAsync(TransactionContext txn) {
ApiFuture<Long> updateCount = txn.executeUpdateAsync(UPDATE_STATEMENT);
if (attempt.incrementAndGet() == 1) {
mockSpanner.abortTransaction(txn);
} else {
if (attempt.get() > 0) {
// Set the result of the update statement back to 1 row.
mockSpanner.putStatementResult(
StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT));
}
ApiFuture<Long> updateCount = txn.executeUpdateAsync(UPDATE_STATEMENT);
if (attempt.incrementAndGet() == 1) {
mockSpanner.abortTransaction(txn);
}
return updateCount;
}
},
Expand Down
Expand Up @@ -42,9 +42,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -88,7 +90,7 @@ public static void setup() throws Exception {
.build()
.start();
channelProvider = LocalChannelProvider.create(uniqueName);
executor = Executors.newSingleThreadExecutor();
executor = Executors.newScheduledThreadPool(8);
}

@AfterClass
Expand Down Expand Up @@ -350,72 +352,85 @@ public void pauseResume() throws Exception {
StatementResult.query(
evenStatement, generateKeyValueResultSet(ImmutableSet.of(2, 4, 6, 8, 10))));

final Object lock = new Object();
final SettableApiFuture<Boolean> evenFinished = SettableApiFuture.create();
final SettableApiFuture<Boolean> unevenFinished = SettableApiFuture.create();
final List<String> allValues = new LinkedList<>();
final CountDownLatch unevenReturnedFirstRow = new CountDownLatch(1);
final Deque<String> allValues = new ConcurrentLinkedDeque<>();
try (ReadOnlyTransaction tx = client.readOnlyTransaction()) {
try (AsyncResultSet evenRs = tx.executeQueryAsync(evenStatement);
AsyncResultSet unevenRs = tx.executeQueryAsync(unevenStatement)) {
evenRs.setCallback(
unevenRs.setCallback(
executor,
new ReadyCallback() {
private boolean firstRow = true;

@Override
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
if (firstRow) {
// Make sure the uneven result set returns the first result.
firstRow = false;
return CallbackResponse.PAUSE;
}
try {
while (true) {
switch (resultSet.tryNext()) {
case DONE:
evenFinished.set(true);
unevenFinished.set(true);
return CallbackResponse.DONE;
case NOT_READY:
return CallbackResponse.CONTINUE;
case OK:
allValues.add(resultSet.getString("Value"));
synchronized (lock) {
allValues.add(resultSet.getString("Value"));
}
unevenReturnedFirstRow.countDown();
return CallbackResponse.PAUSE;
}
}
} catch (Throwable t) {
evenFinished.setException(t);
unevenFinished.setException(t);
return CallbackResponse.DONE;
} finally {
unevenRs.resume();
}
}
});

unevenRs.setCallback(
evenRs.setCallback(
executor,
new ReadyCallback() {
@Override
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
try {
// Make sure the uneven result set has returned the first before we start the even
// results.
unevenReturnedFirstRow.await();
while (true) {
switch (resultSet.tryNext()) {
case DONE:
unevenFinished.set(true);
evenFinished.set(true);
return CallbackResponse.DONE;
case NOT_READY:
return CallbackResponse.CONTINUE;
case OK:
allValues.add(resultSet.getString("Value"));
synchronized (lock) {
allValues.add(resultSet.getString("Value"));
}
return CallbackResponse.PAUSE;
}
}
} catch (Throwable t) {
unevenFinished.setException(t);
evenFinished.setException(t);
return CallbackResponse.DONE;
} finally {
evenRs.resume();
}
}
});
while (!(evenFinished.isDone() && unevenFinished.isDone())) {
synchronized (lock) {
if (allValues.peekLast() != null) {
if (Integer.valueOf(allValues.peekLast().substring(1)) % 2 == 1) {
evenRs.resume();
} else {
unevenRs.resume();
}
}
if (allValues.size() == 10) {
unevenRs.resume();
evenRs.resume();
}
}
}
}
}
assertThat(ApiFutures.allAsList(Arrays.asList(evenFinished, unevenFinished)).get())
Expand Down
Expand Up @@ -48,6 +48,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -343,9 +344,11 @@ public void pauseResume() throws Exception {
Statement.of(
"SELECT * FROM TestTable WHERE MOD(CAST(SUBSTR(Key, 2) AS INT64), 2) = 0 ORDER BY CAST(SUBSTR(Key, 2) AS INT64)");

final Object lock = new Object();
final SettableApiFuture<Boolean> evenFinished = SettableApiFuture.create();
final SettableApiFuture<Boolean> unevenFinished = SettableApiFuture.create();
final List<String> allValues = new LinkedList<>();
final CountDownLatch evenReturnedFirstRow = new CountDownLatch(1);
final Deque<String> allValues = new LinkedList<>();
try (ReadOnlyTransaction tx = client.readOnlyTransaction()) {
try (AsyncResultSet evenRs = tx.executeQueryAsync(evenStatement);
AsyncResultSet unevenRs = tx.executeQueryAsync(unevenStatement)) {
Expand All @@ -363,32 +366,29 @@ public CallbackResponse cursorReady(AsyncResultSet resultSet) {
case NOT_READY:
return CallbackResponse.CONTINUE;
case OK:
allValues.add(resultSet.getString("StringValue"));
synchronized (lock) {
allValues.add(resultSet.getString("StringValue"));
}
evenReturnedFirstRow.countDown();
return CallbackResponse.PAUSE;
}
}
} catch (Throwable t) {
evenFinished.setException(t);
return CallbackResponse.DONE;
} finally {
unevenRs.resume();
}
}
});

unevenRs.setCallback(
executor,
new ReadyCallback() {
private boolean firstRow = true;

@Override
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
if (firstRow) {
// Make sure the even result set returns the first result.
firstRow = false;
return CallbackResponse.PAUSE;
}
try {
// Make sure the even result set has returned the first before we start the uneven
// results.
evenReturnedFirstRow.await();
while (true) {
switch (resultSet.tryNext()) {
case DONE:
Expand All @@ -397,18 +397,33 @@ public CallbackResponse cursorReady(AsyncResultSet resultSet) {
case NOT_READY:
return CallbackResponse.CONTINUE;
case OK:
allValues.add(resultSet.getString("StringValue"));
synchronized (lock) {
allValues.add(resultSet.getString("StringValue"));
}
return CallbackResponse.PAUSE;
}
}
} catch (Throwable t) {
unevenFinished.setException(t);
return CallbackResponse.DONE;
} finally {
evenRs.resume();
}
}
});
while (!(evenFinished.isDone() && unevenFinished.isDone())) {
synchronized (lock) {
if (allValues.peekLast() != null) {
if (Integer.valueOf(allValues.peekLast().substring(1)) % 2 == 1) {
evenRs.resume();
} else {
unevenRs.resume();
}
}
if (allValues.size() == 15) {
unevenRs.resume();
evenRs.resume();
}
}
}
}
}
assertThat(ApiFutures.allAsList(Arrays.asList(evenFinished, unevenFinished)).get())
Expand Down

0 comments on commit fb1fdc4

Please sign in to comment.