Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support aborted transactions internal retry #544

Closed
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 72 additions & 0 deletions google/cloud/spanner_dbapi/checksum.py
@@ -0,0 +1,72 @@
# Copyright 2020 Google LLC
#
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd

"""API to calculate checksums of SQL statements results."""

import hashlib
import pickle

from google.cloud.spanner_dbapi.exceptions import AbortedRetried


class ResultsChecksum:
"""Cumulative checksum.

Used to calculate a total checksum of all the results
returned by operations executed within transaction.
Includes methods for checksums comparison.
These checksums are used while retrying an aborted
transaction to check if the results of a retried transaction
are equal to the results of the original transaction.
"""

def __init__(self):
self.checksum = hashlib.sha256()
self.count = 0 # counter of consumed results

def __len__(self):
"""Return the number of consumed results.

:rtype: :class:`int`
:returns: The number of results.
"""
return self.count

def __eq__(self, other):
"""Check if checksums are equal.

:type other: :class:`google.cloud.spanner_dbapi.checksum.ResultsChecksum`
:param other: Another checksum to compare with this one.
"""
return self.checksum.digest() == other.checksum.digest()

def consume_result(self, result):
"""Add the given result into the checksum.

:type result: Union[int, list]
:param result: Streamed row or row count from an UPDATE operation.
"""
self.checksum.update(pickle.dumps(result))
self.count += 1


def _compare_checksums(original, retried):
"""Compare the given checksums.

Raise an error if the given checksums are not equal.

:type original: :class:`~google.cloud.spanner_dbapi.checksum.ResultsChecksum`
:param original: results checksum of the original transaction.

:type retried: :class:`~google.cloud.spanner_dbapi.checksum.ResultsChecksum`
:param retried: results checksum of the retried transaction.

:raises: :exc:`google.cloud.spanner_dbapi.exceptions.AbortedRetried` in case if checksums are not equal.
"""
if retried != original:
raise AbortedRetried(
"The transaction was aborted and could not be retried due to a concurrent modification."
)
110 changes: 106 additions & 4 deletions google/cloud/spanner_dbapi/connection.py
Expand Up @@ -6,18 +6,24 @@

"""DB-API Connection for the Google Cloud Spanner."""

import time
import warnings

from google.api_core.exceptions import Aborted
from google.api_core.gapic_v1.client_info import ClientInfo
from google.cloud import spanner_v1 as spanner
from google.cloud.spanner_v1.session import _get_retry_delay

from google.cloud.spanner_dbapi.checksum import _compare_checksums
from google.cloud.spanner_dbapi.checksum import ResultsChecksum
from google.cloud.spanner_dbapi.cursor import Cursor
from google.cloud.spanner_dbapi.exceptions import InterfaceError
from google.cloud.spanner_dbapi.version import DEFAULT_USER_AGENT
from google.cloud.spanner_dbapi.version import PY_VERSION


AUTOCOMMIT_MODE_WARNING = "This method is non-operational in autocommit mode"
MAX_INTERNAL_RETRIES = 50


class Connection:
Expand All @@ -40,6 +46,9 @@ def __init__(self, instance, database):

self._transaction = None
self._session = None
# SQL statements, which were executed
# within the current transaction
self._statements = []

self.is_closed = False
self._autocommit = False
Expand Down Expand Up @@ -106,6 +115,60 @@ def _release_session(self):
self.database._pool.put(self._session)
self._session = None

def retry_transaction(self):
"""Retry the aborted transaction.

All the statements executed in the original transaction
will be re-executed in new one. Results checksums of the
original statements and the retried ones will be compared.

:raises: :class:`google.cloud.spanner_dbapi.exceptions.AbortedRetried`
If results checksum of the retried statement is
not equal to the checksum of the original one.
"""
attempt = 0
while True:
self._transaction = None
attempt += 1
if attempt > MAX_INTERNAL_RETRIES:
raise

try:
self._rerun_previous_statements()
break
except Aborted as exc:
delay = _get_retry_delay(exc.errors[0], attempt)
if delay:
time.sleep(delay)

def _rerun_previous_statements(self):
"""
Helper to run all the remembered statements
from the last transaction.
"""
for statement in self._statements:
res_iter, retried_checksum = self.run_statement(
statement, retried=True
)
# executing all the completed statements
if statement != self._statements[-1]:
for res in res_iter:
retried_checksum.consume_result(res)

_compare_checksums(statement.checksum, retried_checksum)
# executing the failed statement
else:
# streaming up to the failed result or
# to the end of the streaming iterator
while len(retried_checksum) < len(statement.checksum):
try:
res = next(iter(res_iter))
retried_checksum.consume_result(res)
except StopIteration:
break

_compare_checksums(statement.checksum, retried_checksum)

def transaction_checkout(self):
"""Get a Cloud Spanner transaction.

Expand Down Expand Up @@ -160,8 +223,19 @@ def commit(self):
if self._autocommit:
warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2)
elif self._transaction:
self._transaction.commit()
self._release_session()
try:
self._transaction.commit()
self._release_session()
self._statements = []
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
except Aborted:
while True:
try:
self.retry_transaction()
break
except Aborted:
pass

self.commit()
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved

def rollback(self):
"""Rolls back any pending transaction.
Expand All @@ -174,6 +248,7 @@ def rollback(self):
elif self._transaction:
self._transaction.rollback()
self._release_session()
self._statements = []

def cursor(self):
"""Factory to create a DB-API Cursor."""
Expand All @@ -190,6 +265,33 @@ def run_prior_DDL_statements(self):

return self.database.update_ddl(ddl_statements).result()

def run_statement(self, statement, retried=False):
"""Run single SQL statement in begun transaction.

This method is never used in autocommit mode. In
!autocommit mode however it remembers every executed
SQL statement with its parameters.

:type statement: :class:`dict`
:param statement: SQL statement to execute.

:rtype: :class:`google.cloud.spanner_v1.streamed.StreamedResultSet`,
:class:`google.cloud.spanner_dbapi.checksum.ResultsChecksum`
:returns: Streamed result set of the statement and a
checksum of this statement results.
"""
transaction = self.transaction_checkout()
self._statements.append(statement)
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved

return (
transaction.execute_sql(
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
statement.sql,
statement.params,
param_types=statement.param_types,
),
ResultsChecksum() if retried else statement.checksum,
)
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved

def __enter__(self):
return self

Expand Down Expand Up @@ -243,11 +345,11 @@ def connect(
"""

client_info = ClientInfo(
user_agent=user_agent or DEFAULT_USER_AGENT, python_version=PY_VERSION,
user_agent=user_agent or DEFAULT_USER_AGENT, python_version=PY_VERSION
)

client = spanner.Client(
project=project, credentials=credentials, client_info=client_info,
project=project, credentials=credentials, client_info=client_info
)

instance = client.instance(instance_id)
Expand Down
62 changes: 43 additions & 19 deletions google/cloud/spanner_dbapi/cursor.py
Expand Up @@ -6,6 +6,7 @@

"""Database cursor for Google Cloud Spanner DB-API."""

from google.api_core.exceptions import Aborted
from google.api_core.exceptions import AlreadyExists
from google.api_core.exceptions import FailedPrecondition
from google.api_core.exceptions import InternalServerError
Expand All @@ -14,7 +15,7 @@
from collections import namedtuple

from google.cloud import spanner_v1 as spanner

from google.cloud.spanner_dbapi.checksum import ResultsChecksum
from google.cloud.spanner_dbapi.exceptions import IntegrityError
from google.cloud.spanner_dbapi.exceptions import InterfaceError
from google.cloud.spanner_dbapi.exceptions import OperationalError
Expand All @@ -26,11 +27,13 @@

from google.cloud.spanner_dbapi import parse_utils
from google.cloud.spanner_dbapi.parse_utils import get_param_types
from google.cloud.spanner_dbapi.parse_utils import sql_pyformat_args_to_spanner
from google.cloud.spanner_dbapi.utils import PeekIterator

_UNSET_COUNT = -1

ColumnDetails = namedtuple("column_details", ["null_ok", "spanner_type"])
Statement = namedtuple("Statement", "sql, params, param_types, checksum")


class Cursor(object):
Expand All @@ -46,6 +49,8 @@ def __init__(self, connection):
self._row_count = _UNSET_COUNT
self.connection = connection
self._is_closed = False
# the currently running SQL statement results checksum
self._checksum = None
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved

# the number of rows to fetch at a time with fetchmany()
self.arraysize = 1
Expand Down Expand Up @@ -158,15 +163,15 @@ def execute(self, sql, args=None):
self.connection.run_prior_DDL_statements()

if not self.connection.autocommit:
transaction = self.connection.transaction_checkout()

sql, params = parse_utils.sql_pyformat_args_to_spanner(
sql, args
)
sql, params = sql_pyformat_args_to_spanner(sql, args)

self._result_set = transaction.execute_sql(
sql, params, param_types=get_param_types(params)
statement = Statement(
sql, params, get_param_types(params), ResultsChecksum(),
)
(
self._result_set,
self._checksum,
) = self.connection.run_statement(statement)
self._itr = PeekIterator(self._result_set)
return

Expand Down Expand Up @@ -207,9 +212,31 @@ def fetchone(self):
self._raise_if_closed()

try:
return next(self)
res = next(self)
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
self._checksum.consume_result(res)
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
return res
except StopIteration:
return None
return
except Aborted:
self.connection.retry_transaction()
return self.fetchone()
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will be a problem. Assuming that this is using the ExecuteStreamingSql RPC, then each next() call could potentially mean that a new RPC is executed. So for the sake of simplicity, assume in the example below that each call to next() executes the ExecuteStreamingSql RPC.

Assume the following situation:

  1. The table Singers contains the following singers (last names): Allison, Morrison, Pieterson
  2. The application executes the query SELECT LastName FROM Singers ORDER BY LastName in transaction 1.
  3. The client application calls fetchone() which returns 'Allison'.
  4. Some other transaction executes `DELETE FROM Singers WHERE LastName='Pieterson'.
  5. The first transaction is aborted by the backend. A retry is executed and the retry logic checks that the checksum of the retried result set is equal to the original attempt, which it is as the first record is still 'Allison'.
  6. The client application calls fetchone() again. This should return 'Morrison', but as it needs to call ExecuteStreamingSql it will (probably) use the transaction id of the original transaction (unless that transaction id has somehow been replaced in the underlying iterator). If it does use the old transaction id, the RPC will fail with yet another Aborted error, and that will repeat itself until the transaction retry limit has been reached.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to me we can just drop the _transaction property, so that Connection will initiate a new one on the next execute() call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@IlyaFaer @c24t

Sorry for reopening this, and this comment should not be considered blocking for merging this PR, but I think we need to look into this once more. Only dropping the _transaction property will in this case not be enough for the following reason:

  1. When executeSql is called, a streaming iterator is returned to the application.
  2. That streaming iterator is linked with the transaction that was active at that moment, and a reference to that transaction is also held in the iterator.
  3. If a transaction is aborted and the client application has consumed only parts of a streaming iterator, that iterator is no longer valid (at least: it will also throw an exception if it needs to receive more data from the server).

The JDBC driver client solves the above problem by wrapping all streaming iterators before returning these to the client application. That makes it possible for the JDBC driver to replace the underlying streaming iterator with a new one when a transaction has been aborted and successfully retried.

We should add that to the Python DBApi as well, but we could do that in a separate PR to prevent this PR from becoming even bigger than it already is.

Copy link
Contributor Author

@IlyaFaer IlyaFaer Nov 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@c24t, @olavloite, hm-m. I think we're protected from errors here, because our connection API doesn't actually give streaming result objects to a user.

Here is where we're getting a streaming iterator:

self._result_set = transaction.execute_sql(
sql, params, param_types=get_param_types(params)
)
self._itr = PeekIterator(self._result_set)

So, iterator is held in the protected property _itr, and users will be streaming it with Cursor.fetch*() methods, without actual access to the iterator itself:

def fetchone(self):
"""Fetch the next row of a query result set, returning a single
sequence, or None when no more data is available."""
self._raise_if_closed()
try:
return next(self)
except StopIteration:
return None

Where next(self) is calling next(self._itr) here:

def __next__(self):
if self._itr is None:
raise ProgrammingError("no results to return")
return next(self._itr)

Thus, if a transaction failed, the connection will drop the transaction, checkout a new one, re-run all the statements, each of which will replace _itr with a new streamed iterator. So, all the iterators are processed internally, and will be replaced on a retry, as I see.


def fetchall(self):
"""Fetch all (remaining) rows of a query result, returning them as
a sequence of sequences.
"""
self._raise_if_closed()

res = []
try:
for row in self:
self._checksum.consume_result(row)
res.append(row)
except Aborted:
self._connection.retry_transaction()
return self.fetchall()

return res

def fetchmany(self, size=None):
"""Fetch the next set of rows of a query result, returning a sequence
Expand All @@ -230,20 +257,17 @@ def fetchmany(self, size=None):
items = []
for i in range(size):
try:
items.append(tuple(self.__next__()))
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
res = next(self)
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
self._checksum.consume_result(res)
items.append(res)
except StopIteration:
break
except Aborted:
self._connection.retry_transaction()
return self.fetchmany(size)

return items

def fetchall(self):
"""Fetch all (remaining) rows of a query result, returning them as
a sequence of sequences.
"""
self._raise_if_closed()

return list(self.__iter__())

def nextset(self):
"""A no-op, raising an error if the cursor or connection is closed."""
self._raise_if_closed()
Expand Down
10 changes: 10 additions & 0 deletions google/cloud/spanner_dbapi/exceptions.py
Expand Up @@ -92,3 +92,13 @@ class NotSupportedError(DatabaseError):
"""

pass


class AbortedRetried(OperationalError):
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
"""
Error for case of no aborted transaction retry
is available, because of underlying data being
changed during a retry.
"""

pass