Skip to content

Commit

Permalink
feat: inline begin transaction (#325)
Browse files Browse the repository at this point in the history
* feat: inline begin tx with first statement

* feat: support inlining BeginTransaction

* fix: invalid dml statement can still return tx id

* bench: add benchmarks for inline begin

* feat: add inline begin for async runner

* test: add additional tests and ITs

* test: add tests for error during tx

* test: use statement with same error code on emulator

* test: skip test on emulator

* test: constraint error causes transaction to be invalidated

* fix: retry transaction if first statements fails and had BeginTransaction option

* fix: handle aborted exceptions

* test: add additional tests for corner cases

* feat: use single-use tx for idem-potent mutations

* fix: remove check for idempotent mutations

* chore: remove commented code

* feat!: remove session pool preparing (#515)

* feat: remove session pool preparing

* fix: fix integration tests

* test: fix malformed retry loop in test case

* fix: review comments

* chore: run formatter

* test: fix integration test that relied on data from other test case
  • Loading branch information
olavloite committed Oct 23, 2020
1 parent 659719d commit d08d3de
Show file tree
Hide file tree
Showing 37 changed files with 3,105 additions and 1,953 deletions.
Expand Up @@ -633,7 +633,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
return stream;
}
};
return new GrpcResultSet(stream, this);
return new GrpcResultSet(
stream, this, request.hasTransaction() && request.getTransaction().hasBegin());
}

/**
Expand Down Expand Up @@ -672,14 +673,20 @@ public void close() {
}
}

/**
* Returns the {@link TransactionSelector} that should be used for a statement that is executed on
* this read context. This could be a reference to an existing transaction ID, or it could be a
* BeginTransaction option that should be included with the statement.
*/
@Nullable
abstract TransactionSelector getTransactionSelector();

/** This method is called when a statement returned a new transaction as part of its results. */
@Override
public void onTransactionMetadata(Transaction transaction) {}

@Override
public void onError(SpannerException e) {}
public void onError(SpannerException e, boolean withBeginTransaction) {}

@Override
public void onDone() {}
Expand Down Expand Up @@ -740,7 +747,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
return stream;
}
};
GrpcResultSet resultSet = new GrpcResultSet(stream, this);
GrpcResultSet resultSet =
new GrpcResultSet(stream, this, selector != null && selector.hasBegin());
return resultSet;
}

Expand Down
Expand Up @@ -81,7 +81,7 @@ interface Listener {
void onTransactionMetadata(Transaction transaction) throws SpannerException;

/** Called when the read finishes with an error. */
void onError(SpannerException e);
void onError(SpannerException e, boolean withBeginTransaction);

/** Called when the read finishes normally. */
void onDone();
Expand All @@ -91,14 +91,17 @@ interface Listener {
static class GrpcResultSet extends AbstractResultSet<List<Object>> {
private final GrpcValueIterator iterator;
private final Listener listener;
private final boolean beginTransaction;
private GrpcStruct currRow;
private SpannerException error;
private ResultSetStats statistics;
private boolean closed;

GrpcResultSet(CloseableIterator<PartialResultSet> iterator, Listener listener) {
GrpcResultSet(
CloseableIterator<PartialResultSet> iterator, Listener listener, boolean beginTransaction) {
this.iterator = new GrpcValueIterator(iterator);
this.listener = listener;
this.beginTransaction = beginTransaction;
}

@Override
Expand Down Expand Up @@ -127,7 +130,7 @@ public boolean next() throws SpannerException {
}
return hasNext;
} catch (SpannerException e) {
throw yieldError(e);
throw yieldError(e, beginTransaction && currRow == null);
}
}

Expand All @@ -149,9 +152,9 @@ public Type getType() {
return currRow.getType();
}

private SpannerException yieldError(SpannerException e) {
private SpannerException yieldError(SpannerException e, boolean beginTransaction) {
close();
listener.onError(e);
listener.onError(e, beginTransaction);
throw e;
}
}
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner;

import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
Expand All @@ -27,6 +28,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Empty;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
Expand Down Expand Up @@ -76,14 +78,19 @@ public TransactionContextFutureImpl beginAsync() {
return begin;
}

private ApiFuture<TransactionContext> internalBeginAsync(boolean setActive) {
private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
txnState = TransactionState.STARTED;
txn = session.newTransaction();
if (setActive) {
if (firstAttempt) {
session.setActive(this);
}
final SettableApiFuture<TransactionContext> res = SettableApiFuture.create();
final ApiFuture<Void> fut = txn.ensureTxnAsync();
final ApiFuture<Void> fut;
if (firstAttempt) {
fut = ApiFutures.immediateFuture(null);
} else {
fut = txn.ensureTxnAsync();
}
ApiFutures.addCallback(
fut,
new ApiFutureCallback<Void>() {
Expand Down Expand Up @@ -149,7 +156,15 @@ public ApiFuture<Void> rollbackAsync() {
txnState == TransactionState.STARTED,
"rollback can only be called if the transaction is in progress");
try {
return txn.rollbackAsync();
return ApiFutures.transformAsync(
txn.rollbackAsync(),
new ApiAsyncFunction<Empty, Void>() {
@Override
public ApiFuture<Void> apply(Empty input) throws Exception {
return ApiFutures.immediateFuture(null);
}
},
MoreExecutors.directExecutor());
} finally {
txnState = TransactionState.ROLLED_BACK;
}
Expand Down
Expand Up @@ -34,11 +34,6 @@ class DatabaseClientImpl implements DatabaseClient {
private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction";
private static final Tracer tracer = Tracing.getTracer();

private enum SessionMode {
READ,
READ_WRITE
}

@VisibleForTesting final String clientId;
@VisibleForTesting final SessionPool pool;

Expand All @@ -53,21 +48,15 @@ private enum SessionMode {
}

@VisibleForTesting
PooledSessionFuture getReadSession() {
return pool.getReadSession();
}

@VisibleForTesting
PooledSessionFuture getReadWriteSession() {
return pool.getReadWriteSession();
PooledSessionFuture getSession() {
return pool.getSession();
}

@Override
public Timestamp write(final Iterable<Mutation> mutations) throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
SessionMode.READ_WRITE,
new Function<Session, Timestamp>() {
@Override
public Timestamp apply(Session session) {
Expand All @@ -94,7 +83,6 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
SessionMode.READ_WRITE,
new Function<Session, Timestamp>() {
@Override
public Timestamp apply(Session session) {
Expand All @@ -120,7 +108,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
public ReadContext singleUse() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().singleUse();
return getSession().singleUse();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -131,7 +119,7 @@ public ReadContext singleUse() {
public ReadContext singleUse(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().singleUse(bound);
return getSession().singleUse(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -142,7 +130,7 @@ public ReadContext singleUse(TimestampBound bound) {
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().singleUseReadOnlyTransaction();
return getSession().singleUseReadOnlyTransaction();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -153,7 +141,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().singleUseReadOnlyTransaction(bound);
return getSession().singleUseReadOnlyTransaction(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -164,7 +152,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
public ReadOnlyTransaction readOnlyTransaction() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().readOnlyTransaction();
return getSession().readOnlyTransaction();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -175,7 +163,7 @@ public ReadOnlyTransaction readOnlyTransaction() {
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().readOnlyTransaction(bound);
return getSession().readOnlyTransaction(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -186,9 +174,9 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
public TransactionRunner readWriteTransaction() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().readWriteTransaction();
return getSession().readWriteTransaction();
} catch (RuntimeException e) {
TraceUtil.setWithFailure(span, e);
TraceUtil.endSpanWithFailure(span, e);
throw e;
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
Expand All @@ -199,7 +187,7 @@ public TransactionRunner readWriteTransaction() {
public TransactionManager transactionManager() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().transactionManager();
return getSession().transactionManager();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -210,7 +198,7 @@ public TransactionManager transactionManager() {
public AsyncRunner runAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().runAsync();
return getSession().runAsync();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -221,7 +209,7 @@ public AsyncRunner runAsync() {
public AsyncTransactionManager transactionManagerAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().transactionManagerAsync();
return getSession().transactionManagerAsync();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -232,10 +220,7 @@ public AsyncTransactionManager transactionManagerAsync() {
public long executePartitionedUpdate(final Statement stmt) {
Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
// A partitioned update transaction does not need a prepared write session, as the transaction
// object will start a new transaction with specific options anyway.
return runWithSessionRetry(
SessionMode.READ,
new Function<Session, Long>() {
@Override
public Long apply(Session session) {
Expand All @@ -248,17 +233,13 @@ public Long apply(Session session) {
}
}

private <T> T runWithSessionRetry(SessionMode mode, Function<Session, T> callable) {
PooledSessionFuture session =
mode == SessionMode.READ_WRITE ? getReadWriteSession() : getReadSession();
private <T> T runWithSessionRetry(Function<Session, T> callable) {
PooledSessionFuture session = getSession();
while (true) {
try {
return callable.apply(session);
} catch (SessionNotFoundException e) {
session =
mode == SessionMode.READ_WRITE
? pool.replaceReadWriteSession(e, session)
: pool.replaceReadSession(e, session);
session = pool.replaceSession(e, session);
}
}
}
Expand Down
Expand Up @@ -36,9 +36,22 @@ class MetricRegistryConstants {
private static final LabelValue UNSET_LABEL = LabelValue.create(null);

static final LabelValue NUM_IN_USE_SESSIONS = LabelValue.create("num_in_use_sessions");

/**
* The session pool no longer prepares a fraction of the sessions with a read/write transaction.
* This metric will therefore always be zero and may be removed in the future.
*/
@Deprecated
static final LabelValue NUM_SESSIONS_BEING_PREPARED =
LabelValue.create("num_sessions_being_prepared");

static final LabelValue NUM_READ_SESSIONS = LabelValue.create("num_read_sessions");

/**
* The session pool no longer prepares a fraction of the sessions with a read/write transaction.
* This metric will therefore always be zero and may be removed in the future.
*/
@Deprecated
static final LabelValue NUM_WRITE_SESSIONS = LabelValue.create("num_write_prepared_sessions");

static final ImmutableList<LabelKey> SPANNER_LABEL_KEYS =
Expand Down

0 comments on commit d08d3de

Please sign in to comment.