Skip to content

Commit

Permalink
Merge pull request #50 from kazabubu/master
Browse files Browse the repository at this point in the history
Added query timeout support
  • Loading branch information
davidmoten committed Nov 21, 2019
2 parents 643e641 + de677ed commit 6d3dc93
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 30 deletions.
18 changes: 9 additions & 9 deletions rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Select.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,27 @@ private Select() {
private static final Logger log = LoggerFactory.getLogger(Select.class);

static <T> Flowable<T> create(Single<Connection> connections,
Flowable<List<Object>> parameterGroups, String sql, int fetchSize,
Function<? super ResultSet, ? extends T> mapper, boolean eagerDispose) {
Flowable<List<Object>> parameterGroups, String sql, int fetchSize,
Function<? super ResultSet, ? extends T> mapper, boolean eagerDispose, int queryTimeoutSec) {
return connections //
.toFlowable() //
.flatMap(con -> create(con, sql, parameterGroups, fetchSize, mapper, eagerDispose));
.flatMap(con -> create(con, sql, parameterGroups, fetchSize, mapper, eagerDispose, queryTimeoutSec));
}

static <T> Flowable<T> create(Connection con, String sql,
Flowable<List<Object>> parameterGroups, int fetchSize,
Function<? super ResultSet, T> mapper, boolean eagerDispose) {
Flowable<List<Object>> parameterGroups, int fetchSize,
Function<? super ResultSet, T> mapper, boolean eagerDispose, int queryTimeoutSec) {
log.debug("Select.create called with con={}", con);
Callable<NamedPreparedStatement> initialState = () -> Util.prepare(con, fetchSize, sql);
Callable<NamedPreparedStatement> initialState = () -> Util.prepare(con, fetchSize, sql, queryTimeoutSec);
Function<NamedPreparedStatement, Flowable<T>> observableFactory = ps -> parameterGroups
.flatMap(parameters -> create(ps.ps, parameters, mapper, ps.names, sql, fetchSize),
.flatMap(parameters -> create(ps.ps, parameters, mapper, ps.names, sql, fetchSize, queryTimeoutSec),
true, 1);
Consumer<NamedPreparedStatement> disposer = Util::closePreparedStatementAndConnection;
return Flowable.using(initialState, observableFactory, disposer, eagerDispose);
}

private static <T> Flowable<? extends T> create(PreparedStatement ps, List<Object> parameters,
Function<? super ResultSet, T> mapper, List<String> names, String sql, int fetchSize) {
Function<? super ResultSet, T> mapper, List<String> names, String sql, int fetchSize, int queryTimeoutSec) {
log.debug("parameters={}", parameters);
log.debug("names={}", names);

Expand All @@ -56,7 +56,7 @@ private static <T> Flowable<? extends T> create(PreparedStatement ps, List<Objec
if (hasCollection) {
// create a new prepared statement with the collection ? substituted with
// ?s to match the size of the collection parameter
ps2 = Util.prepare(ps.getConnection(), fetchSize, sql, params);
ps2 = Util.prepare(ps.getConnection(), fetchSize, sql, params, queryTimeoutSec);
// now wrap the rs to auto close ps2 because it is single use (the next
// collection parameter may have a different ordinality so we need to build
// a new PreparedStatement with a different number of question marks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public final class SelectBuilder extends ParametersBuilder<SelectBuilder>
private final Database db;

int fetchSize = 0; // default
int queryTimeoutSec = Util.QUERY_TIMEOUT_NOT_SET; //default
private Flowable<?> dependsOn;

SelectBuilder(String sql, Single<Connection> connection, Database db) {
Expand All @@ -43,6 +44,12 @@ public SelectBuilder fetchSize(int size) {
return this;
}

public SelectBuilder queryTimeoutSec(int timeoutSec) {
Preconditions.checkArgument(timeoutSec >= 0);
this.queryTimeoutSec = timeoutSec;
return this;
}

public TransactedSelectBuilder transacted() {
return new TransactedSelectBuilder(this, db);
}
Expand All @@ -55,7 +62,7 @@ public TransactedSelectBuilder transactedValuesOnly() {
public <T> Flowable<T> get(@Nonnull ResultSetMapper<? extends T> mapper) {
Preconditions.checkNotNull(mapper, "mapper cannot be null");
Flowable<List<Object>> pg = super.parameterGroupsToFlowable();
Flowable<T> f = Select.<T>create(connection, pg, sql, fetchSize, mapper, true);
Flowable<T> f = Select.<T>create(connection, pg, sql, fetchSize, mapper, true, queryTimeoutSec);
if (dependsOn != null) {
return dependsOn.ignoreElements().andThen(f);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ private static <T> Flowable<Tx<T>> createFlowable(SelectAutomappedBuilder<T> sb,
sb.selectBuilder.sql, //
sb.selectBuilder.fetchSize, //
Util.autoMap(sb.cls), //
false) //
false, //
sb.selectBuilder.queryTimeoutSec) //
.materialize() //
.flatMap(n -> Tx.toTx(n, connection.get(), db)) //
.doOnNext(tx -> {
Expand All @@ -107,4 +108,4 @@ private static <T> Flowable<Tx<T>> createFlowable(SelectAutomappedBuilder<T> sb,
});
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ private static <T> Flowable<Tx<T>> createFlowable(SelectBuilder sb,
sb.sql, //
sb.fetchSize, //
mapper, //
false) //
false, //
sb.queryTimeoutSec) //
.materialize() //
.flatMap(n -> Tx.toTx(n, connection.get(), db)) //
.doOnNext(tx -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ private static Flowable<Tx<Integer>> createFlowable(UpdateBuilder ub, Database d
ub.parameterGroupsToFlowable(), //
ub.sql, //
ub.batchSize, //
false) //
false, //
ub.queryTimeoutSec) //
.flatMap(n -> Tx.toTx(n, connection.get(), db)) //
.doOnNext(tx -> {
t[0] = ((TxImpl<Integer>) tx);
Expand Down
12 changes: 6 additions & 6 deletions rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Update.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ private Update() {
}

static Flowable<Notification<Integer>> create(Single<Connection> connection,
Flowable<List<Object>> parameterGroups, String sql, int batchSize,
boolean eagerDispose) {
Flowable<List<Object>> parameterGroups, String sql, int batchSize,
boolean eagerDispose, int queryTimeoutSec) {
return connection //
.toFlowable() //
.flatMap(con -> create(con, sql, parameterGroups, batchSize, eagerDispose), true,
.flatMap(con -> create(con, sql, parameterGroups, batchSize, eagerDispose, queryTimeoutSec), true,
1);
}

private static Flowable<Notification<Integer>> create(Connection con, String sql,
Flowable<List<Object>> parameterGroups, int batchSize, boolean eagerDispose) {
Flowable<List<Object>> parameterGroups, int batchSize, boolean eagerDispose, int queryTimeoutSec) {
log.debug("Update.create {}", sql);
Callable<NamedPreparedStatement> resourceFactory = () -> Util.prepare(con, sql);
Callable<NamedPreparedStatement> resourceFactory = () -> Util.prepare(con, sql, queryTimeoutSec);
final Function<NamedPreparedStatement, Flowable<Notification<Integer>>> flowableFactory;
if (batchSize == 0) {
flowableFactory = ps -> parameterGroups //
Expand Down Expand Up @@ -101,7 +101,7 @@ private static Single<Integer> create(NamedPreparedStatement ps, List<Parameter>
if (hasCollection) {
// create a new prepared statement with the collection ? substituted with
// ?s to match the size of the collection parameter
ps2 = Util.prepare(ps.ps.getConnection(), 0, sql, params);
ps2 = Util.prepare(ps.ps.getConnection(), 0, sql, params, ps.ps.getQueryTimeout());
} else {
ps2 = ps.ps;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public final class UpdateBuilder extends ParametersBuilder<UpdateBuilder> implem
private final Database db;
Flowable<?> dependsOn;
int batchSize = DEFAULT_BATCH_SIZE;
int queryTimeoutSec = Util.QUERY_TIMEOUT_NOT_SET;

UpdateBuilder(String sql, Single<Connection> connections, Database db) {
super(sql);
Expand All @@ -40,6 +41,12 @@ public UpdateBuilder batchSize(int batchSize) {
return this;
}

public UpdateBuilder queryTimeoutSec(int queryTimeoutSec) {
Preconditions.checkArgument(queryTimeoutSec >= 0);
this.queryTimeoutSec = queryTimeoutSec;
return this;
}

/**
* Returns a builder used to specify how to process the generated keys
* {@link ResultSet}. Not all jdbc drivers support this functionality and
Expand All @@ -56,7 +63,7 @@ public ReturnGeneratedKeysBuilder returnGeneratedKeys() {

public Flowable<Integer> counts() {
return startWithDependency(
Update.create(connections, super.parameterGroupsToFlowable(), sql, batchSize, true).dematerialize());
Update.create(connections, super.parameterGroupsToFlowable(), sql, batchSize, true, queryTimeoutSec).dematerialize());
}

<T> Flowable<T> startWithDependency(@Nonnull Flowable<T> f) {
Expand Down
23 changes: 14 additions & 9 deletions rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public enum Util {
;

private static final Logger log = LoggerFactory.getLogger(Util.class);
public static final int QUERY_TIMEOUT_NOT_SET = -1;

/**
* Sets parameters for the {@link PreparedStatement}.
Expand Down Expand Up @@ -287,39 +288,43 @@ static void closeCallableStatementAndConnection(NamedCallableStatement stmt) {
closePreparedStatementAndConnection(stmt.stmt);
}

static NamedPreparedStatement prepare(Connection con, String sql) throws SQLException {
return prepare(con, 0, sql);
static NamedPreparedStatement prepare(Connection con, String sql, int queryTimeoutSec) throws SQLException {
return prepare(con, 0, sql, queryTimeoutSec);
}

static NamedPreparedStatement prepare(Connection con, int fetchSize, String sql) throws SQLException {
static NamedPreparedStatement prepare(Connection con, int fetchSize, String sql, int queryTimeoutSec) throws SQLException {
// TODO can we parse SqlInfo through because already calculated by
// builder?
SqlInfo info = SqlInfo.parse(sql);
log.debug("preparing statement: {}", sql);
return prepare(con, fetchSize, info);
return prepare(con, fetchSize, info, queryTimeoutSec);
}

static PreparedStatement prepare(Connection connection, int fetchSize, String sql, List<Parameter> parameters)
static PreparedStatement prepare(Connection connection, int fetchSize, String sql, List<Parameter> parameters, int queryTimeoutSec)
throws SQLException {
// should only get here when parameters contains a collection
SqlInfo info = SqlInfo.parse(sql, parameters);
log.debug("preparing statement: {}", info.sql());
return createPreparedStatement(connection, fetchSize, info);
return createPreparedStatement(connection, fetchSize, info, queryTimeoutSec);
}

private static NamedPreparedStatement prepare(Connection con, int fetchSize, SqlInfo info) throws SQLException {
PreparedStatement ps = createPreparedStatement(con, fetchSize, info);
private static NamedPreparedStatement prepare(Connection con, int fetchSize, SqlInfo info, int queryTimeoutSec) throws SQLException {
PreparedStatement ps = createPreparedStatement(con, fetchSize, info, queryTimeoutSec);
return new NamedPreparedStatement(ps, info.names());
}

private static PreparedStatement createPreparedStatement(Connection con, int fetchSize, SqlInfo info)
private static PreparedStatement createPreparedStatement(Connection con, int fetchSize, SqlInfo info, int queryTimeoutSec)
throws SQLException {
PreparedStatement ps = null;
try {
ps = con.prepareStatement(info.sql(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
if (fetchSize > 0) {
ps.setFetchSize(fetchSize);
}

if (queryTimeoutSec != QUERY_TIMEOUT_NOT_SET) {
ps.setQueryTimeout(queryTimeoutSec);
}
} catch (RuntimeException | SQLException e) {
if (ps != null) {
ps.close();
Expand Down

0 comments on commit 6d3dc93

Please sign in to comment.