Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement aborted transactions retry mechanism #1

Closed
wants to merge 13 commits into from
21 changes: 21 additions & 0 deletions google/cloud/spanner_v1/_helpers.py
Expand Up @@ -256,3 +256,24 @@ def _metadata_with_prefix(prefix, **kw):
List[Tuple[str, str]]: RPC metadata with supplied prefix
"""
return [("google-cloud-resource-prefix", prefix)]


def _compare_checksums(original, retried):
"""Compare the given checksums.

Raise an error if the given checksums have consumed
the same number of results, but are not equal.

:type original: :class:`~google.cloud.spanner_v1.transaction.ResultsChecksum`
:param original: results checksum of the original transaction.

:type retried: :class:`~google.cloud.spanner_v1.transaction.ResultsChecksum`
:param retried: results checksum of the retried transaction.

:raises: :exc:`RuntimeError` in case if checksums are not equal.
"""
if original is not None:
if len(retried) == len(original) and retried != original:
raise RuntimeError(
"The underlying data being changed while retrying an aborted transaction."
)
28 changes: 24 additions & 4 deletions google/cloud/spanner_v1/session.py
Expand Up @@ -278,9 +278,13 @@ def batch(self):

return Batch(self)

def transaction(self):
def transaction(self, original_results_checksum=None):
"""Create a transaction to perform a set of reads with shared staleness.

:type original_results_checksum: :class:`~google.cloud.spanner_v1.transaction.ResultsChecksum`
:param original_results_checksum: original transaction results
checksum.

:rtype: :class:`~google.cloud.spanner_v1.transaction.Transaction`
:returns: a transaction bound to this session
:raises ValueError: if the session has not yet been created.
Expand All @@ -292,12 +296,21 @@ def transaction(self):
self._transaction.rolled_back = True
del self._transaction
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved

txn = self._transaction = Transaction(self)
return txn
self._transaction = Transaction(
self, original_results_checksum=original_results_checksum
)
return self._transaction

def run_in_transaction(self, func, *args, **kw):
"""Perform a unit of work in a transaction, retrying on abort.

While executing the transaction operations a checksum
of their results is calculated. On aborted transaction
retry the checksum of the original transaction is compared
with the checksum of the retried transaction to ensure
the retried transaction has the same results that the
original one have had before abortion.

:type func: callable
:param func: takes a required positional argument, the transaction,
and additional positional / keyword arguments as supplied
Expand All @@ -317,13 +330,16 @@ def run_in_transaction(self, func, *args, **kw):

:raises Exception:
reraises any non-ABORT execptions raised by ``func``.
:raises: :exc:`RuntimeError` in case the data changed while
retrying an aborted transaction.
"""
deadline = time.time() + kw.pop("timeout_secs", DEFAULT_RETRY_TIMEOUT_SECS)
original_results_checksum = None
attempts = 0

while True:
if self._transaction is None:
txn = self.transaction()
txn = self.transaction(original_results_checksum)
else:
txn = self._transaction
if txn._transaction_id is None:
Expand All @@ -333,6 +349,8 @@ def run_in_transaction(self, func, *args, **kw):
attempts += 1
return_value = func(txn, *args, **kw)
except Aborted as exc:
if attempts == 1:
original_results_checksum = self._transaction.results_checksum
del self._transaction
_delay_until_retry(exc, deadline, attempts)
continue
Expand All @@ -346,6 +364,8 @@ def run_in_transaction(self, func, *args, **kw):
try:
txn.commit()
except Aborted as exc:
if attempts == 1:
original_results_checksum = self._transaction.results_checksum
del self._transaction
_delay_until_retry(exc, deadline, attempts)
except GoogleAPICallError:
Expand Down
34 changes: 30 additions & 4 deletions google/cloud/spanner_v1/snapshot.py
Expand Up @@ -171,9 +171,22 @@ def read(self, table, columns, keyset, index="", limit=0, partition=None):
self._read_request_count += 1

if self._multi_use:
return StreamedResultSet(iterator, source=self)
return StreamedResultSet(
iterator,
source=self,
results_checksum=getattr(self, "results_checksum", None),
original_results_checksum=getattr(
self, "_original_results_checksum", None
),
)
else:
return StreamedResultSet(iterator)
return StreamedResultSet(
iterator,
results_checksum=getattr(self, "results_checksum", None),
original_results_checksum=getattr(
self, "_original_results_checksum", None
),
)
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved

def execute_sql(
self,
Expand Down Expand Up @@ -278,9 +291,22 @@ def execute_sql(
self._execute_sql_count += 1

if self._multi_use:
return StreamedResultSet(iterator, source=self)
return StreamedResultSet(
iterator,
source=self,
results_checksum=getattr(self, "results_checksum", None),
original_results_checksum=getattr(
self, "_original_results_checksum", None
),
)
else:
return StreamedResultSet(iterator)
return StreamedResultSet(
iterator,
results_checksum=getattr(self, "results_checksum", None),
original_results_checksum=getattr(
self, "_original_results_checksum", None
),
)

def partition_read(
self,
Expand Down
27 changes: 25 additions & 2 deletions google/cloud/spanner_v1/streamed.py
Expand Up @@ -18,6 +18,7 @@
from google.protobuf.struct_pb2 import Value
from google.cloud import exceptions
from google.cloud.spanner_v1.proto import type_pb2
from google.cloud.spanner_v1._helpers import _compare_checksums
import six

# pylint: disable=ungrouped-imports
Expand All @@ -37,16 +38,32 @@ class StreamedResultSet(object):

:type source: :class:`~google.cloud.spanner_v1.snapshot.Snapshot`
:param source: Snapshot from which the result set was fetched.

:type results_checksum: :class:`~google.cloud.spanner_v1.transaction.ResultsChecksum`
:param results_checksum: A checksum to which streamed rows from this
result set must be added.

:type original_results_checksum: :class:`~google.cloud.spanner_v1.transaction.ResultsChecksum`
:param original_results_checksum: Results checksum of the original
transaction.
"""

def __init__(self, response_iterator, source=None):
def __init__(
self,
response_iterator,
source=None,
results_checksum=None,
original_results_checksum=None,
):
self._response_iterator = response_iterator
self._rows = [] # Fully-processed rows
self._metadata = None # Until set from first PRS
self._stats = None # Until set from last PRS
self._current_row = [] # Accumulated values for incomplete row
self._pending_chunk = None # Incomplete value
self._source = source # Source snapshot
self._results_checksum = results_checksum
self._original_results_checksum = original_results_checksum

@property
def fields(self):
Expand Down Expand Up @@ -143,7 +160,13 @@ def __iter__(self):
return
iter_rows, self._rows[:] = self._rows[:], ()
while iter_rows:
yield iter_rows.pop(0)
row = iter_rows.pop(0)
if self._results_checksum is not None:
self._results_checksum.consume_result(row)
_compare_checksums(
self._original_results_checksum, self._results_checksum
)
yield row
Comment on lines +163 to +169
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The java version actually consumes the results up to the same point that the user did before retrying before doing the comparison: https://github.com/googleapis/java-spanner/blob/059ef1ef1f03e80f4ff2705b45a62c553bb4e83d/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ChecksumResultSet.java#L145-L153. Here we actually yield the rows used in the comparison. How come this doesn't result in the user getting the same rows twice after an internal retry?

It's also a bit confusing to see this do the checksum comparison with each row. I know _compare_checksums passes if it sees fewer retried rows than original, but that's not what I'd expect it to do without reading it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@olavloite, @skuruppu mentioned you might be able to review and compare this to the java client behavior.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come this doesn't result in the user getting the same rows twice after an internal retry?

Seems to me a user will not use the rows read within a transaction in operations outside the transaction until the transaction is finished. It probably sounds hard, so consider this case:

User starts a transaction, reads several rows, then calls his own (not Spanner) API which is using these rows to write some data somewhere. Then he calls more functions in the same transaction and commits it. It's incorrect way of use - one should not do anything with data which the transaction is using at that moment, as transaction can fail and all the data/changes will be rollbacked, but it'll not rollback the records made in the user API, so the user will get the data desynchronization.

With this, the data that was read within transaction are considered as a temporary inexact-until-transaction-commit data, and if we're retrying a transaction, all that data should be re-read and recalculated, that's how I see it.

Copy link
Collaborator Author

@IlyaFaer IlyaFaer Sep 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also a bit confusing to see this do the checksum comparison with each row. I know _compare_checksums passes if it sees fewer retried rows than original, but that's not what I'd expect it to do without reading it.

Yeah, that's something I'm not 100% like too, but retry logic looks too apportioned across classes and functions to me, so I've decided to make many things deep under the hood. Thus, in the transaction class we're not thinking about results number of a result set of one of the operations of this transaction, we're just calling == and it takes care about such a things deeper in the result set itself. Result set object is much more closer to these details than transaction.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have absolutely no Python experience, so bear with me if I misread or misunderstood (parts of) the code here, but if I understand it correctly the suggested implementation for Python here does the following (during the initial transaction):

  1. The __iter__ method will normally (i.e. in the original transaction) iterate over the PartialResultSets that are received from Spanner, and for each of these iterate over the rows in that PartialResultSet.
  2. If the ResultSet has a checksum, it will update the running checksum with the contents of the current row.
  3. It then checks whether the current checksum is equal to the checksum of the original transaction. If this is the original transaction and not a retry, the value of _original_results_checksum will be None and the comparison will always succeed.
  4. It then returns the row.

If a transaction is retried, the following happens:

  1. The __iter__ method will iterate over the PartialResultSets that are received from Spanner for the retried query, and for each of these iterate over the rows in that PartialResultSet.
  2. If the ResultSet has a checksum, it will update the running checksum with the contents of the current row.
  3. It then checks whether the current checksum is equal to the checksum of the original transaction. As far as I can see, this would always fail if the result set contained more than 1 row, as it is a running checksum.

Assume that a query returns the following rows during the original transaction:

Key Value
1 One
2 Two

During the original transaction the entire result set was consumed so the running checksum was calculated based on all the rows:

  • After row 1 the checksum is C1
  • After row 2 the checksum is C2
    So the original checksum is now C2.

During a retry the query will be retried and the checksum will be calculated again (I assume that the results during the retry are equal to the original query):

  • After row 1 the checksum is C1. That is compared to the original checksum that is C2 and the comparison will fail.

The checksum should therefore not be compared for each row, but after consuming the same number of rows during the retry as during original query.

(But I get a feeling that there is a more fundamental misunderstanding about this retry logic. I'll elaborate on that in a separate comment.)

Copy link
Collaborator Author

@IlyaFaer IlyaFaer Sep 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @olavloite

It then checks whether the current checksum is equal to the checksum of the original transaction. As far as I can see, this would always fail if the result set contained more than 1 row, as it is a running checksum.

Yeah, that's the thing @c24t mentioned as something that we don't expect. In fact I'm redefining == operator for ResultsChecksum object, so that it compares checksums in this way:

if original is not None:
        if retried != original:
            if not retried < original:  # retried has less results

At the first line it checks if we're in retried transaction - if not, than we have nothing to compare.

The second checks if checksums are equal, considering their length (number of consumed results).

And the third line checks if the retried transaction came to the original transaction failure point.

So, if the given checksums have different length, it'll be considered like there are no problems so far, 'cause a) retried transaction is not at the point, where the original failed, so it's too soon to compare checksums; b) retried transaction went farther than the original transaction = the checksums were compared = they were equal at the original transaction failure point).

Looking at this, I think there can be a problem when retried transaction went farther than the original. I'll double check

Copy link
Collaborator Author

@IlyaFaer IlyaFaer Sep 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@c24t, hm-m, I think the logic here can be little bit simpler, if I'll use a __len__ magic method.

UPDATE: yes, it looks better, PTAL


def one(self):
"""Return exactly one result, or raise an exception.
Expand Down
70 changes: 69 additions & 1 deletion google/cloud/spanner_v1/transaction.py
Expand Up @@ -14,10 +14,14 @@

"""Spanner read-write transaction support."""

import hashlib
import pickle

from google.protobuf.struct_pb2 import Struct

from google.cloud._helpers import _pb_timestamp_to_datetime
from google.cloud.spanner_v1._helpers import (
_compare_checksums,
_make_value_pb,
_merge_query_options,
_metadata_with_prefix,
Expand All @@ -35,6 +39,10 @@ class Transaction(_SnapshotBase, _BatchBase):
:type session: :class:`~google.cloud.spanner_v1.session.Session`
:param session: the session used to perform the commit

:type original_results_checksum: :class:`~google.cloud.spanner_v1.transaction.ResultsChecksum`
:param original_results_checksum: results checksum of the
original transaction.

:raises ValueError: if session has an existing transaction
"""

Expand All @@ -44,11 +52,21 @@ class Transaction(_SnapshotBase, _BatchBase):
_multi_use = True
_execute_sql_count = 0

def __init__(self, session):
def __init__(self, session, original_results_checksum=None):
if session._transaction is not None:
raise ValueError("Session has existing transaction.")

super(Transaction, self).__init__(session)
self._results_checksum = ResultsChecksum() # this transaction results checksum
self._original_results_checksum = original_results_checksum

@property
def results_checksum(self):
"""
Cumulative checksum of all the results returned
by all the operations runned within this transaction.
"""
return self._results_checksum

def _check_state(self):
"""Helper for :meth:`commit` et al.
Expand Down Expand Up @@ -232,6 +250,9 @@ def execute_update(
seqno=seqno,
metadata=metadata,
)
self._results_checksum.consume_result(response.stats.row_count_exact)

_compare_checksums(self._original_results_checksum, self._results_checksum)
return response.stats.row_count_exact

def batch_update(self, statements):
Expand Down Expand Up @@ -292,6 +313,9 @@ def batch_update(self, statements):
row_counts = [
result_set.stats.row_count_exact for result_set in response.result_sets
]
self._results_checksum.consume_result(row_counts)

_compare_checksums(self._original_results_checksum, self._results_checksum)
return response.status, row_counts

def __enter__(self):
Expand All @@ -305,3 +329,47 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.commit()
else:
self.rollback()


class ResultsChecksum:
"""Cumulative checksum.

Used to calculate a total checksum of all the results
returned by operations executed within transaction.
Includes methods for checksums comparison.

These checksums are used while retrying an aborted
transaction to check if the results of a retried transaction
are equal to the results of the original transaction.
"""

def __init__(self):
self.checksum = hashlib.sha256()
self.count = 0 # counter of consumed results

def __len__(self):
"""Return the number of consumed results.

Returns:
int: The number of results.
"""
return self.count

def __eq__(self, other):
"""Check if checksums are equal.

Args:
other (ResultsChecksum):
Another checksum to compare with this one.
"""
return self.checksum.digest() == other.checksum.digest()

def consume_result(self, result):
"""Add the given result into the checksum.

Args:
result (Union[int, list]):
Streamed row or row count from an UPDATE operation.
"""
self.checksum.update(pickle.dumps(result))
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
self.count += 1
45 changes: 45 additions & 0 deletions tests/unit/test__helpers.py
Expand Up @@ -526,3 +526,48 @@ def test(self):
prefix = "prefix"
metadata = self._call_fut(prefix)
self.assertEqual(metadata, [("google-cloud-resource-prefix", prefix)])


class Test_compare_checksums(unittest.TestCase):
def _cal_fut(self, *args, **kw):
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
from google.cloud.spanner_v1._helpers import _compare_checksums

return _compare_checksums(*args, **kw)

def test_no_original_checksum(self):
from google.cloud.spanner_v1.transaction import ResultsChecksum

self.assertIsNone(self._cal_fut(None, ResultsChecksum()))

def test_equal(self):
from google.cloud.spanner_v1.transaction import ResultsChecksum

original = ResultsChecksum()
original.consume_result(5)

retried = ResultsChecksum()
retried.consume_result(5)

self.assertIsNone(self._cal_fut(original, retried))

def test_less_results(self):
from google.cloud.spanner_v1.transaction import ResultsChecksum

original = ResultsChecksum()
original.consume_result(5)

retried = ResultsChecksum()

self.assertIsNone(self._cal_fut(original, retried))

def test_mismatch(self):
from google.cloud.spanner_v1.transaction import ResultsChecksum

original = ResultsChecksum()
original.consume_result(5)

retried = ResultsChecksum()
retried.consume_result(2)

with self.assertRaises(RuntimeError):
self._cal_fut(original, retried)