From 81f9ecc800e8e4236dae7915b38e44b89a239361 Mon Sep 17 00:00:00 2001 From: Olav Loite Date: Fri, 28 Feb 2020 19:34:58 +0100 Subject: [PATCH] examples: add more examples --- .../cloud/spanner/MockSpannerTestUtil.java | 7 +- .../google/cloud/spanner/ReadAsyncTest.java | 143 +++++++++++++++++- .../cloud/spanner/it/ITAsyncExamplesTest.java | 138 +++++++++++++++++ 3 files changed, 283 insertions(+), 5 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestUtil.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestUtil.java index 0a8f680f99..fe85ef7c90 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestUtil.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestUtil.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner; import com.google.cloud.spanner.Type.StructField; +import com.google.common.collect.ContiguousSet; import com.google.protobuf.ListValue; import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.StructType; @@ -108,11 +109,11 @@ public class MockSpannerTestUtil { .setMetadata(READ_KEY_VALUE_METADATA) .build(); static final com.google.spanner.v1.ResultSet READ_MULTIPLE_KEY_VALUE_RESULTSET = - generateKeyValueResultSet(1, 3); + generateKeyValueResultSet(ContiguousSet.closed(1, 3)); - static com.google.spanner.v1.ResultSet generateKeyValueResultSet(int beginRow, int endRow) { + static com.google.spanner.v1.ResultSet generateKeyValueResultSet(Iterable rows) { com.google.spanner.v1.ResultSet.Builder builder = com.google.spanner.v1.ResultSet.newBuilder(); - for (int row = beginRow; row <= endRow; row++) { + for (Integer row : rows) { builder.addRows( ListValue.newBuilder() .addValues(com.google.protobuf.Value.newBuilder().setStringValue("k" + row).build()) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadAsyncTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadAsyncTest.java index f324905ef5..53e5041891 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadAsyncTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadAsyncTest.java @@ -31,7 +31,9 @@ import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.common.base.Function; +import com.google.common.collect.ContiguousSet; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.SettableFuture; import io.grpc.Server; @@ -40,6 +42,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; @@ -282,9 +285,9 @@ public void readOnlyTransaction() throws Exception { Statement.of("SELECT * FROM TestTable WHERE Key IN ('k10', 'k11', 'k12')"); Statement statement2 = Statement.of("SELECT * FROM TestTable WHERE Key IN ('k1', 'k2', 'k3"); mockSpanner.putStatementResult( - StatementResult.query(statement1, generateKeyValueResultSet(10, 12))); + StatementResult.query(statement1, generateKeyValueResultSet(ContiguousSet.closed(10, 12)))); mockSpanner.putStatementResult( - StatementResult.query(statement2, generateKeyValueResultSet(1, 3))); + StatementResult.query(statement2, generateKeyValueResultSet(ContiguousSet.closed(1, 3)))); ApiFuture> values1; ApiFuture> values2; @@ -333,4 +336,140 @@ public int compare(String o1, String o2) { executor); assertThat(allValues.get()).containsExactly("v1", "v2", "v3", "v10", "v11", "v12"); } + + @Test + public void pauseResume() throws Exception { + Statement unevenStatement = + Statement.of("SELECT * FROM TestTable WHERE MOD(CAST(SUBSTR(Key, 2) AS INT64), 2) = 1"); + Statement evenStatement = + Statement.of("SELECT * FROM TestTable WHERE MOD(CAST(SUBSTR(Key, 2) AS INT64), 2) = 0"); + mockSpanner.putStatementResult( + StatementResult.query( + unevenStatement, generateKeyValueResultSet(ImmutableSet.of(1, 3, 5, 7, 9)))); + mockSpanner.putStatementResult( + StatementResult.query( + evenStatement, generateKeyValueResultSet(ImmutableSet.of(2, 4, 6, 8, 10)))); + + final SettableApiFuture evenFinished = SettableApiFuture.create(); + final SettableApiFuture unevenFinished = SettableApiFuture.create(); + final List allValues = new LinkedList<>(); + try (ReadOnlyTransaction tx = client.readOnlyTransaction()) { + try (AsyncResultSet evenRs = tx.executeQueryAsync(evenStatement); + AsyncResultSet unevenRs = tx.executeQueryAsync(unevenStatement)) { + evenRs.setCallback( + executor, + new ReadyCallback() { + private boolean firstRow = true; + + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + if (firstRow) { + // Make sure the uneven result set returns the first result. + firstRow = false; + return CallbackResponse.PAUSE; + } + try { + while (true) { + switch (resultSet.tryNext()) { + case DONE: + evenFinished.set(true); + return CallbackResponse.DONE; + case NOT_READY: + return CallbackResponse.CONTINUE; + case OK: + allValues.add(resultSet.getString("Value")); + return CallbackResponse.PAUSE; + } + } + } catch (Throwable t) { + evenFinished.setException(t); + return CallbackResponse.DONE; + } finally { + unevenRs.resume(); + } + } + }); + + unevenRs.setCallback( + executor, + new ReadyCallback() { + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + try { + while (true) { + switch (resultSet.tryNext()) { + case DONE: + unevenFinished.set(true); + return CallbackResponse.DONE; + case NOT_READY: + return CallbackResponse.CONTINUE; + case OK: + allValues.add(resultSet.getString("Value")); + return CallbackResponse.PAUSE; + } + } + } catch (Throwable t) { + unevenFinished.setException(t); + return CallbackResponse.DONE; + } finally { + evenRs.resume(); + } + } + }); + } + } + assertThat(ApiFutures.allAsList(Arrays.asList(evenFinished, unevenFinished)).get()) + .containsExactly(Boolean.TRUE, Boolean.TRUE); + assertThat(allValues) + .containsExactly("v1", "v2", "v3", "v4", "v5", "v6", "v7", "v8", "v9", "v10"); + } + + @Test + public void cancel() throws Exception { + final List values = new LinkedList<>(); + final SettableApiFuture finished = SettableApiFuture.create(); + final CountDownLatch receivedFirstRow = new CountDownLatch(1); + final CountDownLatch cancelled = new CountDownLatch(1); + try (AsyncResultSet rs = + client.singleUse().readAsync(READ_TABLE_NAME, KeySet.all(), READ_COLUMN_NAMES)) { + rs.setCallback( + executor, + new ReadyCallback() { + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + try { + while (true) { + switch (resultSet.tryNext()) { + case DONE: + finished.set(true); + return CallbackResponse.DONE; + case NOT_READY: + return CallbackResponse.CONTINUE; + case OK: + values.add(resultSet.getString("Value")); + receivedFirstRow.countDown(); + cancelled.await(); + break; + } + } + } catch (Throwable t) { + finished.setException(t); + return CallbackResponse.DONE; + } + } + }); + receivedFirstRow.await(); + rs.cancel(); + } + cancelled.countDown(); + try { + finished.get(); + fail("missing expected exception"); + } catch (ExecutionException e) { + assertThat(e.getCause()).isInstanceOf(SpannerException.class); + SpannerException se = (SpannerException) e.getCause(); + assertThat(se.getErrorCode()).isEqualTo(ErrorCode.CANCELLED); + assertThat(values).containsExactly("v1"); + } + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java index 7c80632ed2..1f9b0dd81d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner.it; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; @@ -29,12 +30,14 @@ import com.google.cloud.spanner.AsyncRunner.AsyncWork; import com.google.cloud.spanner.Database; import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.IntegrationTest; import com.google.cloud.spanner.IntegrationTestEnv; import com.google.cloud.spanner.Key; import com.google.cloud.spanner.KeySet; import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.ReadOnlyTransaction; +import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.StructReader; @@ -47,6 +50,8 @@ import java.util.Comparator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.junit.AfterClass; @@ -328,4 +333,137 @@ public int compare(String o1, String o2) { executor); assertThat(allValues.get()).containsExactly("v1", "v2", "v3", "v10", "v11", "v12"); } + + @Test + public void pauseResume() throws Exception { + Statement unevenStatement = + Statement.of( + "SELECT * FROM TestTable WHERE MOD(CAST(SUBSTR(Key, 2) AS INT64), 2) = 1 ORDER BY CAST(SUBSTR(Key, 2) AS INT64)"); + Statement evenStatement = + Statement.of( + "SELECT * FROM TestTable WHERE MOD(CAST(SUBSTR(Key, 2) AS INT64), 2) = 0 ORDER BY CAST(SUBSTR(Key, 2) AS INT64)"); + + final SettableApiFuture evenFinished = SettableApiFuture.create(); + final SettableApiFuture unevenFinished = SettableApiFuture.create(); + final List allValues = new LinkedList<>(); + try (ReadOnlyTransaction tx = client.readOnlyTransaction()) { + try (AsyncResultSet evenRs = tx.executeQueryAsync(evenStatement); + AsyncResultSet unevenRs = tx.executeQueryAsync(unevenStatement)) { + evenRs.setCallback( + executor, + new ReadyCallback() { + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + try { + while (true) { + switch (resultSet.tryNext()) { + case DONE: + evenFinished.set(true); + return CallbackResponse.DONE; + case NOT_READY: + return CallbackResponse.CONTINUE; + case OK: + allValues.add(resultSet.getString("StringValue")); + return CallbackResponse.PAUSE; + } + } + } catch (Throwable t) { + evenFinished.setException(t); + return CallbackResponse.DONE; + } finally { + unevenRs.resume(); + } + } + }); + + unevenRs.setCallback( + executor, + new ReadyCallback() { + private boolean firstRow = true; + + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + if (firstRow) { + // Make sure the even result set returns the first result. + firstRow = false; + return CallbackResponse.PAUSE; + } + try { + while (true) { + switch (resultSet.tryNext()) { + case DONE: + unevenFinished.set(true); + return CallbackResponse.DONE; + case NOT_READY: + return CallbackResponse.CONTINUE; + case OK: + allValues.add(resultSet.getString("StringValue")); + return CallbackResponse.PAUSE; + } + } + } catch (Throwable t) { + unevenFinished.setException(t); + return CallbackResponse.DONE; + } finally { + evenRs.resume(); + } + } + }); + } + } + assertThat(ApiFutures.allAsList(Arrays.asList(evenFinished, unevenFinished)).get()) + .containsExactly(Boolean.TRUE, Boolean.TRUE); + assertThat(allValues) + .containsExactly( + "v0", "v1", "v2", "v3", "v4", "v5", "v6", "v7", "v8", "v9", "v10", "v11", "v12", "v13", + "v14"); + } + + @Test + public void cancel() throws Exception { + final List values = new LinkedList<>(); + final SettableApiFuture finished = SettableApiFuture.create(); + final CountDownLatch receivedFirstRow = new CountDownLatch(1); + final CountDownLatch cancelled = new CountDownLatch(1); + try (AsyncResultSet rs = client.singleUse().readAsync(TABLE_NAME, KeySet.all(), ALL_COLUMNS)) { + rs.setCallback( + executor, + new ReadyCallback() { + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + try { + while (true) { + switch (resultSet.tryNext()) { + case DONE: + finished.set(true); + return CallbackResponse.DONE; + case NOT_READY: + return CallbackResponse.CONTINUE; + case OK: + values.add(resultSet.getString("StringValue")); + receivedFirstRow.countDown(); + cancelled.await(); + break; + } + } + } catch (Throwable t) { + finished.setException(t); + return CallbackResponse.DONE; + } + } + }); + receivedFirstRow.await(); + rs.cancel(); + } + cancelled.countDown(); + try { + finished.get(); + fail("missing expected exception"); + } catch (ExecutionException e) { + assertThat(e.getCause()).isInstanceOf(SpannerException.class); + SpannerException se = (SpannerException) e.getCause(); + assertThat(se.getErrorCode()).isEqualTo(ErrorCode.CANCELLED); + assertThat(values).containsExactly("v0"); + } + } }