Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
snower committed Mar 12, 2018
1 parent dd8ce76 commit 7558ae9
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 76 deletions.
6 changes: 5 additions & 1 deletion run-tests
Expand Up @@ -2,7 +2,11 @@

export LOG_LEVEL=debug

exec nosetests \
for file in ./tests/*.py
do
echo $file
exec nosetests $file \
--logging-format='%(asctime)s [%(name)s] %(levelname)-6s %(message)s' \
--with-coverage \
--cover-package=tormysql
done
5 changes: 1 addition & 4 deletions tests/__init__.py
Expand Up @@ -30,7 +30,4 @@ def setUp(self):
def tearDown(self):
if not self.pool.closed:
self.pool.close()
super(BaseTestCase, self).tearDown()

def get_new_ioloop(self):
return IOLoop.current()
super(BaseTestCase, self).tearDown()
14 changes: 4 additions & 10 deletions tests/test_asyncio.py
Expand Up @@ -8,14 +8,17 @@
from tormysql.platform import use_asyncio
use_asyncio()
except:
pass
asyncio = None

from tornado.ioloop import IOLoop
from tormysql.cursor import SSCursor
from tormysql.helpers import ConnectionPool
from tornado.testing import AsyncTestCase
from tornado.testing import gen_test
from tornado.test.util import unittest
from tormysql.util import py3

@unittest.skipIf(asyncio is None, "asyncio module not present")
class TestAsyncioCase(AsyncTestCase):
PARAMS = dict(
host=os.getenv("MYSQL_HOST", "127.0.0.1"),
Expand All @@ -41,15 +44,6 @@ def tearDown(self):
super(TestAsyncioCase, self).tearDown()
self.pool.close()

def get_new_ioloop(self):
try:
import asyncio
from tornado.platform.asyncio import AsyncIOMainLoop
AsyncIOMainLoop().install()
return IOLoop.current()
except:
return IOLoop.current()

if py3:
exec("""
@gen_test
Expand Down
15 changes: 11 additions & 4 deletions tests/test_charset.py
@@ -1,5 +1,7 @@
#!/usr/bin/env python
# encoding: utf-8

from tornado import gen
from tornado.testing import gen_test
from . import BaseTestCase

Expand All @@ -12,8 +14,8 @@ class TestLocale(BaseTestCase):
'utf8mb4', 'cp1251', 'cp1256', 'cp1257', 'cp932'
]

@gen_test
def test1(self):
@gen.coroutine
def _execute_test1(self):
for charset in self.CHARSETS:
with (yield self.pool.Connection()) as connection:
yield connection.set_charset(charset)
Expand All @@ -23,8 +25,13 @@ def test1(self):
data = cursor.fetchone()
self.assertEqual(data[1], charset)

@gen_test
def test_escape(self):
@gen.coroutine
def _execute_test_escape(self):
with (yield self.pool.Connection()) as connection:
s = connection.escape(r'"')
self.assertEqual(s, '\'\\"\'')

@gen_test
def test(self):
yield self._execute_test1()
yield self._execute_test_escape()
19 changes: 11 additions & 8 deletions tests/test_helpers.py
Expand Up @@ -3,6 +3,7 @@
# create by: snower

import os
from tornado import gen
from tornado.ioloop import IOLoop
from tormysql.helpers import ConnectionPool
from tornado.testing import AsyncTestCase
Expand Down Expand Up @@ -33,21 +34,23 @@ def tearDown(self):
super(TestHelpersCase, self).tearDown()
self.pool.close()

def get_new_ioloop(self):
return IOLoop.current()

@gen_test
def test_execute(self):
@gen.coroutine
def _execute_test_execute(self):
sql = "select 1 as test"
cursor = yield self.pool.execute(sql)
result = cursor.fetchone()
self.assertTrue(result == (1,))

@gen_test
def test_tx(self):
@gen.coroutine
def _execute_test_tx(self):
sql = "select 1 as test"
tx = yield self.pool.begin()
cursor = yield tx.execute(sql)
yield tx.commit()
result = cursor.fetchone()
self.assertTrue(result == (1,))
self.assertTrue(result == (1,))

@gen_test
def test(self):
yield self._execute_test_execute()
yield self._execute_test_tx()
59 changes: 35 additions & 24 deletions tests/test_unstable_connection.py
@@ -1,6 +1,7 @@
# encoding: utf-8
import socket
import os
from tornado import gen
from tornado.ioloop import IOLoop
from tormysql.pool import ConnectionNotFoundError
from pymysql import OperationalError
Expand All @@ -19,28 +20,20 @@ def setUp(self):
s = socket.socket()
s.bind(('127.0.0.1', 0))

_, self.port = s.getsockname()
self.host, self.port = self.PARAMS['host'], self.PARAMS['port']
_, self.pport = s.getsockname()
s.close()

def init_proxy(self):
self.proxy = ProxyServer(
self.PARAMS['host'],
self.PARAMS['port'],
self.host,
self.port,
session_factory=SessionFactory(),
io_loop=self.io_loop,
)

self.proxy.listen(self.port)
self.PARAMS['port'] = self.port
self.PARAMS['port'] = self.pport
self.PARAMS['host'] = '127.0.0.1'

def get_new_ioloop(self):
try:
import asyncio
from tornado.platform.asyncio import AsyncIOMainLoop
AsyncIOMainLoop().install()
return IOLoop.current()
except:
return IOLoop.current()
self.proxy.listen(self.pport)

def _close_proxy_sessions(self):
for sock in self.proxy.SessionsList:
Expand All @@ -50,26 +43,32 @@ def _close_proxy_sessions(self):
pass

def tearDown(self):
super(BaseTestCase, self).tearDown()
try:
self.proxy.stop()
except:
pass
super(BaseTestCase, self).tearDown()

@gen_test
def test_connection_closing(self):
@gen.coroutine
def _execute_test_connection_closing(self):
self.init_proxy()
connection = yield Connection(**self.PARAMS)
cursor = connection.cursor()
self._close_proxy_sessions()
try:
yield cursor.execute('SELECT 1')
yield cursor.close()
except OperationalError:
pass
else:
raise AssertionError("Unexpected normal situation")
self.proxy.stop()

@gen_test
def test_connection_closed(self):
@gen.coroutine
def _execute_test_connection_closed(self):
self.init_proxy()
conn = yield Connection(**self.PARAMS)
yield conn.close()
self.proxy.stop()

try:
Expand All @@ -79,15 +78,18 @@ def test_connection_closed(self):
else:
raise AssertionError("Unexpected normal situation")

@gen_test
def test_remote_closing(self):
@gen.coroutine
def _execute_test_remote_closing(self):
self.init_proxy()
pool = ConnectionPool(
max_connections=int(os.getenv("MYSQL_POOL", "5")),
idle_seconds=7200,
**self.PARAMS
)

try:
conn = yield pool.Connection()
yield conn.do_close()
self.proxy.stop()
yield pool.Connection()
except OperationalError:
Expand All @@ -97,8 +99,9 @@ def test_remote_closing(self):
finally:
yield pool.close()

@gen_test
def test_pool_closing(self):
@gen.coroutine
def _execute_test_pool_closing(self):
self.init_proxy()
pool = ConnectionPool(
max_connections=int(os.getenv("MYSQL_POOL", "5")),
idle_seconds=7200,
Expand All @@ -115,3 +118,11 @@ def test_pool_closing(self):
raise AssertionError("Unexpected normal situation")
finally:
yield pool.close()
self.proxy.stop()

@gen_test
def test(self):
yield self._execute_test_connection_closing()
yield self._execute_test_connection_closed()
yield self._execute_test_remote_closing()
yield self._execute_test_pool_closing()
15 changes: 11 additions & 4 deletions tests/test_warnings.py
@@ -1,19 +1,21 @@
# encoding: utf-8

import uuid
from tornado import gen
from tornado.testing import gen_test
from . import BaseTestCase


class TestWarnings(BaseTestCase):
@gen_test
def test0(self):
@gen.coroutine
def _execute_test0(self):
connection = yield self.pool.Connection()
warnings = yield connection.show_warnings()
connection.close()
self.assertEqual(warnings, (), "No warnings")

@gen_test
def test1(self):
@gen.coroutine
def _execute_test1(self):
name = uuid.uuid4().hex
sql = 'DROP TABLE IF EXISTS test_{name}'.format(name=name)
with (yield self.pool.Connection()) as connection:
Expand All @@ -22,3 +24,8 @@ def test1(self):

warnings = yield connection.show_warnings()
self.assertTrue(name in warnings[0][2])

@gen_test
def test(self):
yield self._execute_test0()
yield self._execute_test1()
51 changes: 33 additions & 18 deletions tests/test_with_with.py
@@ -1,42 +1,57 @@
#!/usr/bin/env python
# encoding: utf-8

import os
from tornado import gen
from tornado.testing import gen_test
from tormysql.cursor import SSDictCursor
from . import BaseTestCase

from tormysql import ConnectionPool

class TestWithWith(BaseTestCase):
@gen_test
def test1(self):
@gen.coroutine
def _execute_test1(self):
sql = "select * from test limit 1"
with (yield self.pool.Connection()) as connection:
with connection.cursor() as cursor:
yield cursor.execute(sql)
datas = cursor.fetchall()
self.assertTrue(bool(datas))

yield self.pool.close()

class TestAsyncCursor(BaseTestCase):
PARAMS = dict(
host=os.getenv("MYSQL_HOST", "127.0.0.1"),
user=os.getenv("MYSQL_USER", "root"),
passwd=os.getenv("MYSQL_PASSWD", ""),
db=os.getenv("MYSQL_DB", "test"),
charset=os.getenv("MYSQL_CHARSET", "utf8"),
no_delay=True,
sql_mode="REAL_AS_FLOAT",
init_command="SET max_join_size=DEFAULT",
cursorclass=SSDictCursor
)
def init_params(self):
self.PARAMS = dict(
host=os.getenv("MYSQL_HOST", "127.0.0.1"),
user=os.getenv("MYSQL_USER", "root"),
passwd=os.getenv("MYSQL_PASSWD", ""),
db=os.getenv("MYSQL_DB", "test"),
charset=os.getenv("MYSQL_CHARSET", "utf8"),
no_delay=True,
sql_mode="REAL_AS_FLOAT",
init_command="SET max_join_size=DEFAULT",
cursorclass=SSDictCursor
)

@gen_test
def test1(self):
self.pool = ConnectionPool(
max_connections=int(os.getenv("MYSQL_POOL", 5)),
idle_seconds=7200,
**self.PARAMS
)

@gen.coroutine
def _execute_test2(self):
self.init_params()
sql = "select 1 as test"
with (yield self.pool.Connection()) as connection:
cursor = connection.cursor()
yield cursor.execute(sql)
result = yield cursor.fetchone()
yield cursor.close()
self.assertTrue('test' in result)
self.assertEqual(result['test'], 1)
self.assertEqual(result['test'], 1)

@gen_test
def test(self):
yield self._execute_test1()
yield self._execute_test2()
6 changes: 4 additions & 2 deletions tormysql/client.py
Expand Up @@ -48,7 +48,7 @@ def set_close_callback(self, callback):
self._close_callback = callback

def close(self):
if self._connection._closed:
if self._closed:
return
return async_call_method(self._connection.close)

Expand Down Expand Up @@ -102,7 +102,9 @@ def __getattr__(self, name):
return getattr(self._connection, name)

def __del__(self):
self.close()
try:
self.close()
except: pass

def __enter__(self):
return self.cursor()
Expand Down

0 comments on commit 7558ae9

Please sign in to comment.