New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: async connection API #392
Changes from all commits
ec4cc30
dd27157
2ae9a11
5d28817
b565560
2446111
321d23c
c753e7c
0a80a85
79308b6
8d5a618
2d531c5
1ce6795
a33a31f
9111798
2711d47
895b024
3b8d29e
fdc64a3
c6ff32d
3096ef6
0395413
bb130b1
35210df
2e84dc1
2500e42
24bc392
d671278
a258579
2878257
81b70c3
5c75bee
f1a40ee
a541398
51ef086
ded2101
5989efb
c50f1ed
c5fba07
3e775b1
b6e175a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,8 @@ | |
import com.google.cloud.spanner.AbstractReadContext.ListenableAsyncResultSet; | ||
import com.google.common.base.Function; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.base.Supplier; | ||
import com.google.common.base.Suppliers; | ||
import com.google.common.collect.ImmutableList; | ||
import com.google.common.util.concurrent.ListeningScheduledExecutorService; | ||
import com.google.common.util.concurrent.MoreExecutors; | ||
|
@@ -88,8 +90,8 @@ private State(boolean shouldStop) { | |
|
||
private final BlockingDeque<Struct> buffer; | ||
private Struct currentRow; | ||
/** The underlying synchronous {@link ResultSet} that is producing the rows. */ | ||
private final ResultSet delegateResultSet; | ||
/** Supplies the underlying synchronous {@link ResultSet} that will be producing the rows. */ | ||
private final Supplier<ResultSet> delegateResultSet; | ||
|
||
/** | ||
* Any exception that occurs while executing the query and iterating over the result set will be | ||
|
@@ -144,6 +146,11 @@ private State(boolean shouldStop) { | |
private volatile CountDownLatch consumingLatch = new CountDownLatch(0); | ||
|
||
AsyncResultSetImpl(ExecutorProvider executorProvider, ResultSet delegate, int bufferSize) { | ||
this(executorProvider, Suppliers.ofInstance(Preconditions.checkNotNull(delegate)), bufferSize); | ||
} | ||
|
||
AsyncResultSetImpl( | ||
ExecutorProvider executorProvider, Supplier<ResultSet> delegate, int bufferSize) { | ||
super(delegate); | ||
this.executorProvider = Preconditions.checkNotNull(executorProvider); | ||
this.delegateResultSet = Preconditions.checkNotNull(delegate); | ||
|
@@ -165,7 +172,7 @@ public void close() { | |
return; | ||
} | ||
if (state == State.INITIALIZED || state == State.SYNC) { | ||
delegateResultSet.close(); | ||
delegateResultSet.get().close(); | ||
} | ||
this.closed = true; | ||
} | ||
|
@@ -228,7 +235,7 @@ public CursorState tryNext() throws SpannerException { | |
|
||
private void closeDelegateResultSet() { | ||
try { | ||
delegateResultSet.close(); | ||
delegateResultSet.get().close(); | ||
} catch (Throwable t) { | ||
log.log(Level.FINE, "Ignoring error from closing delegate result set", t); | ||
} | ||
|
@@ -261,7 +268,7 @@ public void run() { | |
// we'll keep the cancelled state. | ||
return; | ||
} | ||
executionException = SpannerExceptionFactory.newSpannerException(e); | ||
executionException = SpannerExceptionFactory.asSpannerException(e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Returns the same exception instance if |
||
cursorReturnedDoneOrException = true; | ||
} | ||
return; | ||
|
@@ -325,10 +332,10 @@ public Void call() throws Exception { | |
boolean stop = false; | ||
boolean hasNext = false; | ||
try { | ||
hasNext = delegateResultSet.next(); | ||
hasNext = delegateResultSet.get().next(); | ||
} catch (Throwable e) { | ||
synchronized (monitor) { | ||
executionException = SpannerExceptionFactory.newSpannerException(e); | ||
executionException = SpannerExceptionFactory.asSpannerException(e); | ||
} | ||
} | ||
try { | ||
|
@@ -357,13 +364,13 @@ public Void call() throws Exception { | |
} | ||
} | ||
if (!stop) { | ||
buffer.put(delegateResultSet.getCurrentRowAsStruct()); | ||
buffer.put(delegateResultSet.get().getCurrentRowAsStruct()); | ||
startCallbackIfNecessary(); | ||
hasNext = delegateResultSet.next(); | ||
hasNext = delegateResultSet.get().next(); | ||
} | ||
} catch (Throwable e) { | ||
synchronized (monitor) { | ||
executionException = SpannerExceptionFactory.newSpannerException(e); | ||
executionException = SpannerExceptionFactory.asSpannerException(e); | ||
stop = true; | ||
} | ||
} | ||
|
@@ -544,9 +551,9 @@ public <T> List<T> toList(Function<StructReader, T> transformer) throws SpannerE | |
try { | ||
return future.get(); | ||
} catch (ExecutionException e) { | ||
throw SpannerExceptionFactory.newSpannerException(e.getCause()); | ||
throw SpannerExceptionFactory.asSpannerException(e.getCause()); | ||
} catch (Throwable e) { | ||
throw SpannerExceptionFactory.newSpannerException(e); | ||
throw SpannerExceptionFactory.asSpannerException(e); | ||
} | ||
} | ||
|
||
|
@@ -558,14 +565,14 @@ public boolean next() throws SpannerException { | |
"Cannot call next() on a result set with a callback."); | ||
this.state = State.SYNC; | ||
} | ||
boolean res = delegateResultSet.next(); | ||
currentRow = res ? delegateResultSet.getCurrentRowAsStruct() : null; | ||
boolean res = delegateResultSet.get().next(); | ||
currentRow = res ? delegateResultSet.get().getCurrentRowAsStruct() : null; | ||
return res; | ||
} | ||
|
||
@Override | ||
public ResultSetStats getStats() { | ||
return delegateResultSet.getStats(); | ||
return delegateResultSet.get().getStats(); | ||
} | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,7 +89,7 @@ static ErrorCode valueOf(String name, ErrorCode defaultValue) { | |
/** | ||
* Returns the error code corresponding to a gRPC status, or {@code UNKNOWN} if not recognized. | ||
*/ | ||
static ErrorCode fromGrpcStatus(Status status) { | ||
public static ErrorCode fromGrpcStatus(Status status) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed to public to be accessible from the |
||
ErrorCode code = errorByRpcCode.get(status.getCode().value()); | ||
return code == null ? UNKNOWN : code; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,14 +16,17 @@ | |
|
||
package com.google.cloud.spanner; | ||
|
||
import com.google.api.core.ApiFuture; | ||
import com.google.api.gax.core.ExecutorProvider; | ||
import com.google.api.gax.core.InstantiatingExecutorProvider; | ||
import com.google.cloud.ByteArray; | ||
import com.google.cloud.Date; | ||
import com.google.cloud.Timestamp; | ||
import com.google.cloud.spanner.Options.QueryOption; | ||
import com.google.cloud.spanner.Type.Code; | ||
import com.google.cloud.spanner.Type.StructField; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.base.Supplier; | ||
import com.google.common.collect.Lists; | ||
import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
import com.google.spanner.v1.ResultSetStats; | ||
|
@@ -65,8 +68,41 @@ public static AsyncResultSet toAsyncResultSet(ResultSet delegate) { | |
* ExecutorProvider}. | ||
*/ | ||
public static AsyncResultSet toAsyncResultSet( | ||
ResultSet delegate, ExecutorProvider executorProvider) { | ||
return new AsyncResultSetImpl(executorProvider, delegate, 100); | ||
ResultSet delegate, ExecutorProvider executorProvider, QueryOption... options) { | ||
Options readOptions = Options.fromQueryOptions(options); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Allow specifying a buffer size instead of always using a default size. |
||
final int bufferRows = | ||
readOptions.hasBufferRows() | ||
? readOptions.bufferRows() | ||
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; | ||
return new AsyncResultSetImpl(executorProvider, delegate, bufferRows); | ||
} | ||
|
||
/** | ||
* Converts the {@link ResultSet} that will be returned by the given {@link ApiFuture} to an | ||
* {@link AsyncResultSet} using the given {@link ExecutorProvider}. | ||
*/ | ||
public static AsyncResultSet toAsyncResultSet( | ||
ApiFuture<ResultSet> delegate, ExecutorProvider executorProvider, QueryOption... options) { | ||
Options readOptions = Options.fromQueryOptions(options); | ||
final int bufferRows = | ||
readOptions.hasBufferRows() | ||
? readOptions.bufferRows() | ||
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; | ||
return new AsyncResultSetImpl( | ||
executorProvider, new FutureResultSetSupplier(delegate), bufferRows); | ||
} | ||
|
||
private static class FutureResultSetSupplier implements Supplier<ResultSet> { | ||
final ApiFuture<ResultSet> delegate; | ||
|
||
FutureResultSetSupplier(ApiFuture<ResultSet> delegate) { | ||
this.delegate = Preconditions.checkNotNull(delegate); | ||
} | ||
|
||
@Override | ||
public ResultSet get() { | ||
return SpannerApiFutures.get(delegate); | ||
} | ||
} | ||
|
||
private static class PrePopulatedResultSet implements ResultSet { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* Copyright 2020 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.spanner; | ||
|
||
import com.google.api.core.ApiFuture; | ||
import com.google.common.base.Preconditions; | ||
import java.util.concurrent.CancellationException; | ||
import java.util.concurrent.ExecutionException; | ||
|
||
public class SpannerApiFutures { | ||
public static <T> T get(ApiFuture<T> future) throws SpannerException { | ||
return getOrNull(Preconditions.checkNotNull(future)); | ||
} | ||
|
||
public static <T> T getOrNull(ApiFuture<T> future) throws SpannerException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Convenience method that will catch and wrap common exceptions. |
||
try { | ||
return future == null ? null : future.get(); | ||
} catch (ExecutionException e) { | ||
if (e.getCause() instanceof SpannerException) { | ||
throw (SpannerException) e.getCause(); | ||
} | ||
throw SpannerExceptionFactory.newSpannerException(e.getCause()); | ||
} catch (InterruptedException e) { | ||
throw SpannerExceptionFactory.propagateInterrupt(e); | ||
} catch (CancellationException e) { | ||
throw SpannerExceptionFactory.newSpannerExceptionForCancellation(null, e); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
executeQueryAsync
method will produce aFuture<ResultSet>
that should be used as the underlying delegate of anAsyncResultSet
.