Skip to content

Commit

Permalink
refactor: create base transaction class (#81)
Browse files Browse the repository at this point in the history
towards #65
  • Loading branch information
rafilong committed Jul 14, 2020
1 parent 9aac83b commit 546eb90
Show file tree
Hide file tree
Showing 4 changed files with 315 additions and 167 deletions.
166 changes: 166 additions & 0 deletions google/cloud/firestore_v1/base_transaction.py
@@ -0,0 +1,166 @@
# Copyright 2017 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helpers for applying Google Cloud Firestore changes in a transaction."""


from google.cloud.firestore_v1 import types

MAX_ATTEMPTS = 5
"""int: Default number of transaction attempts (with retries)."""
_CANT_BEGIN = "The transaction has already begun. Current transaction ID: {!r}."
_MISSING_ID_TEMPLATE = "The transaction has no transaction ID, so it cannot be {}."
_CANT_ROLLBACK = _MISSING_ID_TEMPLATE.format("rolled back")
_CANT_COMMIT = _MISSING_ID_TEMPLATE.format("committed")
_WRITE_READ_ONLY = "Cannot perform write operation in read-only transaction."
_INITIAL_SLEEP = 1.0
"""float: Initial "max" for sleep interval. To be used in :func:`_sleep`."""
_MAX_SLEEP = 30.0
"""float: Eventual "max" sleep time. To be used in :func:`_sleep`."""
_MULTIPLIER = 2.0
"""float: Multiplier for exponential backoff. To be used in :func:`_sleep`."""
_EXCEED_ATTEMPTS_TEMPLATE = "Failed to commit transaction in {:d} attempts."
_CANT_RETRY_READ_ONLY = "Only read-write transactions can be retried."


class BaseTransaction(object):
"""Accumulate read-and-write operations to be sent in a transaction.
Args:
max_attempts (Optional[int]): The maximum number of attempts for
the transaction (i.e. allowing retries). Defaults to
:attr:`~google.cloud.firestore_v1.transaction.MAX_ATTEMPTS`.
read_only (Optional[bool]): Flag indicating if the transaction
should be read-only or should allow writes. Defaults to
:data:`False`.
"""

def __init__(self, max_attempts=MAX_ATTEMPTS, read_only=False):
self._max_attempts = max_attempts
self._read_only = read_only
self._id = None

def _add_write_pbs(self, write_pbs):
raise NotImplementedError

def _options_protobuf(self, retry_id):
"""Convert the current object to protobuf.
The ``retry_id`` value is used when retrying a transaction that
failed (e.g. due to contention). It is intended to be the "first"
transaction that failed (i.e. if multiple retries are needed).
Args:
retry_id (Union[bytes, NoneType]): Transaction ID of a transaction
to be retried.
Returns:
Optional[google.cloud.firestore_v1.types.TransactionOptions]:
The protobuf ``TransactionOptions`` if ``read_only==True`` or if
there is a transaction ID to be retried, else :data:`None`.
Raises:
ValueError: If ``retry_id`` is not :data:`None` but the
transaction is read-only.
"""
if retry_id is not None:
if self._read_only:
raise ValueError(_CANT_RETRY_READ_ONLY)

return types.TransactionOptions(
read_write=types.TransactionOptions.ReadWrite(
retry_transaction=retry_id
)
)
elif self._read_only:
return types.TransactionOptions(
read_only=types.TransactionOptions.ReadOnly()
)
else:
return None

@property
def in_progress(self):
"""Determine if this transaction has already begun.
Returns:
bool: Indicates if the transaction has started.
"""
return self._id is not None

@property
def id(self):
"""Get the current transaction ID.
Returns:
Optional[bytes]: The transaction ID (or :data:`None` if the
current transaction is not in progress).
"""
return self._id

def _clean_up(self):
"""Clean up the instance after :meth:`_rollback`` or :meth:`_commit``.
This intended to occur on success or failure of the associated RPCs.
"""
self._write_pbs = []
self._id = None

def _begin(self, retry_id=None):
raise NotImplementedError

def _rollback(self):
raise NotImplementedError

def _commit(self):
raise NotImplementedError

def get_all(self, references):
raise NotImplementedError

def get(self, ref_or_query):
raise NotImplementedError


class _BaseTransactional(object):
"""Provide a callable object to use as a transactional decorater.
This is surfaced via
:func:`~google.cloud.firestore_v1.transaction.transactional`.
Args:
to_wrap (Callable[[:class:`~google.cloud.firestore_v1.transaction.Transaction`, ...], Any]):
A callable that should be run (and retried) in a transaction.
"""

def __init__(self, to_wrap):
self.to_wrap = to_wrap
self.current_id = None
"""Optional[bytes]: The current transaction ID."""
self.retry_id = None
"""Optional[bytes]: The ID of the first attempted transaction."""

def _reset(self):
"""Unset the transaction IDs."""
self.current_id = None
self.retry_id = None

def _pre_commit(self, transaction, *args, **kwargs):
raise NotImplementedError

def _maybe_commit(self, transaction):
raise NotImplementedError

def __call__(self, transaction, *args, **kwargs):
raise NotImplementedError
114 changes: 18 additions & 96 deletions google/cloud/firestore_v1/transaction.py
Expand Up @@ -20,31 +20,27 @@

import six

from google.cloud.firestore_v1.base_transaction import (
_BaseTransactional,
BaseTransaction,
MAX_ATTEMPTS,
_CANT_BEGIN,
_CANT_ROLLBACK,
_CANT_COMMIT,
_WRITE_READ_ONLY,
_INITIAL_SLEEP,
_MAX_SLEEP,
_MULTIPLIER,
_EXCEED_ATTEMPTS_TEMPLATE,
)

from google.api_core import exceptions
from google.cloud.firestore_v1 import batch
from google.cloud.firestore_v1 import types
from google.cloud.firestore_v1.document import DocumentReference
from google.cloud.firestore_v1.query import Query


MAX_ATTEMPTS = 5
"""int: Default number of transaction attempts (with retries)."""
_CANT_BEGIN = "The transaction has already begun. Current transaction ID: {!r}."
_MISSING_ID_TEMPLATE = "The transaction has no transaction ID, so it cannot be {}."
_CANT_ROLLBACK = _MISSING_ID_TEMPLATE.format("rolled back")
_CANT_COMMIT = _MISSING_ID_TEMPLATE.format("committed")
_WRITE_READ_ONLY = "Cannot perform write operation in read-only transaction."
_INITIAL_SLEEP = 1.0
"""float: Initial "max" for sleep interval. To be used in :func:`_sleep`."""
_MAX_SLEEP = 30.0
"""float: Eventual "max" sleep time. To be used in :func:`_sleep`."""
_MULTIPLIER = 2.0
"""float: Multiplier for exponential backoff. To be used in :func:`_sleep`."""
_EXCEED_ATTEMPTS_TEMPLATE = "Failed to commit transaction in {:d} attempts."
_CANT_RETRY_READ_ONLY = "Only read-write transactions can be retried."


class Transaction(batch.WriteBatch):
class Transaction(batch.WriteBatch, BaseTransaction):
"""Accumulate read-and-write operations to be sent in a transaction.
Args:
Expand All @@ -60,9 +56,7 @@ class Transaction(batch.WriteBatch):

def __init__(self, client, max_attempts=MAX_ATTEMPTS, read_only=False):
super(Transaction, self).__init__(client)
self._max_attempts = max_attempts
self._read_only = read_only
self._id = None
BaseTransaction.__init__(self, max_attempts, read_only)

def _add_write_pbs(self, write_pbs):
"""Add `Write`` protobufs to this transaction.
Expand All @@ -79,61 +73,6 @@ def _add_write_pbs(self, write_pbs):

super(Transaction, self)._add_write_pbs(write_pbs)

def _options_protobuf(self, retry_id):
"""Convert the current object to protobuf.
The ``retry_id`` value is used when retrying a transaction that
failed (e.g. due to contention). It is intended to be the "first"
transaction that failed (i.e. if multiple retries are needed).
Args:
retry_id (Union[bytes, NoneType]): Transaction ID of a transaction
to be retried.
Returns:
Optional[google.cloud.firestore_v1.types.TransactionOptions]:
The protobuf ``TransactionOptions`` if ``read_only==True`` or if
there is a transaction ID to be retried, else :data:`None`.
Raises:
ValueError: If ``retry_id`` is not :data:`None` but the
transaction is read-only.
"""
if retry_id is not None:
if self._read_only:
raise ValueError(_CANT_RETRY_READ_ONLY)

return types.TransactionOptions(
read_write=types.TransactionOptions.ReadWrite(
retry_transaction=retry_id
)
)
elif self._read_only:
return types.TransactionOptions(
read_only=types.TransactionOptions.ReadOnly()
)
else:
return None

@property
def in_progress(self):
"""Determine if this transaction has already begun.
Returns:
bool: Indicates if the transaction has started.
"""
return self._id is not None

@property
def id(self):
"""Get the current transaction ID.
Returns:
Optional[bytes]: The transaction ID (or :data:`None` if the
current transaction is not in progress).
"""
return self._id

def _begin(self, retry_id=None):
"""Begin the transaction.
Expand All @@ -157,14 +96,6 @@ def _begin(self, retry_id=None):
)
self._id = transaction_response.transaction

def _clean_up(self):
"""Clean up the instance after :meth:`_rollback`` or :meth:`_commit``.
This intended to occur on success or failure of the associated RPCs.
"""
self._write_pbs = []
self._id = None

def _rollback(self):
"""Roll back the transaction.
Expand Down Expand Up @@ -238,7 +169,7 @@ def get(self, ref_or_query):
)


class _Transactional(object):
class _Transactional(_BaseTransactional):
"""Provide a callable object to use as a transactional decorater.
This is surfaced via
Expand All @@ -250,16 +181,7 @@ class _Transactional(object):
"""

def __init__(self, to_wrap):
self.to_wrap = to_wrap
self.current_id = None
"""Optional[bytes]: The current transaction ID."""
self.retry_id = None
"""Optional[bytes]: The ID of the first attempted transaction."""

def _reset(self):
"""Unset the transaction IDs."""
self.current_id = None
self.retry_id = None
super(_Transactional, self).__init__(to_wrap)

def _pre_commit(self, transaction, *args, **kwargs):
"""Begin transaction and call the wrapped callable.
Expand Down

0 comments on commit 546eb90

Please sign in to comment.