Skip to content

Commit

Permalink
fix: fix span test cases after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Feb 26, 2020
1 parent 768b4ae commit 987b77f
Show file tree
Hide file tree
Showing 14 changed files with 135 additions and 51 deletions.
42 changes: 42 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -11,4 +11,46 @@
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>* asyncDeleteSession(*)</method>
</difference>

<!-- Add Async API -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>* runAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* executeQueryAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* readAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* readRowAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* readUsingIndexAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* readRowUsingIndexAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>* executeUpdateAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>* executeQueryAsync(*)</method>
</difference>
</differences>
Expand Up @@ -47,7 +47,6 @@
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracing;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -274,7 +273,7 @@ void initTransaction() {
final SessionImpl session;
final SpannerRpc rpc;
final ExecutorProvider executorProvider;
final Span span;
Span span;
private final int defaultPrefetchChunks;

@GuardedBy("lock")
Expand All @@ -296,24 +295,14 @@ void initTransaction() {
SpannerRpc rpc,
ExecutorProvider executorProvider,
int defaultPrefetchChunks) {
this(
session,
rpc,
executorProvider,
defaultPrefetchChunks,
Tracing.getTracer().getCurrentSpan());
}

private AbstractReadContext(
SessionImpl session,
SpannerRpc rpc,
ExecutorProvider executorProvider,
int defaultPrefetchChunks,
Span span) {
this.session = session;
this.rpc = rpc;
this.executorProvider = executorProvider;
this.defaultPrefetchChunks = defaultPrefetchChunks;
}

@Override
public void setSpan(Span span) {
this.span = span;
}

Expand Down
Expand Up @@ -30,6 +30,7 @@
import com.google.spanner.v1.PartitionReadRequest;
import com.google.spanner.v1.PartitionResponse;
import com.google.spanner.v1.TransactionSelector;
import io.opencensus.trace.Tracing;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -70,6 +71,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
spanner.getOptions().getPrefetchChunks());
this.sessionName = session.getName();
this.options = session.getOptions();
setSpan(Tracing.getTracer().getCurrentSpan());
initTransaction();
}

Expand All @@ -84,6 +86,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
spanner.getOptions().getPrefetchChunks());
this.sessionName = session.getName();
this.options = session.getOptions();
setSpan(Tracing.getTracer().getCurrentSpan());
}

@Override
Expand Down
Expand Up @@ -27,6 +27,7 @@
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
import io.opencensus.trace.Span;
import java.util.Map;
import java.util.concurrent.Callable;
import org.threeten.bp.Duration;
Expand Down Expand Up @@ -101,4 +102,7 @@ public com.google.spanner.v1.ResultSet call() throws Exception {
public void invalidate() {
isValid = false;
}

@Override
public void setSpan(Span span) {}
}
Expand Up @@ -75,13 +75,16 @@ static void throwIfTransactionsPending() {
static interface SessionTransaction {
/** Invalidates the transaction, generally because a new one has been started on the session. */
void invalidate();
/** Registers the current span on the transaction. */
void setSpan(Span span);
}

private final SpannerImpl spanner;
private final String name;
private SessionTransaction activeTransaction;
private ByteString readyTransactionId;
private final Map<SpannerRpc.Option, ?> options;
private Span currentSpan;

SessionImpl(SpannerImpl spanner, String name, Map<SpannerRpc.Option, ?> options) {
this.spanner = spanner;
Expand All @@ -98,6 +101,10 @@ public String getName() {
return options;
}

void setCurrentSpan(Span span) {
currentSpan = span;
}

@Override
public long executePartitionedUpdate(Statement stmt) {
setActive(null);
Expand Down Expand Up @@ -266,7 +273,8 @@ TransactionContextImpl newTransaction() {
readyTransactionId,
spanner.getRpc(),
spanner.getAsyncExecutorProvider(),
spanner.getDefaultPrefetchChunks());
spanner.getDefaultPrefetchChunks(),
currentSpan);
return txn;
}

Expand All @@ -278,11 +286,14 @@ <T extends SessionTransaction> T setActive(@Nullable T ctx) {
}
activeTransaction = ctx;
readyTransactionId = null;
if (activeTransaction != null) {
activeTransaction.setSpan(currentSpan);
}
return ctx;
}

@Override
public TransactionManager transactionManager() {
return new TransactionManagerImpl(this);
return new TransactionManagerImpl(this, currentSpan);
}
}
Expand Up @@ -1117,7 +1117,7 @@ public PooledSession get() {
try {
PooledSession res = super.get();
synchronized (lock) {
res.markBusy();
res.markBusy(span);
span.addAnnotation(sessionAnnotation(res));
incrementNumSessionsInUse();
checkedOutSessions.add(this);
Expand Down Expand Up @@ -1282,7 +1282,8 @@ private void keepAlive() {
}
}

private void markBusy() {
private void markBusy(Span span) {
this.delegate.setCurrentSpan(span);
this.state = SessionState.BUSY;
}

Expand Down
Expand Up @@ -29,14 +29,19 @@ final class TransactionManagerImpl implements TransactionManager, SessionTransac
private static final Tracer tracer = Tracing.getTracer();

private final SessionImpl session;
private final Span span;
private Span span;

private TransactionRunnerImpl.TransactionContextImpl txn;
private TransactionState txnState;

TransactionManagerImpl(SessionImpl session) {
TransactionManagerImpl(SessionImpl session, Span span) {
this.session = session;
this.span = Tracing.getTracer().getCurrentSpan();
this.span = span;
}

@Override
public void setSpan(Span span) {
this.span = span;
}

@Override
Expand Down
Expand Up @@ -81,9 +81,11 @@ static class TransactionContextImpl extends AbstractReadContext implements Trans
@Nullable ByteString transactionId,
SpannerRpc rpc,
ExecutorProvider executorProvider,
int defaultPrefetchChunks) {
int defaultPrefetchChunks,
Span span) {
super(session, rpc, executorProvider, defaultPrefetchChunks);
this.transactionId = transactionId;
this.span = span;
}

void ensureTxn() {
Expand Down Expand Up @@ -320,7 +322,7 @@ public long[] batchUpdate(Iterable<Statement> statements) {

private boolean blockNestedTxn = true;
private final SessionImpl session;
private final Span span;
private Span span;
private TransactionContextImpl txn;
private volatile boolean isValid = true;

Expand All @@ -332,10 +334,14 @@ public TransactionRunner allowNestedTransaction() {

TransactionRunnerImpl(SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks) {
this.session = session;
this.span = Tracing.getTracer().getCurrentSpan();
this.txn = session.newTransaction();
}

@Override
public void setSpan(Span span) {
this.span = span;
}

@Nullable
@Override
public <T> T run(TransactionCallable<T> callable) {
Expand Down
Expand Up @@ -40,6 +40,7 @@
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.Session;
import com.google.spanner.v1.Transaction;
import io.opencensus.trace.Span;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Calendar;
Expand Down Expand Up @@ -111,6 +112,7 @@ public void setUp() {
Mockito.when(rpc.commit(Mockito.any(CommitRequest.class), Mockito.any(Map.class)))
.thenReturn(commitResponse);
session = spanner.getSessionClient(db).createSession();
((SessionImpl) session).setCurrentSpan(mock(Span.class));
// We expect the same options, "options", on all calls on "session".
options = optionsCaptor.getValue();
}
Expand Down
Expand Up @@ -60,6 +60,7 @@
import com.google.spanner.v1.RollbackRequest;
import io.opencensus.metrics.LabelValue;
import io.opencensus.metrics.MetricRegistry;
import io.opencensus.trace.Span;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -1209,13 +1210,19 @@ public void testSessionNotFoundReadWriteTransaction() {
hasPreparedTransaction ? ByteString.copyFromUtf8("test-txn") : null;
final TransactionContextImpl closedTransactionContext =
new TransactionContextImpl(
closedSession, preparedTransactionId, rpc, mock(ExecutorProvider.class), 10);
closedSession,
preparedTransactionId,
rpc,
mock(ExecutorProvider.class),
10,
mock(Span.class));
when(closedSession.asyncClose())
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(closedSession.newTransaction()).thenReturn(closedTransactionContext);
when(closedSession.beginTransaction()).thenThrow(sessionNotFound);
TransactionRunnerImpl closedTransactionRunner =
new TransactionRunnerImpl(closedSession, rpc, 10);
closedTransactionRunner.setSpan(mock(Span.class));
when(closedSession.readWriteTransaction()).thenReturn(closedTransactionRunner);

final SessionImpl openSession = mock(SessionImpl.class);
Expand All @@ -1228,6 +1235,7 @@ public void testSessionNotFoundReadWriteTransaction() {
when(openSession.beginTransaction()).thenReturn(ByteString.copyFromUtf8("open-txn"));
TransactionRunnerImpl openTransactionRunner =
new TransactionRunnerImpl(openSession, mock(SpannerRpc.class), 10);
openTransactionRunner.setSpan(mock(Span.class));
when(openSession.readWriteTransaction()).thenReturn(openTransactionRunner);

ResultSet openResultSet = mock(ResultSet.class);
Expand Down
Expand Up @@ -305,27 +305,29 @@ public Void run(TransactionContext transaction) throws Exception {

@Test
public void transactionRunnerWithError() {
TransactionRunner runner = client.readWriteTransaction();
try {
runner.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws Exception {
transaction.executeUpdate(INVALID_UPDATE_STATEMENT);
return null;
}
});
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT);
}
for (int i = 0; i < 1000; i++) {
TransactionRunner runner = client.readWriteTransaction();
try {
runner.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws Exception {
transaction.executeUpdate(INVALID_UPDATE_STATEMENT);
return null;
}
});
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT);
}

Map<String, Boolean> spans = failOnOverkillTraceComponent.getSpans();
assertThat(spans.size()).isEqualTo(5);
assertThat(spans).containsEntry("CloudSpanner.ReadWriteTransaction", true);
assertThat(spans).containsEntry("CloudSpannerOperation.BatchCreateSessions", true);
assertThat(spans).containsEntry("SessionPool.WaitForSession", true);
assertThat(spans).containsEntry("CloudSpannerOperation.BatchCreateSessionsRequest", true);
assertThat(spans).containsEntry("CloudSpannerOperation.BeginTransaction", true);
Map<String, Boolean> spans = failOnOverkillTraceComponent.getSpans();
assertThat(spans.size()).isEqualTo(5);
assertThat(spans).containsEntry("CloudSpanner.ReadWriteTransaction", true);
assertThat(spans).containsEntry("CloudSpannerOperation.BatchCreateSessions", true);
assertThat(spans).containsEntry("SessionPool.WaitForSession", true);
assertThat(spans).containsEntry("CloudSpannerOperation.BatchCreateSessionsRequest", true);
assertThat(spans).containsEntry("CloudSpannerOperation.BeginTransaction", true);
}
}
}
Expand Up @@ -27,6 +27,7 @@
import com.google.rpc.Status;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteBatchDmlResponse;
import io.opencensus.trace.Span;
import java.util.Arrays;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -61,7 +62,12 @@ private void batchDml(int status) {
.thenReturn(response);
try (TransactionContextImpl impl =
new TransactionContextImpl(
session, ByteString.copyFromUtf8("test"), rpc, mock(ExecutorProvider.class), 10)) {
session,
ByteString.copyFromUtf8("test"),
rpc,
mock(ExecutorProvider.class),
10,
mock(Span.class))) {
impl.batchUpdate(Arrays.asList(statement));
}
}
Expand Down

0 comments on commit 987b77f

Please sign in to comment.