Skip to content

Commit

Permalink
bug fixed for DruidDataSource.close.
Browse files Browse the repository at this point in the history
  • Loading branch information
wenshao committed Aug 21, 2016
1 parent c9e9aab commit 35d00ec
Showing 1 changed file with 57 additions and 62 deletions.
119 changes: 57 additions & 62 deletions src/main/java/com/alibaba/druid/pool/DruidDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,7 @@
* @author ljw [ljw2083@alibaba-inc.com]
* @author wenshao [szujobs@hotmail.com]
*/
public class DruidDataSource extends DruidAbstractDataSource
implements DruidDataSourceMBean
, ManagedDataSource
, Referenceable
, Closeable
, Cloneable
, ConnectionPoolDataSource
, MBeanRegistration {
public class DruidDataSource extends DruidAbstractDataSource implements DruidDataSourceMBean, ManagedDataSource, Referenceable, Closeable, Cloneable, ConnectionPoolDataSource, MBeanRegistration {

private final static Log LOG = LogFactory.getLog(DruidDataSource.class);

Expand Down Expand Up @@ -167,7 +160,7 @@ public class DruidDataSource extends DruidAbstractDataSource
public static ThreadLocal<Long> waitNanosLocal = new ThreadLocal<Long>();

private boolean logDifferentThread = true;

public DruidDataSource(){
this(false);
}
Expand Down Expand Up @@ -590,7 +583,7 @@ public void init() throws SQLException {
this.jdbcUrl = this.jdbcUrl.trim();
initFromWrapDriverUrl();
}

for (Filter filter : filters) {
filter.init(this);
}
Expand Down Expand Up @@ -627,7 +620,7 @@ public void init() throws SQLException {
if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) {
throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true");
}

if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis) {
throw new SQLException("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis");
}
Expand Down Expand Up @@ -942,13 +935,13 @@ private void db2ValidationQueryCheck() {
private void initValidConnectionChecker() {
String realDriverClassName = driver.getClass().getName();
if (realDriverClassName.equals(JdbcConstants.MYSQL_DRIVER) //
|| realDriverClassName.equals(JdbcConstants.MYSQL_DRIVER_6)) {
|| realDriverClassName.equals(JdbcConstants.MYSQL_DRIVER_6)) {
this.validConnectionChecker = new MySqlValidConnectionChecker();
} else if (realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER)) {
this.validConnectionChecker = new OracleValidConnectionChecker();
} else if (realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER)
||realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_SQLJDBC4)
||realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_JTDS)) {
|| realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_SQLJDBC4)
|| realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_JTDS)) {
this.validConnectionChecker = new MSSQLValidConnectionChecker();
} else if (realDriverClassName.equals(JdbcConstants.POSTGRESQL_DRIVER)) {
this.validConnectionChecker = new PGValidConnectionChecker();
Expand All @@ -966,7 +959,7 @@ private void initExceptionSorter() {

String realDriverClassName = driver.getClass().getName();
if (realDriverClassName.equals(JdbcConstants.MYSQL_DRIVER) //
|| realDriverClassName.equals(JdbcConstants.MYSQL_DRIVER_6)) {
|| realDriverClassName.equals(JdbcConstants.MYSQL_DRIVER_6)) {
this.exceptionSorter = new MySqlExceptionSorter();
} else if (realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER)) {
this.exceptionSorter = new OracleExceptionSorter();
Expand All @@ -978,7 +971,7 @@ private void initExceptionSorter() {

} else if (realDriverClassName.equals(JdbcConstants.POSTGRESQL_DRIVER)) {
this.exceptionSorter = new PGExceptionSorter();

} else if (realDriverClassName.equals("com.alibaba.druid.mock.MockDriver")) {
this.exceptionSorter = new MockExceptionSorter();
} else if (realDriverClassName.contains("DB2")) {
Expand Down Expand Up @@ -1172,9 +1165,9 @@ private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLExce

StringBuilder buf = new StringBuilder();
buf.append("wait millis ")//
.append(waitNanos / (1000 * 1000))//
.append(", active " + activeCount)//
.append(", maxActive " + maxActive)//
.append(waitNanos / (1000 * 1000))//
.append(", active " + activeCount)//
.append(", maxActive " + maxActive)//
;

List<JdbcSqlStatValue> sqlList = this.getDataSourceStat().getRuningSqlList();
Expand Down Expand Up @@ -1241,12 +1234,12 @@ public void handleConnectionException(DruidPooledConnection pooledConnection, Th
requireDiscard = true;
}
}

if (requireDiscard) {
this.discardConnection(holder.getConnection());
holder.setDiscard(true);
}

LOG.error("discard connection", sqlEx);
}

Expand Down Expand Up @@ -1309,7 +1302,7 @@ protected void recycle(DruidPooledConnection pooledConnection) throws SQLExcepti
} else {
holder.reset();
}

if (holder.isDiscard()) {
return;
}
Expand Down Expand Up @@ -1349,7 +1342,7 @@ protected void recycle(DruidPooledConnection pooledConnection) throws SQLExcepti
} finally {
lock.unlock();
}

if (!result) {
JdbcUtils.close(holder.getConnection());
LOG.info("connection recyle failed.");
Expand Down Expand Up @@ -1414,21 +1407,21 @@ public void close() {
}

for (int i = 0; i < poolingCount; ++i) {
try {
DruidConnectionHolder connHolder = connections[i];
DruidConnectionHolder connHolder = connections[i];

for (PreparedStatementHolder stmtHolder : connHolder.getStatementPool().getMap().values()) {
connHolder.getStatementPool().closeRemovedStatement(stmtHolder);
}
connHolder.getStatementPool().getMap().clear();
for (PreparedStatementHolder stmtHolder : connHolder.getStatementPool().getMap().values()) {
connHolder.getStatementPool().closeRemovedStatement(stmtHolder);
}
connHolder.getStatementPool().getMap().clear();

Connection physicalConnection = connHolder.getConnection();
Connection physicalConnection = connHolder.getConnection();
try {
physicalConnection.close();
connections[i] = null;
destroyCount.incrementAndGet();
} catch (Exception ex) {
LOG.warn("close connection error", ex);
}
connections[i] = null;
destroyCount.incrementAndGet();
}
poolingCount = 0;
unregisterMbean();
Expand Down Expand Up @@ -1492,7 +1485,7 @@ boolean putLast(DruidConnectionHolder e, long lastActiveTimeMillis) {
if (poolingCount >= maxActive) {
return false;
}

e.setLastActiveTimeMillis(lastActiveTimeMillis);
connections[poolingCount] = e;
incrementPoolingCount();
Expand All @@ -1504,19 +1497,19 @@ boolean putLast(DruidConnectionHolder e, long lastActiveTimeMillis) {

notEmpty.signal();
notEmptySignalCount++;

return true;
}

DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
try {
while (poolingCount == 0) {
emptySignal(); // send signal to CreateThread create connection

if (failFast && failContinuous.get()) {
throw new DataSourceNotAvailableException(createError);
}

notEmptyWaitThreadCount++;
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
Expand All @@ -1527,7 +1520,7 @@ DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
notEmptyWaitThreadCount--;
}
notEmptyWaitCount++;

if (!enable) {
connectErrorCount.incrementAndGet();
throw new DataSourceDisableException();
Expand All @@ -1552,7 +1545,7 @@ private DruidConnectionHolder pollLast(long nanos) throws InterruptedException,
for (;;) {
if (poolingCount == 0) {
emptySignal(); // send signal to CreateThread create connection

if (failFast && failContinuous.get()) {
throw new DataSourceNotAvailableException(createError);
}
Expand Down Expand Up @@ -1600,7 +1593,7 @@ private DruidConnectionHolder pollLast(long nanos) throws InterruptedException,
decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;

long waitNanos = nanos - estimate;
last.setLastNotEmptyWaitNanos(waitNanos);

Expand Down Expand Up @@ -1843,7 +1836,7 @@ protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) {
} finally {
lock.unlock();
}

return true;
}

Expand All @@ -1855,26 +1848,26 @@ public class CreateConnectionTask implements Runnable {
public void run() {
runInternal();
}

private void runInternal() {
for (;;) {

// addLast
lock.lock();
try {
boolean emptyWait = true;

if (createError != null && poolingCount == 0) {
emptyWait = false;
}

if (emptyWait) {
// 必须存在线程等待,才创建连接
if (poolingCount >= notEmptyWaitThreadCount) {
createTaskCount--;
return;
}

// 防止创建超过maxActive数量的连接
if (activeCount + poolingCount >= maxActive) {
createTaskCount--;
Expand All @@ -1884,7 +1877,7 @@ private void runInternal() {
} finally {
lock.unlock();
}

PhysicalConnectionInfo physicalConnection = null;

try {
Expand All @@ -1905,7 +1898,7 @@ private void runInternal() {
lock.unlock();
}
}

if (breakAfterAcquireFailure) {
lock.lock();
try {
Expand Down Expand Up @@ -1973,17 +1966,17 @@ public void run() {

try {
boolean emptyWait = true;

if (createError != null && poolingCount == 0) {
emptyWait = false;
}

if (emptyWait) {
// 必须存在线程等待,才创建连接
if (poolingCount >= notEmptyWaitThreadCount) {
empty.await();
}

// 防止创建超过maxActive数量的连接
if (activeCount + poolingCount >= maxActive) {
empty.await();
Expand All @@ -2005,7 +1998,8 @@ public void run() {
connection = createPhysicalConnection();
setFailContinuous(false);
} catch (SQLException e) {
LOG.error("create connection error, url: " + jdbcUrl + ", errorCode " + e.getErrorCode() + ", state " + e.getSQLState(), e);
LOG.error("create connection error, url: " + jdbcUrl + ", errorCode " + e.getErrorCode()
+ ", state " + e.getSQLState(), e);

errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
Expand All @@ -2019,7 +2013,7 @@ public void run() {
lock.unlock();
}
}

if (breakAfterAcquireFailure) {
break;
}
Expand Down Expand Up @@ -2162,7 +2156,7 @@ public int removeAbandoned() {
continue;
}
}

JdbcUtils.close(pooledConnection);
pooledConnection.abandond();
removeAbandonedCount++;
Expand All @@ -2183,7 +2177,8 @@ public int removeAbandoned() {
buf.append("\n");
}

buf.append("ownerThread current state is "+pooledConnection.getOwnerThread().getState() + ", current stackTrace\n");
buf.append("ownerThread current state is " + pooledConnection.getOwnerThread().getState()
+ ", current stackTrace\n");
trace = pooledConnection.getOwnerThread().getStackTrace();
for (int i = 0; i < trace.length; i++) {
buf.append("\tat ");
Expand Down Expand Up @@ -2275,13 +2270,13 @@ public void shrink(boolean checkTime) {
continue;
}
}

long idleMillis = currentTimeMillis - connection.getLastActiveTimeMillis();

if (idleMillis < minEvictableIdleTimeMillis) {
break;
}

if (checkTime && i < checkCount) {
evictList.add(connection);
} else if (idleMillis > maxEvictableIdleTimeMillis) {
Expand Down Expand Up @@ -2916,7 +2911,7 @@ private void emptySignal() {
CreateConnectionTask task = new CreateConnectionTask();
createScheduler.submit(task);
}

@Override
public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception {
if (server != null) {
Expand All @@ -2933,19 +2928,19 @@ public ObjectName preRegister(MBeanServer server, ObjectName name) throws Except

@Override
public void postRegister(Boolean registrationDone) {

}

@Override
public void preDeregister() throws Exception {

}

@Override
public void postDeregister() {

}

public boolean isClosed() {
return this.closed;
}
Expand Down

0 comments on commit 35d00ec

Please sign in to comment.