Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
arithmetic1728 committed Apr 7, 2021
1 parent 5a5adfe commit 7d5dbd1
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 81 deletions.
22 changes: 1 addition & 21 deletions google/oauth2/challenges.py
Expand Up @@ -133,27 +133,7 @@ def obtain_challenge_input(self, metadata):
return None


class SamlChallenge(ReauthChallenge):
"""Challenge that asks the users to browse to their ID Providers."""

@property
def name(self):
return "SAML"

@property
def is_locally_eligible(self):
return True

def obtain_challenge_input(self, metadata):
# Magic Arch has not fully supported returning a proper dedirect URL
# for programmatic SAML users today. So we error our here and request
# users to complete a web login.
raise exceptions.ReauthFailError(
"SAML login is required for the current account to complete reauthentication."
)


AVAILABLE_CHALLENGES = {
challenge.name: challenge
for challenge in [SecurityKeyChallenge(), PasswordChallenge(), SamlChallenge()]
for challenge in [SecurityKeyChallenge(), PasswordChallenge()]
}
44 changes: 20 additions & 24 deletions google/oauth2/reauth.py
Expand Up @@ -54,6 +54,11 @@
_CHALLENGE_PENDING = "CHALLENGE_PENDING"


# Override this global variable to set custom max number of rounds of reauth
# challenges should be run.
RUN_CHALLENGE_RETRY_LIMIT = 5


def _get_challenges(
request, supported_challenge_types, access_token, requested_scopes=None
):
Expand Down Expand Up @@ -161,40 +166,32 @@ def _run_next_challenge(msg, request, access_token):
return None


def _obtain_rapt(request, access_token, requested_scopes, rounds_num=5):
def _obtain_rapt(request, access_token, requested_scopes):
"""Given an http request method and reauth access token, get rapt token.
Args:
request (google.auth.transport.Request): A callable used to make
HTTP requests.
access_token (str): reauth access token
requested_scopes (Sequence[str]): scopes required by the client application
rounds_num (Optional(int)): max number of attempts to get a rapt after the next
challenge, before failing the reauth. This defines total number of
challenges + number of additional retries if the chalenge input
wasn't accepted.
Returns:
str: The rapt token.
Raises:
google.auth.exceptions.ReauthError: if reauth failed
"""
msg = None

for _ in range(0, rounds_num):
msg = _get_challenges(
request,
list(challenges.AVAILABLE_CHALLENGES.keys()),
access_token,
requested_scopes,
)

if not msg:
msg = _get_challenges(
request,
list(challenges.AVAILABLE_CHALLENGES.keys()),
access_token,
requested_scopes,
)

if msg["status"] == _AUTHENTICATED:
return msg["encodedProofOfReauthToken"]
if msg["status"] == _AUTHENTICATED:
return msg["encodedProofOfReauthToken"]

for _ in range(0, RUN_CHALLENGE_RETRY_LIMIT):
if not (
msg["status"] == _CHALLENGE_REQUIRED or msg["status"] == _CHALLENGE_PENDING
):
Expand All @@ -204,18 +201,17 @@ def _obtain_rapt(request, access_token, requested_scopes, rounds_num=5):
)
)

"""Check if we are in an interractive environment.
If the rapt token needs refreshing, the user needs to answer the
challenges.
"""
if not _helpers.is_interactive():
raise exceptions.ReauthFailError(
"Reauthentication challenge could not be answered because you are not in an interactive session."
"Reauthentication challenge could not be answered because you are not"
" in an interactive session."
)

msg = _run_next_challenge(msg, request, access_token)

if msg["status"] == _AUTHENTICATED:
return msg["encodedProofOfReauthToken"]

# If we got here it means we didn't get authenticated.
raise exceptions.ReauthFailError()

Expand Down
14 changes: 0 additions & 14 deletions tests/oauth2/test_challenges.py
Expand Up @@ -125,17 +125,3 @@ def test_password_challenge(getpass_mock):
assert challenges.PasswordChallenge().obtain_challenge_input({}) == {
"credential": " "
}


def test_saml_challenge():
metadata = {
"status": "READY",
"challengeId": 1,
"challengeType": "SAML",
"securityKey": {},
}
challenge = challenges.SamlChallenge()
assert challenge.is_locally_eligible
assert challenge.name == "SAML"
with pytest.raises(exceptions.ReauthFailError):
challenge.obtain_challenge_input(metadata)
48 changes: 26 additions & 22 deletions tests/oauth2/test_reauth.py
Expand Up @@ -162,21 +162,30 @@ def test__run_next_challenge_success():
)


def test__obtain_rapt_not_authenticated():
with pytest.raises(exceptions.ReauthFailError) as excinfo:
reauth._obtain_rapt(MOCK_REQUEST, "token", None, rounds_num=0)
assert excinfo.match(r"Reauthentication failed. None")


def test__obtain_rapt_authenticated():
with mock.patch(
"google.oauth2.reauth._get_challenges",
return_value=CHALLENGES_RESPONSE_AUTHENTICATED,
):
assert (
reauth._obtain_rapt(MOCK_REQUEST, "token", None, rounds_num=1)
== "new_rapt_token"
)
assert reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token"


def test__obtain_rapt_authenticated_after_run_next_challenge():
with mock.patch(
"google.oauth2.reauth._get_challenges",
return_value=CHALLENGES_RESPONSE_TEMPLATE,
):
with mock.patch(
"google.oauth2.reauth._run_next_challenge",
side_effect=[
CHALLENGES_RESPONSE_TEMPLATE,
CHALLENGES_RESPONSE_AUTHENTICATED,
],
):
with mock.patch("google.auth._helpers.is_interactive", return_value=True):
assert (
reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token"
)


def test__obtain_rapt_unsupported_status():
Expand All @@ -186,7 +195,7 @@ def test__obtain_rapt_unsupported_status():
"google.oauth2.reauth._get_challenges", return_value=challenges_response
):
with pytest.raises(exceptions.ReauthFailError) as excinfo:
reauth._obtain_rapt(MOCK_REQUEST, "token", None, rounds_num=1)
reauth._obtain_rapt(MOCK_REQUEST, "token", None)
assert excinfo.match(r"API error: STATUS_UNSPECIFIED")


Expand All @@ -197,24 +206,19 @@ def test__obtain_rapt_not_interactive():
):
with mock.patch("google.auth._helpers.is_interactive", return_value=False):
with pytest.raises(exceptions.ReauthFailError) as excinfo:
reauth._obtain_rapt(MOCK_REQUEST, "token", None, rounds_num=1)
reauth._obtain_rapt(MOCK_REQUEST, "token", None)
assert excinfo.match(r"not in an interactive session")


def test__obtain_rapt_run_next_challenge():
def test__obtain_rapt_not_authenticated():
with mock.patch(
"google.oauth2.reauth._get_challenges",
return_value=CHALLENGES_RESPONSE_TEMPLATE,
):
with mock.patch(
"google.oauth2.reauth._run_next_challenge",
return_value=CHALLENGES_RESPONSE_AUTHENTICATED,
):
with mock.patch("google.auth._helpers.is_interactive", return_value=True):
assert (
reauth._obtain_rapt(MOCK_REQUEST, "token", None, rounds_num=2)
== "new_rapt_token"
)
with mock.patch("google.oauth2.reauth.RUN_CHALLENGE_RETRY_LIMIT", 0):
with pytest.raises(exceptions.ReauthFailError) as excinfo:
reauth._obtain_rapt(MOCK_REQUEST, "token", None)
assert excinfo.match(r"Reauthentication failed")


def test_get_rapt_token():
Expand Down

0 comments on commit 7d5dbd1

Please sign in to comment.