Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(db_api): add an ability to set ReadOnly/ReadWrite connection mode #475

Merged
merged 37 commits into from Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4d7684e
feat(db_api): add an ability to set ReadOnly/ReadWrite connection mode
Aug 2, 2021
990fe3e
update the error message
Aug 2, 2021
2381c6d
update another error message
Aug 2, 2021
9cec921
don't check error messages with regexes
Aug 2, 2021
d6783b8
don't commit/rollback ReadOnly transactions
Aug 5, 2021
131a6fc
clear the transaction
Aug 5, 2021
e1a3db2
add read-only transactions data visibility test
Aug 5, 2021
4bd65e7
Apply suggestions from code review
Aug 5, 2021
4186609
add conditions for edge cases
Aug 6, 2021
0f3e716
Merge branch 'read_only_transactions' of https://github.com/q-logic/p…
Aug 6, 2021
8332d49
don't calc checksum for read-only transactions
Aug 6, 2021
61fc8ad
Merge branch 'master' into read_only_transactions
Aug 6, 2021
0be92af
use Snapshot for reads
Aug 10, 2021
3a72694
update docstrings
Aug 10, 2021
00887ff
Merge branch 'master' into read_only_transactions
Aug 10, 2021
b8eefed
Merge branch 'master' into read_only_transactions
Aug 11, 2021
de0e47e
use multi-use snapshots in !autocommit mode
Aug 11, 2021
9f1896a
return the read_only docstring back, erase excess unit test
Aug 11, 2021
4422ad5
Merge branch 'master' into read_only_transactions
larkee Aug 16, 2021
f11ea8c
erase excess ifs
Aug 16, 2021
9bafc76
Merge branch 'read_only_transactions' of https://github.com/q-logic/p…
Aug 16, 2021
783951f
Merge branch 'master' into read_only_transactions
Aug 20, 2021
d807a2d
add additional check into the snapshot_checkout() method
Aug 23, 2021
76e7caf
Merge branch 'master' into read_only_transactions
Aug 26, 2021
c61f212
add new style system test
Sep 13, 2021
9118987
Merge branch 'read_only_transactions' of https://github.com/q-logic/p…
Sep 13, 2021
70fc3ac
resolve conflict
Sep 13, 2021
22e5e73
don't use error message regexes
Sep 13, 2021
1cdccbe
erase excess import
Sep 13, 2021
9bb0ebc
Merge branch 'master' into read_only_transactions
Sep 14, 2021
ac8c4b2
refactor
Sep 14, 2021
ae9cd00
Merge branch 'main' into read_only_transactions
larkee Sep 16, 2021
6973748
add unit test to check that read-only transactions are not retried
Sep 16, 2021
5dab169
Merge branch 'read_only_transactions' of https://github.com/q-logic/p…
Sep 16, 2021
5761c30
Merge branch 'main' into read_only_transactions
skuruppu Sep 21, 2021
afeb585
Merge branch 'main' into read_only_transactions
skuruppu Oct 5, 2021
1ccb387
Merge branch 'main' into read_only_transactions
Oct 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 69 additions & 3 deletions google/cloud/spanner_dbapi/connection.py
Expand Up @@ -21,6 +21,7 @@
from google.api_core.gapic_v1.client_info import ClientInfo
from google.cloud import spanner_v1 as spanner
from google.cloud.spanner_v1.session import _get_retry_delay
from google.cloud.spanner_v1.snapshot import Snapshot

from google.cloud.spanner_dbapi._helpers import _execute_insert_heterogenous
from google.cloud.spanner_dbapi._helpers import _execute_insert_homogenous
Expand Down Expand Up @@ -50,15 +51,31 @@ class Connection:

:type database: :class:`~google.cloud.spanner_v1.database.Database`
:param database: The database to which the connection is linked.

:type read_only: bool
:param read_only:
Flag to indicate that the connection may only execute queries and no update or DDL statements.
If True, the connection will use a single use read-only transaction with strong timestamp
bound for each new statement, and will immediately see any changes that have been committed by
any other transaction.
If autocommit is false, the connection will automatically start a new multi use read-only transaction
with strong timestamp bound when the first statement is executed. This read-only transaction will be
used for all subsequent statements until either commit() or rollback() is called on the connection. The
read-only transaction will read from a consistent snapshot of the database at the time that the
transaction started. This means that the transaction will not see any changes that have been
committed by other transactions since the start of the read-only transaction. Commit or rolling back
the read-only transaction is semantically the same, and only indicates that the read-only transaction
should end a that a new one should be started when the next statement is executed.
"""

def __init__(self, instance, database):
def __init__(self, instance, database, read_only=False):
self._instance = instance
self._database = database
self._ddl_statements = []

self._transaction = None
self._session = None
self._snapshot = None
# SQL statements, which were executed
# within the current transaction
self._statements = []
Expand All @@ -69,6 +86,7 @@ def __init__(self, instance, database):
# this connection should be cleared on the
# connection close
self._own_pool = True
self._read_only = read_only

@property
def autocommit(self):
Expand Down Expand Up @@ -123,6 +141,30 @@ def instance(self):
"""
return self._instance

@property
def read_only(self):
"""Flag: the connection can be used only for database reads.

Returns:
bool:
True if the connection may only be used for database reads.
"""
return self._read_only

@read_only.setter
def read_only(self, value):
"""`read_only` flag setter.

Args:
value (bool): True for ReadOnly mode, False for ReadWrite.
"""
if self.inside_transaction:
raise ValueError(
"Connection read/write mode can't be changed while a transaction is in progress. "
"Commit or rollback the current transaction and try again."
)
self._read_only = value
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved

def _session_checkout(self):
"""Get a Cloud Spanner session from the pool.

Expand Down Expand Up @@ -231,6 +273,22 @@ def transaction_checkout(self):

return self._transaction

def snapshot_checkout(self):
"""Get a Cloud Spanner snapshot.

Initiate a new multi-use snapshot, if there is no snapshot in
this connection yet. Return the existing one otherwise.

:rtype: :class:`google.cloud.spanner_v1.snapshot.Snapshot`
:returns: A Cloud Spanner snapshot object, ready to use.
"""
if self.read_only and not self.autocommit:
if not self._snapshot:
self._snapshot = Snapshot(self._session_checkout(), multi_use=True)
self._snapshot.begin()

return self._snapshot

def _raise_if_closed(self):
"""Helper to check the connection state before running a query.
Raises an exception if this connection is closed.
Expand Down Expand Up @@ -259,14 +317,18 @@ def commit(self):

This method is non-operational in autocommit mode.
"""
self._snapshot = None

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make more sense to add the read_only check here and exit early. If the connection is read only, then none of the code below should be relevant. WDYT?

This also applies to rollback().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The self._release_session()call would also be needed in the early exit branch in that case.

if self._autocommit:
warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2)
return

self.run_prior_DDL_statements()
if self.inside_transaction:
try:
self._transaction.commit()
if not self.read_only:
self._transaction.commit()

self._release_session()
self._statements = []
except Aborted:
Expand All @@ -279,10 +341,14 @@ def rollback(self):
This is a no-op if there is no active transaction or if the connection
is in autocommit mode.
"""
self._snapshot = None

if self._autocommit:
warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2)
elif self._transaction:
self._transaction.rollback()
if not self.read_only:
self._transaction.rollback()

self._release_session()
self._statements = []

Expand Down
86 changes: 47 additions & 39 deletions google/cloud/spanner_dbapi/cursor.py
Expand Up @@ -185,6 +185,10 @@ def execute(self, sql, args=None):

# Classify whether this is a read-only SQL statement.
try:
if self.connection.read_only:
self._handle_DQL(sql, args or None)
return

classification = parse_utils.classify_stmt(sql)
if classification == parse_utils.STMT_DDL:
ddl_statements = []
Expand Down Expand Up @@ -324,14 +328,15 @@ def fetchone(self):

try:
res = next(self)
if not self.connection.autocommit:
if not self.connection.autocommit and not self.connection.read_only:
self._checksum.consume_result(res)
return res
except StopIteration:
return
except Aborted:
self.connection.retry_transaction()
return self.fetchone()
if not self.connection.read_only:
self.connection.retry_transaction()
return self.fetchone()

def fetchall(self):
"""Fetch all (remaining) rows of a query result, returning them as
Expand All @@ -342,12 +347,13 @@ def fetchall(self):
res = []
try:
for row in self:
if not self.connection.autocommit:
if not self.connection.autocommit and not self.connection.read_only:
self._checksum.consume_result(row)
res.append(row)
except Aborted:
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
self.connection.retry_transaction()
return self.fetchall()
if not self.connection.read_only:
self.connection.retry_transaction()
return self.fetchall()

return res

Expand All @@ -371,14 +377,15 @@ def fetchmany(self, size=None):
for i in range(size):
try:
res = next(self)
if not self.connection.autocommit:
if not self.connection.autocommit and not self.connection.read_only:
self._checksum.consume_result(res)
items.append(res)
except StopIteration:
break
except Aborted:
self.connection.retry_transaction()
return self.fetchmany(size)
if not self.connection.read_only:
self.connection.retry_transaction()
return self.fetchmany(size)

return items

Expand All @@ -394,38 +401,39 @@ def setoutputsize(self, size, column=None):
"""A no-op, raising an error if the cursor or connection is closed."""
self._raise_if_closed()

def _handle_DQL_with_snapshot(self, snapshot, sql, params):
# Reference
# https://googleapis.dev/python/spanner/latest/session-api.html#google.cloud.spanner_v1.session.Session.execute_sql
sql, params = parse_utils.sql_pyformat_args_to_spanner(sql, params)
res = snapshot.execute_sql(
sql, params=params, param_types=get_param_types(params)
)
# Immediately using:
# iter(response)
# here, because this Spanner API doesn't provide
# easy mechanisms to detect when only a single item
# is returned or many, yet mixing results that
# are for .fetchone() with those that would result in
# many items returns a RuntimeError if .fetchone() is
# invoked and vice versa.
self._result_set = res
# Read the first element so that the StreamedResultSet can
# return the metadata after a DQL statement. See issue #155.
self._itr = PeekIterator(self._result_set)
# Unfortunately, Spanner doesn't seem to send back
# information about the number of rows available.
self._row_count = _UNSET_COUNT

def _handle_DQL(self, sql, params):
with self.connection.database.snapshot() as snapshot:
# Reference
# https://googleapis.dev/python/spanner/latest/session-api.html#google.cloud.spanner_v1.session.Session.execute_sql
sql, params = parse_utils.sql_pyformat_args_to_spanner(sql, params)
res = snapshot.execute_sql(
sql, params=params, param_types=get_param_types(params)
if self.connection.read_only and not self.connection.autocommit:
# initiate or use the existing multi-use snapshot
self._handle_DQL_with_snapshot(
self.connection.snapshot_checkout(), sql, params
)
if type(res) == int:
self._row_count = res
self._itr = None
else:
# Immediately using:
# iter(response)
# here, because this Spanner API doesn't provide
# easy mechanisms to detect when only a single item
# is returned or many, yet mixing results that
# are for .fetchone() with those that would result in
# many items returns a RuntimeError if .fetchone() is
# invoked and vice versa.
self._result_set = res
# Read the first element so that the StreamedResultSet can
# return the metadata after a DQL statement. See issue #155.
while True:
try:
self._itr = PeekIterator(self._result_set)
break
except Aborted:
self.connection.retry_transaction()
# Unfortunately, Spanner doesn't seem to send back
# information about the number of rows available.
self._row_count = _UNSET_COUNT
else:
# execute with single-use snapshot
with self.connection.database.snapshot() as snapshot:
self._handle_DQL_with_snapshot(snapshot, sql, params)
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved

def __enter__(self):
return self
Expand Down
26 changes: 25 additions & 1 deletion tests/system/test_dbapi.py
Expand Up @@ -19,9 +19,12 @@
import pytest

from google.cloud import spanner_v1
from google.cloud.spanner_dbapi.connection import connect, Connection
from google.cloud.spanner_dbapi.connection import connect
from google.cloud.spanner_dbapi.connection import Connection
from google.cloud.spanner_dbapi.exceptions import ProgrammingError
from . import _helpers


DATABASE_NAME = "dbapi-txn"

DDL_STATEMENTS = (
Expand Down Expand Up @@ -367,3 +370,24 @@ def test_user_agent(shared_instance, dbapi_database):
conn.instance._client._client_info.user_agent
== "dbapi/" + pkg_resources.get_distribution("google-cloud-spanner").version
)


def test_read_only(shared_instance, dbapi_database):
"""
Check that connection set to `read_only=True` uses
ReadOnly transactions.
"""
conn = Connection(shared_instance, dbapi_database, read_only=True)
cur = conn.cursor()

with pytest.raises(ProgrammingError):
cur.execute(
"""
UPDATE contacts
SET first_name = 'updated-first-name'
WHERE first_name = 'first-name'
"""
)

cur.execute("SELECT * FROM contacts")
conn.commit()
46 changes: 44 additions & 2 deletions tests/unit/spanner_dbapi/test_connection.py
Expand Up @@ -39,14 +39,14 @@ def _get_client_info(self):

return ClientInfo(user_agent=USER_AGENT)

def _make_connection(self):
def _make_connection(self, **kwargs):
from google.cloud.spanner_dbapi import Connection
from google.cloud.spanner_v1.instance import Instance

# We don't need a real Client object to test the constructor
instance = Instance(INSTANCE, client=None)
database = instance.database(DATABASE)
return Connection(instance, database)
return Connection(instance, database, **kwargs)

@mock.patch("google.cloud.spanner_dbapi.connection.Connection.commit")
def test_autocommit_setter_transaction_not_started(self, mock_commit):
Expand Down Expand Up @@ -105,6 +105,22 @@ def test_property_instance(self):
self.assertIsInstance(connection.instance, Instance)
self.assertEqual(connection.instance, connection._instance)

def test_read_only_connection(self):
connection = self._make_connection(read_only=True)
self.assertTrue(connection.read_only)

connection._transaction = mock.Mock(committed=False, rolled_back=False)
with self.assertRaisesRegex(
ValueError,
"Connection read/write mode can't be changed while a transaction is in progress. "
"Commit or rollback the current transaction and try again.",
):
connection.read_only = False

connection._transaction = None
connection.read_only = False
self.assertFalse(connection.read_only)

@staticmethod
def _make_pool():
from google.cloud.spanner_v1.pool import AbstractSessionPool
Expand Down Expand Up @@ -160,6 +176,32 @@ def test_transaction_checkout(self):
connection._autocommit = True
self.assertIsNone(connection.transaction_checkout())

def test_snapshot_checkout(self):
from google.cloud.spanner_dbapi import Connection

connection = Connection(INSTANCE, DATABASE, read_only=True)
connection.autocommit = False

session_checkout = mock.MagicMock(autospec=True)
connection._session_checkout = session_checkout

snapshot = connection.snapshot_checkout()
session_checkout.assert_called_once()

self.assertEqual(snapshot, connection.snapshot_checkout())

connection.commit()
self.assertIsNone(connection._snapshot)

connection.snapshot_checkout()
self.assertIsNotNone(connection._snapshot)

connection.rollback()
self.assertIsNone(connection._snapshot)

connection.autocommit = True
self.assertIsNone(connection.snapshot_checkout())

@mock.patch("google.cloud.spanner_v1.Client")
def test_close(self, mock_client):
from google.cloud.spanner_dbapi import connect
Expand Down