Skip to content

Commit

Permalink
refactor: move connection api to Spanner client lib (#76)
Browse files Browse the repository at this point in the history
* refactor: move connection api to Spanner client lib

* fix: add ignored dependencies

* fix: add new dependencies

* fix: remove JDBC-specific interface

* fix: remove jdbc references from comments

* fix: mark API as internal

* feat: include QueryOptions change

* tests: add missing test files + parallelize ITs

* tests: add comment to test class

* tests: clean up test case

* tests: increase exec time to prevent flakiness
  • Loading branch information
olavloite committed Apr 4, 2020
1 parent 10ae0a7 commit d617fb6
Show file tree
Hide file tree
Showing 134 changed files with 56,588 additions and 5 deletions.
16 changes: 15 additions & 1 deletion google-cloud-spanner/pom.xml
Expand Up @@ -111,7 +111,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<ignoredDependencies>io.grpc:grpc-protobuf-lite,org.hamcrest:hamcrest,org.hamcrest:hamcrest-core,com.google.errorprone:error_prone_annotations</ignoredDependencies>
<ignoredDependencies>io.grpc:grpc-protobuf-lite,org.hamcrest:hamcrest,org.hamcrest:hamcrest-core,com.google.errorprone:error_prone_annotations,com.google.api.grpc:grpc-google-cloud-spanner-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-instance-v1,com.google.api.grpc:grpc-google-cloud-spanner-admin-database-v1</ignoredDependencies>
</configuration>
</plugin>
</plugins>
Expand Down Expand Up @@ -228,6 +228,20 @@
<artifactId>jsr305</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Expand Up @@ -16,14 +16,15 @@

package com.google.cloud.spanner;

import com.google.cloud.spanner.connection.Connection;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* Exception thrown by a {@link CloudSpannerJdbcConnection} when a database operation detects that a
* transaction has aborted and an internal retry failed because of a concurrent modification. This
* type of error has its own subclass since it is often necessary to handle this specific kind of
* aborted exceptions differently to other types of errors.
* Exception thrown by a {@link Connection} when a database operation detects that a transaction has
* aborted and an internal retry failed because of a concurrent modification. This type of error has
* its own subclass since it is often necessary to handle this specific kind of aborted exceptions
* differently to other types of errors.
*/
public class AbortedDueToConcurrentModificationException extends AbortedException {
private static final long serialVersionUID = 7600146169922053323L;
Expand Down
@@ -0,0 +1,162 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.connection;

import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.connection.StatementExecutor.StatementTimeout;
import com.google.cloud.spanner.connection.StatementParser.ParsedStatement;
import com.google.common.base.Preconditions;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.concurrent.GuardedBy;

/** Base for all {@link Connection}-based transactions and batches. */
abstract class AbstractBaseUnitOfWork implements UnitOfWork {
private final StatementExecutor statementExecutor;
private final StatementTimeout statementTimeout;

/**
* The {@link Future} that monitors the result of the statement currently being executed for this
* unit of work.
*/
@GuardedBy("this")
private Future<?> currentlyRunningStatementFuture = null;

enum InterceptorsUsage {
INVOKE_INTERCEPTORS,
IGNORE_INTERCEPTORS;
}

abstract static class Builder<B extends Builder<?, T>, T extends AbstractBaseUnitOfWork> {
private StatementExecutor statementExecutor;
private StatementTimeout statementTimeout = new StatementTimeout();

Builder() {}

@SuppressWarnings("unchecked")
B self() {
return (B) this;
}

B withStatementExecutor(StatementExecutor executor) {
Preconditions.checkNotNull(executor);
this.statementExecutor = executor;
return self();
}

B setStatementTimeout(StatementTimeout timeout) {
Preconditions.checkNotNull(timeout);
this.statementTimeout = timeout;
return self();
}

abstract T build();
}

AbstractBaseUnitOfWork(Builder<?, ?> builder) {
Preconditions.checkState(builder.statementExecutor != null, "No statement executor specified");
this.statementExecutor = builder.statementExecutor;
this.statementTimeout = builder.statementTimeout;
}

StatementExecutor getStatementExecutor() {
return statementExecutor;
}

StatementTimeout getStatementTimeout() {
return statementTimeout;
}

@Override
public void cancel() {
synchronized (this) {
if (currentlyRunningStatementFuture != null
&& !currentlyRunningStatementFuture.isDone()
&& !currentlyRunningStatementFuture.isCancelled()) {
currentlyRunningStatementFuture.cancel(true);
}
}
}

<T> T asyncExecuteStatement(ParsedStatement statement, Callable<T> callable) {
return asyncExecuteStatement(statement, callable, InterceptorsUsage.INVOKE_INTERCEPTORS);
}

<T> T asyncExecuteStatement(
ParsedStatement statement, Callable<T> callable, InterceptorsUsage interceptorUsage) {
Preconditions.checkNotNull(statement);
Preconditions.checkNotNull(callable);

if (interceptorUsage == InterceptorsUsage.INVOKE_INTERCEPTORS) {
statementExecutor.invokeInterceptors(
statement, StatementExecutionStep.EXECUTE_STATEMENT, this);
}
Future<T> future = statementExecutor.submit(callable);
synchronized (this) {
this.currentlyRunningStatementFuture = future;
}
T res;
try {
if (statementTimeout.hasTimeout()) {
TimeUnit unit = statementTimeout.getAppropriateTimeUnit();
res = future.get(statementTimeout.getTimeoutValue(unit), unit);
} else {
res = future.get();
}
} catch (TimeoutException e) {
// statement timed out, cancel the execution
future.cancel(true);
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.DEADLINE_EXCEEDED,
"Statement execution timeout occurred for " + statement.getSqlWithoutComments(),
e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Set<Throwable> causes = new HashSet<>();
while (cause != null && !causes.contains(cause)) {
if (cause instanceof SpannerException) {
throw (SpannerException) cause;
}
causes.add(cause);
cause = cause.getCause();
}
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.UNKNOWN,
"Statement execution failed for " + statement.getSqlWithoutComments(),
e);
} catch (InterruptedException e) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.CANCELLED, "Statement execution was interrupted", e);
} catch (CancellationException e) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.CANCELLED, "Statement execution was cancelled", e);
} finally {
synchronized (this) {
this.currentlyRunningStatementFuture = null;
}
}
return res;
}
}
@@ -0,0 +1,96 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.connection;

import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.ReadContext;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.connection.StatementParser.ParsedStatement;
import com.google.common.base.Preconditions;
import java.util.concurrent.Callable;

/**
* Base class for {@link Connection}-based transactions that can be used for multiple read and
* read/write statements.
*/
abstract class AbstractMultiUseTransaction extends AbstractBaseUnitOfWork {

AbstractMultiUseTransaction(Builder<?, ? extends AbstractMultiUseTransaction> builder) {
super(builder);
}

@Override
public Type getType() {
return Type.TRANSACTION;
}

@Override
public boolean isActive() {
return getState().isActive();
}

/**
* Check that the current transaction actually has a valid underlying transaction. If not, the
* method will throw a {@link SpannerException}.
*/
abstract void checkValidTransaction();

/** Returns the {@link ReadContext} that can be used for queries on this transaction. */
abstract ReadContext getReadContext();

@Override
public ResultSet executeQuery(
final ParsedStatement statement,
final AnalyzeMode analyzeMode,
final QueryOption... options) {
Preconditions.checkArgument(statement.isQuery(), "Statement is not a query");
checkValidTransaction();
return asyncExecuteStatement(
statement,
new Callable<ResultSet>() {
@Override
public ResultSet call() throws Exception {
return DirectExecuteResultSet.ofResultSet(
internalExecuteQuery(statement, analyzeMode, options));
}
});
}

ResultSet internalExecuteQuery(
final ParsedStatement statement, AnalyzeMode analyzeMode, QueryOption... options) {
if (analyzeMode == AnalyzeMode.NONE) {
return getReadContext().executeQuery(statement.getStatement(), options);
}
return getReadContext()
.analyzeQuery(statement.getStatement(), analyzeMode.getQueryAnalyzeMode());
}

@Override
public long[] runBatch() {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "Run batch is not supported for transactions");
}

@Override
public void abortBatch() {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "Run batch is not supported for transactions");
}
}
@@ -0,0 +1,52 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.connection;

import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;

/**
* {@link AnalyzeMode} indicates whether a query should be executed as a normal query (NONE),
* whether only a query plan should be returned, or whether the query should be profiled while
* executed.
*/
enum AnalyzeMode {
NONE(null),
PLAN(QueryAnalyzeMode.PLAN),
PROFILE(QueryAnalyzeMode.PROFILE);

private final QueryAnalyzeMode mode;

private AnalyzeMode(QueryAnalyzeMode mode) {
this.mode = mode;
}

QueryAnalyzeMode getQueryAnalyzeMode() {
return mode;
}

/** Translates from the Spanner client library QueryAnalyzeMode to {@link AnalyzeMode}. */
static AnalyzeMode of(QueryAnalyzeMode mode) {
switch (mode) {
case PLAN:
return AnalyzeMode.PLAN;
case PROFILE:
return AnalyzeMode.PROFILE;
default:
throw new IllegalArgumentException(mode + " is unknown");
}
}
}

0 comments on commit d617fb6

Please sign in to comment.