Skip to content

Commit

Permalink
examples: add example integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Feb 28, 2020
1 parent 80d023a commit c411f6c
Show file tree
Hide file tree
Showing 6 changed files with 457 additions and 31 deletions.
Expand Up @@ -476,20 +476,23 @@ private CreateListCallback(

@Override
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
CursorState state;
try {
while ((state = resultSet.tryNext()) == CursorState.OK) {
builder.add(transformer.apply(resultSet));
while (true) {
switch (resultSet.tryNext()) {
case DONE:
future.set(builder.build());
return CallbackResponse.DONE;
case NOT_READY:
return CallbackResponse.CONTINUE;
case OK:
builder.add(transformer.apply(resultSet));
break;
}
}
} catch (SpannerException e) {
future.setException(e);
return CallbackResponse.DONE;
}
if (state == CursorState.DONE) {
future.set(builder.build());
} catch (Throwable t) {
future.setException(t);
return CallbackResponse.DONE;
}
return CallbackResponse.CONTINUE;
}
}

Expand Down
Expand Up @@ -278,6 +278,37 @@ public interface DatabaseClient {
*/
TransactionManager transactionManager();

/**
* Returns an asynchronous transaction runner for executing a single logical transaction with
* retries. The returned runner can only be used once.
*
* <p>Example of a read write transaction.
*
* <pre> <code>
* Executor executor = Executors.newSingleThreadExecutor();
* final long singerId = my_singer_id;
* AsyncRunner runner = client.runAsync();
* ApiFuture<Long> rowCount =
* runner.runAsync(
* new AsyncWork<Long>() {
* @Override
* public ApiFuture<Long> doWorkAsync(TransactionContext txn) {
* String column = "FirstName";
* Struct row =
* txn.readRow("Singers", Key.of(singerId), Collections.singleton("Name"));
* String name = row.getString("Name");
* return txn.executeUpdateAsync(
* Statement.newBuilder("UPDATE Singers SET Name=@name WHERE SingerId=@id")
* .bind("id")
* .to(singerId)
* .bind("name")
* .to(name.toUpperCase())
* .build());
* }
* },
* executor);
* </code></pre>
*/
AsyncRunner runAsync();

/**
Expand Down
Expand Up @@ -219,8 +219,8 @@ private AutoClosingReadContext(
}

T getReadContextDelegate() {
if (readContextDelegate == null) {
synchronized (lock) {
synchronized (lock) {
if (readContextDelegate == null) {
while (true) {
try {
this.readContextDelegate = readContextDelegateSupplier.apply(this.session);
Expand Down
Expand Up @@ -108,22 +108,17 @@ public class MockSpannerTestUtil {
.setMetadata(READ_KEY_VALUE_METADATA)
.build();
static final com.google.spanner.v1.ResultSet READ_MULTIPLE_KEY_VALUE_RESULTSET =
com.google.spanner.v1.ResultSet.newBuilder()
.addRows(
ListValue.newBuilder()
.addValues(com.google.protobuf.Value.newBuilder().setStringValue("k1").build())
.addValues(com.google.protobuf.Value.newBuilder().setStringValue("v1").build())
.build())
.addRows(
ListValue.newBuilder()
.addValues(com.google.protobuf.Value.newBuilder().setStringValue("k2").build())
.addValues(com.google.protobuf.Value.newBuilder().setStringValue("v2").build())
.build())
.addRows(
ListValue.newBuilder()
.addValues(com.google.protobuf.Value.newBuilder().setStringValue("k3").build())
.addValues(com.google.protobuf.Value.newBuilder().setStringValue("v3").build())
.build())
.setMetadata(READ_KEY_VALUE_METADATA)
.build();
generateKeyValueResultSet(1, 3);

static com.google.spanner.v1.ResultSet generateKeyValueResultSet(int beginRow, int endRow) {
com.google.spanner.v1.ResultSet.Builder builder = com.google.spanner.v1.ResultSet.newBuilder();
for (int row = beginRow; row <= endRow; row++) {
builder.addRows(
ListValue.newBuilder()
.addValues(com.google.protobuf.Value.newBuilder().setStringValue("k" + row).build())
.addValues(com.google.protobuf.Value.newBuilder().setStringValue("v" + row).build())
.build());
}
return builder.setMetadata(READ_KEY_VALUE_METADATA).build();
}
}
Expand Up @@ -20,19 +20,26 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.cloud.NoCredentials;
import com.google.cloud.spanner.AsyncResultSet.CallbackResponse;
import com.google.cloud.spanner.AsyncResultSet.ReadyCallback;
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.inprocess.InProcessServerBuilder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -95,7 +102,8 @@ public void before() {
.setProjectId(TEST_PROJECT)
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance())
.setSessionPoolOption(SessionPoolOptions.newBuilder().setFailOnSessionLeak().build())
.setSessionPoolOption(
SessionPoolOptions.newBuilder().setFailOnSessionLeak().setMinSessions(0).build())
.build()
.getService();
client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
Expand Down Expand Up @@ -267,4 +275,62 @@ public CallbackResponse cursorReady(AsyncResultSet resultSet) {
Thread.sleep(10L);
assertThat(clientImpl.pool.getNumberOfSessionsInUse()).isEqualTo(0);
}

@Test
public void readOnlyTransaction() throws Exception {
Statement statement1 =
Statement.of("SELECT * FROM TestTable WHERE Key IN ('k10', 'k11', 'k12')");
Statement statement2 = Statement.of("SELECT * FROM TestTable WHERE Key IN ('k1', 'k2', 'k3");
mockSpanner.putStatementResult(
StatementResult.query(statement1, generateKeyValueResultSet(10, 12)));
mockSpanner.putStatementResult(
StatementResult.query(statement2, generateKeyValueResultSet(1, 3)));

ApiFuture<ImmutableList<String>> values1;
ApiFuture<ImmutableList<String>> values2;
try (ReadOnlyTransaction tx = client.readOnlyTransaction()) {
try (AsyncResultSet rs = tx.executeQueryAsync(statement1)) {
values1 =
rs.toListAsync(
new Function<StructReader, String>() {
@Override
public String apply(StructReader input) {
return input.getString("Value");
}
},
executor);
}
try (AsyncResultSet rs = tx.executeQueryAsync(statement2)) {
values2 =
rs.toListAsync(
new Function<StructReader, String>() {
@Override
public String apply(StructReader input) {
return input.getString("Value");
}
},
executor);
}
}
ApiFuture<Iterable<String>> allValues =
ApiFutures.transform(
ApiFutures.allAsList(Arrays.asList(values1, values2)),
new ApiFunction<List<ImmutableList<String>>, Iterable<String>>() {
@Override
public Iterable<String> apply(List<ImmutableList<String>> input) {
return Iterables.mergeSorted(
input,
new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
// Return in numerical order (i.e. without the preceding 'v').
return Integer.valueOf(o1.substring(1))
.compareTo(Integer.valueOf(o2.substring(1)));
}
});
}
},
executor);
assertThat(allValues.get()).containsExactly("v1", "v2", "v3", "v10", "v11", "v12");
}
}

0 comments on commit c411f6c

Please sign in to comment.