Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use DML batches in executemany() method #412

Merged
merged 25 commits into from Aug 9, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 27 additions & 0 deletions google/cloud/spanner_dbapi/connection.py
Expand Up @@ -63,6 +63,7 @@ def __init__(self, instance, database):

self.is_closed = False
self._autocommit = False
self._use_mutations = False
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
# indicator to know if the session pool used by
# this connection should be cleared on the
# connection close
Expand Down Expand Up @@ -121,6 +122,32 @@ def instance(self):
"""
return self._instance

@property
def use_mutations(self):
"""
Flag of the connection mode in which mutations
are used for massive DML statements.

Returns:
bool: True if mutations mode is enable, False otherwise.
"""
return self._use_mutations

@use_mutations.setter
def use_mutations(self, value):
"""Change mutations use mode.

Mutations are used by default in autocommit
mode and they can be used in transactions
mode in case of this flag set to True.

Args:
value (bool): New flag value.
"""
self._use_mutations = value
if value:
self.autocommit = False

def _session_checkout(self):
"""Get a Cloud Spanner session from the pool.

Expand Down
28 changes: 25 additions & 3 deletions google/cloud/spanner_dbapi/cursor.py
Expand Up @@ -38,6 +38,7 @@
from google.cloud.spanner_dbapi import parse_utils
from google.cloud.spanner_dbapi.parse_utils import get_param_types
from google.cloud.spanner_dbapi.parse_utils import sql_pyformat_args_to_spanner
from google.cloud.spanner_dbapi.parse_utils import RE_INSERT
from google.cloud.spanner_dbapi.utils import PeekIterator
from google.cloud.spanner_dbapi.utils import StreamedManyResultSets

Expand Down Expand Up @@ -258,9 +259,30 @@ def executemany(self, operation, seq_of_params):

many_result_set = StreamedManyResultSets()

for params in seq_of_params:
self.execute(operation, params)
many_result_set.add_iter(self._itr)
if self.connection.autocommit or self.connection.use_mutations:
if classification == parse_utils.STMT_INSERT:
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
match = RE_INSERT.search(operation)

table_name = match["table_name"].strip("`")

cols = []
for col in match["columns"].split(","):
col = col.strip()

if col[0] == '"' and col[-1] == '"':
col = col[1:-1]

col = col.strip("`")
cols.append(col)

transaction = self.connection.transaction_checkout()
transaction.insert(
table=table_name, columns=cols, values=seq_of_params,
)
else:
for params in seq_of_params:
self.execute(operation, params)
many_result_set.add_iter(self._itr)

self._result_set = many_result_set
self._itr = many_result_set
Expand Down
62 changes: 62 additions & 0 deletions tests/unit/spanner_dbapi/test_cursor.py
Expand Up @@ -337,6 +337,68 @@ def test_executemany(self):
(mock.call(operation, (1,)), mock.call(operation, (2,)))
)

def test_executemany_insert_autocommit(self):
from google.cloud.spanner_dbapi import connect

sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)"""

with mock.patch(
"google.cloud.spanner_v1.instance.Instance.exists", return_value=True
):
with mock.patch(
"google.cloud.spanner_v1.database.Database.exists", return_value=True
):
connection = connect("test-instance", "test-database")

connection.autocommit = True
cursor = connection.cursor()
transact_mock = mock.Mock()
transact_mock.insert = mock.Mock()

with mock.patch(
"google.cloud.spanner_dbapi.connection.Connection.transaction_checkout",
return_value=transact_mock,
):
cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)])

transact_mock.insert.assert_called_once_with(
table="table",
columns=["col1", "col2", "col3", '"col4"'],
values=[(1, 2, 3, 4), (5, 6, 7, 8)],
)

def test_executemany_insert_use_mutations(self):
from google.cloud.spanner_dbapi import connect

sql = """INSERT INTO table (col1, "col2", `col3`, `"col4"`) VALUES (%s, %s, %s, %s)"""

with mock.patch(
"google.cloud.spanner_v1.instance.Instance.exists", return_value=True
):
with mock.patch(
"google.cloud.spanner_v1.database.Database.exists", return_value=True,
):
connection = connect("test-instance", "test-database")

connection.use_mutations = True
cursor = connection.cursor()
transact_mock = mock.Mock()
transact_mock.insert = mock.Mock()

self.assertFalse(connection.autocommit)

with mock.patch(
"google.cloud.spanner_dbapi.connection.Connection.transaction_checkout",
return_value=transact_mock,
):
cursor.executemany(sql, [(1, 2, 3, 4), (5, 6, 7, 8)])

transact_mock.insert.assert_called_once_with(
table="table",
columns=["col1", "col2", "col3", '"col4"'],
values=[(1, 2, 3, 4), (5, 6, 7, 8)],
)

@unittest.skipIf(
sys.version_info[0] < 3, "Python 2 has an outdated iterator definition"
)
Expand Down