diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java index 72d537f11a..d202f5d5eb 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java @@ -46,9 +46,7 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; @@ -56,7 +54,6 @@ @RunWith(Parameterized.class) public class RetryOnInvalidatedSessionTest { - @Rule public ExpectedException expected = ExpectedException.none(); @Parameter(0) public boolean failOnInvalidatedSession; @@ -206,7 +203,7 @@ private static void initReadWriteSessionPool() throws InterruptedException { // Wait for at least one read/write session to be ready. Stopwatch watch = Stopwatch.createStarted(); while (((DatabaseClientImpl) client).pool.getNumberOfAvailableWritePreparedSessions() == 0) { - if (watch.elapsed(TimeUnit.MILLISECONDS) > 1000L) { + if (watch.elapsed(TimeUnit.SECONDS) > 5L) { fail("No read/write sessions prepared"); } Thread.sleep(5L); @@ -222,8 +219,8 @@ private static void invalidateSessionPool(DatabaseClient client, int minSessions // Wait for all sessions to have been created, and then delete them. Stopwatch watch = Stopwatch.createStarted(); while (((DatabaseClientImpl) client).pool.totalSessions() < minSessions) { - if (watch.elapsed(TimeUnit.MILLISECONDS) > 1000L) { - fail("MinSessions not created"); + if (watch.elapsed(TimeUnit.SECONDS) > 5L) { + fail(String.format("Failed to create MinSessions=%d", minSessions)); } Thread.sleep(5L); } @@ -237,28 +234,27 @@ private static void invalidateSessionPool(DatabaseClient client, int minSessions @Test public void singleUseSelect() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } invalidateSessionPool(); - // This call will receive an invalidated session that will be replaced on the first call to - // rs.next(). - int count = 0; - try (ReadContext context = client.singleUse()) { - try (ResultSet rs = context.executeQuery(SELECT1AND2)) { - while (rs.next()) { - count++; + try { + // This call will receive an invalidated session that will be replaced on the first call to + // rs.next(). + int count = 0; + try (ReadContext context = client.singleUse()) { + try (ResultSet rs = context.executeQuery(SELECT1AND2)) { + while (rs.next()) { + count++; + } } } + assertThat(count).isEqualTo(2); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } - assertThat(count).isEqualTo(2); } @Test public void singleUseRead() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } invalidateSessionPool(); int count = 0; try (ReadContext context = client.singleUse()) { @@ -268,14 +264,14 @@ public void singleUseRead() throws InterruptedException { } } assertThat(count).isEqualTo(2); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } } @Test public void singleUseReadUsingIndex() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } invalidateSessionPool(); int count = 0; try (ReadContext context = client.singleUse()) { @@ -286,38 +282,38 @@ public void singleUseReadUsingIndex() throws InterruptedException { } } assertThat(count).isEqualTo(2); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } } @Test public void singleUseReadRow() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } invalidateSessionPool(); try (ReadContext context = client.singleUse()) { Struct row = context.readRow("FOO", Key.of(), Arrays.asList("BAR")); assertThat(row.getLong(0)).isEqualTo(1L); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } } @Test public void singleUseReadRowUsingIndex() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } invalidateSessionPool(); try (ReadContext context = client.singleUse()) { Struct row = context.readRowUsingIndex("FOO", "IDX", Key.of(), Arrays.asList("BAR")); assertThat(row.getLong(0)).isEqualTo(1L); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } } @Test public void singleUseReadOnlyTransactionSelect() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } invalidateSessionPool(); int count = 0; try (ReadContext context = client.singleUseReadOnlyTransaction()) { @@ -326,15 +322,15 @@ public void singleUseReadOnlyTransactionSelect() throws InterruptedException { count++; } } + assertThat(count).isEqualTo(2); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } - assertThat(count).isEqualTo(2); } @Test public void singleUseReadOnlyTransactionRead() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } invalidateSessionPool(); int count = 0; try (ReadContext context = client.singleUseReadOnlyTransaction()) { @@ -344,14 +340,14 @@ public void singleUseReadOnlyTransactionRead() throws InterruptedException { } } assertThat(count).isEqualTo(2); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } } @Test public void singlUseReadOnlyTransactionReadUsingIndex() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } invalidateSessionPool(); int count = 0; try (ReadContext context = client.singleUseReadOnlyTransaction()) { @@ -362,38 +358,38 @@ public void singlUseReadOnlyTransactionReadUsingIndex() throws InterruptedExcept } } assertThat(count).isEqualTo(2); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } } @Test public void singleUseReadOnlyTransactionReadRow() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } invalidateSessionPool(); try (ReadContext context = client.singleUseReadOnlyTransaction()) { Struct row = context.readRow("FOO", Key.of(), Arrays.asList("BAR")); assertThat(row.getLong(0)).isEqualTo(1L); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } } @Test public void singleUseReadOnlyTransactionReadRowUsingIndex() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } invalidateSessionPool(); try (ReadContext context = client.singleUseReadOnlyTransaction()) { Struct row = context.readRowUsingIndex("FOO", "IDX", Key.of(), Arrays.asList("BAR")); assertThat(row.getLong(0)).isEqualTo(1L); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } } @Test public void readOnlyTransactionSelect() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } invalidateSessionPool(); int count = 0; try (ReadContext context = client.readOnlyTransaction()) { @@ -403,14 +399,14 @@ public void readOnlyTransactionSelect() throws InterruptedException { } } assertThat(count).isEqualTo(2); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } } @Test public void readOnlyTransactionRead() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } invalidateSessionPool(); int count = 0; try (ReadContext context = client.readOnlyTransaction()) { @@ -420,14 +416,14 @@ public void readOnlyTransactionRead() throws InterruptedException { } } assertThat(count).isEqualTo(2); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } } @Test public void readOnlyTransactionReadUsingIndex() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } invalidateSessionPool(); int count = 0; try (ReadContext context = client.readOnlyTransaction()) { @@ -438,30 +434,33 @@ public void readOnlyTransactionReadUsingIndex() throws InterruptedException { } } assertThat(count).isEqualTo(2); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } } @Test public void readOnlyTransactionReadRow() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } invalidateSessionPool(); try (ReadContext context = client.readOnlyTransaction()) { Struct row = context.readRow("FOO", Key.of(), Arrays.asList("BAR")); assertThat(row.getLong(0)).isEqualTo(1L); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } } @Test public void readOnlyTransactionReadRowUsingIndex() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } invalidateSessionPool(); try (ReadContext context = client.readOnlyTransaction()) { Struct row = context.readRowUsingIndex("FOO", "IDX", Key.of(), Arrays.asList("BAR")); assertThat(row.getLong(0)).isEqualTo(1L); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } } @@ -589,337 +588,367 @@ public Integer run(TransactionContext transaction) throws Exception { @Test public void readWriteTransactionSelect() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - TransactionRunner runner = client.readWriteTransaction(); - int count = - runner.run( - new TransactionCallable() { - @Override - public Integer run(TransactionContext transaction) throws Exception { - int count = 0; - try (ResultSet rs = transaction.executeQuery(SELECT1AND2)) { - while (rs.next()) { - count++; + try { + TransactionRunner runner = client.readWriteTransaction(); + int count = + runner.run( + new TransactionCallable() { + @Override + public Integer run(TransactionContext transaction) throws Exception { + int count = 0; + try (ResultSet rs = transaction.executeQuery(SELECT1AND2)) { + while (rs.next()) { + count++; + } } + return count; } - return count; - } - }); - assertThat(count).isEqualTo(2); + }); + assertThat(count).isEqualTo(2); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } } @Test public void readWriteTransactionRead() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - TransactionRunner runner = client.readWriteTransaction(); - int count = - runner.run( - new TransactionCallable() { - @Override - public Integer run(TransactionContext transaction) throws Exception { - int count = 0; - try (ResultSet rs = transaction.read("FOO", KeySet.all(), Arrays.asList("BAR"))) { - while (rs.next()) { - count++; + try { + TransactionRunner runner = client.readWriteTransaction(); + int count = + runner.run( + new TransactionCallable() { + @Override + public Integer run(TransactionContext transaction) throws Exception { + int count = 0; + try (ResultSet rs = transaction.read("FOO", KeySet.all(), Arrays.asList("BAR"))) { + while (rs.next()) { + count++; + } } + return count; } - return count; - } - }); - assertThat(count).isEqualTo(2); + }); + assertThat(count).isEqualTo(2); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } } @Test public void readWriteTransactionReadUsingIndex() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - TransactionRunner runner = client.readWriteTransaction(); - int count = - runner.run( - new TransactionCallable() { - @Override - public Integer run(TransactionContext transaction) throws Exception { - int count = 0; - try (ResultSet rs = - transaction.readUsingIndex("FOO", "IDX", KeySet.all(), Arrays.asList("BAR"))) { - while (rs.next()) { - count++; + try { + TransactionRunner runner = client.readWriteTransaction(); + int count = + runner.run( + new TransactionCallable() { + @Override + public Integer run(TransactionContext transaction) throws Exception { + int count = 0; + try (ResultSet rs = + transaction.readUsingIndex( + "FOO", "IDX", KeySet.all(), Arrays.asList("BAR"))) { + while (rs.next()) { + count++; + } } + return count; } - return count; - } - }); - assertThat(count).isEqualTo(2); + }); + assertThat(count).isEqualTo(2); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } } @Test public void readWriteTransactionReadRow() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - TransactionRunner runner = client.readWriteTransaction(); - Struct row = - runner.run( - new TransactionCallable() { - @Override - public Struct run(TransactionContext transaction) throws Exception { - return transaction.readRow("FOO", Key.of(), Arrays.asList("BAR")); - } - }); - assertThat(row.getLong(0)).isEqualTo(1L); + try { + TransactionRunner runner = client.readWriteTransaction(); + Struct row = + runner.run( + new TransactionCallable() { + @Override + public Struct run(TransactionContext transaction) throws Exception { + return transaction.readRow("FOO", Key.of(), Arrays.asList("BAR")); + } + }); + assertThat(row.getLong(0)).isEqualTo(1L); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } } @Test public void readWriteTransactionReadRowUsingIndex() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - TransactionRunner runner = client.readWriteTransaction(); - Struct row = - runner.run( - new TransactionCallable() { - @Override - public Struct run(TransactionContext transaction) throws Exception { - return transaction.readRowUsingIndex("FOO", "IDX", Key.of(), Arrays.asList("BAR")); - } - }); - assertThat(row.getLong(0)).isEqualTo(1L); + try { + TransactionRunner runner = client.readWriteTransaction(); + Struct row = + runner.run( + new TransactionCallable() { + @Override + public Struct run(TransactionContext transaction) throws Exception { + return transaction.readRowUsingIndex( + "FOO", "IDX", Key.of(), Arrays.asList("BAR")); + } + }); + assertThat(row.getLong(0)).isEqualTo(1L); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } } @Test public void readWriteTransactionUpdate() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - TransactionRunner runner = client.readWriteTransaction(); - long count = - runner.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate(UPDATE_STATEMENT); - } - }); - assertThat(count).isEqualTo(UPDATE_COUNT); + try { + TransactionRunner runner = client.readWriteTransaction(); + long count = + runner.run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + return transaction.executeUpdate(UPDATE_STATEMENT); + } + }); + assertThat(count).isEqualTo(UPDATE_COUNT); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } } @Test public void readWriteTransactionBatchUpdate() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - TransactionRunner runner = client.readWriteTransaction(); - long[] count = - runner.run( - new TransactionCallable() { - @Override - public long[] run(TransactionContext transaction) throws Exception { - return transaction.batchUpdate(Arrays.asList(UPDATE_STATEMENT)); - } - }); - assertThat(count.length).isEqualTo(1); - assertThat(count[0]).isEqualTo(UPDATE_COUNT); + try { + TransactionRunner runner = client.readWriteTransaction(); + long[] count = + runner.run( + new TransactionCallable() { + @Override + public long[] run(TransactionContext transaction) throws Exception { + return transaction.batchUpdate(Arrays.asList(UPDATE_STATEMENT)); + } + }); + assertThat(count.length).isEqualTo(1); + assertThat(count[0]).isEqualTo(UPDATE_COUNT); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } } @Test public void readWriteTransactionBuffer() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - TransactionRunner runner = client.readWriteTransaction(); - runner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - transaction.buffer(Mutation.newInsertBuilder("FOO").set("BAR").to(1L).build()); - return null; - } - }); - assertThat(runner.getCommitTimestamp()).isNotNull(); + try { + TransactionRunner runner = client.readWriteTransaction(); + runner.run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + transaction.buffer(Mutation.newInsertBuilder("FOO").set("BAR").to(1L).build()); + return null; + } + }); + assertThat(runner.getCommitTimestamp()).isNotNull(); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } } @Test public void readWriteTransactionSelectInvalidatedDuringTransaction() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } - TransactionRunner runner = client.readWriteTransaction(); - int attempts = - runner.run( - new TransactionCallable() { - private int attempt = 0; - - @Override - public Integer run(TransactionContext transaction) throws Exception { - attempt++; - int count = 0; - try (ResultSet rs = transaction.executeQuery(SELECT1AND2)) { - while (rs.next()) { - count++; + try { + TransactionRunner runner = client.readWriteTransaction(); + int attempts = + runner.run( + new TransactionCallable() { + private int attempt = 0; + + @Override + public Integer run(TransactionContext transaction) throws Exception { + attempt++; + int count = 0; + try (ResultSet rs = transaction.executeQuery(SELECT1AND2)) { + while (rs.next()) { + count++; + } } - } - assertThat(count).isEqualTo(2); - if (attempt == 1) { - invalidateSessionPool(); - } - try (ResultSet rs = transaction.executeQuery(SELECT1AND2)) { - while (rs.next()) { - count++; + assertThat(count).isEqualTo(2); + if (attempt == 1) { + invalidateSessionPool(); + } + try (ResultSet rs = transaction.executeQuery(SELECT1AND2)) { + while (rs.next()) { + count++; + } } + return attempt; } - return attempt; - } - }); - assertThat(attempts).isGreaterThan(1); + }); + assertThat(attempts).isGreaterThan(1); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } } @Test public void readWriteTransactionReadInvalidatedDuringTransaction() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } - TransactionRunner runner = client.readWriteTransaction(); - int attempts = - runner.run( - new TransactionCallable() { - private int attempt = 0; - - @Override - public Integer run(TransactionContext transaction) throws Exception { - attempt++; - int count = 0; - try (ResultSet rs = transaction.read("FOO", KeySet.all(), Arrays.asList("BAR"))) { - while (rs.next()) { - count++; + try { + TransactionRunner runner = client.readWriteTransaction(); + int attempts = + runner.run( + new TransactionCallable() { + private int attempt = 0; + + @Override + public Integer run(TransactionContext transaction) throws Exception { + attempt++; + int count = 0; + try (ResultSet rs = transaction.read("FOO", KeySet.all(), Arrays.asList("BAR"))) { + while (rs.next()) { + count++; + } } - } - assertThat(count).isEqualTo(2); - if (attempt == 1) { - invalidateSessionPool(); - } - try (ResultSet rs = transaction.read("FOO", KeySet.all(), Arrays.asList("BAR"))) { - while (rs.next()) { - count++; + assertThat(count).isEqualTo(2); + if (attempt == 1) { + invalidateSessionPool(); + } + try (ResultSet rs = transaction.read("FOO", KeySet.all(), Arrays.asList("BAR"))) { + while (rs.next()) { + count++; + } } + return attempt; } - return attempt; - } - }); - assertThat(attempts).isGreaterThan(1); + }); + assertThat(attempts).isGreaterThan(1); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } } @Test public void readWriteTransactionReadUsingIndexInvalidatedDuringTransaction() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } - TransactionRunner runner = client.readWriteTransaction(); - int attempts = - runner.run( - new TransactionCallable() { - private int attempt = 0; - - @Override - public Integer run(TransactionContext transaction) throws Exception { - attempt++; - int count = 0; - try (ResultSet rs = - transaction.readUsingIndex("FOO", "IDX", KeySet.all(), Arrays.asList("BAR"))) { - while (rs.next()) { - count++; + try { + TransactionRunner runner = client.readWriteTransaction(); + int attempts = + runner.run( + new TransactionCallable() { + private int attempt = 0; + + @Override + public Integer run(TransactionContext transaction) throws Exception { + attempt++; + int count = 0; + try (ResultSet rs = + transaction.readUsingIndex( + "FOO", "IDX", KeySet.all(), Arrays.asList("BAR"))) { + while (rs.next()) { + count++; + } } - } - assertThat(count).isEqualTo(2); - if (attempt == 1) { - invalidateSessionPool(); - } - try (ResultSet rs = - transaction.readUsingIndex("FOO", "IDX", KeySet.all(), Arrays.asList("BAR"))) { - while (rs.next()) { - count++; + assertThat(count).isEqualTo(2); + if (attempt == 1) { + invalidateSessionPool(); } + try (ResultSet rs = + transaction.readUsingIndex( + "FOO", "IDX", KeySet.all(), Arrays.asList("BAR"))) { + while (rs.next()) { + count++; + } + } + return attempt; } - return attempt; - } - }); - assertThat(attempts).isGreaterThan(1); + }); + assertThat(attempts).isGreaterThan(1); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } } @Test public void readWriteTransactionReadRowInvalidatedDuringTransaction() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } - TransactionRunner runner = client.readWriteTransaction(); - int attempts = - runner.run( - new TransactionCallable() { - private int attempt = 0; - - @Override - public Integer run(TransactionContext transaction) throws Exception { - attempt++; - Struct row = transaction.readRow("FOO", Key.of(), Arrays.asList("BAR")); - assertThat(row.getLong(0)).isEqualTo(1L); - if (attempt == 1) { - invalidateSessionPool(); + try { + TransactionRunner runner = client.readWriteTransaction(); + int attempts = + runner.run( + new TransactionCallable() { + private int attempt = 0; + + @Override + public Integer run(TransactionContext transaction) throws Exception { + attempt++; + Struct row = transaction.readRow("FOO", Key.of(), Arrays.asList("BAR")); + assertThat(row.getLong(0)).isEqualTo(1L); + if (attempt == 1) { + invalidateSessionPool(); + } + row = transaction.readRow("FOO", Key.of(), Arrays.asList("BAR")); + return attempt; } - row = transaction.readRow("FOO", Key.of(), Arrays.asList("BAR")); - return attempt; - } - }); - assertThat(attempts).isGreaterThan(1); + }); + assertThat(attempts).isGreaterThan(1); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } } @Test public void readWriteTransactionReadRowUsingIndexInvalidatedDuringTransaction() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } - TransactionRunner runner = client.readWriteTransaction(); - int attempts = - runner.run( - new TransactionCallable() { - private int attempt = 0; - - @Override - public Integer run(TransactionContext transaction) throws Exception { - attempt++; - Struct row = - transaction.readRowUsingIndex("FOO", "IDX", Key.of(), Arrays.asList("BAR")); - assertThat(row.getLong(0)).isEqualTo(1L); - if (attempt == 1) { - invalidateSessionPool(); + try { + TransactionRunner runner = client.readWriteTransaction(); + int attempts = + runner.run( + new TransactionCallable() { + private int attempt = 0; + + @Override + public Integer run(TransactionContext transaction) throws Exception { + attempt++; + Struct row = + transaction.readRowUsingIndex("FOO", "IDX", Key.of(), Arrays.asList("BAR")); + assertThat(row.getLong(0)).isEqualTo(1L); + if (attempt == 1) { + invalidateSessionPool(); + } + row = transaction.readRowUsingIndex("FOO", "IDX", Key.of(), Arrays.asList("BAR")); + return attempt; } - row = transaction.readRowUsingIndex("FOO", "IDX", Key.of(), Arrays.asList("BAR")); - return attempt; - } - }); - assertThat(attempts).isGreaterThan(1); + }); + assertThat(attempts).isGreaterThan(1); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } } /** @@ -971,13 +1000,10 @@ public void transactionManagerReadOnlySessionInPool() throws InterruptedExceptio @SuppressWarnings("resource") @Test public void transactionManagerSelect() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - int count = 0; try (TransactionManager manager = client.transactionManager()) { + int count = 0; TransactionContext transaction = manager.begin(); while (true) { try { @@ -993,20 +1019,20 @@ public void transactionManagerSelect() throws InterruptedException { transaction = manager.resetForRetry(); } } + assertThat(count).isEqualTo(2); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } - assertThat(count).isEqualTo(2); } @SuppressWarnings("resource") @Test public void transactionManagerRead() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - int count = 0; try (TransactionManager manager = client.transactionManager()) { + int count = 0; TransactionContext transaction = manager.begin(); while (true) { try { @@ -1022,20 +1048,20 @@ public void transactionManagerRead() throws InterruptedException { transaction = manager.resetForRetry(); } } + assertThat(count).isEqualTo(2); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } - assertThat(count).isEqualTo(2); } @SuppressWarnings("resource") @Test public void transactionManagerReadUsingIndex() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - int count = 0; try (TransactionManager manager = client.transactionManager()) { + int count = 0; TransactionContext transaction = manager.begin(); while (true) { try { @@ -1052,20 +1078,20 @@ public void transactionManagerReadUsingIndex() throws InterruptedException { transaction = manager.resetForRetry(); } } + assertThat(count).isEqualTo(2); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } - assertThat(count).isEqualTo(2); } @SuppressWarnings("resource") @Test public void transactionManagerReadRow() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - Struct row; try (TransactionManager manager = client.transactionManager()) { + Struct row; TransactionContext transaction = manager.begin(); while (true) { try { @@ -1077,20 +1103,20 @@ public void transactionManagerReadRow() throws InterruptedException { transaction = manager.resetForRetry(); } } + assertThat(row.getLong(0)).isEqualTo(1L); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } - assertThat(row.getLong(0)).isEqualTo(1L); } @SuppressWarnings("resource") @Test public void transactionManagerReadRowUsingIndex() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - Struct row; try (TransactionManager manager = client.transactionManager()) { + Struct row; TransactionContext transaction = manager.begin(); while (true) { try { @@ -1102,20 +1128,20 @@ public void transactionManagerReadRowUsingIndex() throws InterruptedException { transaction = manager.resetForRetry(); } } + assertThat(row.getLong(0)).isEqualTo(1L); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } - assertThat(row.getLong(0)).isEqualTo(1L); } @SuppressWarnings("resource") @Test public void transactionManagerUpdate() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - long count; try (TransactionManager manager = client.transactionManager()) { + long count; TransactionContext transaction = manager.begin(); while (true) { try { @@ -1127,20 +1153,20 @@ public void transactionManagerUpdate() throws InterruptedException { transaction = manager.resetForRetry(); } } + assertThat(count).isEqualTo(UPDATE_COUNT); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } - assertThat(count).isEqualTo(UPDATE_COUNT); } @SuppressWarnings("resource") @Test public void transactionManagerBatchUpdate() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - long[] count; try (TransactionManager manager = client.transactionManager()) { + long[] count; TransactionContext transaction = manager.begin(); while (true) { try { @@ -1152,17 +1178,17 @@ public void transactionManagerBatchUpdate() throws InterruptedException { transaction = manager.resetForRetry(); } } + assertThat(count.length).isEqualTo(1); + assertThat(count[0]).isEqualTo(UPDATE_COUNT); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } - assertThat(count.length).isEqualTo(1); - assertThat(count[0]).isEqualTo(UPDATE_COUNT); } @SuppressWarnings("resource") @Test public void transactionManagerBuffer() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); try (TransactionManager manager = client.transactionManager()) { @@ -1178,17 +1204,17 @@ public void transactionManagerBuffer() throws InterruptedException { } } assertThat(manager.getCommitTimestamp()).isNotNull(); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } } @SuppressWarnings("resource") @Test public void transactionManagerSelectInvalidatedDuringTransaction() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } - int attempts = 0; try (TransactionManager manager = client.transactionManager()) { + int attempts = 0; TransactionContext transaction = manager.begin(); while (true) { attempts++; @@ -1215,18 +1241,18 @@ public void transactionManagerSelectInvalidatedDuringTransaction() throws Interr transaction = manager.resetForRetry(); } } + assertThat(attempts).isGreaterThan(1); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } - assertThat(attempts).isGreaterThan(1); } @SuppressWarnings("resource") @Test public void transactionManagerReadInvalidatedDuringTransaction() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } - int attempts = 0; try (TransactionManager manager = client.transactionManager()) { + int attempts = 0; TransactionContext transaction = manager.begin(); while (true) { attempts++; @@ -1253,19 +1279,19 @@ public void transactionManagerReadInvalidatedDuringTransaction() throws Interrup transaction = manager.resetForRetry(); } } + assertThat(attempts).isGreaterThan(1); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } - assertThat(attempts).isGreaterThan(1); } @SuppressWarnings("resource") @Test public void transactionManagerReadUsingIndexInvalidatedDuringTransaction() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } - int attempts = 0; try (TransactionManager manager = client.transactionManager()) { + int attempts = 0; TransactionContext transaction = manager.begin(); while (true) { attempts++; @@ -1294,18 +1320,18 @@ public void transactionManagerReadUsingIndexInvalidatedDuringTransaction() transaction = manager.resetForRetry(); } } + assertThat(attempts).isGreaterThan(1); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } - assertThat(attempts).isGreaterThan(1); } @SuppressWarnings("resource") @Test public void transactionManagerReadRowInvalidatedDuringTransaction() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } - int attempts = 0; try (TransactionManager manager = client.transactionManager()) { + int attempts = 0; TransactionContext transaction = manager.begin(); while (true) { attempts++; @@ -1323,19 +1349,19 @@ public void transactionManagerReadRowInvalidatedDuringTransaction() throws Inter transaction = manager.resetForRetry(); } } + assertThat(attempts).isGreaterThan(1); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } - assertThat(attempts).isGreaterThan(1); } @SuppressWarnings("resource") @Test public void transactionManagerReadRowUsingIndexInvalidatedDuringTransaction() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } - int attempts = 0; try (TransactionManager manager = client.transactionManager()) { + int attempts = 0; TransactionContext transaction = manager.begin(); while (true) { attempts++; @@ -1353,40 +1379,49 @@ public void transactionManagerReadRowUsingIndexInvalidatedDuringTransaction() transaction = manager.resetForRetry(); } } + assertThat(attempts).isGreaterThan(1); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); } - assertThat(attempts).isGreaterThan(1); } @Test public void partitionedDml() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - assertThat(client.executePartitionedUpdate(UPDATE_STATEMENT)).isEqualTo(UPDATE_COUNT); + try { + assertThat(client.executePartitionedUpdate(UPDATE_STATEMENT)).isEqualTo(UPDATE_COUNT); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } } @Test public void write() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - Timestamp timestamp = client.write(Arrays.asList(Mutation.delete("FOO", KeySet.all()))); - assertThat(timestamp).isNotNull(); + try { + Timestamp timestamp = client.write(Arrays.asList(Mutation.delete("FOO", KeySet.all()))); + assertThat(timestamp).isNotNull(); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } } @Test public void writeAtLeastOnce() throws InterruptedException { - if (failOnInvalidatedSession) { - expected.expect(SessionNotFoundException.class); - } initReadWriteSessionPool(); invalidateSessionPool(); - Timestamp timestamp = - client.writeAtLeastOnce(Arrays.asList(Mutation.delete("FOO", KeySet.all()))); - assertThat(timestamp).isNotNull(); + try { + Timestamp timestamp = + client.writeAtLeastOnce(Arrays.asList(Mutation.delete("FOO", KeySet.all()))); + assertThat(timestamp).isNotNull(); + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } } }