From d5227994a4a5e2300905d6619742664dcd909443 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 9 Dec 2020 18:09:48 -0500 Subject: [PATCH] fix: make HTTPDatastoreAPI compatible w/ microgen Gapic API (#136) * tests: refactor to helpers for repeated tests * fix: update 'HTTPDatastoreAPI.allocate_ids' to follow request-only pattern Toward #124 * fix: add missing 'HTTPDatastoreAPI.reserve_ids' method Closes #134. * fix: update 'HTTPDatastoreAPI.rollback' to follow request-only pattern Toward #124 * fix: update 'HTTPDatastoreAPI.commit' to follow request-only pattern Toward #124 * fix: update 'HTTPDatastoreAPI.begin_transaction' to follow request-only pattern Toward #124 * fix: update 'HTTPDatastoreAPI.run_query' to follow request-only pattern Toward #124 * fix: update 'HTTPDatastoreAPI.lookup' to follow request-only pattern Toward #124 * fix: add 'retry' / 'timeout' args to HTTPDatastoreAPI methods Closes #124. * chore: lint * tests: run systests also w/ GRPC disabled. Closes #133. --- google/cloud/datastore/_http.py | 264 ++++++---- noxfile.py | 13 +- tests/unit/test__http.py | 907 +++++++++++++++++--------------- 3 files changed, 664 insertions(+), 520 deletions(-) diff --git a/google/cloud/datastore/_http.py b/google/cloud/datastore/_http.py index 8f2c9c58..9e13567b 100644 --- a/google/cloud/datastore/_http.py +++ b/google/cloud/datastore/_http.py @@ -31,7 +31,30 @@ """A template for the URL of a particular API call.""" -def _request(http, project, method, data, base_url, client_info): +def _make_retry_timeout_kwargs(retry, timeout): + """Helper for methods taking optional retry / timout args.""" + kwargs = {} + + if retry is not None: + kwargs["retry"] = retry + + if timeout is not None: + kwargs["timeout"] = timeout + + return kwargs + + +def _make_request_pb(request, request_pb_type): + """Helper for converting dicts to request messages.""" + if not isinstance(request, request_pb_type): + request = request_pb_type(**request) + + return request + + +def _request( + http, project, method, data, base_url, client_info, retry=None, timeout=None, +): """Make a request over the Http transport to the Cloud Datastore API. :type http: :class:`requests.Session` @@ -54,6 +77,12 @@ def _request(http, project, method, data, base_url, client_info): :type client_info: :class:`google.api_core.client_info.ClientInfo` :param client_info: used to generate user agent. + :type retry: :class:`google.api_core.retry.Retry` + :param retry: (Optional) retry policy for the request + + :type timeout: float or tuple(float, float) + :param timeout: (Optional) timeout for the request + :rtype: str :returns: The string response content from the API call. :raises: :class:`google.cloud.exceptions.GoogleCloudError` if the @@ -67,7 +96,17 @@ def _request(http, project, method, data, base_url, client_info): } api_url = build_api_url(project, method, base_url) - response = http.request(url=api_url, method="POST", headers=headers, data=data) + requester = http.request + + if retry is not None: + requester = retry(requester) + + if timeout is not None: + response = requester( + url=api_url, method="POST", headers=headers, data=data, timeout=timeout, + ) + else: + response = requester(url=api_url, method="POST", headers=headers, data=data) if response.status_code != 200: error_status = status_pb2.Status.FromString(response.content) @@ -78,7 +117,17 @@ def _request(http, project, method, data, base_url, client_info): return response.content -def _rpc(http, project, method, base_url, client_info, request_pb, response_pb_cls): +def _rpc( + http, + project, + method, + base_url, + client_info, + request_pb, + response_pb_cls, + retry=None, + timeout=None, +): """Make a protobuf RPC request. :type http: :class:`requests.Session` @@ -105,11 +154,20 @@ def _rpc(http, project, method, base_url, client_info, request_pb, response_pb_c :param response_pb_cls: The class used to unmarshall the response protobuf. + :type retry: :class:`google.api_core.retry.Retry` + :param retry: (Optional) retry policy for the request + + :type timeout: float or tuple(float, float) + :param timeout: (Optional) timeout for the request + :rtype: :class:`google.protobuf.message.Message` :returns: The RPC message parsed from the response. """ req_data = request_pb._pb.SerializeToString() - response = _request(http, project, method, req_data, base_url, client_info) + kwargs = _make_retry_timeout_kwargs(retry, timeout) + response = _request( + http, project, method, req_data, base_url, client_info, **kwargs + ) return response_pb_cls.deserialize(response) @@ -149,27 +207,25 @@ class HTTPDatastoreAPI(object): def __init__(self, client): self.client = client - def lookup(self, project_id, keys, read_options=None): + def lookup(self, request, retry=None, timeout=None): """Perform a ``lookup`` request. - :type project_id: str - :param project_id: The project to connect to. This is - usually your project name in the cloud console. + :type request: :class:`_datastore_pb2.LookupRequest` or dict + :param request: + Parameter bundle for API request. - :type keys: List[.entity_pb2.Key] - :param keys: The keys to retrieve from the datastore. + :type retry: :class:`google.api_core.retry.Retry` + :param retry: (Optional) retry policy for the request - :type read_options: :class:`.datastore_pb2.ReadOptions` - :param read_options: (Optional) The options for this lookup. Contains - either the transaction for the read or - ``STRONG`` or ``EVENTUAL`` read consistency. + :type timeout: float or tuple(float, float) + :param timeout: (Optional) timeout for the request :rtype: :class:`.datastore_pb2.LookupResponse` :returns: The returned protobuf response object. """ - request_pb = _datastore_pb2.LookupRequest( - project_id=project_id, read_options=read_options, keys=keys - ) + request_pb = _make_request_pb(request, _datastore_pb2.LookupRequest) + project_id = request_pb.project_id + return _rpc( self.client._http, project_id, @@ -178,44 +234,29 @@ def lookup(self, project_id, keys, read_options=None): self.client._client_info, request_pb, _datastore_pb2.LookupResponse, + retry=retry, + timeout=timeout, ) - def run_query( - self, project_id, partition_id, read_options=None, query=None, gql_query=None - ): + def run_query(self, request, retry=None, timeout=None): """Perform a ``runQuery`` request. - :type project_id: str - :param project_id: The project to connect to. This is - usually your project name in the cloud console. + :type request: :class:`_datastore_pb2.BeginTransactionRequest` or dict + :param request: + Parameter bundle for API request. - :type partition_id: :class:`.entity_pb2.PartitionId` - :param partition_id: Partition ID corresponding to an optional - namespace and project ID. + :type retry: :class:`google.api_core.retry.Retry` + :param retry: (Optional) retry policy for the request - :type read_options: :class:`.datastore_pb2.ReadOptions` - :param read_options: (Optional) The options for this query. Contains - either the transaction for the read or - ``STRONG`` or ``EVENTUAL`` read consistency. - - :type query: :class:`.query_pb2.Query` - :param query: (Optional) The query protobuf to run. At most one of - ``query`` and ``gql_query`` can be specified. - - :type gql_query: :class:`.query_pb2.GqlQuery` - :param gql_query: (Optional) The GQL query to run. At most one of - ``query`` and ``gql_query`` can be specified. + :type timeout: float or tuple(float, float) + :param timeout: (Optional) timeout for the request :rtype: :class:`.datastore_pb2.RunQueryResponse` :returns: The returned protobuf response object. """ - request_pb = _datastore_pb2.RunQueryRequest( - project_id=project_id, - partition_id=partition_id, - read_options=read_options, - query=query, - gql_query=gql_query, - ) + request_pb = _make_request_pb(request, _datastore_pb2.RunQueryRequest) + project_id = request_pb.project_id + return _rpc( self.client._http, project_id, @@ -224,22 +265,29 @@ def run_query( self.client._client_info, request_pb, _datastore_pb2.RunQueryResponse, + retry=retry, + timeout=timeout, ) - def begin_transaction(self, project_id, transaction_options=None): + def begin_transaction(self, request, retry=None, timeout=None): """Perform a ``beginTransaction`` request. - :type project_id: str - :param project_id: The project to connect to. This is - usually your project name in the cloud console. + :type request: :class:`_datastore_pb2.BeginTransactionRequest` or dict + :param request: + Parameter bundle for API request. + + :type retry: :class:`google.api_core.retry.Retry` + :param retry: (Optional) retry policy for the request - :type transaction_options: ~.datastore_v1.types.TransactionOptions - :param transaction_options: (Optional) Options for a new transaction. + :type timeout: float or tuple(float, float) + :param timeout: (Optional) timeout for the request :rtype: :class:`.datastore_pb2.BeginTransactionResponse` :returns: The returned protobuf response object. """ - request_pb = _datastore_pb2.BeginTransactionRequest() + request_pb = _make_request_pb(request, _datastore_pb2.BeginTransactionRequest) + project_id = request_pb.project_id + return _rpc( self.client._http, project_id, @@ -248,37 +296,29 @@ def begin_transaction(self, project_id, transaction_options=None): self.client._client_info, request_pb, _datastore_pb2.BeginTransactionResponse, + retry=retry, + timeout=timeout, ) - def commit(self, project_id, mode, mutations, transaction=None): + def commit(self, request, retry=None, timeout=None): """Perform a ``commit`` request. - :type project_id: str - :param project_id: The project to connect to. This is - usually your project name in the cloud console. + :type request: :class:`_datastore_pb2.CommitRequest` or dict + :param request: + Parameter bundle for API request. - :type mode: :class:`.gapic.datastore.v1.enums.CommitRequest.Mode` - :param mode: The type of commit to perform. Expected to be one of - ``TRANSACTIONAL`` or ``NON_TRANSACTIONAL``. + :type retry: :class:`google.api_core.retry.Retry` + :param retry: (Optional) retry policy for the request - :type mutations: list - :param mutations: List of :class:`.datastore_pb2.Mutation`, the - mutations to perform. - - :type transaction: bytes - :param transaction: (Optional) The transaction ID returned from - :meth:`begin_transaction`. Non-transactional - commits must pass :data:`None`. + :type timeout: float or tuple(float, float) + :param timeout: (Optional) timeout for the request :rtype: :class:`.datastore_pb2.CommitResponse` :returns: The returned protobuf response object. """ - request_pb = _datastore_pb2.CommitRequest( - project_id=project_id, - mode=mode, - transaction=transaction, - mutations=mutations, - ) + request_pb = _make_request_pb(request, _datastore_pb2.CommitRequest) + project_id = request_pb.project_id + return _rpc( self.client._http, project_id, @@ -287,25 +327,29 @@ def commit(self, project_id, mode, mutations, transaction=None): self.client._client_info, request_pb, _datastore_pb2.CommitResponse, + retry=retry, + timeout=timeout, ) - def rollback(self, project_id, transaction): + def rollback(self, request, retry=None, timeout=None): """Perform a ``rollback`` request. - :type project_id: str - :param project_id: The project to connect to. This is - usually your project name in the cloud console. + :type request: :class:`_datastore_pb2.RollbackRequest` or dict + :param request: + Parameter bundle for API request. - :type transaction: bytes - :param transaction: The transaction ID to rollback. + :type retry: :class:`google.api_core.retry.Retry` + :param retry: (Optional) retry policy for the request + + :type timeout: float or tuple(float, float) + :param timeout: (Optional) timeout for the request :rtype: :class:`.datastore_pb2.RollbackResponse` :returns: The returned protobuf response object. """ - request_pb = _datastore_pb2.RollbackRequest( - project_id=project_id, transaction=transaction - ) - # Response is empty (i.e. no fields) but we return it anyway. + request_pb = _make_request_pb(request, _datastore_pb2.RollbackRequest) + project_id = request_pb.project_id + return _rpc( self.client._http, project_id, @@ -314,22 +358,29 @@ def rollback(self, project_id, transaction): self.client._client_info, request_pb, _datastore_pb2.RollbackResponse, + retry=retry, + timeout=timeout, ) - def allocate_ids(self, project_id, keys): + def allocate_ids(self, request, retry=None, timeout=None): """Perform an ``allocateIds`` request. - :type project_id: str - :param project_id: The project to connect to. This is - usually your project name in the cloud console. + :type request: :class:`_datastore_pb2.AllocateIdsRequest` or dict + :param request: + Parameter bundle for API request. - :type keys: List[.entity_pb2.Key] - :param keys: The keys for which the backend should allocate IDs. + :type retry: :class:`google.api_core.retry.Retry` + :param retry: (Optional) retry policy for the request + + :type timeout: float or tuple(float, float) + :param timeout: (Optional) timeout for the request :rtype: :class:`.datastore_pb2.AllocateIdsResponse` :returns: The returned protobuf response object. """ - request_pb = _datastore_pb2.AllocateIdsRequest(keys=keys) + request_pb = _make_request_pb(request, _datastore_pb2.AllocateIdsRequest) + project_id = request_pb.project_id + return _rpc( self.client._http, project_id, @@ -338,4 +389,37 @@ def allocate_ids(self, project_id, keys): self.client._client_info, request_pb, _datastore_pb2.AllocateIdsResponse, + retry=retry, + timeout=timeout, + ) + + def reserve_ids(self, request, retry=None, timeout=None): + """Perform an ``reserveIds`` request. + + :type request: :class:`_datastore_pb2.ReserveIdsRequest` or dict + :param request: + Parameter bundle for API request. + + :type retry: :class:`google.api_core.retry.Retry` + :param retry: (Optional) retry policy for the request + + :type timeout: float or tuple(float, float) + :param timeout: (Optional) timeout for the request + + :rtype: :class:`.datastore_pb2.ReserveIdsResponse` + :returns: The returned protobuf response object. + """ + request_pb = _make_request_pb(request, _datastore_pb2.ReserveIdsRequest) + project_id = request_pb.project_id + + return _rpc( + self.client._http, + project_id, + "reserveIds", + self.client._base_url, + self.client._client_info, + request_pb, + _datastore_pb2.ReserveIdsResponse, + retry=retry, + timeout=timeout, ) diff --git a/noxfile.py b/noxfile.py index 21cdf161..a4bcedd7 100644 --- a/noxfile.py +++ b/noxfile.py @@ -98,7 +98,8 @@ def unit(session): @nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS) -def system(session): +@nox.parametrize("disable_grpc", [False, True]) +def system(session, disable_grpc): """Run the system test suite.""" system_test_path = os.path.join("tests", "system.py") system_test_folder_path = os.path.join("tests", "system") @@ -126,11 +127,17 @@ def system(session): ) session.install("-e", ".") + env = {} + if disable_grpc: + env["GOOGLE_CLOUD_DISABLE_GRPC"] = "True" + # Run py.test against the system tests. if system_test_exists: - session.run("py.test", "--quiet", system_test_path, *session.posargs) + session.run("py.test", "--quiet", system_test_path, env=env, *session.posargs) if system_test_folder_exists: - session.run("py.test", "--quiet", system_test_folder_path, *session.posargs) + session.run( + "py.test", "--quiet", system_test_folder_path, env=env, *session.posargs + ) @nox.session(python=DEFAULT_PYTHON_VERSION) diff --git a/tests/unit/test__http.py b/tests/unit/test__http.py index 6048d40b..2e8da9e9 100644 --- a/tests/unit/test__http.py +++ b/tests/unit/test__http.py @@ -20,6 +20,82 @@ import requests +class Test__make_retry_timeout_kwargs(unittest.TestCase): + @staticmethod + def _call_fut(retry, timeout): + from google.cloud.datastore._http import _make_retry_timeout_kwargs + + return _make_retry_timeout_kwargs(retry, timeout) + + def test_empty(self): + expected = {} + self.assertEqual(self._call_fut(None, None), expected) + + def test_w_retry(self): + retry = object() + expected = {"retry": retry} + self.assertEqual(self._call_fut(retry, None), expected) + + def test_w_timeout(self): + timeout = 5.0 + expected = {"timeout": timeout} + self.assertEqual(self._call_fut(None, timeout), expected) + + def test_w_retry_w_timeout(self): + retry = object() + timeout = 5.0 + expected = {"retry": retry, "timeout": timeout} + self.assertEqual(self._call_fut(retry, timeout), expected) + + +class Foo: + def __init__(self, bar=None, baz=None): + self.bar = bar + self.baz = baz + + +class Test__make_request_pb(unittest.TestCase): + @staticmethod + def _call_fut(request, request_pb_type): + from google.cloud.datastore._http import _make_request_pb + + return _make_request_pb(request, request_pb_type) + + def test_w_empty_dict(self): + request = {} + + foo = self._call_fut(request, Foo) + + self.assertIsInstance(foo, Foo) + self.assertIsNone(foo.bar) + self.assertIsNone(foo.baz) + + def test_w_partial_dict(self): + request = {"bar": "Bar"} + + foo = self._call_fut(request, Foo) + + self.assertIsInstance(foo, Foo) + self.assertEqual(foo.bar, "Bar") + self.assertIsNone(foo.baz) + + def test_w_complete_dict(self): + request = {"bar": "Bar", "baz": "Baz"} + + foo = self._call_fut(request, Foo) + + self.assertIsInstance(foo, Foo) + self.assertEqual(foo.bar, "Bar") + self.assertEqual(foo.baz, "Baz") + + def test_w_instance(self): + passed = Foo() + + foo = self._call_fut(passed, Foo) + + self.assertIs(foo, passed) + + class Test__request(unittest.TestCase): @staticmethod def _call_fut(*args, **kwargs): @@ -27,7 +103,7 @@ def _call_fut(*args, **kwargs): return _request(*args, **kwargs) - def test_success(self): + def _helper(self, retry=None, timeout=None): from google.cloud import _http as connection_module project = "PROJECT" @@ -40,8 +116,11 @@ def test_success(self): http = _make_requests_session([_make_response(content=response_data)]) - # Call actual function under test. - response = self._call_fut(http, project, method, data, base_url, client_info) + kwargs = _make_retry_timeout_kwargs(retry, timeout, http) + + response = self._call_fut( + http, project, method, data, base_url, client_info, **kwargs + ) self.assertEqual(response, response_data) # Check that the mocks were called as expected. @@ -51,10 +130,30 @@ def test_success(self): "User-Agent": user_agent, connection_module.CLIENT_INFO_HEADER: user_agent, } + + if retry is not None: + retry.assert_called_once_with(http.request) + + kwargs.pop("retry", None) http.request.assert_called_once_with( - method="POST", url=expected_url, headers=expected_headers, data=data + method="POST", + url=expected_url, + headers=expected_headers, + data=data, + **kwargs ) + def test_ok(self): + self._helper() + + def test_w_retry(self): + retry = mock.MagicMock() + self._helper(retry=retry) + + def test_w_timeout(self): + timeout = 5.0 + self._helper(timeout=timeout) + def test_failure(self): from google.cloud.exceptions import BadRequest from google.rpc import code_pb2 @@ -89,7 +188,7 @@ def _call_fut(*args, **kwargs): return _rpc(*args, **kwargs) - def test_it(self): + def _helper(self, retry=None, timeout=None): from google.cloud.datastore_v1.types import datastore as datastore_pb2 http = object() @@ -100,6 +199,9 @@ def test_it(self): request_pb = datastore_pb2.BeginTransactionRequest(project_id=project) response_pb = datastore_pb2.BeginTransactionResponse(transaction=b"7830rmc") + + kwargs = _make_retry_timeout_kwargs(retry, timeout) + patch = mock.patch( "google.cloud.datastore._http._request", return_value=response_pb._pb.SerializeToString(), @@ -113,17 +215,31 @@ def test_it(self): client_info, request_pb, datastore_pb2.BeginTransactionResponse, + **kwargs ) - self.assertEqual(result, response_pb._pb) - mock_request.assert_called_once_with( - http, - project, - method, - request_pb._pb.SerializeToString(), - base_url, - client_info, - ) + self.assertEqual(result, response_pb._pb) + + mock_request.assert_called_once_with( + http, + project, + method, + request_pb._pb.SerializeToString(), + base_url, + client_info, + **kwargs + ) + + def test_defaults(self): + self._helper() + + def test_w_retry(self): + retry = mock.MagicMock() + self._helper(retry=retry) + + def test_w_timeout(self): + timeout = 5.0 + self._helper(timeout=timeout) class TestHTTPDatastoreAPI(unittest.TestCase): @@ -147,52 +263,35 @@ def test_constructor(self): ds_api = self._make_one(client) self.assertIs(ds_api.client, client) - def test_lookup_single_key_empty_response(self): + def _lookup_single_helper( + self, + read_consistency=None, + transaction=None, + empty=True, + retry=None, + timeout=None, + ): from google.cloud.datastore_v1.types import datastore as datastore_pb2 + from google.cloud.datastore_v1.types import entity as entity_pb2 project = "PROJECT" key_pb = _make_key_pb(project) - rsp_pb = datastore_pb2.LookupResponse() - read_options = datastore_pb2.ReadOptions() - - # Create mock HTTP and client with response. - http = _make_requests_session( - [_make_response(content=rsp_pb._pb.SerializeToString())] - ) - client_info = _make_client_info() - client = mock.Mock( - _http=http, - _base_url="test.invalid", - _client_info=client_info, - spec=["_http", "_base_url", "_client_info"], - ) - # Make request. - ds_api = self._make_one(client) - response = ds_api.lookup(project, [key_pb], read_options=read_options) + options_kw = {} + if read_consistency is not None: + options_kw["read_consistency"] = read_consistency + if transaction is not None: + options_kw["transaction"] = transaction - # Check the result and verify the callers. - self.assertEqual(response, rsp_pb._pb) - uri = _build_expected_url(client._base_url, project, "lookup") - self.assertEqual(len(response.found), 0) - self.assertEqual(len(response.missing), 0) - self.assertEqual(len(response.deferred), 0) + read_options = datastore_pb2.ReadOptions(**options_kw) - request = _verify_protobuf_call(http, uri, datastore_pb2.LookupRequest()) - self.assertEqual(list(request.keys), [key_pb._pb]) - self.assertEqual(request.read_options, read_options._pb) - - def test_lookup_single_key_empty_response_w_eventual(self): - from google.cloud.datastore_v1.types import datastore as datastore_pb2 - - project = "PROJECT" - key_pb = _make_key_pb(project) rsp_pb = datastore_pb2.LookupResponse() - read_options = datastore_pb2.ReadOptions( - read_consistency=datastore_pb2.ReadOptions.ReadConsistency.EVENTUAL - ) - # Create mock HTTP and client with response. + if not empty: + entity = entity_pb2.Entity() + entity.key._pb.CopyFrom(key_pb._pb) + rsp_pb._pb.found.add(entity=entity._pb) + http = _make_requests_session( [_make_response(content=rsp_pb._pb.SerializeToString())] ) @@ -203,110 +302,97 @@ def test_lookup_single_key_empty_response_w_eventual(self): _client_info=client_info, spec=["_http", "_base_url", "_client_info"], ) - - # Make request. ds_api = self._make_one(client) - response = ds_api.lookup(project, [key_pb], read_options=read_options) + request = { + "project_id": project, + "keys": [key_pb], + "read_options": read_options, + } + kwargs = _make_retry_timeout_kwargs(retry, timeout, http) - # Check the result and verify the callers. - self.assertEqual(response, rsp_pb._pb) - uri = _build_expected_url(client._base_url, project, "lookup") - self.assertEqual(len(response.found), 0) - self.assertEqual(len(response.missing), 0) - self.assertEqual(len(response.deferred), 0) + response = ds_api.lookup(request=request, **kwargs) - request = _verify_protobuf_call(http, uri, datastore_pb2.LookupRequest()) - self.assertEqual(list(request.keys), [key_pb._pb]) - self.assertEqual(request.read_options, read_options._pb) + self.assertEqual(response, rsp_pb._pb) - def test_lookup_single_key_empty_response_w_transaction(self): - from google.cloud.datastore_v1.types import datastore as datastore_pb2 + if empty: + self.assertEqual(len(response.found), 0) + else: + self.assertEqual(len(response.found), 1) - project = "PROJECT" - transaction = b"TRANSACTION" - key_pb = _make_key_pb(project) - rsp_pb = datastore_pb2.LookupResponse() - read_options = datastore_pb2.ReadOptions(transaction=transaction) + self.assertEqual(len(response.missing), 0) + self.assertEqual(len(response.deferred), 0) - # Create mock HTTP and client with response. - http = _make_requests_session( - [_make_response(content=rsp_pb._pb.SerializeToString())] - ) - client_info = _make_client_info() - client = mock.Mock( - _http=http, - _base_url="test.invalid", - _client_info=client_info, - spec=["_http", "_base_url", "_client_info"], + uri = _build_expected_url(client._base_url, project, "lookup") + request = _verify_protobuf_call( + http, uri, datastore_pb2.LookupRequest(), retry=retry, timeout=timeout, ) - # Make request. - ds_api = self._make_one(client) - response = ds_api.lookup(project, [key_pb], read_options=read_options) - - # Check the result and verify the callers. - self.assertEqual(response, rsp_pb._pb) - uri = _build_expected_url(client._base_url, project, "lookup") - self.assertEqual(len(response.found), 0) - self.assertEqual(len(response.missing), 0) - self.assertEqual(len(response.deferred), 0) + if retry is not None: + retry.assert_called_once_with(http.request) - request = _verify_protobuf_call(http, uri, datastore_pb2.LookupRequest()) self.assertEqual(list(request.keys), [key_pb._pb]) self.assertEqual(request.read_options, read_options._pb) - def test_lookup_single_key_nonempty_response(self): + def test_lookup_single_key_miss(self): + self._lookup_single_helper() + + def test_lookup_single_key_miss_w_read_consistency(self): from google.cloud.datastore_v1.types import datastore as datastore_pb2 - from google.cloud.datastore_v1.types import entity as entity_pb2 - project = "PROJECT" - key_pb = _make_key_pb(project) - rsp_pb = datastore_pb2.LookupResponse() - entity = entity_pb2.Entity() - entity.key._pb.CopyFrom(key_pb._pb) - rsp_pb._pb.found.add(entity=entity._pb) - read_options = datastore_pb2.ReadOptions() + read_consistency = datastore_pb2.ReadOptions.ReadConsistency.EVENTUAL + self._lookup_single_helper(read_consistency=read_consistency) - # Create mock HTTP and client with response. - http = _make_requests_session( - [_make_response(content=rsp_pb._pb.SerializeToString())] - ) - client_info = _make_client_info() - client = mock.Mock( - _http=http, - _base_url="test.invalid", - _client_info=client_info, - spec=["_http", "_base_url", "_client_info"], - ) + def test_lookup_single_key_miss_w_transaction(self): + transaction = b"TRANSACTION" + self._lookup_single_helper(transaction=transaction) - # Make request. - ds_api = self._make_one(client) - response = ds_api.lookup(project, [key_pb], read_options=read_options) + def test_lookup_single_key_hit(self): + self._lookup_single_helper(empty=False) - # Check the result and verify the callers. - self.assertEqual(response, rsp_pb._pb) - uri = _build_expected_url(client._base_url, project, "lookup") - self.assertEqual(len(response.found), 1) - self.assertEqual(len(response.missing), 0) - self.assertEqual(len(response.deferred), 0) - found = response.found[0].entity - self.assertEqual(found.key.path[0].kind, "Kind") - self.assertEqual(found.key.path[0].id, 1234) + def test_lookup_single_key_hit_w_retry(self): + retry = mock.MagicMock() + self._lookup_single_helper(empty=False, retry=retry) - request = _verify_protobuf_call(http, uri, datastore_pb2.LookupRequest()) - self.assertEqual(list(request.keys), [key_pb._pb]) - self.assertEqual(request.read_options, read_options._pb) + def test_lookup_single_key_hit_w_timeout(self): + timeout = 5.0 + self._lookup_single_helper(empty=False, timeout=timeout) - def test_lookup_multiple_keys_empty_response(self): + def _lookup_multiple_helper( + self, found=0, missing=0, deferred=0, retry=None, timeout=None, + ): from google.cloud.datastore_v1.types import datastore as datastore_pb2 + from google.cloud.datastore_v1.types import entity as entity_pb2 project = "PROJECT" key_pb1 = _make_key_pb(project) key_pb2 = _make_key_pb(project, id_=2345) - rsp_pb = datastore_pb2.LookupResponse() + keys = [key_pb1, key_pb2] read_options = datastore_pb2.ReadOptions() - # Create mock HTTP and client with response. + rsp_pb = datastore_pb2.LookupResponse() + + found_keys = [] + for i_found in range(found): + key = keys[i_found] + found_keys.append(key._pb) + entity = entity_pb2.Entity() + entity.key._pb.CopyFrom(key._pb) + rsp_pb._pb.found.add(entity=entity._pb) + + missing_keys = [] + for i_missing in range(missing): + key = keys[i_missing] + missing_keys.append(key._pb) + entity = entity_pb2.Entity() + entity.key._pb.CopyFrom(key._pb) + rsp_pb._pb.missing.add(entity=entity._pb) + + deferred_keys = [] + for i_deferred in range(deferred): + key = keys[i_deferred] + deferred_keys.append(key._pb) + rsp_pb._pb.deferred.append(key._pb) + http = _make_requests_session( [_make_response(content=rsp_pb._pb.SerializeToString())] ) @@ -317,168 +403,95 @@ def test_lookup_multiple_keys_empty_response(self): _client_info=client_info, spec=["_http", "_base_url", "_client_info"], ) - - # Make request. ds_api = self._make_one(client) - response = ds_api.lookup(project, [key_pb1, key_pb2], read_options=read_options) - - # Check the result and verify the callers. - self.assertEqual(response, rsp_pb._pb) - uri = _build_expected_url(client._base_url, project, "lookup") - self.assertEqual(len(response.found), 0) - self.assertEqual(len(response.missing), 0) - self.assertEqual(len(response.deferred), 0) + request = { + "project_id": project, + "keys": keys, + "read_options": read_options, + } + kwargs = _make_retry_timeout_kwargs(retry, timeout, http) - request = _verify_protobuf_call(http, uri, datastore_pb2.LookupRequest()) - self.assertEqual(list(request.keys), [key_pb1._pb, key_pb2._pb]) - self.assertEqual(request.read_options, read_options._pb) + response = ds_api.lookup(request=request, **kwargs) - def test_lookup_multiple_keys_w_missing(self): - from google.cloud.datastore_v1.types import datastore as datastore_pb2 - - project = "PROJECT" - key_pb1 = _make_key_pb(project) - key_pb2 = _make_key_pb(project, id_=2345) - rsp_pb = datastore_pb2.LookupResponse() - er_1 = rsp_pb._pb.missing.add() - er_1.entity.key.CopyFrom(key_pb1._pb) - er_2 = rsp_pb._pb.missing.add() - er_2.entity.key.CopyFrom(key_pb2._pb) - read_options = datastore_pb2.ReadOptions() + self.assertEqual(response, rsp_pb._pb) - # Create mock HTTP and client with response. - http = _make_requests_session( - [_make_response(content=rsp_pb._pb.SerializeToString())] - ) - client_info = _make_client_info() - client = mock.Mock( - _http=http, - _base_url="test.invalid", - _client_info=client_info, - spec=["_http", "_base_url", "_client_info"], + self.assertEqual([found.entity.key for found in response.found], found_keys) + self.assertEqual( + [missing.entity.key for missing in response.missing], missing_keys ) + self.assertEqual(list(response.deferred), deferred_keys) - # Make request. - ds_api = self._make_one(client) - response = ds_api.lookup(project, [key_pb1, key_pb2], read_options=read_options) - - # Check the result and verify the callers. - self.assertEqual(response, rsp_pb._pb) uri = _build_expected_url(client._base_url, project, "lookup") - self.assertEqual(len(response.found), 0) - self.assertEqual(len(response.deferred), 0) - missing_keys = [result.entity.key for result in response.missing] - self.assertEqual(missing_keys, [key_pb1._pb, key_pb2._pb]) - - request = _verify_protobuf_call(http, uri, datastore_pb2.LookupRequest()) + request = _verify_protobuf_call( + http, uri, datastore_pb2.LookupRequest(), retry=retry, timeout=timeout, + ) self.assertEqual(list(request.keys), [key_pb1._pb, key_pb2._pb]) self.assertEqual(request.read_options, read_options._pb) - def test_lookup_multiple_keys_w_deferred(self): - from google.cloud.datastore_v1.types import datastore as datastore_pb2 - - project = "PROJECT" - key_pb1 = _make_key_pb(project) - key_pb2 = _make_key_pb(project, id_=2345) - rsp_pb = datastore_pb2.LookupResponse() - rsp_pb._pb.deferred.add().CopyFrom(key_pb1._pb) - rsp_pb._pb.deferred.add().CopyFrom(key_pb2._pb) - read_options = datastore_pb2.ReadOptions() + def test_lookup_multiple_keys_w_empty_response(self): + self._lookup_multiple_helper() - # Create mock HTTP and client with response. - http = _make_requests_session( - [_make_response(content=rsp_pb._pb.SerializeToString())] - ) - client_info = _make_client_info() - client = mock.Mock( - _http=http, - _base_url="test.invalid", - _client_info=client_info, - spec=["_http", "_base_url", "_client_info"], - ) + def test_lookup_multiple_keys_w_retry(self): + retry = mock.MagicMock() + self._lookup_multiple_helper(retry=retry) - # Make request. - ds_api = self._make_one(client) - response = ds_api.lookup(project, [key_pb1, key_pb2], read_options=read_options) + def test_lookup_multiple_keys_w_timeout(self): + timeout = 5.0 + self._lookup_multiple_helper(timeout=timeout) - # Check the result and verify the callers. - self.assertEqual(response, rsp_pb._pb) - uri = _build_expected_url(client._base_url, project, "lookup") - self.assertEqual(len(response.found), 0) - self.assertEqual(len(response.missing), 0) - self.assertEqual(list(response.deferred), [key_pb1._pb, key_pb2._pb]) + def test_lookup_multiple_keys_w_found(self): + self._lookup_multiple_helper(found=2) - request = _verify_protobuf_call(http, uri, datastore_pb2.LookupRequest()) - self.assertEqual(list(request.keys), [key_pb1._pb, key_pb2._pb]) - self.assertEqual(request.read_options, read_options._pb) + def test_lookup_multiple_keys_w_missing(self): + self._lookup_multiple_helper(missing=2) - def test_run_query_w_eventual_no_transaction(self): + def test_lookup_multiple_keys_w_deferred(self): + self._lookup_multiple_helper(deferred=2) + + def _run_query_helper( + self, + read_consistency=None, + transaction=None, + namespace=None, + found=0, + retry=None, + timeout=None, + ): from google.cloud.datastore_v1.types import datastore as datastore_pb2 from google.cloud.datastore_v1.types import entity as entity_pb2 from google.cloud.datastore_v1.types import query as query_pb2 project = "PROJECT" kind = "Nonesuch" - cursor = b"\x00" query_pb = self._make_query_pb(kind) - partition_id = entity_pb2.PartitionId(project_id=project) - read_options = datastore_pb2.ReadOptions( - read_consistency=datastore_pb2.ReadOptions.ReadConsistency.EVENTUAL - ) - rsp_pb = datastore_pb2.RunQueryResponse( - batch=query_pb2.QueryResultBatch( - entity_result_type=query_pb2.EntityResult.ResultType.FULL, - end_cursor=cursor, - more_results=query_pb2.QueryResultBatch.MoreResultsType.NO_MORE_RESULTS, - ) - ) - # Create mock HTTP and client with response. - http = _make_requests_session( - [_make_response(content=rsp_pb._pb.SerializeToString())] - ) - client_info = _make_client_info() - client = mock.Mock( - _http=http, - _base_url="test.invalid", - _client_info=client_info, - spec=["_http", "_base_url", "_client_info"], - ) - - # Make request. - ds_api = self._make_one(client) - response = ds_api.run_query(project, partition_id, read_options, query=query_pb) + partition_kw = {"project_id": project} + if namespace is not None: + partition_kw["namespace_id"] = namespace - # Check the result and verify the callers. - self.assertEqual(response, rsp_pb._pb) - - uri = _build_expected_url(client._base_url, project, "runQuery") - request = _verify_protobuf_call(http, uri, datastore_pb2.RunQueryRequest()) - self.assertEqual(request.partition_id, partition_id._pb) - self.assertEqual(request.query, query_pb._pb) - self.assertEqual(request.read_options, read_options._pb) + partition_id = entity_pb2.PartitionId(**partition_kw) - def test_run_query_wo_eventual_w_transaction(self): - from google.cloud.datastore_v1.types import datastore as datastore_pb2 - from google.cloud.datastore_v1.types import entity as entity_pb2 - from google.cloud.datastore_v1.types import query as query_pb2 + options_kw = {} + if read_consistency is not None: + options_kw["read_consistency"] = read_consistency + if transaction is not None: + options_kw["transaction"] = transaction + read_options = datastore_pb2.ReadOptions(**options_kw) - project = "PROJECT" - kind = "Nonesuch" cursor = b"\x00" - transaction = b"TRANSACTION" - query_pb = self._make_query_pb(kind) - partition_id = entity_pb2.PartitionId(project_id=project) - read_options = datastore_pb2.ReadOptions(transaction=transaction) + batch_kw = { + "entity_result_type": query_pb2.EntityResult.ResultType.FULL, + "end_cursor": cursor, + "more_results": query_pb2.QueryResultBatch.MoreResultsType.NO_MORE_RESULTS, + } + if found: + batch_kw["entity_results"] = [ + query_pb2.EntityResult(entity=entity_pb2.Entity()) + ] * found rsp_pb = datastore_pb2.RunQueryResponse( - batch=query_pb2.QueryResultBatch( - entity_result_type=query_pb2.EntityResult.ResultType.FULL, - end_cursor=cursor, - more_results=query_pb2.QueryResultBatch.MoreResultsType.NO_MORE_RESULTS, - ) + batch=query_pb2.QueryResultBatch(**batch_kw) ) - # Create mock HTTP and client with response. http = _make_requests_session( [_make_response(content=rsp_pb._pb.SerializeToString())] ) @@ -489,110 +502,53 @@ def test_run_query_wo_eventual_w_transaction(self): _client_info=client_info, spec=["_http", "_base_url", "_client_info"], ) - - # Make request. ds_api = self._make_one(client) - response = ds_api.run_query(project, partition_id, read_options, query=query_pb) + request = { + "project_id": project, + "partition_id": partition_id, + "read_options": read_options, + "query": query_pb, + } + kwargs = _make_retry_timeout_kwargs(retry, timeout, http) + + response = ds_api.run_query(request=request, **kwargs) - # Check the result and verify the callers. self.assertEqual(response, rsp_pb._pb) uri = _build_expected_url(client._base_url, project, "runQuery") - request = _verify_protobuf_call(http, uri, datastore_pb2.RunQueryRequest()) + request = _verify_protobuf_call( + http, uri, datastore_pb2.RunQueryRequest(), retry=retry, timeout=timeout, + ) self.assertEqual(request.partition_id, partition_id._pb) self.assertEqual(request.query, query_pb._pb) self.assertEqual(request.read_options, read_options._pb) - def test_run_query_wo_namespace_empty_result(self): - from google.cloud.datastore_v1.types import datastore as datastore_pb2 - from google.cloud.datastore_v1.types import entity as entity_pb2 - from google.cloud.datastore_v1.types import query as query_pb2 + def test_run_query_simple(self): + self._run_query_helper() - project = "PROJECT" - kind = "Nonesuch" - cursor = b"\x00" - query_pb = self._make_query_pb(kind) - partition_id = entity_pb2.PartitionId(project_id=project) - read_options = datastore_pb2.ReadOptions() - rsp_pb = datastore_pb2.RunQueryResponse( - batch=query_pb2.QueryResultBatch( - entity_result_type=query_pb2.EntityResult.ResultType.FULL, - end_cursor=cursor, - more_results=query_pb2.QueryResultBatch.MoreResultsType.NO_MORE_RESULTS, - ) - ) + def test_run_query_w_retry(self): + retry = mock.MagicMock() + self._run_query_helper(retry=retry) - # Create mock HTTP and client with response. - http = _make_requests_session( - [_make_response(content=rsp_pb._pb.SerializeToString())] - ) - client_info = _make_client_info() - client = mock.Mock( - _http=http, - _base_url="test.invalid", - _client_info=client_info, - spec=["_http", "_base_url", "_client_info"], - ) + def test_run_query_w_timeout(self): + timeout = 5.0 + self._run_query_helper(timeout=timeout) - # Make request. - ds_api = self._make_one(client) - response = ds_api.run_query(project, partition_id, read_options, query=query_pb) + def test_run_query_w_read_consistency(self): + from google.cloud.datastore_v1.types import datastore as datastore_pb2 - # Check the result and verify the callers. - self.assertEqual(response, rsp_pb._pb) + read_consistency = datastore_pb2.ReadOptions.ReadConsistency.EVENTUAL + self._run_query_helper(read_consistency=read_consistency) - uri = _build_expected_url(client._base_url, project, "runQuery") - request = _verify_protobuf_call(http, uri, datastore_pb2.RunQueryRequest()) - self.assertEqual(request.partition_id, partition_id._pb) - self.assertEqual(request.query, query_pb._pb) - self.assertEqual(request.read_options, read_options._pb) + def test_run_query_w_transaction(self): + transaction = b"TRANSACTION" + self._run_query_helper(transaction=transaction) def test_run_query_w_namespace_nonempty_result(self): - from google.cloud.datastore_v1.types import datastore as datastore_pb2 - from google.cloud.datastore_v1.types import entity as entity_pb2 - from google.cloud.datastore_v1.types import query as query_pb2 - - project = "PROJECT" - kind = "Kind" namespace = "NS" - query_pb = self._make_query_pb(kind) - partition_id = entity_pb2.PartitionId( - project_id=project, namespace_id=namespace - ) - read_options = datastore_pb2.ReadOptions() - rsp_pb = datastore_pb2.RunQueryResponse( - batch=query_pb2.QueryResultBatch( - entity_result_type=query_pb2.EntityResult.ResultType.FULL, - entity_results=[query_pb2.EntityResult(entity=entity_pb2.Entity())], - more_results=query_pb2.QueryResultBatch.MoreResultsType.NO_MORE_RESULTS, - ) - ) - - # Create mock HTTP and client with response. - http = _make_requests_session( - [_make_response(content=rsp_pb._pb.SerializeToString())] - ) - client_info = _make_client_info() - client = mock.Mock( - _http=http, - _base_url="test.invalid", - _client_info=client_info, - spec=["_http", "_base_url", "_client_info"], - ) - - # Make request. - ds_api = self._make_one(client) - response = ds_api.run_query(project, partition_id, read_options, query=query_pb) - - # Check the result and verify the callers. - self.assertEqual(response, rsp_pb._pb) - - uri = _build_expected_url(client._base_url, project, "runQuery") - request = _verify_protobuf_call(http, uri, datastore_pb2.RunQueryRequest()) - self.assertEqual(request.partition_id, partition_id._pb) - self.assertEqual(request.query, query_pb._pb) + self._run_query_helper(namespace=namespace, found=1) - def test_begin_transaction(self): + def _begin_transaction_helper(self, options=None, retry=None, timeout=None): from google.cloud.datastore_v1.types import datastore as datastore_pb2 project = "PROJECT" @@ -614,19 +570,46 @@ def test_begin_transaction(self): # Make request. ds_api = self._make_one(client) - response = ds_api.begin_transaction(project) + request = {"project_id": project} + + if options is not None: + request["transaction_options"] = options + + kwargs = _make_retry_timeout_kwargs(retry, timeout, http) + + response = ds_api.begin_transaction(request=request, **kwargs) # Check the result and verify the callers. self.assertEqual(response, rsp_pb._pb) uri = _build_expected_url(client._base_url, project, "beginTransaction") request = _verify_protobuf_call( - http, uri, datastore_pb2.BeginTransactionRequest() + http, + uri, + datastore_pb2.BeginTransactionRequest(), + retry=retry, + timeout=timeout, ) - # The RPC-over-HTTP request does not set the project in the request. - self.assertEqual(request.project_id, u"") - def test_commit_wo_transaction(self): + def test_begin_transaction_wo_options(self): + self._begin_transaction_helper() + + def test_begin_transaction_w_options(self): + from google.cloud.datastore_v1.types import TransactionOptions + + read_only = TransactionOptions.ReadOnly._meta.pb() + options = TransactionOptions(read_only=read_only) + self._begin_transaction_helper(options=options) + + def test_begin_transaction_w_retry(self): + retry = mock.MagicMock() + self._begin_transaction_helper(retry=retry) + + def test_begin_transaction_w_timeout(self): + timeout = 5.0 + self._begin_transaction_helper(timeout=timeout) + + def _commit_helper(self, transaction=None, retry=None, timeout=None): from google.cloud.datastore_v1.types import datastore as datastore_pb2 from google.cloud.datastore.helpers import _new_value_pb @@ -640,7 +623,6 @@ def test_commit_wo_transaction(self): value_pb = _new_value_pb(insert, "foo") value_pb.string_value = u"Foo" - # Create mock HTTP and client with response. http = _make_requests_session( [_make_response(content=rsp_pb._pb.SerializeToString())] ) @@ -652,63 +634,52 @@ def test_commit_wo_transaction(self): spec=["_http", "_base_url", "_client_info"], ) - # Make request. rq_class = datastore_pb2.CommitRequest ds_api = self._make_one(client) - mode = rq_class.Mode.NON_TRANSACTIONAL - result = ds_api.commit(project, mode, [mutation]) - # Check the result and verify the callers. + request = {"project_id": project, "mutations": [mutation]} + + if transaction is not None: + request["transaction"] = transaction + mode = request["mode"] = rq_class.Mode.TRANSACTIONAL + else: + mode = request["mode"] = rq_class.Mode.NON_TRANSACTIONAL + + kwargs = _make_retry_timeout_kwargs(retry, timeout, http) + + result = ds_api.commit(request=request, **kwargs) + self.assertEqual(result, rsp_pb._pb) uri = _build_expected_url(client._base_url, project, "commit") - request = _verify_protobuf_call(http, uri, rq_class()) - self.assertEqual(request.transaction, b"") + request = _verify_protobuf_call( + http, uri, rq_class(), retry=retry, timeout=timeout, + ) self.assertEqual(list(request.mutations), [mutation]) - self.assertEqual(request.mode, rq_class.Mode.NON_TRANSACTIONAL) + self.assertEqual(request.mode, mode) - def test_commit_w_transaction(self): - from google.cloud.datastore_v1.types import datastore as datastore_pb2 - from google.cloud.datastore.helpers import _new_value_pb + if transaction is not None: + self.assertEqual(request.transaction, transaction) + else: + self.assertEqual(request.transaction, b"") - project = "PROJECT" - key_pb = _make_key_pb(project) - rsp_pb = datastore_pb2.CommitResponse() - req_pb = datastore_pb2.CommitRequest() - mutation = req_pb._pb.mutations.add() - insert = mutation.upsert - insert.key.CopyFrom(key_pb._pb) - value_pb = _new_value_pb(insert, "foo") - value_pb.string_value = u"Foo" + def test_commit_wo_transaction(self): + self._commit_helper() - # Create mock HTTP and client with response. - http = _make_requests_session( - [_make_response(content=rsp_pb._pb.SerializeToString())] - ) - client_info = _make_client_info() - client = mock.Mock( - _http=http, - _base_url="test.invalid", - _client_info=client_info, - spec=["_http", "_base_url", "_client_info"], - ) + def test_commit_w_transaction(self): + transaction = b"xact" - # Make request. - rq_class = datastore_pb2.CommitRequest - ds_api = self._make_one(client) - mode = rq_class.Mode.TRANSACTIONAL - result = ds_api.commit(project, mode, [mutation], transaction=b"xact") + self._commit_helper(transaction=transaction) - # Check the result and verify the callers. - self.assertEqual(result, rsp_pb._pb) + def test_commit_w_retry(self): + retry = mock.MagicMock() + self._commit_helper(retry=retry) - uri = _build_expected_url(client._base_url, project, "commit") - request = _verify_protobuf_call(http, uri, rq_class()) - self.assertEqual(request.transaction, b"xact") - self.assertEqual(list(request.mutations), [mutation]) - self.assertEqual(request.mode, rq_class.Mode.TRANSACTIONAL) + def test_commit_w_timeout(self): + timeout = 5.0 + self._commit_helper(timeout=timeout) - def test_rollback_ok(self): + def _rollback_helper(self, retry=None, timeout=None): from google.cloud.datastore_v1.types import datastore as datastore_pb2 project = "PROJECT" @@ -729,22 +700,46 @@ def test_rollback_ok(self): # Make request. ds_api = self._make_one(client) - response = ds_api.rollback(project, transaction) + request = {"project_id": project, "transaction": transaction} + kwargs = _make_retry_timeout_kwargs(retry, timeout, http) + + response = ds_api.rollback(request=request, **kwargs) # Check the result and verify the callers. self.assertEqual(response, rsp_pb._pb) uri = _build_expected_url(client._base_url, project, "rollback") - request = _verify_protobuf_call(http, uri, datastore_pb2.RollbackRequest()) + request = _verify_protobuf_call( + http, uri, datastore_pb2.RollbackRequest(), retry=retry, timeout=timeout, + ) self.assertEqual(request.transaction, transaction) - def test_allocate_ids_empty(self): + def test_rollback_ok(self): + self._rollback_helper() + + def test_rollback_w_retry(self): + retry = mock.MagicMock() + self._rollback_helper(retry=retry) + + def test_rollback_w_timeout(self): + timeout = 5.0 + self._rollback_helper(timeout=timeout) + + def _allocate_ids_helper(self, count=0, retry=None, timeout=None): from google.cloud.datastore_v1.types import datastore as datastore_pb2 project = "PROJECT" + before_key_pbs = [] + after_key_pbs = [] rsp_pb = datastore_pb2.AllocateIdsResponse() - # Create mock HTTP and client with response. + for i_count in range(count): + requested = _make_key_pb(project, id_=None) + before_key_pbs.append(requested) + allocated = _make_key_pb(project, id_=i_count) + after_key_pbs.append(allocated) + rsp_pb._pb.keys.add().CopyFrom(allocated._pb) + http = _make_requests_session( [_make_response(content=rsp_pb._pb.SerializeToString())] ) @@ -755,33 +750,49 @@ def test_allocate_ids_empty(self): _client_info=client_info, spec=["_http", "_base_url", "_client_info"], ) - - # Make request. ds_api = self._make_one(client) - response = ds_api.allocate_ids(project, []) - # Check the result and verify the callers. + request = {"project_id": project, "keys": before_key_pbs} + kwargs = _make_retry_timeout_kwargs(retry, timeout, http) + + response = ds_api.allocate_ids(request=request, **kwargs) + self.assertEqual(response, rsp_pb._pb) - self.assertEqual(list(response.keys), []) + self.assertEqual(list(response.keys), [i._pb for i in after_key_pbs]) uri = _build_expected_url(client._base_url, project, "allocateIds") - request = _verify_protobuf_call(http, uri, datastore_pb2.AllocateIdsRequest()) - self.assertEqual(list(request.keys), []) + request = _verify_protobuf_call( + http, uri, datastore_pb2.AllocateIdsRequest(), retry=retry, timeout=timeout, + ) + self.assertEqual(len(request.keys), len(before_key_pbs)) + for key_before, key_after in zip(before_key_pbs, request.keys): + self.assertEqual(key_before, key_after) + + def test_allocate_ids_empty(self): + self._allocate_ids_helper() def test_allocate_ids_non_empty(self): + self._allocate_ids_helper(count=2) + + def test_allocate_ids_w_retry(self): + retry = mock.MagicMock() + self._allocate_ids_helper(retry=retry) + + def test_allocate_ids_w_timeout(self): + timeout = 5.0 + self._allocate_ids_helper(timeout=timeout) + + def _reserve_ids_helper(self, count=0, retry=None, timeout=None): from google.cloud.datastore_v1.types import datastore as datastore_pb2 project = "PROJECT" - before_key_pbs = [ - _make_key_pb(project, id_=None), - _make_key_pb(project, id_=None), - ] - after_key_pbs = [_make_key_pb(project), _make_key_pb(project, id_=2345)] - rsp_pb = datastore_pb2.AllocateIdsResponse() - rsp_pb._pb.keys.add().CopyFrom(after_key_pbs[0]._pb) - rsp_pb._pb.keys.add().CopyFrom(after_key_pbs[1]._pb) + before_key_pbs = [] + rsp_pb = datastore_pb2.ReserveIdsResponse() + + for i_count in range(count): + requested = _make_key_pb(project, id_=i_count) + before_key_pbs.append(requested) - # Create mock HTTP and client with response. http = _make_requests_session( [_make_response(content=rsp_pb._pb.SerializeToString())] ) @@ -792,21 +803,37 @@ def test_allocate_ids_non_empty(self): _client_info=client_info, spec=["_http", "_base_url", "_client_info"], ) - - # Make request. ds_api = self._make_one(client) - response = ds_api.allocate_ids(project, before_key_pbs) - # Check the result and verify the callers. - self.assertEqual(list(response.keys), [i._pb for i in after_key_pbs]) + request = {"project_id": project, "keys": before_key_pbs} + kwargs = _make_retry_timeout_kwargs(retry, timeout, http) + + response = ds_api.reserve_ids(request=request, **kwargs) + self.assertEqual(response, rsp_pb._pb) - uri = _build_expected_url(client._base_url, project, "allocateIds") - request = _verify_protobuf_call(http, uri, datastore_pb2.AllocateIdsRequest()) + uri = _build_expected_url(client._base_url, project, "reserveIds") + request = _verify_protobuf_call( + http, uri, datastore_pb2.AllocateIdsRequest(), retry=retry, timeout=timeout, + ) self.assertEqual(len(request.keys), len(before_key_pbs)) for key_before, key_after in zip(before_key_pbs, request.keys): self.assertEqual(key_before, key_after) + def test_reserve_ids_empty(self): + self._reserve_ids_helper() + + def test_reserve_ids_non_empty(self): + self._reserve_ids_helper(count=2) + + def test_reserve_ids_w_retry(self): + retry = mock.MagicMock() + self._reserve_ids_helper(retry=retry) + + def test_reserve_ids_w_timeout(self): + timeout = 5.0 + self._reserve_ids_helper(timeout=timeout) + def _make_response(status=client.OK, content=b"", headers={}): response = requests.Response() @@ -849,7 +876,7 @@ def _make_client_info(user_agent=_USER_AGENT): return client_info -def _verify_protobuf_call(http, expected_url, pb): +def _verify_protobuf_call(http, expected_url, pb, retry=None, timeout=None): from google.cloud import _http as connection_module expected_headers = { @@ -858,10 +885,36 @@ def _verify_protobuf_call(http, expected_url, pb): connection_module.CLIENT_INFO_HEADER: _USER_AGENT, } - http.request.assert_called_once_with( - method="POST", url=expected_url, headers=expected_headers, data=mock.ANY - ) + if retry is not None: + retry.assert_called_once_with(http.request) + + if timeout is not None: + http.request.assert_called_once_with( + method="POST", + url=expected_url, + headers=expected_headers, + data=mock.ANY, + timeout=timeout, + ) + else: + http.request.assert_called_once_with( + method="POST", url=expected_url, headers=expected_headers, data=mock.ANY + ) data = http.request.mock_calls[0][2]["data"] pb._pb.ParseFromString(data) return pb + + +def _make_retry_timeout_kwargs(retry, timeout, http=None): + kwargs = {} + + if retry is not None: + kwargs["retry"] = retry + if http is not None: + retry.return_value = http.request + + if timeout is not None: + kwargs["timeout"] = timeout + + return kwargs