Skip to content

Commit

Permalink
connection pool add debug_connection_used, it will print connection l…
Browse files Browse the repository at this point in the history
…ast query sql when connection not release to poll
  • Loading branch information
snower committed Feb 16, 2017
1 parent 462f81f commit 5d6be50
Showing 1 changed file with 42 additions and 3 deletions.
45 changes: 42 additions & 3 deletions tormysql/pool.py
Expand Up @@ -11,6 +11,7 @@
from collections import deque
from tornado.concurrent import Future
from tornado.ioloop import IOLoop
from pymysql._compat import text_type
from .client import Client


Expand Down Expand Up @@ -65,11 +66,43 @@ def do_close(self):
return super(Connection, self).close()


class RecordQueryConnection(Connection):
def __init__(self, *args, **kwargs):
super(RecordQueryConnection, self).__init__(*args, **kwargs)

self._last_query_sql = ""

def connect(self):
future = super(RecordQueryConnection, self).connect()

origin_query = self._connection.query
def query(sql, unbuffered=False):
self._last_query_sql = sql
return origin_query(sql, unbuffered)
self._connection.query = query

return future

def query(self, sql, unbuffered=False):
self._last_query_sql = sql
return super(RecordQueryConnection, self).query(sql, unbuffered)

def get_last_query_sql(self):
if isinstance(self._last_query_sql, text_type):
return self._last_query_sql.encode("utf-8")
return self._last_query_sql


class ConnectionPool(object):
def __init__(self, *args, **kwargs):
self._max_connections = kwargs.pop("max_connections") if "max_connections" in kwargs else 32
self._idle_seconds = kwargs.pop("idle_seconds") if "idle_seconds" in kwargs else 7200
self._wait_connection_timeout = kwargs.pop("wait_connection_timeout") if "wait_connection_timeout" in kwargs else 8
self._debug_connection_used = kwargs.pop("debug_connection_used") if "debug_connection_used" in kwargs else False
if self._debug_connection_used:
self._connection_cls = RecordQueryConnection
else:
self._connection_cls = Connection
self._args = args
self._kwargs = kwargs
self._connections = deque(maxlen = self._max_connections)
Expand Down Expand Up @@ -109,7 +142,7 @@ def connection_connected_callback(self, future, connection_future):
IOLoop.current().add_callback(self.init_connection, wait_future)

def init_connection(self, future):
connection = Connection(self, *self._args, **self._kwargs)
connection = self._connection_cls(self, *self._args, **self._kwargs)
connection.set_close_callback(self.connection_close_callback)
connection_future = connection.connect()
self._connections_count += 1
Expand Down Expand Up @@ -265,9 +298,15 @@ def check_idle_connections(self):
for connection in self._used_connections.values():
if now - connection.used_time > (self._wait_connection_timeout * 4) ** 2:
connection.do_close()
logging.error("Connection used timeout close, used time %.2fs %s %s.", now - connection.used_time, connection, self)
if self._debug_connection_used:
logging.error("Connection used timeout close, used time %.2fs %s %s.\n%s", now - connection.used_time, connection, self, connection.get_last_query_sql())
else:
logging.error("Connection used timeout close, used time %.2fs %s %s.", now - connection.used_time, connection, self)
elif now - connection.used_time > self._wait_connection_timeout ** 2 * 2:
logging.warning("Connection maybe not release, used time %.2fs %s %s.", now - connection.used_time, connection, self)
if self._debug_connection_used:
logging.warning("Connection maybe not release, used time %.2fs %s %s.\n%s", now - connection.used_time, connection, self, connection.get_last_query_sql())
else:
logging.warning("Connection maybe not release, used time %.2fs %s %s.", now - connection.used_time, connection, self)

next_check_time = now + self._idle_seconds
for connection in tuple(self._connections):
Expand Down

0 comments on commit 5d6be50

Please sign in to comment.