Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: inline begin transaction #325

Merged
merged 23 commits into from Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9fb15fe
feat: inline begin tx with first statement
olavloite Jul 8, 2020
6b8d9dc
feat: support inlining BeginTransaction
olavloite Jul 8, 2020
59ff2aa
fix: invalid dml statement can still return tx id
olavloite Jul 8, 2020
4d4446f
bench: add benchmarks for inline begin
olavloite Jul 16, 2020
48a5db5
feat: add inline begin for async runner
olavloite Jul 17, 2020
261c911
test: add additional tests and ITs
olavloite Jul 17, 2020
af50669
test: add tests for error during tx
olavloite Jul 30, 2020
f30334e
test: use statement with same error code on emulator
olavloite Jul 30, 2020
a4d2e76
test: skip test on emulator
olavloite Jul 30, 2020
1817afa
test: constraint error causes transaction to be invalidated
olavloite Jul 30, 2020
61cc207
fix: retry transaction if first statements fails and had BeginTransac…
olavloite Jul 31, 2020
3bdff48
fix: handle aborted exceptions
olavloite Jul 31, 2020
057839f
Merge branch 'master' into inline-begin-tx
olavloite Sep 16, 2020
b3148a0
test: add additional tests for corner cases
olavloite Sep 16, 2020
f508bdb
feat: use single-use tx for idem-potent mutations
olavloite Sep 16, 2020
d9e938f
fix: remove check for idempotent mutations
olavloite Sep 17, 2020
8a28f61
Merge branch 'master' into inline-begin-tx
olavloite Sep 28, 2020
bec71d7
Merge branch 'master' into inline-begin-tx
olavloite Oct 5, 2020
07346f0
chore: remove commented code
olavloite Oct 6, 2020
2768f69
feat!: remove session pool preparing (#515)
olavloite Oct 21, 2020
b816a66
Merge branch 'master' into inline-begin-tx
olavloite Oct 21, 2020
24ea415
chore: run formatter
olavloite Oct 21, 2020
28277ff
test: fix integration test that relied on data from other test case
olavloite Oct 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -633,7 +633,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
return stream;
}
};
return new GrpcResultSet(stream, this);
return new GrpcResultSet(
stream, this, request.hasTransaction() && request.getTransaction().hasBegin());
}

/**
Expand Down Expand Up @@ -672,14 +673,20 @@ public void close() {
}
}

/**
* Returns the {@link TransactionSelector} that should be used for a statement that is executed on
* this read context. This could be a reference to an existing transaction ID, or it could be a
* BeginTransaction option that should be included with the statement.
*/
@Nullable
abstract TransactionSelector getTransactionSelector();

/** This method is called when a statement returned a new transaction as part of its results. */
@Override
public void onTransactionMetadata(Transaction transaction) {}

@Override
public void onError(SpannerException e) {}
public void onError(SpannerException e, boolean withBeginTransaction) {}

@Override
public void onDone() {}
Expand Down Expand Up @@ -740,7 +747,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
return stream;
}
};
GrpcResultSet resultSet = new GrpcResultSet(stream, this);
GrpcResultSet resultSet =
new GrpcResultSet(stream, this, selector != null && selector.hasBegin());
return resultSet;
}

Expand Down
Expand Up @@ -81,7 +81,7 @@ interface Listener {
void onTransactionMetadata(Transaction transaction) throws SpannerException;

/** Called when the read finishes with an error. */
void onError(SpannerException e);
void onError(SpannerException e, boolean withBeginTransaction);

/** Called when the read finishes normally. */
void onDone();
Expand All @@ -91,14 +91,17 @@ interface Listener {
static class GrpcResultSet extends AbstractResultSet<List<Object>> {
private final GrpcValueIterator iterator;
private final Listener listener;
private final boolean beginTransaction;
private GrpcStruct currRow;
private SpannerException error;
private ResultSetStats statistics;
private boolean closed;

GrpcResultSet(CloseableIterator<PartialResultSet> iterator, Listener listener) {
GrpcResultSet(
CloseableIterator<PartialResultSet> iterator, Listener listener, boolean beginTransaction) {
this.iterator = new GrpcValueIterator(iterator);
this.listener = listener;
this.beginTransaction = beginTransaction;
}

@Override
Expand Down Expand Up @@ -127,7 +130,7 @@ public boolean next() throws SpannerException {
}
return hasNext;
} catch (SpannerException e) {
throw yieldError(e);
throw yieldError(e, beginTransaction && currRow == null);
}
}

Expand All @@ -149,9 +152,9 @@ public Type getType() {
return currRow.getType();
}

private SpannerException yieldError(SpannerException e) {
private SpannerException yieldError(SpannerException e, boolean beginTransaction) {
close();
listener.onError(e);
listener.onError(e, beginTransaction);
throw e;
}
}
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner;

import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
Expand All @@ -26,6 +27,7 @@
import com.google.cloud.spanner.TransactionManager.TransactionState;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Empty;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
Expand All @@ -37,14 +39,16 @@ final class AsyncTransactionManagerImpl

private final SessionImpl session;
private Span span;
private final boolean inlineBegin;

private TransactionRunnerImpl.TransactionContextImpl txn;
private TransactionState txnState;
private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create();

AsyncTransactionManagerImpl(SessionImpl session, Span span) {
AsyncTransactionManagerImpl(SessionImpl session, Span span, boolean inlineBegin) {
this.session = session;
this.span = span;
this.inlineBegin = inlineBegin;
}

@Override
Expand Down Expand Up @@ -72,7 +76,12 @@ private ApiFuture<TransactionContext> internalBeginAsync(boolean setActive) {
session.setActive(this);
}
final SettableApiFuture<TransactionContext> res = SettableApiFuture.create();
final ApiFuture<Void> fut = txn.ensureTxnAsync();
final ApiFuture<Void> fut;
if (inlineBegin) {
fut = ApiFutures.immediateFuture(null);
} else {
fut = txn.ensureTxnAsync();
}
ApiFutures.addCallback(
fut,
new ApiFutureCallback<Void>() {
Expand Down Expand Up @@ -138,7 +147,15 @@ public ApiFuture<Void> rollbackAsync() {
txnState == TransactionState.STARTED,
"rollback can only be called if the transaction is in progress");
try {
return txn.rollbackAsync();
return ApiFutures.transformAsync(
txn.rollbackAsync(),
new ApiAsyncFunction<Empty, Void>() {
@Override
public ApiFuture<Void> apply(Empty input) throws Exception {
return ApiFutures.immediateFuture(null);
}
},
MoreExecutors.directExecutor());
} finally {
txnState = TransactionState.ROLLED_BACK;
}
Expand Down
Expand Up @@ -29,6 +29,8 @@

class DatabaseClientImpl implements DatabaseClient {
private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction";
private static final String READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN =
"CloudSpanner.ReadWriteTransactionWithInlineBegin";
private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction";
private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction";
private static final Tracer tracer = Tracing.getTracer();
Expand All @@ -40,15 +42,17 @@ private enum SessionMode {

@VisibleForTesting final String clientId;
@VisibleForTesting final SessionPool pool;
private final boolean inlineBeginReadWriteTransactions;

@VisibleForTesting
DatabaseClientImpl(SessionPool pool) {
this("", pool);
DatabaseClientImpl(SessionPool pool, boolean inlineBeginReadWriteTransactions) {
this("", pool, inlineBeginReadWriteTransactions);
}

DatabaseClientImpl(String clientId, SessionPool pool) {
DatabaseClientImpl(String clientId, SessionPool pool, boolean inlineBeginReadWriteTransactions) {
this.clientId = clientId;
this.pool = pool;
this.inlineBeginReadWriteTransactions = inlineBeginReadWriteTransactions;
}

@VisibleForTesting
Expand Down Expand Up @@ -169,6 +173,12 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {

@Override
public TransactionRunner readWriteTransaction() {
return inlineBeginReadWriteTransactions
? inlinedReadWriteTransaction()
: preparedReadWriteTransaction();
}

private TransactionRunner preparedReadWriteTransaction() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().readWriteTransaction();
Expand All @@ -180,8 +190,25 @@ public TransactionRunner readWriteTransaction() {
}
}

private TransactionRunner inlinedReadWriteTransaction() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN).startSpan();
try (Scope s = tracer.withSpan(span)) {
// An inlined read/write transaction does not need a write-prepared session.
return getReadSession().readWriteTransaction();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public TransactionManager transactionManager() {
return inlineBeginReadWriteTransactions
? inlinedTransactionManager()
: preparedTransactionManager();
}

private TransactionManager preparedTransactionManager() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().transactionManager();
Expand All @@ -191,8 +218,23 @@ public TransactionManager transactionManager() {
}
}

private TransactionManager inlinedTransactionManager() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN).startSpan();
try (Scope s = tracer.withSpan(span)) {
// An inlined read/write transaction does not need a write-prepared session.
return getReadSession().transactionManager();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public AsyncRunner runAsync() {
return inlineBeginReadWriteTransactions ? inlinedRunAsync() : preparedRunAsync();
}

private AsyncRunner preparedRunAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().runAsync();
Expand All @@ -202,8 +244,24 @@ public AsyncRunner runAsync() {
}
}

private AsyncRunner inlinedRunAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().runAsync();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public AsyncTransactionManager transactionManagerAsync() {
return inlineBeginReadWriteTransactions
? inlinedTransactionManagerAsync()
: preparedTransactionManagerAsync();
}

private AsyncTransactionManager preparedTransactionManagerAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().transactionManagerAsync();
Expand All @@ -213,6 +271,16 @@ public AsyncTransactionManager transactionManagerAsync() {
}
}

private AsyncTransactionManager inlinedTransactionManagerAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().transactionManagerAsync();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public long executePartitionedUpdate(final Statement stmt) {
Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
Expand Down
Expand Up @@ -228,24 +228,34 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
@Override
public TransactionRunner readWriteTransaction() {
return setActive(
new TransactionRunnerImpl(this, spanner.getRpc(), spanner.getDefaultPrefetchChunks()));
new TransactionRunnerImpl(
this,
spanner.getRpc(),
spanner.getDefaultPrefetchChunks(),
spanner.getOptions().isInlineBeginForReadWriteTransaction()));
}

@Override
public AsyncRunner runAsync() {
return new AsyncRunnerImpl(
setActive(
new TransactionRunnerImpl(this, spanner.getRpc(), spanner.getDefaultPrefetchChunks())));
new TransactionRunnerImpl(
this,
spanner.getRpc(),
spanner.getDefaultPrefetchChunks(),
spanner.getOptions().isInlineBeginForReadWriteTransaction())));
}

@Override
public TransactionManager transactionManager() {
return new TransactionManagerImpl(this, currentSpan);
return new TransactionManagerImpl(
this, currentSpan, spanner.getOptions().isInlineBeginForReadWriteTransaction());
}

@Override
public AsyncTransactionManagerImpl transactionManagerAsync() {
return new AsyncTransactionManagerImpl(this, currentSpan);
return new AsyncTransactionManagerImpl(
this, currentSpan, spanner.getOptions().isInlineBeginForReadWriteTransaction());
}

@Override
Expand Down
Expand Up @@ -233,7 +233,8 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {

@VisibleForTesting
DatabaseClientImpl createDatabaseClient(String clientId, SessionPool pool) {
return new DatabaseClientImpl(clientId, pool);
return new DatabaseClientImpl(
clientId, pool, getOptions().isInlineBeginForReadWriteTransaction());
}

@Override
Expand Down