Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: inline begin transaction #325

Merged
merged 23 commits into from Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9fb15fe
feat: inline begin tx with first statement
olavloite Jul 8, 2020
6b8d9dc
feat: support inlining BeginTransaction
olavloite Jul 8, 2020
59ff2aa
fix: invalid dml statement can still return tx id
olavloite Jul 8, 2020
4d4446f
bench: add benchmarks for inline begin
olavloite Jul 16, 2020
48a5db5
feat: add inline begin for async runner
olavloite Jul 17, 2020
261c911
test: add additional tests and ITs
olavloite Jul 17, 2020
af50669
test: add tests for error during tx
olavloite Jul 30, 2020
f30334e
test: use statement with same error code on emulator
olavloite Jul 30, 2020
a4d2e76
test: skip test on emulator
olavloite Jul 30, 2020
1817afa
test: constraint error causes transaction to be invalidated
olavloite Jul 30, 2020
61cc207
fix: retry transaction if first statements fails and had BeginTransac…
olavloite Jul 31, 2020
3bdff48
fix: handle aborted exceptions
olavloite Jul 31, 2020
057839f
Merge branch 'master' into inline-begin-tx
olavloite Sep 16, 2020
b3148a0
test: add additional tests for corner cases
olavloite Sep 16, 2020
f508bdb
feat: use single-use tx for idem-potent mutations
olavloite Sep 16, 2020
d9e938f
fix: remove check for idempotent mutations
olavloite Sep 17, 2020
8a28f61
Merge branch 'master' into inline-begin-tx
olavloite Sep 28, 2020
bec71d7
Merge branch 'master' into inline-begin-tx
olavloite Oct 5, 2020
07346f0
chore: remove commented code
olavloite Oct 6, 2020
2768f69
feat!: remove session pool preparing (#515)
olavloite Oct 21, 2020
b816a66
Merge branch 'master' into inline-begin-tx
olavloite Oct 21, 2020
24ea415
chore: run formatter
olavloite Oct 21, 2020
28277ff
test: fix integration test that relied on data from other test case
olavloite Oct 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -633,7 +633,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
return stream;
}
};
return new GrpcResultSet(stream, this);
return new GrpcResultSet(
stream, this, request.hasTransaction() && request.getTransaction().hasBegin());
}

/**
Expand Down Expand Up @@ -672,14 +673,20 @@ public void close() {
}
}

/**
* Returns the {@link TransactionSelector} that should be used for a statement that is executed on
* this read context. This could be a reference to an existing transaction ID, or it could be a
* BeginTransaction option that should be included with the statement.
*/
@Nullable
abstract TransactionSelector getTransactionSelector();

/** This method is called when a statement returned a new transaction as part of its results. */
@Override
public void onTransactionMetadata(Transaction transaction) {}

@Override
public void onError(SpannerException e) {}
public void onError(SpannerException e, boolean withBeginTransaction) {}

@Override
public void onDone() {}
Expand Down Expand Up @@ -740,7 +747,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
return stream;
}
};
GrpcResultSet resultSet = new GrpcResultSet(stream, this);
GrpcResultSet resultSet =
new GrpcResultSet(stream, this, selector != null && selector.hasBegin());
return resultSet;
}

Expand Down
Expand Up @@ -81,7 +81,7 @@ interface Listener {
void onTransactionMetadata(Transaction transaction) throws SpannerException;

/** Called when the read finishes with an error. */
void onError(SpannerException e);
void onError(SpannerException e, boolean withBeginTransaction);

/** Called when the read finishes normally. */
void onDone();
Expand All @@ -91,14 +91,17 @@ interface Listener {
static class GrpcResultSet extends AbstractResultSet<List<Object>> {
private final GrpcValueIterator iterator;
private final Listener listener;
private final boolean beginTransaction;
private GrpcStruct currRow;
private SpannerException error;
private ResultSetStats statistics;
private boolean closed;

GrpcResultSet(CloseableIterator<PartialResultSet> iterator, Listener listener) {
GrpcResultSet(
CloseableIterator<PartialResultSet> iterator, Listener listener, boolean beginTransaction) {
this.iterator = new GrpcValueIterator(iterator);
this.listener = listener;
this.beginTransaction = beginTransaction;
}

@Override
Expand Down Expand Up @@ -127,7 +130,7 @@ public boolean next() throws SpannerException {
}
return hasNext;
} catch (SpannerException e) {
throw yieldError(e);
throw yieldError(e, beginTransaction && currRow == null);
}
}

Expand All @@ -149,9 +152,9 @@ public Type getType() {
return currRow.getType();
}

private SpannerException yieldError(SpannerException e) {
private SpannerException yieldError(SpannerException e, boolean beginTransaction) {
close();
listener.onError(e);
listener.onError(e, beginTransaction);
throw e;
}
}
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner;

import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
Expand All @@ -27,6 +28,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Empty;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
Expand Down Expand Up @@ -76,14 +78,19 @@ public TransactionContextFutureImpl beginAsync() {
return begin;
}

private ApiFuture<TransactionContext> internalBeginAsync(boolean setActive) {
private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
txnState = TransactionState.STARTED;
txn = session.newTransaction();
if (setActive) {
if (firstAttempt) {
session.setActive(this);
}
final SettableApiFuture<TransactionContext> res = SettableApiFuture.create();
final ApiFuture<Void> fut = txn.ensureTxnAsync();
final ApiFuture<Void> fut;
if (firstAttempt) {
fut = ApiFutures.immediateFuture(null);
} else {
fut = txn.ensureTxnAsync();
}
ApiFutures.addCallback(
fut,
new ApiFutureCallback<Void>() {
Expand Down Expand Up @@ -149,7 +156,15 @@ public ApiFuture<Void> rollbackAsync() {
txnState == TransactionState.STARTED,
"rollback can only be called if the transaction is in progress");
try {
return txn.rollbackAsync();
return ApiFutures.transformAsync(
txn.rollbackAsync(),
new ApiAsyncFunction<Empty, Void>() {
@Override
public ApiFuture<Void> apply(Empty input) throws Exception {
return ApiFutures.immediateFuture(null);
}
},
MoreExecutors.directExecutor());
} finally {
txnState = TransactionState.ROLLED_BACK;
}
Expand Down
Expand Up @@ -33,11 +33,6 @@ class DatabaseClientImpl implements DatabaseClient {
private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction";
private static final Tracer tracer = Tracing.getTracer();

private enum SessionMode {
READ,
READ_WRITE
}

@VisibleForTesting final String clientId;
@VisibleForTesting final SessionPool pool;

Expand All @@ -52,21 +47,15 @@ private enum SessionMode {
}

@VisibleForTesting
PooledSessionFuture getReadSession() {
return pool.getReadSession();
}

@VisibleForTesting
PooledSessionFuture getReadWriteSession() {
return pool.getReadWriteSession();
PooledSessionFuture getSession() {
return pool.getSession();
}

@Override
public Timestamp write(final Iterable<Mutation> mutations) throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
SessionMode.READ_WRITE,
new Function<Session, Timestamp>() {
@Override
public Timestamp apply(Session session) {
Expand All @@ -86,7 +75,6 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
SessionMode.READ_WRITE,
new Function<Session, Timestamp>() {
@Override
public Timestamp apply(Session session) {
Expand All @@ -105,7 +93,7 @@ public Timestamp apply(Session session) {
public ReadContext singleUse() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().singleUse();
return getSession().singleUse();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -116,7 +104,7 @@ public ReadContext singleUse() {
public ReadContext singleUse(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().singleUse(bound);
return getSession().singleUse(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -127,7 +115,7 @@ public ReadContext singleUse(TimestampBound bound) {
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().singleUseReadOnlyTransaction();
return getSession().singleUseReadOnlyTransaction();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -138,7 +126,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().singleUseReadOnlyTransaction(bound);
return getSession().singleUseReadOnlyTransaction(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -149,7 +137,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
public ReadOnlyTransaction readOnlyTransaction() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().readOnlyTransaction();
return getSession().readOnlyTransaction();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -160,7 +148,7 @@ public ReadOnlyTransaction readOnlyTransaction() {
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().readOnlyTransaction(bound);
return getSession().readOnlyTransaction(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -171,9 +159,9 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
public TransactionRunner readWriteTransaction() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().readWriteTransaction();
return getSession().readWriteTransaction();
} catch (RuntimeException e) {
TraceUtil.setWithFailure(span, e);
TraceUtil.endSpanWithFailure(span, e);
throw e;
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
Expand All @@ -184,7 +172,7 @@ public TransactionRunner readWriteTransaction() {
public TransactionManager transactionManager() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().transactionManager();
return getSession().transactionManager();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -195,7 +183,7 @@ public TransactionManager transactionManager() {
public AsyncRunner runAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().runAsync();
return getSession().runAsync();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -206,7 +194,7 @@ public AsyncRunner runAsync() {
public AsyncTransactionManager transactionManagerAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().transactionManagerAsync();
return getSession().transactionManagerAsync();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -217,10 +205,7 @@ public AsyncTransactionManager transactionManagerAsync() {
public long executePartitionedUpdate(final Statement stmt) {
Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
// A partitioned update transaction does not need a prepared write session, as the transaction
// object will start a new transaction with specific options anyway.
return runWithSessionRetry(
SessionMode.READ,
new Function<Session, Long>() {
@Override
public Long apply(Session session) {
Expand All @@ -233,17 +218,13 @@ public Long apply(Session session) {
}
}

private <T> T runWithSessionRetry(SessionMode mode, Function<Session, T> callable) {
PooledSessionFuture session =
mode == SessionMode.READ_WRITE ? getReadWriteSession() : getReadSession();
private <T> T runWithSessionRetry(Function<Session, T> callable) {
PooledSessionFuture session = getSession();
while (true) {
try {
return callable.apply(session);
} catch (SessionNotFoundException e) {
session =
mode == SessionMode.READ_WRITE
? pool.replaceReadWriteSession(e, session)
: pool.replaceReadSession(e, session);
session = pool.replaceSession(e, session);
}
}
}
Expand Down
Expand Up @@ -36,9 +36,22 @@ class MetricRegistryConstants {
private static final LabelValue UNSET_LABEL = LabelValue.create(null);

static final LabelValue NUM_IN_USE_SESSIONS = LabelValue.create("num_in_use_sessions");

/**
* The session pool no longer prepares a fraction of the sessions with a read/write transaction.
* This metric will therefore always be zero and may be removed in the future.
*/
@Deprecated
static final LabelValue NUM_SESSIONS_BEING_PREPARED =
LabelValue.create("num_sessions_being_prepared");

static final LabelValue NUM_READ_SESSIONS = LabelValue.create("num_read_sessions");

/**
* The session pool no longer prepares a fraction of the sessions with a read/write transaction.
* This metric will therefore always be zero and may be removed in the future.
*/
@Deprecated
static final LabelValue NUM_WRITE_SESSIONS = LabelValue.create("num_write_prepared_sessions");

static final ImmutableList<LabelKey> SPANNER_LABEL_KEYS =
Expand Down