Skip to content

Commit

Permalink
examples: add more examples
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Feb 28, 2020
1 parent c411f6c commit 81f9ecc
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 5 deletions.
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import com.google.cloud.spanner.Type.StructField;
import com.google.common.collect.ContiguousSet;
import com.google.protobuf.ListValue;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.StructType;
Expand Down Expand Up @@ -108,11 +109,11 @@ public class MockSpannerTestUtil {
.setMetadata(READ_KEY_VALUE_METADATA)
.build();
static final com.google.spanner.v1.ResultSet READ_MULTIPLE_KEY_VALUE_RESULTSET =
generateKeyValueResultSet(1, 3);
generateKeyValueResultSet(ContiguousSet.closed(1, 3));

static com.google.spanner.v1.ResultSet generateKeyValueResultSet(int beginRow, int endRow) {
static com.google.spanner.v1.ResultSet generateKeyValueResultSet(Iterable<Integer> rows) {
com.google.spanner.v1.ResultSet.Builder builder = com.google.spanner.v1.ResultSet.newBuilder();
for (int row = beginRow; row <= endRow; row++) {
for (Integer row : rows) {
builder.addRows(
ListValue.newBuilder()
.addValues(com.google.protobuf.Value.newBuilder().setStringValue("k" + row).build())
Expand Down
Expand Up @@ -31,7 +31,9 @@
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.common.base.Function;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Server;
Expand All @@ -40,6 +42,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -282,9 +285,9 @@ public void readOnlyTransaction() throws Exception {
Statement.of("SELECT * FROM TestTable WHERE Key IN ('k10', 'k11', 'k12')");
Statement statement2 = Statement.of("SELECT * FROM TestTable WHERE Key IN ('k1', 'k2', 'k3");
mockSpanner.putStatementResult(
StatementResult.query(statement1, generateKeyValueResultSet(10, 12)));
StatementResult.query(statement1, generateKeyValueResultSet(ContiguousSet.closed(10, 12))));
mockSpanner.putStatementResult(
StatementResult.query(statement2, generateKeyValueResultSet(1, 3)));
StatementResult.query(statement2, generateKeyValueResultSet(ContiguousSet.closed(1, 3))));

ApiFuture<ImmutableList<String>> values1;
ApiFuture<ImmutableList<String>> values2;
Expand Down Expand Up @@ -333,4 +336,140 @@ public int compare(String o1, String o2) {
executor);
assertThat(allValues.get()).containsExactly("v1", "v2", "v3", "v10", "v11", "v12");
}

@Test
public void pauseResume() throws Exception {
Statement unevenStatement =
Statement.of("SELECT * FROM TestTable WHERE MOD(CAST(SUBSTR(Key, 2) AS INT64), 2) = 1");
Statement evenStatement =
Statement.of("SELECT * FROM TestTable WHERE MOD(CAST(SUBSTR(Key, 2) AS INT64), 2) = 0");
mockSpanner.putStatementResult(
StatementResult.query(
unevenStatement, generateKeyValueResultSet(ImmutableSet.of(1, 3, 5, 7, 9))));
mockSpanner.putStatementResult(
StatementResult.query(
evenStatement, generateKeyValueResultSet(ImmutableSet.of(2, 4, 6, 8, 10))));

final SettableApiFuture<Boolean> evenFinished = SettableApiFuture.create();
final SettableApiFuture<Boolean> unevenFinished = SettableApiFuture.create();
final List<String> allValues = new LinkedList<>();
try (ReadOnlyTransaction tx = client.readOnlyTransaction()) {
try (AsyncResultSet evenRs = tx.executeQueryAsync(evenStatement);
AsyncResultSet unevenRs = tx.executeQueryAsync(unevenStatement)) {
evenRs.setCallback(
executor,
new ReadyCallback() {
private boolean firstRow = true;

@Override
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
if (firstRow) {
// Make sure the uneven result set returns the first result.
firstRow = false;
return CallbackResponse.PAUSE;
}
try {
while (true) {
switch (resultSet.tryNext()) {
case DONE:
evenFinished.set(true);
return CallbackResponse.DONE;
case NOT_READY:
return CallbackResponse.CONTINUE;
case OK:
allValues.add(resultSet.getString("Value"));
return CallbackResponse.PAUSE;
}
}
} catch (Throwable t) {
evenFinished.setException(t);
return CallbackResponse.DONE;
} finally {
unevenRs.resume();
}
}
});

unevenRs.setCallback(
executor,
new ReadyCallback() {
@Override
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
try {
while (true) {
switch (resultSet.tryNext()) {
case DONE:
unevenFinished.set(true);
return CallbackResponse.DONE;
case NOT_READY:
return CallbackResponse.CONTINUE;
case OK:
allValues.add(resultSet.getString("Value"));
return CallbackResponse.PAUSE;
}
}
} catch (Throwable t) {
unevenFinished.setException(t);
return CallbackResponse.DONE;
} finally {
evenRs.resume();
}
}
});
}
}
assertThat(ApiFutures.allAsList(Arrays.asList(evenFinished, unevenFinished)).get())
.containsExactly(Boolean.TRUE, Boolean.TRUE);
assertThat(allValues)
.containsExactly("v1", "v2", "v3", "v4", "v5", "v6", "v7", "v8", "v9", "v10");
}

@Test
public void cancel() throws Exception {
final List<String> values = new LinkedList<>();
final SettableApiFuture<Boolean> finished = SettableApiFuture.create();
final CountDownLatch receivedFirstRow = new CountDownLatch(1);
final CountDownLatch cancelled = new CountDownLatch(1);
try (AsyncResultSet rs =
client.singleUse().readAsync(READ_TABLE_NAME, KeySet.all(), READ_COLUMN_NAMES)) {
rs.setCallback(
executor,
new ReadyCallback() {
@Override
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
try {
while (true) {
switch (resultSet.tryNext()) {
case DONE:
finished.set(true);
return CallbackResponse.DONE;
case NOT_READY:
return CallbackResponse.CONTINUE;
case OK:
values.add(resultSet.getString("Value"));
receivedFirstRow.countDown();
cancelled.await();
break;
}
}
} catch (Throwable t) {
finished.setException(t);
return CallbackResponse.DONE;
}
}
});
receivedFirstRow.await();
rs.cancel();
}
cancelled.countDown();
try {
finished.get();
fail("missing expected exception");
} catch (ExecutionException e) {
assertThat(e.getCause()).isInstanceOf(SpannerException.class);
SpannerException se = (SpannerException) e.getCause();
assertThat(se.getErrorCode()).isEqualTo(ErrorCode.CANCELLED);
assertThat(values).containsExactly("v1");
}
}
}
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner.it;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
Expand All @@ -29,12 +30,14 @@
import com.google.cloud.spanner.AsyncRunner.AsyncWork;
import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.IntegrationTest;
import com.google.cloud.spanner.IntegrationTestEnv;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.ReadOnlyTransaction;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.StructReader;
Expand All @@ -47,6 +50,8 @@
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.AfterClass;
Expand Down Expand Up @@ -328,4 +333,137 @@ public int compare(String o1, String o2) {
executor);
assertThat(allValues.get()).containsExactly("v1", "v2", "v3", "v10", "v11", "v12");
}

@Test
public void pauseResume() throws Exception {
Statement unevenStatement =
Statement.of(
"SELECT * FROM TestTable WHERE MOD(CAST(SUBSTR(Key, 2) AS INT64), 2) = 1 ORDER BY CAST(SUBSTR(Key, 2) AS INT64)");
Statement evenStatement =
Statement.of(
"SELECT * FROM TestTable WHERE MOD(CAST(SUBSTR(Key, 2) AS INT64), 2) = 0 ORDER BY CAST(SUBSTR(Key, 2) AS INT64)");

final SettableApiFuture<Boolean> evenFinished = SettableApiFuture.create();
final SettableApiFuture<Boolean> unevenFinished = SettableApiFuture.create();
final List<String> allValues = new LinkedList<>();
try (ReadOnlyTransaction tx = client.readOnlyTransaction()) {
try (AsyncResultSet evenRs = tx.executeQueryAsync(evenStatement);
AsyncResultSet unevenRs = tx.executeQueryAsync(unevenStatement)) {
evenRs.setCallback(
executor,
new ReadyCallback() {
@Override
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
try {
while (true) {
switch (resultSet.tryNext()) {
case DONE:
evenFinished.set(true);
return CallbackResponse.DONE;
case NOT_READY:
return CallbackResponse.CONTINUE;
case OK:
allValues.add(resultSet.getString("StringValue"));
return CallbackResponse.PAUSE;
}
}
} catch (Throwable t) {
evenFinished.setException(t);
return CallbackResponse.DONE;
} finally {
unevenRs.resume();
}
}
});

unevenRs.setCallback(
executor,
new ReadyCallback() {
private boolean firstRow = true;

@Override
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
if (firstRow) {
// Make sure the even result set returns the first result.
firstRow = false;
return CallbackResponse.PAUSE;
}
try {
while (true) {
switch (resultSet.tryNext()) {
case DONE:
unevenFinished.set(true);
return CallbackResponse.DONE;
case NOT_READY:
return CallbackResponse.CONTINUE;
case OK:
allValues.add(resultSet.getString("StringValue"));
return CallbackResponse.PAUSE;
}
}
} catch (Throwable t) {
unevenFinished.setException(t);
return CallbackResponse.DONE;
} finally {
evenRs.resume();
}
}
});
}
}
assertThat(ApiFutures.allAsList(Arrays.asList(evenFinished, unevenFinished)).get())
.containsExactly(Boolean.TRUE, Boolean.TRUE);
assertThat(allValues)
.containsExactly(
"v0", "v1", "v2", "v3", "v4", "v5", "v6", "v7", "v8", "v9", "v10", "v11", "v12", "v13",
"v14");
}

@Test
public void cancel() throws Exception {
final List<String> values = new LinkedList<>();
final SettableApiFuture<Boolean> finished = SettableApiFuture.create();
final CountDownLatch receivedFirstRow = new CountDownLatch(1);
final CountDownLatch cancelled = new CountDownLatch(1);
try (AsyncResultSet rs = client.singleUse().readAsync(TABLE_NAME, KeySet.all(), ALL_COLUMNS)) {
rs.setCallback(
executor,
new ReadyCallback() {
@Override
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
try {
while (true) {
switch (resultSet.tryNext()) {
case DONE:
finished.set(true);
return CallbackResponse.DONE;
case NOT_READY:
return CallbackResponse.CONTINUE;
case OK:
values.add(resultSet.getString("StringValue"));
receivedFirstRow.countDown();
cancelled.await();
break;
}
}
} catch (Throwable t) {
finished.setException(t);
return CallbackResponse.DONE;
}
}
});
receivedFirstRow.await();
rs.cancel();
}
cancelled.countDown();
try {
finished.get();
fail("missing expected exception");
} catch (ExecutionException e) {
assertThat(e.getCause()).isInstanceOf(SpannerException.class);
SpannerException se = (SpannerException) e.getCause();
assertThat(se.getErrorCode()).isEqualTo(ErrorCode.CANCELLED);
assertThat(values).containsExactly("v0");
}
}
}

0 comments on commit 81f9ecc

Please sign in to comment.