diff --git a/google/cloud/datastore/batch.py b/google/cloud/datastore/batch.py index dc8463f3..294c1b45 100644 --- a/google/cloud/datastore/batch.py +++ b/google/cloud/datastore/batch.py @@ -236,7 +236,7 @@ def begin(self): raise ValueError("Batch already started previously.") self._status = self._IN_PROGRESS - def _commit(self): + def _commit(self, retry, timeout): """Commits the batch. This is called by :meth:`commit`. @@ -246,8 +246,16 @@ def _commit(self): else: mode = _datastore_pb2.CommitRequest.TRANSACTIONAL + kwargs = {} + + if retry is not None: + kwargs["retry"] = retry + + if timeout is not None: + kwargs["timeout"] = timeout + commit_response_pb = self._client._datastore_api.commit( - self.project, mode, self._mutations, transaction=self._id + self.project, mode, self._mutations, transaction=self._id, **kwargs ) _, updated_keys = _parse_commit_response(commit_response_pb) # If the back-end returns without error, we are guaranteed that @@ -257,13 +265,24 @@ def _commit(self): new_id = new_key_pb.path[-1].id entity.key = entity.key.completed_key(new_id) - def commit(self): + def commit(self, retry=None, timeout=None): """Commits the batch. This is called automatically upon exiting a with statement, however it can be called explicitly if you don't want to use a context manager. + :type retry: :class:`google.api_core.retry.Retry` + :param retry: + A retry object used to retry requests. If ``None`` is specified, + requests will be retried using a default configuration. + + :type timeout: float + :param timeout: + Time, in seconds, to wait for the request to complete. + Note that if ``retry`` is specified, the timeout applies + to each individual attempt. + :raises: :class:`~exceptions.ValueError` if the batch is not in progress. """ @@ -271,7 +290,7 @@ def commit(self): raise ValueError("Batch must be in progress to commit()") try: - self._commit() + self._commit(retry=retry, timeout=timeout) finally: self._status = self._FINISHED diff --git a/google/cloud/datastore/client.py b/google/cloud/datastore/client.py index 7dd286d3..0a446630 100644 --- a/google/cloud/datastore/client.py +++ b/google/cloud/datastore/client.py @@ -89,6 +89,19 @@ def _determine_default_project(project=None): return project +def _make_retry_timeout_kwargs(retry, timeout): + """Helper: make optional retry / timeout kwargs dict.""" + kwargs = {} + + if retry is not None: + kwargs["retry"] = retry + + if timeout is not None: + kwargs["timeout"] = timeout + + return kwargs + + def _extended_lookup( datastore_api, project, @@ -97,6 +110,8 @@ def _extended_lookup( deferred=None, eventual=False, transaction_id=None, + retry=None, + timeout=None, ): """Repeat lookup until all keys found (unless stop requested). @@ -133,6 +148,17 @@ def _extended_lookup( the given transaction. Incompatible with ``eventual==True``. + :type retry: :class:`google.api_core.retry.Retry` + :param retry: + A retry object used to retry requests. If ``None`` is specified, + requests will be retried using a default configuration. + + :type timeout: float + :param timeout: + Time, in seconds, to wait for the request to complete. + Note that if ``retry`` is specified, the timeout applies + to each individual attempt. + :rtype: list of :class:`.entity_pb2.Entity` :returns: The requested entities. :raises: :class:`ValueError` if missing / deferred are not null or @@ -144,6 +170,8 @@ def _extended_lookup( if deferred is not None and deferred != []: raise ValueError("deferred must be None or an empty list") + kwargs = _make_retry_timeout_kwargs(retry, timeout) + results = [] loop_num = 0 @@ -151,7 +179,7 @@ def _extended_lookup( while loop_num < _MAX_LOOPS: # loop against possible deferred. loop_num += 1 lookup_response = datastore_api.lookup( - project, key_pbs, read_options=read_options + project, key_pbs, read_options=read_options, **kwargs ) # Accumulate the new results. @@ -338,7 +366,16 @@ def current_transaction(self): if isinstance(transaction, Transaction): return transaction - def get(self, key, missing=None, deferred=None, transaction=None, eventual=False): + def get( + self, + key, + missing=None, + deferred=None, + transaction=None, + eventual=False, + retry=None, + timeout=None, + ): """Retrieve an entity from a single key (if it exists). .. note:: @@ -369,6 +406,17 @@ def get(self, key, missing=None, deferred=None, transaction=None, eventual=False Setting True will use eventual consistency, but cannot be used inside a transaction or will raise ValueError. + :type retry: :class:`google.api_core.retry.Retry` + :param retry: + A retry object used to retry requests. If ``None`` is specified, + requests will be retried using a default configuration. + + :type timeout: float + :param timeout: + Time, in seconds, to wait for the request to complete. + Note that if ``retry`` is specified, the timeout applies + to each individual attempt. + :rtype: :class:`google.cloud.datastore.entity.Entity` or ``NoneType`` :returns: The requested entity if it exists. @@ -380,12 +428,21 @@ def get(self, key, missing=None, deferred=None, transaction=None, eventual=False deferred=deferred, transaction=transaction, eventual=eventual, + retry=retry, + timeout=timeout, ) if entities: return entities[0] def get_multi( - self, keys, missing=None, deferred=None, transaction=None, eventual=False + self, + keys, + missing=None, + deferred=None, + transaction=None, + eventual=False, + retry=None, + timeout=None, ): """Retrieve entities, along with their attributes. @@ -412,6 +469,17 @@ def get_multi( Setting True will use eventual consistency, but cannot be used inside a transaction or will raise ValueError. + :type retry: :class:`google.api_core.retry.Retry` + :param retry: + A retry object used to retry requests. If ``None`` is specified, + requests will be retried using a default configuration. + + :type timeout: float + :param timeout: + Time, in seconds, to wait for the request to complete. + Note that if ``retry`` is specified, the timeout applies + to each individual attempt. + :rtype: list of :class:`google.cloud.datastore.entity.Entity` :returns: The requested entities. :raises: :class:`ValueError` if one or more of ``keys`` has a project @@ -437,6 +505,8 @@ def get_multi( missing=missing, deferred=deferred, transaction_id=transaction and transaction.id, + retry=retry, + timeout=timeout, ) if missing is not None: @@ -451,7 +521,7 @@ def get_multi( return [helpers.entity_from_protobuf(entity_pb) for entity_pb in entity_pbs] - def put(self, entity): + def put(self, entity, retry=None, timeout=None): """Save an entity in the Cloud Datastore. .. note:: @@ -462,15 +532,41 @@ def put(self, entity): :type entity: :class:`google.cloud.datastore.entity.Entity` :param entity: The entity to be saved to the datastore. + + :type retry: :class:`google.api_core.retry.Retry` + :param retry: + A retry object used to retry requests. If ``None`` is specified, + requests will be retried using a default configuration. + Only meaningful outside of another batch / transaction. + + :type timeout: float + :param timeout: + Time, in seconds, to wait for the request to complete. + Note that if ``retry`` is specified, the timeout applies + to each individual attempt. Only meaningful outside of another + batch / transaction. """ - self.put_multi(entities=[entity]) + self.put_multi(entities=[entity], retry=retry, timeout=timeout) - def put_multi(self, entities): + def put_multi(self, entities, retry=None, timeout=None): """Save entities in the Cloud Datastore. :type entities: list of :class:`google.cloud.datastore.entity.Entity` :param entities: The entities to be saved to the datastore. + :type retry: :class:`google.api_core.retry.Retry` + :param retry: + A retry object used to retry requests. If ``None`` is specified, + requests will be retried using a default configuration. + Only meaningful outside of another batch / transaction. + + :type timeout: float + :param timeout: + Time, in seconds, to wait for the request to complete. + Note that if ``retry`` is specified, the timeout applies + to each individual attempt. Only meaningful outside of another + batch / transaction. + :raises: :class:`ValueError` if ``entities`` is a single entity. """ if isinstance(entities, Entity): @@ -490,9 +586,9 @@ def put_multi(self, entities): current.put(entity) if not in_batch: - current.commit() + current.commit(retry=retry, timeout=timeout) - def delete(self, key): + def delete(self, key, retry=None, timeout=None): """Delete the key in the Cloud Datastore. .. note:: @@ -503,14 +599,40 @@ def delete(self, key): :type key: :class:`google.cloud.datastore.key.Key` :param key: The key to be deleted from the datastore. + + :type retry: :class:`google.api_core.retry.Retry` + :param retry: + A retry object used to retry requests. If ``None`` is specified, + requests will be retried using a default configuration. + Only meaningful outside of another batch / transaction. + + :type timeout: float + :param timeout: + Time, in seconds, to wait for the request to complete. + Note that if ``retry`` is specified, the timeout applies + to each individual attempt. Only meaningful outside of another + batch / transaction. """ - self.delete_multi(keys=[key]) + self.delete_multi(keys=[key], retry=retry, timeout=timeout) - def delete_multi(self, keys): + def delete_multi(self, keys, retry=None, timeout=None): """Delete keys from the Cloud Datastore. :type keys: list of :class:`google.cloud.datastore.key.Key` :param keys: The keys to be deleted from the Datastore. + + :type retry: :class:`google.api_core.retry.Retry` + :param retry: + A retry object used to retry requests. If ``None`` is specified, + requests will be retried using a default configuration. + Only meaningful outside of another batch / transaction. + + :type timeout: float + :param timeout: + Time, in seconds, to wait for the request to complete. + Note that if ``retry`` is specified, the timeout applies + to each individual attempt. Only meaningful outside of another + batch / transaction. """ if not keys: return @@ -527,9 +649,9 @@ def delete_multi(self, keys): current.delete(key) if not in_batch: - current.commit() + current.commit(retry=retry, timeout=timeout) - def allocate_ids(self, incomplete_key, num_ids): + def allocate_ids(self, incomplete_key, num_ids, retry=None, timeout=None): """Allocate a list of IDs from a partial key. :type incomplete_key: :class:`google.cloud.datastore.key.Key` @@ -538,6 +660,17 @@ def allocate_ids(self, incomplete_key, num_ids): :type num_ids: int :param num_ids: The number of IDs to allocate. + :type retry: :class:`google.api_core.retry.Retry` + :param retry: + A retry object used to retry requests. If ``None`` is specified, + requests will be retried using a default configuration. + + :type timeout: float + :param timeout: + Time, in seconds, to wait for the request to complete. + Note that if ``retry`` is specified, the timeout applies + to each individual attempt. + :rtype: list of :class:`google.cloud.datastore.key.Key` :returns: The (complete) keys allocated with ``incomplete_key`` as root. @@ -550,8 +683,10 @@ def allocate_ids(self, incomplete_key, num_ids): incomplete_key_pb = incomplete_key.to_protobuf() incomplete_key_pbs = [incomplete_key_pb] * num_ids + kwargs = _make_retry_timeout_kwargs(retry, timeout) + response_pb = self._datastore_api.allocate_ids( - incomplete_key.project, incomplete_key_pbs + incomplete_key.project, incomplete_key_pbs, **kwargs ) allocated_ids = [ allocated_key_pb.path[-1].id for allocated_key_pb in response_pb.keys @@ -666,7 +801,7 @@ def do_something(entity): kwargs["namespace"] = self.namespace return Query(self, **kwargs) - def reserve_ids(self, complete_key, num_ids): + def reserve_ids(self, complete_key, num_ids, retry=None, timeout=None): """Reserve a list of IDs from a complete key. :type complete_key: :class:`google.cloud.datastore.key.Key` @@ -675,6 +810,17 @@ def reserve_ids(self, complete_key, num_ids): :type num_ids: int :param num_ids: The number of IDs to reserve. + :type retry: :class:`google.api_core.retry.Retry` + :param retry: + A retry object used to retry requests. If ``None`` is specified, + requests will be retried using a default configuration. + + :type timeout: float + :param timeout: + Time, in seconds, to wait for the request to complete. + Note that if ``retry`` is specified, the timeout applies + to each individual attempt. + :rtype: class:`NoneType` :returns: None :raises: :class:`ValueError` if `complete_key`` is not a @@ -686,9 +832,13 @@ def reserve_ids(self, complete_key, num_ids): if not isinstance(num_ids, int): raise ValueError(("num_ids is not a valid integer.", num_ids)) + kwargs = _make_retry_timeout_kwargs(retry, timeout) + complete_key_pb = complete_key.to_protobuf() complete_key_pbs = [complete_key_pb] * num_ids - self._datastore_api.reserve_ids(complete_key.project, complete_key_pbs) + self._datastore_api.reserve_ids( + complete_key.project, complete_key_pbs, **kwargs + ) return None diff --git a/google/cloud/datastore/query.py b/google/cloud/datastore/query.py index 78a153cb..7a4bedeb 100644 --- a/google/cloud/datastore/query.py +++ b/google/cloud/datastore/query.py @@ -344,6 +344,8 @@ def fetch( end_cursor=None, client=None, eventual=False, + retry=None, + timeout=None, ): """Execute the Query; return an iterator for the matching entities. @@ -380,6 +382,17 @@ def fetch( but cannot be used inside a transaction or will raise ValueError. + :type retry: :class:`google.api_core.retry.Retry` + :param retry: + A retry object used to retry requests. If ``None`` is specified, + requests will be retried using a default configuration. + + :type timeout: float + :param timeout: + Time, in seconds, to wait for the request to complete. + Note that if ``retry`` is specified, the timeout applies + to each individual attempt. + :rtype: :class:`Iterator` :returns: The iterator for the query. """ @@ -394,6 +407,8 @@ def fetch( start_cursor=start_cursor, end_cursor=end_cursor, eventual=eventual, + retry=retry, + timeout=timeout, ) @@ -427,6 +442,17 @@ class Iterator(page_iterator.Iterator): Setting True will use eventual consistency, but cannot be used inside a transaction or will raise ValueError. + + :type retry: :class:`google.api_core.retry.Retry` + :param retry: + A retry object used to retry requests. If ``None`` is specified, + requests will be retried using a default configuration. + + :type timeout: float + :param timeout: + Time, in seconds, to wait for the request to complete. + Note that if ``retry`` is specified, the timeout applies + to each individual attempt. """ next_page_token = None @@ -440,6 +466,8 @@ def __init__( start_cursor=None, end_cursor=None, eventual=False, + retry=None, + timeout=None, ): super(Iterator, self).__init__( client=client, @@ -451,6 +479,8 @@ def __init__( self._offset = offset self._end_cursor = end_cursor self._eventual = eventual + self._retry = retry + self._timeout = timeout # The attributes below will change over the life of the iterator. self._more_results = True self._skipped_results = 0 @@ -536,8 +566,17 @@ def _next_page(self): partition_id = entity_pb2.PartitionId( project_id=self._query.project, namespace_id=self._query.namespace ) + + kwargs = {} + + if self._retry is not None: + kwargs["retry"] = self._retry + + if self._timeout is not None: + kwargs["timeout"] = self._timeout + response_pb = self.client._datastore_api.run_query( - self._query.project, partition_id, read_options, query=query_pb + self._query.project, partition_id, read_options, query=query_pb, **kwargs ) while ( @@ -551,7 +590,11 @@ def _next_page(self): query_pb.start_cursor = response_pb.batch.skipped_cursor query_pb.offset -= response_pb.batch.skipped_results response_pb = self.client._datastore_api.run_query( - self._query.project, partition_id, read_options, query=query_pb + self._query.project, + partition_id, + read_options, + query=query_pb, + **kwargs ) entity_pbs = self._process_query_results(response_pb) diff --git a/google/cloud/datastore/transaction.py b/google/cloud/datastore/transaction.py index ccff5561..705cc059 100644 --- a/google/cloud/datastore/transaction.py +++ b/google/cloud/datastore/transaction.py @@ -18,6 +18,19 @@ from google.cloud.datastore_v1.types import TransactionOptions +def _make_retry_timeout_kwargs(retry, timeout): + """Helper: make optional retry / timeout kwargs dict.""" + kwargs = {} + + if retry is not None: + kwargs["retry"] = retry + + if timeout is not None: + kwargs["timeout"] = timeout + + return kwargs + + class Transaction(Batch): """An abstraction representing datastore Transactions. @@ -193,40 +206,69 @@ def current(self): if isinstance(top, Transaction): return top - def begin(self): + def begin(self, retry=None, timeout=None): """Begins a transaction. This method is called automatically when entering a with statement, however it can be called explicitly if you don't want to use a context manager. + :type retry: :class:`google.api_core.retry.Retry` + :param retry: + A retry object used to retry requests. If ``None`` is specified, + requests will be retried using a default configuration. + + :type timeout: float + :param timeout: + Time, in seconds, to wait for the request to complete. + Note that if ``retry`` is specified, the timeout applies + to each individual attempt. + :raises: :class:`~exceptions.ValueError` if the transaction has already begun. """ super(Transaction, self).begin() + + kwargs = _make_retry_timeout_kwargs(retry, timeout) + try: - response_pb = self._client._datastore_api.begin_transaction(self.project) + response_pb = self._client._datastore_api.begin_transaction( + self.project, **kwargs + ) self._id = response_pb.transaction except: # noqa: E722 do not use bare except, specify exception instead self._status = self._ABORTED raise - def rollback(self): + def rollback(self, retry=None, timeout=None): """Rolls back the current transaction. This method has necessary side-effects: - Sets the current transaction's ID to None. + + :type retry: :class:`google.api_core.retry.Retry` + :param retry: + A retry object used to retry requests. If ``None`` is specified, + requests will be retried using a default configuration. + + :type timeout: float + :param timeout: + Time, in seconds, to wait for the request to complete. + Note that if ``retry`` is specified, the timeout applies + to each individual attempt. """ + kwargs = _make_retry_timeout_kwargs(retry, timeout) + try: # No need to use the response it contains nothing. - self._client._datastore_api.rollback(self.project, self._id) + self._client._datastore_api.rollback(self.project, self._id, **kwargs) finally: super(Transaction, self).rollback() # Clear our own ID in case this gets accidentally reused. self._id = None - def commit(self): + def commit(self, retry=None, timeout=None): """Commits the transaction. This is called automatically upon exiting a with statement, @@ -236,9 +278,22 @@ def commit(self): This method has necessary side-effects: - Sets the current transaction's ID to None. + + :type retry: :class:`google.api_core.retry.Retry` + :param retry: + A retry object used to retry requests. If ``None`` is specified, + requests will be retried using a default configuration. + + :type timeout: float + :param timeout: + Time, in seconds, to wait for the request to complete. + Note that if ``retry`` is specified, the timeout applies + to each individual attempt. """ + kwargs = _make_retry_timeout_kwargs(retry, timeout) + try: - super(Transaction, self).commit() + super(Transaction, self).commit(**kwargs) finally: # Clear our own ID in case this gets accidentally reused. self._id = None diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index 8516e78c..7ad2aeab 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -229,6 +229,46 @@ def test_commit(self): mode = datastore_pb2.CommitRequest.NON_TRANSACTIONAL commit_method.assert_called_with(project, mode, [], transaction=None) + def test_commit_w_timeout(self): + from google.cloud.datastore_v1.proto import datastore_pb2 + + project = "PROJECT" + client = _Client(project) + batch = self._make_one(client) + timeout = 100000 + + self.assertEqual(batch._status, batch._INITIAL) + batch.begin() + self.assertEqual(batch._status, batch._IN_PROGRESS) + batch.commit(timeout=timeout) + self.assertEqual(batch._status, batch._FINISHED) + + commit_method = client._datastore_api.commit + mode = datastore_pb2.CommitRequest.NON_TRANSACTIONAL + commit_method.assert_called_with( + project, mode, [], transaction=None, timeout=timeout + ) + + def test_commit_w_retry(self): + from google.cloud.datastore_v1.proto import datastore_pb2 + + project = "PROJECT" + client = _Client(project) + batch = self._make_one(client) + retry = mock.Mock() + + self.assertEqual(batch._status, batch._INITIAL) + batch.begin() + self.assertEqual(batch._status, batch._IN_PROGRESS) + batch.commit(retry=retry) + self.assertEqual(batch._status, batch._FINISHED) + + commit_method = client._datastore_api.commit + mode = datastore_pb2.CommitRequest.NON_TRANSACTIONAL + commit_method.assert_called_with( + project, mode, [], transaction=None, retry=retry + ) + def test_commit_wrong_status(self): project = "PROJECT" client = _Client(project) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index f172044e..ab186a8d 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -356,25 +356,24 @@ def test__push_batch_and__pop_batch(self): self.assertEqual(list(client._batch_stack), []) def test_get_miss(self): - _called_with = [] - - def _get_multi(*args, **kw): - _called_with.append((args, kw)) - return [] creds = _make_credentials() client = self._make_one(credentials=creds) - client.get_multi = _get_multi + get_multi = client.get_multi = mock.Mock(return_value=[]) key = object() self.assertIsNone(client.get(key)) - self.assertEqual(_called_with[0][0], ()) - self.assertEqual(_called_with[0][1]["keys"], [key]) - self.assertIsNone(_called_with[0][1]["missing"]) - self.assertIsNone(_called_with[0][1]["deferred"]) - self.assertIsNone(_called_with[0][1]["transaction"]) + get_multi.assert_called_once_with( + keys=[key], + missing=None, + deferred=None, + transaction=None, + eventual=False, + retry=None, + timeout=None, + ) def test_get_hit(self): TXN_ID = "123" @@ -554,13 +553,15 @@ def test_get_multi_w_deferred_from_backend_but_not_passed(self): self.PROJECT, [key1_pb, key2_pb], read_options=read_options ) - def test_get_multi_hit(self): + def test_get_multi_hit_w_retry_w_timeout(self): from google.cloud.datastore_v1.proto import datastore_pb2 from google.cloud.datastore.key import Key kind = "Kind" id_ = 1234 path = [{"kind": kind, "id": id_}] + retry = mock.Mock() + timeout = 100000 # Make a found entity pb to be returned from mock backend. entity_pb = _make_entity_pb(self.PROJECT, kind, id_, "foo", "Foo") @@ -573,7 +574,7 @@ def test_get_multi_hit(self): client._datastore_api_internal = ds_api key = Key(kind, id_, project=self.PROJECT) - (result,) = client.get_multi([key]) + (result,) = client.get_multi([key], retry=retry, timeout=timeout) new_key = result.key # Check the returned value is as expected. @@ -585,7 +586,11 @@ def test_get_multi_hit(self): read_options = datastore_pb2.ReadOptions() ds_api.lookup.assert_called_once_with( - self.PROJECT, [key.to_protobuf()], read_options=read_options + self.PROJECT, + [key.to_protobuf()], + read_options=read_options, + retry=retry, + timeout=timeout, ) def test_get_multi_hit_w_transaction(self): @@ -711,20 +716,30 @@ def test_get_multi_max_loops(self): ds_api.lookup.assert_not_called() def test_put(self): - _called_with = [] - - def _put_multi(*args, **kw): - _called_with.append((args, kw)) creds = _make_credentials() client = self._make_one(credentials=creds) - client.put_multi = _put_multi - entity = object() + put_multi = client.put_multi = mock.Mock() + entity = mock.Mock() client.put(entity) - self.assertEqual(_called_with[0][0], ()) - self.assertEqual(_called_with[0][1]["entities"], [entity]) + put_multi.assert_called_once_with(entities=[entity], retry=None, timeout=None) + + def test_put_w_retry_w_timeout(self): + + creds = _make_credentials() + client = self._make_one(credentials=creds) + put_multi = client.put_multi = mock.Mock() + entity = mock.Mock() + retry = mock.Mock() + timeout = 100000 + + client.put(entity, retry=retry, timeout=timeout) + + put_multi.assert_called_once_with( + entities=[entity], retry=retry, timeout=timeout + ) def test_put_multi_no_entities(self): creds = _make_credentials() @@ -739,13 +754,15 @@ def test_put_multi_w_single_empty_entity(self): client = self._make_one(credentials=creds) self.assertRaises(ValueError, client.put_multi, Entity()) - def test_put_multi_no_batch_w_partial_key(self): + def test_put_multi_no_batch_w_partial_key_w_retry_w_timeout(self): from google.cloud.datastore_v1.proto import datastore_pb2 from google.cloud.datastore.helpers import _property_tuples entity = _Entity(foo=u"bar") key = entity.key = _Key(self.PROJECT) key._id = None + retry = mock.Mock() + timeout = 100000 creds = _make_credentials() client = self._make_one(credentials=creds) @@ -753,12 +770,13 @@ def test_put_multi_no_batch_w_partial_key(self): ds_api = _make_datastore_api(key_pb) client._datastore_api_internal = ds_api - result = client.put_multi([entity]) + result = client.put_multi([entity], retry=retry, timeout=timeout) self.assertIsNone(result) self.assertEqual(ds_api.commit.call_count, 1) _, positional, keyword = ds_api.commit.mock_calls[0] - self.assertEqual(keyword, {"transaction": None}) + expected_kw = {"transaction": None, "retry": retry, "timeout": timeout} + self.assertEqual(keyword, expected_kw) self.assertEqual(len(positional), 3) self.assertEqual(positional[0], self.PROJECT) @@ -796,20 +814,26 @@ def test_put_multi_existing_batch_w_completed_key(self): self.assertEqual(value_pb.string_value, u"bar") def test_delete(self): - _called_with = [] + creds = _make_credentials() + client = self._make_one(credentials=creds) + delete_multi = client.delete_multi = mock.Mock() + key = mock.Mock() - def _delete_multi(*args, **kw): - _called_with.append((args, kw)) + client.delete(key) + delete_multi.assert_called_once_with(keys=[key], retry=None, timeout=None) + + def test_delete_w_retry_w_timeout(self): creds = _make_credentials() client = self._make_one(credentials=creds) - client.delete_multi = _delete_multi - key = object() + delete_multi = client.delete_multi = mock.Mock() + key = mock.Mock() + retry = mock.Mock() + timeout = 100000 - client.delete(key) + client.delete(key, retry=retry, timeout=timeout) - self.assertEqual(_called_with[0][0], ()) - self.assertEqual(_called_with[0][1]["keys"], [key]) + delete_multi.assert_called_once_with(keys=[key], retry=retry, timeout=timeout) def test_delete_multi_no_keys(self): creds = _make_credentials() @@ -820,22 +844,25 @@ def test_delete_multi_no_keys(self): self.assertIsNone(result) client._datastore_api_internal.commit.assert_not_called() - def test_delete_multi_no_batch(self): + def test_delete_multi_no_batch_w_retry_w_timeout(self): from google.cloud.datastore_v1.proto import datastore_pb2 key = _Key(self.PROJECT) + retry = mock.Mock() + timeout = 100000 creds = _make_credentials() client = self._make_one(credentials=creds) ds_api = _make_datastore_api() client._datastore_api_internal = ds_api - result = client.delete_multi([key]) + result = client.delete_multi([key], retry=retry, timeout=timeout) self.assertIsNone(result) self.assertEqual(ds_api.commit.call_count, 1) _, positional, keyword = ds_api.commit.mock_calls[0] - self.assertEqual(keyword, {"transaction": None}) + expected_kw = {"transaction": None, "retry": retry, "timeout": timeout} + self.assertEqual(keyword, expected_kw) self.assertEqual(len(positional), 3) self.assertEqual(positional[0], self.PROJECT) @@ -893,6 +920,36 @@ def test_allocate_ids_w_partial_key(self): # Check the IDs returned. self.assertEqual([key._id for key in result], list(range(num_ids))) + expected_keys = [incomplete_key.to_protobuf()] * num_ids + alloc_ids.assert_called_once_with(self.PROJECT, expected_keys) + + def test_allocate_ids_w_partial_key_w_retry_w_timeout(self): + num_ids = 2 + + incomplete_key = _Key(self.PROJECT) + incomplete_key._id = None + retry = mock.Mock() + timeout = 100000 + + creds = _make_credentials() + client = self._make_one(credentials=creds, _use_grpc=False) + allocated = mock.Mock(keys=[_KeyPB(i) for i in range(num_ids)], spec=["keys"]) + alloc_ids = mock.Mock(return_value=allocated, spec=[]) + ds_api = mock.Mock(allocate_ids=alloc_ids, spec=["allocate_ids"]) + client._datastore_api_internal = ds_api + + result = client.allocate_ids( + incomplete_key, num_ids, retry=retry, timeout=timeout + ) + + # Check the IDs returned. + self.assertEqual([key._id for key in result], list(range(num_ids))) + + expected_keys = [incomplete_key.to_protobuf()] * num_ids + alloc_ids.assert_called_once_with( + self.PROJECT, expected_keys, retry=retry, timeout=timeout + ) + def test_allocate_ids_w_completed_key(self): creds = _make_credentials() client = self._make_one(credentials=creds) @@ -913,6 +970,26 @@ def test_reserve_ids_w_completed_key(self): expected_keys = [complete_key.to_protobuf()] * num_ids reserve_ids.assert_called_once_with(self.PROJECT, expected_keys) + def test_reserve_ids_w_completed_key_w_retry_w_timeout(self): + num_ids = 2 + retry = mock.Mock() + timeout = 100000 + + creds = _make_credentials() + client = self._make_one(credentials=creds, _use_grpc=False) + complete_key = _Key(self.PROJECT) + self.assertTrue(not complete_key.is_partial) + reserve_ids = mock.Mock() + ds_api = mock.Mock(reserve_ids=reserve_ids, spec=["reserve_ids"]) + client._datastore_api_internal = ds_api + + client.reserve_ids(complete_key, num_ids, retry=retry, timeout=timeout) + + expected_keys = [complete_key.to_protobuf()] * num_ids + reserve_ids.assert_called_once_with( + self.PROJECT, expected_keys, retry=retry, timeout=timeout + ) + def test_reserve_ids_w_partial_key(self): num_ids = 2 incomplete_key = _Key(self.PROJECT) diff --git a/tests/unit/test_query.py b/tests/unit/test_query.py index ed6cbc9d..fbaadb28 100644 --- a/tests/unit/test_query.py +++ b/tests/unit/test_query.py @@ -332,6 +332,7 @@ def test_fetch_defaults_w_client_attr(self): client = self._make_client() query = self._make_one(client) + iterator = query.fetch() self.assertIsInstance(iterator, Iterator) @@ -339,19 +340,29 @@ def test_fetch_defaults_w_client_attr(self): self.assertIs(iterator.client, client) self.assertIsNone(iterator.max_results) self.assertEqual(iterator._offset, 0) + self.assertIsNone(iterator._retry) + self.assertIsNone(iterator._timeout) - def test_fetch_w_explicit_client(self): + def test_fetch_w_explicit_client_w_retry_w_timeout(self): from google.cloud.datastore.query import Iterator client = self._make_client() other_client = self._make_client() query = self._make_one(client) - iterator = query.fetch(limit=7, offset=8, client=other_client) + retry = mock.Mock() + timeout = 100000 + + iterator = query.fetch( + limit=7, offset=8, client=other_client, retry=retry, timeout=timeout + ) + self.assertIsInstance(iterator, Iterator) self.assertIs(iterator._query, query) self.assertIs(iterator.client, other_client) self.assertEqual(iterator.max_results, 7) self.assertEqual(iterator._offset, 8) + self.assertEqual(iterator._retry, retry) + self.assertEqual(iterator._timeout, timeout) class TestIterator(unittest.TestCase): @@ -367,6 +378,7 @@ def _make_one(self, *args, **kw): def test_constructor_defaults(self): query = object() client = object() + iterator = self._make_one(query, client) self.assertFalse(iterator._started) @@ -379,6 +391,8 @@ def test_constructor_defaults(self): self.assertIsNone(iterator._offset) self.assertIsNone(iterator._end_cursor) self.assertTrue(iterator._more_results) + self.assertIsNone(iterator._retry) + self.assertIsNone(iterator._timeout) def test_constructor_explicit(self): query = object() @@ -387,6 +401,9 @@ def test_constructor_explicit(self): offset = 9 start_cursor = b"8290\xff" end_cursor = b"so20rc\ta" + retry = mock.Mock() + timeout = 100000 + iterator = self._make_one( query, client, @@ -394,6 +411,8 @@ def test_constructor_explicit(self): offset=offset, start_cursor=start_cursor, end_cursor=end_cursor, + retry=retry, + timeout=timeout, ) self.assertFalse(iterator._started) @@ -406,6 +425,8 @@ def test_constructor_explicit(self): self.assertEqual(iterator._offset, offset) self.assertEqual(iterator._end_cursor, end_cursor) self.assertTrue(iterator._more_results) + self.assertEqual(iterator._retry, retry) + self.assertEqual(iterator._timeout, timeout) def test__build_protobuf_empty(self): from google.cloud.datastore_v1.proto import query_pb2 @@ -513,7 +534,7 @@ def test__process_query_results_bad_enum(self): with self.assertRaises(ValueError): iterator._process_query_results(response_pb) - def _next_page_helper(self, txn_id=None): + def _next_page_helper(self, txn_id=None, retry=None, timeout=None): from google.api_core import page_iterator from google.cloud.datastore_v1.proto import datastore_pb2 from google.cloud.datastore_v1.proto import entity_pb2 @@ -531,9 +552,18 @@ def _next_page_helper(self, txn_id=None): client = _Client(project, datastore_api=ds_api, transaction=transaction) query = Query(client) - iterator = self._make_one(query, client) + kwargs = {} + + if retry is not None: + kwargs["retry"] = retry + + if timeout is not None: + kwargs["timeout"] = timeout + + iterator = self._make_one(query, client, **kwargs) page = iterator._next_page() + self.assertIsInstance(page, page_iterator.Page) self.assertIs(page._parent, iterator) @@ -544,12 +574,18 @@ def _next_page_helper(self, txn_id=None): read_options = datastore_pb2.ReadOptions(transaction=txn_id) empty_query = query_pb2.Query() ds_api.run_query.assert_called_once_with( - project, partition_id, read_options, query=empty_query + project, partition_id, read_options, query=empty_query, **kwargs ) def test__next_page(self): self._next_page_helper() + def test__next_page_w_retry(self): + self._next_page_helper(retry=mock.Mock()) + + def test__next_page_w_timeout(self): + self._next_page_helper(timeout=100000) + def test__next_page_in_transaction(self): txn_id = b"1xo1md\xe2\x98\x83" self._next_page_helper(txn_id) diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index a1e23610..b285db1f 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -94,6 +94,23 @@ def test_begin(self): self.assertEqual(xact.id, id_) ds_api.begin_transaction.assert_called_once_with(project) + def test_begin_w_retry_w_timeout(self): + project = "PROJECT" + id_ = 889 + retry = mock.Mock() + timeout = 100000 + + ds_api = _make_datastore_api(xact_id=id_) + client = _Client(project, datastore_api=ds_api) + xact = self._make_one(client) + + xact.begin(retry=retry, timeout=timeout) + + self.assertEqual(xact.id, id_) + ds_api.begin_transaction.assert_called_once_with( + project, retry=retry, timeout=timeout + ) + def test_begin_tombstoned(self): project = "PROJECT" id_ = 1094 @@ -131,52 +148,77 @@ def test_rollback(self): client = _Client(project, datastore_api=ds_api) xact = self._make_one(client) xact.begin() + xact.rollback() - client._datastore_api.rollback.assert_called_once_with(project, id_) + self.assertIsNone(xact.id) - ds_api.begin_transaction.assert_called_once_with(project) + ds_api.rollback.assert_called_once_with(project, id_) + + def test_rollback_w_retry_w_timeout(self): + project = "PROJECT" + id_ = 239 + retry = mock.Mock() + timeout = 100000 + + ds_api = _make_datastore_api(xact_id=id_) + client = _Client(project, datastore_api=ds_api) + xact = self._make_one(client) + xact.begin() + + xact.rollback(retry=retry, timeout=timeout) + + self.assertIsNone(xact.id) + ds_api.rollback.assert_called_once_with( + project, id_, retry=retry, timeout=timeout + ) def test_commit_no_partial_keys(self): from google.cloud.datastore_v1.proto import datastore_pb2 project = "PROJECT" id_ = 1002930 + mode = datastore_pb2.CommitRequest.TRANSACTIONAL + ds_api = _make_datastore_api(xact_id=id_) client = _Client(project, datastore_api=ds_api) xact = self._make_one(client) xact.begin() xact.commit() - mode = datastore_pb2.CommitRequest.TRANSACTIONAL - client._datastore_api.commit.assert_called_once_with( - project, mode, [], transaction=id_ - ) + ds_api.commit.assert_called_once_with(project, mode, [], transaction=id_) self.assertIsNone(xact.id) - ds_api.begin_transaction.assert_called_once_with(project) - def test_commit_w_partial_keys(self): + def test_commit_w_partial_keys_w_retry_w_timeout(self): from google.cloud.datastore_v1.proto import datastore_pb2 project = "PROJECT" kind = "KIND" id1 = 123 + mode = datastore_pb2.CommitRequest.TRANSACTIONAL key = _make_key(kind, id1, project) id2 = 234 + retry = mock.Mock() + timeout = 100000 + ds_api = _make_datastore_api(key, xact_id=id2) client = _Client(project, datastore_api=ds_api) xact = self._make_one(client) xact.begin() entity = _Entity() + xact.put(entity) - xact.commit() + xact.commit(retry=retry, timeout=timeout) - mode = datastore_pb2.CommitRequest.TRANSACTIONAL ds_api.commit.assert_called_once_with( - project, mode, xact.mutations, transaction=id2 + project, + mode, + xact.mutations, + transaction=id2, + retry=retry, + timeout=timeout, ) self.assertIsNone(xact.id) self.assertEqual(entity.key.path, [{"kind": kind, "id": id1}]) - ds_api.begin_transaction.assert_called_once_with(project) def test_context_manager_no_raise(self): from google.cloud.datastore_v1.proto import datastore_pb2 @@ -238,6 +280,7 @@ def test_put_read_only(self): entity = _Entity() xact = self._make_one(client, read_only=True) xact.begin() + with self.assertRaises(RuntimeError): xact.put(entity)