From fb1fdc4bda549bf457e9ac08a9b020544ec50504 Mon Sep 17 00:00:00 2001 From: Olav Loite Date: Fri, 28 Feb 2020 21:54:49 +0100 Subject: [PATCH] tests: fix flaky tests --- .../google/cloud/spanner/AsyncRunnerTest.java | 9 +-- .../google/cloud/spanner/ReadAsyncTest.java | 59 ++++++++++++------- .../cloud/spanner/it/ITAsyncExamplesTest.java | 43 +++++++++----- 3 files changed, 71 insertions(+), 40 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java index d8267dcb4e..b20cd3b7a8 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java @@ -258,14 +258,15 @@ public void asyncRunnerCommitAborted() throws Exception { new AsyncWork() { @Override public ApiFuture doWorkAsync(TransactionContext txn) { - ApiFuture updateCount = txn.executeUpdateAsync(UPDATE_STATEMENT); - if (attempt.incrementAndGet() == 1) { - mockSpanner.abortTransaction(txn); - } else { + if (attempt.get() > 0) { // Set the result of the update statement back to 1 row. mockSpanner.putStatementResult( StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); } + ApiFuture updateCount = txn.executeUpdateAsync(UPDATE_STATEMENT); + if (attempt.incrementAndGet() == 1) { + mockSpanner.abortTransaction(txn); + } return updateCount; } }, diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadAsyncTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadAsyncTest.java index 53e5041891..944a7d70f5 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadAsyncTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadAsyncTest.java @@ -42,9 +42,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.Deque; import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -88,7 +90,7 @@ public static void setup() throws Exception { .build() .start(); channelProvider = LocalChannelProvider.create(uniqueName); - executor = Executors.newSingleThreadExecutor(); + executor = Executors.newScheduledThreadPool(8); } @AfterClass @@ -350,72 +352,85 @@ public void pauseResume() throws Exception { StatementResult.query( evenStatement, generateKeyValueResultSet(ImmutableSet.of(2, 4, 6, 8, 10)))); + final Object lock = new Object(); final SettableApiFuture evenFinished = SettableApiFuture.create(); final SettableApiFuture unevenFinished = SettableApiFuture.create(); - final List allValues = new LinkedList<>(); + final CountDownLatch unevenReturnedFirstRow = new CountDownLatch(1); + final Deque allValues = new ConcurrentLinkedDeque<>(); try (ReadOnlyTransaction tx = client.readOnlyTransaction()) { try (AsyncResultSet evenRs = tx.executeQueryAsync(evenStatement); AsyncResultSet unevenRs = tx.executeQueryAsync(unevenStatement)) { - evenRs.setCallback( + unevenRs.setCallback( executor, new ReadyCallback() { - private boolean firstRow = true; - @Override public CallbackResponse cursorReady(AsyncResultSet resultSet) { - if (firstRow) { - // Make sure the uneven result set returns the first result. - firstRow = false; - return CallbackResponse.PAUSE; - } try { while (true) { switch (resultSet.tryNext()) { case DONE: - evenFinished.set(true); + unevenFinished.set(true); return CallbackResponse.DONE; case NOT_READY: return CallbackResponse.CONTINUE; case OK: - allValues.add(resultSet.getString("Value")); + synchronized (lock) { + allValues.add(resultSet.getString("Value")); + } + unevenReturnedFirstRow.countDown(); return CallbackResponse.PAUSE; } } } catch (Throwable t) { - evenFinished.setException(t); + unevenFinished.setException(t); return CallbackResponse.DONE; - } finally { - unevenRs.resume(); } } }); - - unevenRs.setCallback( + evenRs.setCallback( executor, new ReadyCallback() { @Override public CallbackResponse cursorReady(AsyncResultSet resultSet) { try { + // Make sure the uneven result set has returned the first before we start the even + // results. + unevenReturnedFirstRow.await(); while (true) { switch (resultSet.tryNext()) { case DONE: - unevenFinished.set(true); + evenFinished.set(true); return CallbackResponse.DONE; case NOT_READY: return CallbackResponse.CONTINUE; case OK: - allValues.add(resultSet.getString("Value")); + synchronized (lock) { + allValues.add(resultSet.getString("Value")); + } return CallbackResponse.PAUSE; } } } catch (Throwable t) { - unevenFinished.setException(t); + evenFinished.setException(t); return CallbackResponse.DONE; - } finally { - evenRs.resume(); } } }); + while (!(evenFinished.isDone() && unevenFinished.isDone())) { + synchronized (lock) { + if (allValues.peekLast() != null) { + if (Integer.valueOf(allValues.peekLast().substring(1)) % 2 == 1) { + evenRs.resume(); + } else { + unevenRs.resume(); + } + } + if (allValues.size() == 10) { + unevenRs.resume(); + evenRs.resume(); + } + } + } } } assertThat(ApiFutures.allAsList(Arrays.asList(evenFinished, unevenFinished)).get()) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java index 1f9b0dd81d..c328f16ef9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java @@ -48,6 +48,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.Deque; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -343,9 +344,11 @@ public void pauseResume() throws Exception { Statement.of( "SELECT * FROM TestTable WHERE MOD(CAST(SUBSTR(Key, 2) AS INT64), 2) = 0 ORDER BY CAST(SUBSTR(Key, 2) AS INT64)"); + final Object lock = new Object(); final SettableApiFuture evenFinished = SettableApiFuture.create(); final SettableApiFuture unevenFinished = SettableApiFuture.create(); - final List allValues = new LinkedList<>(); + final CountDownLatch evenReturnedFirstRow = new CountDownLatch(1); + final Deque allValues = new LinkedList<>(); try (ReadOnlyTransaction tx = client.readOnlyTransaction()) { try (AsyncResultSet evenRs = tx.executeQueryAsync(evenStatement); AsyncResultSet unevenRs = tx.executeQueryAsync(unevenStatement)) { @@ -363,15 +366,16 @@ public CallbackResponse cursorReady(AsyncResultSet resultSet) { case NOT_READY: return CallbackResponse.CONTINUE; case OK: - allValues.add(resultSet.getString("StringValue")); + synchronized (lock) { + allValues.add(resultSet.getString("StringValue")); + } + evenReturnedFirstRow.countDown(); return CallbackResponse.PAUSE; } } } catch (Throwable t) { evenFinished.setException(t); return CallbackResponse.DONE; - } finally { - unevenRs.resume(); } } }); @@ -379,16 +383,12 @@ public CallbackResponse cursorReady(AsyncResultSet resultSet) { unevenRs.setCallback( executor, new ReadyCallback() { - private boolean firstRow = true; - @Override public CallbackResponse cursorReady(AsyncResultSet resultSet) { - if (firstRow) { - // Make sure the even result set returns the first result. - firstRow = false; - return CallbackResponse.PAUSE; - } try { + // Make sure the even result set has returned the first before we start the uneven + // results. + evenReturnedFirstRow.await(); while (true) { switch (resultSet.tryNext()) { case DONE: @@ -397,18 +397,33 @@ public CallbackResponse cursorReady(AsyncResultSet resultSet) { case NOT_READY: return CallbackResponse.CONTINUE; case OK: - allValues.add(resultSet.getString("StringValue")); + synchronized (lock) { + allValues.add(resultSet.getString("StringValue")); + } return CallbackResponse.PAUSE; } } } catch (Throwable t) { unevenFinished.setException(t); return CallbackResponse.DONE; - } finally { - evenRs.resume(); } } }); + while (!(evenFinished.isDone() && unevenFinished.isDone())) { + synchronized (lock) { + if (allValues.peekLast() != null) { + if (Integer.valueOf(allValues.peekLast().substring(1)) % 2 == 1) { + evenRs.resume(); + } else { + unevenRs.resume(); + } + } + if (allValues.size() == 15) { + unevenRs.resume(); + evenRs.resume(); + } + } + } } } assertThat(ApiFutures.allAsList(Arrays.asList(evenFinished, unevenFinished)).get())