Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: asyncio http request logic and asynchronous credentials logic #572

Merged
merged 9 commits into from Jul 28, 2020
Expand Up @@ -18,25 +18,14 @@

import asyncio
import functools
import logging


import aiohttp
import six


from google.auth import exceptions
from google.auth import transport
from google.auth.transport import requests


_OAUTH_SCOPES = [
"https://www.googleapis.com/auth/appengine.apis",
"https://www.googleapis.com/auth/userinfo.email",
]

_LOGGER = logging.getLogger(__name__)

# Timeout can be re-defined depending on async requirement. Currently made 60s more than
# sync timeout.
_DEFAULT_TIMEOUT = 180 # in seconds
Expand All @@ -51,19 +40,19 @@ class _Response(transport.Response):
"""

def __init__(self, response):
self.response = response
self._response = response

@property
def status(self):
return self.response.status
return self._response.status

@property
def headers(self):
return self.response.headers
return self._response.headers

@property
def data(self):
return self.response.content
return self._response.content


class Request(transport.Request):
Expand All @@ -76,10 +65,9 @@ class Request(transport.Request):
This class can be useful if you want to manually refresh a
:class:`~google.auth.credentials.Credentials` instance::

import google.auth.transport.aiohttp_req
import aiohttp
import google.auth.transport.aiohttp_requests

request = google.auth.transport.aiohttp_req.Request()
request = google.auth.transport.aiohttp_requests.Request()

credentials.refresh(request)

Expand All @@ -91,11 +79,7 @@ class Request(transport.Request):
"""

def __init__(self, session=None):
"""
self.session = None
if not session:
session = aiohttp.ClientSession()
"""

self.session = None

async def __call__(
Expand All @@ -111,7 +95,7 @@ async def __call__(
Make an HTTP request using aiohttp.

Args:
url (str): The URI to be requested.
url (str): The URL to be requested.
method (str): The HTTP method to use for the request. Defaults
to 'GET'.
body (bytes): The payload / body in HTTP request.
Expand All @@ -132,7 +116,7 @@ async def __call__(
try:
if self.session is None: # pragma: NO COVER
self.session = aiohttp.ClientSession() # pragma: NO COVER
_LOGGER.debug("Making request: %s %s", method, url)
requests._LOGGER.debug("Making request: %s %s", method, url)
response = await self.session.request(
method, url, data=body, headers=headers, timeout=timeout, **kwargs
)
Expand All @@ -157,9 +141,9 @@ class AuthorizedSession(aiohttp.ClientSession):
This class is used to perform requests to API endpoints that require
authorization::

import google.auth.transport.aiohttp_req
import google.auth.transport.aiohttp_requests

async with aiohttp_req.AuthorizedSession(credentials) as authed_session:
async with aiohttp_requests.AuthorizedSession(credentials) as authed_session:
response = await authed_session.request(
'GET', 'https://www.googleapis.com/storage/v1/b')

Expand All @@ -176,11 +160,11 @@ class AuthorizedSession(aiohttp.ClientSession):
refresh the credentials and retry the request.
refresh_timeout (Optional[int]): The timeout value in seconds for
credential refresh HTTP requests.
auth_request (google.auth.transport.aiohttp_req.Request):
auth_request (google.auth.transport.aiohttp_requests.Request):
(Optional) An instance of
:class:`~google.auth.transport.aiohttp_req.Request` used when
:class:`~google.auth.transport.aiohttp_requests.Request` used when
refreshing credentials. If not passed,
an instance of :class:`~google.auth.transport.aiohttp_req.Request`
an instance of :class:`~google.auth.transport.aiohttp_requests.Request`
is created.
"""

Expand Down Expand Up @@ -214,56 +198,49 @@ async def request(
**kwargs
):

if self._auth_request is None:
self._auth_request_session = aiohttp.ClientSession()
auth_request = Request(self._auth_request_session)
self._auth_request = auth_request

# Use a kwarg for this instead of an attribute to maintain
# thread-safety.
_credential_refresh_attempt = kwargs.pop("_credential_refresh_attempt", 0)
# Make a copy of the headers. They will be modified by the credentials
# and we want to pass the original headers if we recurse.
request_headers = headers.copy() if headers is not None else {}

# Do not apply the timeout unconditionally in order to not override the
# _auth_request's default timeout.
auth_request = (
self._auth_request
if timeout is None
else functools.partial(self._auth_request, timeout=timeout)
)

remaining_time = max_allowed_time

with requests.TimeoutGuard(remaining_time, asyncio.TimeoutError) as guard:
await self.credentials.before_request(
auth_request, method, url, request_headers
)
"""Implementation of Authorized Session aiohttp request.

with requests.TimeoutGuard(remaining_time, asyncio.TimeoutError) as guard:
response = await super(AuthorizedSession, self).request(
method,
url,
data=data,
headers=request_headers,
timeout=timeout,
**kwargs
)

remaining_time = guard.remaining_timeout
Args:
method: The http request method used (e.g. GET, PUT, DELETE)

url: The url at which the http request is sent.

data, headers: These fields parallel the associated data and headers
fields of a regular http request. Using the aiohttp client session to
send the http request allows us to use this parallel corresponding structure
in our Authorized Session class.

timeout (Optional[Union[float, Tuple[float, float]]]):
The amount of time in seconds to wait for the server response
with each individual request.

Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

max_allowed_time (Optional[float]):
If the method runs longer than this, a ``Timeout`` exception is
automatically raised. Unlike the ``timeout` parameter, this
value applies to the total method execution time, even if
multiple requests are made under the hood.

Mind that it is not guaranteed that the timeout error is raised
at ``max_allowed_time`. It might take longer, for example, if
an underlying request takes a lot of time, but the request
itself does not timeout, e.g. if a large file is being
transmitted. The timout error will be raised after such
request completes.
"""

if (
response.status in self._refresh_status_codes
and _credential_refresh_attempt < self._max_refresh_attempts
):
async with aiohttp.ClientSession() as self._auth_request_session:
auth_request = Request(self._auth_request_session)
self._auth_request = auth_request

_LOGGER.info(
"Refreshing credentials due to a %s response. Attempt %s/%s.",
response.status,
_credential_refresh_attempt + 1,
self._max_refresh_attempts,
)
# Use a kwarg for this instead of an attribute to maintain
# thread-safety.
_credential_refresh_attempt = kwargs.pop("_credential_refresh_attempt", 0)
# Make a copy of the headers. They will be modified by the credentials
# and we want to pass the original headers if we recurse.
request_headers = headers.copy() if headers is not None else {}

# Do not apply the timeout unconditionally in order to not override the
# _auth_request's default timeout.
Expand All @@ -273,25 +250,64 @@ async def request(
else functools.partial(self._auth_request, timeout=timeout)
)

remaining_time = max_allowed_time

with requests.TimeoutGuard(remaining_time, asyncio.TimeoutError) as guard:
async with self._refresh_lock:
await self._loop.run_in_executor(
None, self.credentials.refresh, auth_request
)
await self.credentials.before_request(
auth_request, method, url, request_headers
)

remaining_time = guard.remaining_timeout
with requests.TimeoutGuard(remaining_time, asyncio.TimeoutError) as guard:
response = await super(AuthorizedSession, self).request(
method,
url,
data=data,
headers=request_headers,
timeout=timeout,
**kwargs
)

return await self.request(
method,
url,
data=data,
headers=headers,
max_allowed_time=remaining_time,
timeout=timeout,
_credential_refresh_attempt=_credential_refresh_attempt + 1,
**kwargs
)
remaining_time = guard.remaining_timeout

await self._auth_request_session.close()
if (
response.status in self._refresh_status_codes
and _credential_refresh_attempt < self._max_refresh_attempts
):

requests._LOGGER.info(
"Refreshing credentials due to a %s response. Attempt %s/%s.",
response.status,
_credential_refresh_attempt + 1,
self._max_refresh_attempts,
)

# Do not apply the timeout unconditionally in order to not override the
# _auth_request's default timeout.
auth_request = (
self._auth_request
if timeout is None
else functools.partial(self._auth_request, timeout=timeout)
)

with requests.TimeoutGuard(
remaining_time, asyncio.TimeoutError
) as guard:
async with self._refresh_lock:
await self._loop.run_in_executor(
None, self.credentials.refresh, auth_request
)

remaining_time = guard.remaining_timeout

return await self.request(
method,
url,
data=data,
headers=headers,
max_allowed_time=remaining_time,
timeout=timeout,
_credential_refresh_attempt=_credential_refresh_attempt + 1,
**kwargs
)

return response
18 changes: 1 addition & 17 deletions noxfile.py
Expand Up @@ -32,22 +32,6 @@
"aioresponses",
]

TEST_DEPENDENCIES2 = [
"flask",
"freezegun",
"mock",
"oauth2client",
"pyopenssl",
"pytest",
"pytest-cov",
"pytest-localserver",
"requests",
"urllib3",
"cryptography",
"responses",
"grpcio",
]

BLACK_VERSION = "black==19.3b0"
BLACK_PATHS = [
"google",
Expand Down Expand Up @@ -107,7 +91,7 @@ def unit(session):

@nox.session(python=["2.7", "3.5"])
def unit_prev_versions(session):
session.install(*TEST_DEPENDENCIES2)
session.install(*TEST_DEPENDENCIES[:-2])
Copy link
Contributor

@crwilcox crwilcox Jul 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little nervous about this, expecting to see folks add more deps :D.

can you make two lists one "dependencies" and one "additional_python_3_dependencies". And for python3 create a set with the two?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in the latest commit to have 2 lists and then a composition of both for the python3.6+ async tests.

session.install(".")
session.run(
"pytest", "--cov=google.auth", "--cov=google.oauth2", "--cov=tests", "tests"
Expand Down