Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix unit tests so they can work in g3 #714

Merged
merged 4 commits into from Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions tests/oauth2/test_service_account.py
Expand Up @@ -203,18 +203,18 @@ def test_apply_with_no_quota_project_id(self):
assert "x-goog-user-project" not in headers
assert "token" in headers["authorization"]

@mock.patch("google.auth.jwt.Credentials.from_signing_credentials", autospec=True)
def test__create_self_signed_jwt(self, from_signing_credentials):
@mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
def test__create_self_signed_jwt(self, jwt):
credentials = service_account.Credentials(
SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI
)

audience = "https://pubsub.googleapis.com"
credentials._create_self_signed_jwt(audience)
from_signing_credentials.assert_called_once_with(credentials, audience)
jwt.from_signing_credentials.assert_called_once_with(credentials, audience)

@mock.patch("google.auth.jwt.Credentials.from_signing_credentials", autospec=True)
def test__create_self_signed_jwt_with_user_scopes(self, from_signing_credentials):
@mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
def test__create_self_signed_jwt_with_user_scopes(self, jwt):
credentials = service_account.Credentials(
SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI, scopes=["foo"]
)
Expand All @@ -223,7 +223,7 @@ def test__create_self_signed_jwt_with_user_scopes(self, from_signing_credentials
credentials._create_self_signed_jwt(audience)

# JWT should not be created if there are user-defined scopes
from_signing_credentials.assert_not_called()
jwt.from_signing_credentials.assert_not_called()

@mock.patch("google.oauth2._client.jwt_grant", autospec=True)
def test_refresh_success(self, jwt_grant):
Expand Down
12 changes: 6 additions & 6 deletions tests/oauth2/test_sts.py
Expand Up @@ -128,7 +128,7 @@ def test_exchange_token_full_success_without_auth(self):
self.ADDON_HEADERS,
)

self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE

def test_exchange_token_partial_success_without_auth(self):
Expand Down Expand Up @@ -157,7 +157,7 @@ def test_exchange_token_partial_success_without_auth(self):
requested_token_type=self.REQUESTED_TOKEN_TYPE,
)

self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE

def test_exchange_token_non200_without_auth(self):
Expand Down Expand Up @@ -227,7 +227,7 @@ def test_exchange_token_full_success_with_basic_auth(self):
self.ADDON_HEADERS,
)

self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE

def test_exchange_token_partial_success_with_basic_auth(self):
Expand Down Expand Up @@ -259,7 +259,7 @@ def test_exchange_token_partial_success_with_basic_auth(self):
requested_token_type=self.REQUESTED_TOKEN_TYPE,
)

self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE

def test_exchange_token_non200_with_basic_auth(self):
Expand Down Expand Up @@ -331,7 +331,7 @@ def test_exchange_token_full_success_with_reqbody_auth(self):
self.ADDON_HEADERS,
)

self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE

def test_exchange_token_partial_success_with_reqbody_auth(self):
Expand Down Expand Up @@ -362,7 +362,7 @@ def test_exchange_token_partial_success_with_reqbody_auth(self):
requested_token_type=self.REQUESTED_TOKEN_TYPE,
)

self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE

def test_exchange_token_non200_with_reqbody_auth(self):
Expand Down
22 changes: 11 additions & 11 deletions tests/test_aws.py
Expand Up @@ -959,15 +959,15 @@ def test_retrieve_subject_token_success_temp_creds_no_environment_vars(
)
# Assert region request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[0].kwargs, REGION_URL
request.call_args_list[0][1], REGION_URL
)
# Assert role request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[1].kwargs, SECURITY_CREDS_URL
request.call_args_list[1][1], SECURITY_CREDS_URL
)
# Assert security credentials request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[2].kwargs,
request.call_args_list[2][1],
"{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
{"Content-Type": "application/json"},
)
Expand All @@ -986,11 +986,11 @@ def test_retrieve_subject_token_success_temp_creds_no_environment_vars(
assert len(new_request.call_args_list) == 2
# Assert role request.
self.assert_aws_metadata_request_kwargs(
new_request.call_args_list[0].kwargs, SECURITY_CREDS_URL
new_request.call_args_list[0][1], SECURITY_CREDS_URL
)
# Assert security credentials request.
self.assert_aws_metadata_request_kwargs(
new_request.call_args_list[1].kwargs,
new_request.call_args_list[1][1],
"{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
{"Content-Type": "application/json"},
)
Expand Down Expand Up @@ -1193,7 +1193,7 @@ def test_refresh_success_without_impersonation_ignore_default_scopes(self, utcno
assert len(request.call_args_list) == 4
# Fourth request should be sent to GCP STS endpoint.
self.assert_token_request_kwargs(
request.call_args_list[3].kwargs, token_headers, token_request_data
request.call_args_list[3][1], token_headers, token_request_data
)
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
assert credentials.quota_project_id == QUOTA_PROJECT_ID
Expand Down Expand Up @@ -1249,7 +1249,7 @@ def test_refresh_success_without_impersonation_use_default_scopes(self, utcnow):
assert len(request.call_args_list) == 4
# Fourth request should be sent to GCP STS endpoint.
self.assert_token_request_kwargs(
request.call_args_list[3].kwargs, token_headers, token_request_data
request.call_args_list[3][1], token_headers, token_request_data
)
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
assert credentials.quota_project_id == QUOTA_PROJECT_ID
Expand Down Expand Up @@ -1326,12 +1326,12 @@ def test_refresh_success_with_impersonation_ignore_default_scopes(self, utcnow):
assert len(request.call_args_list) == 5
# Fourth request should be sent to GCP STS endpoint.
self.assert_token_request_kwargs(
request.call_args_list[3].kwargs, token_headers, token_request_data
request.call_args_list[3][1], token_headers, token_request_data
)
# Fifth request should be sent to iamcredentials endpoint for service
# account impersonation.
self.assert_impersonation_request_kwargs(
request.call_args_list[4].kwargs,
request.call_args_list[4][1],
impersonation_headers,
impersonation_request_data,
)
Expand Down Expand Up @@ -1410,12 +1410,12 @@ def test_refresh_success_with_impersonation_use_default_scopes(self, utcnow):
assert len(request.call_args_list) == 5
# Fourth request should be sent to GCP STS endpoint.
self.assert_token_request_kwargs(
request.call_args_list[3].kwargs, token_headers, token_request_data
request.call_args_list[3][1], token_headers, token_request_data
)
# Fifth request should be sent to iamcredentials endpoint for service
# account impersonation.
self.assert_impersonation_request_kwargs(
request.call_args_list[4].kwargs,
request.call_args_list[4][1],
impersonation_headers,
impersonation_request_data,
)
Expand Down
36 changes: 14 additions & 22 deletions tests/test_external_account.py
Expand Up @@ -363,9 +363,7 @@ def test_refresh_without_client_auth_success(self, unused_utcnow):

credentials.refresh(request)

self.assert_token_request_kwargs(
request.call_args.kwargs, headers, request_data
)
self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert credentials.expiry == expected_expiry
assert not credentials.expired
Expand Down Expand Up @@ -422,11 +420,11 @@ def test_refresh_impersonation_without_client_auth_success(self):
assert len(request.call_args_list) == 2
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
request.call_args_list[0].kwargs, token_headers, token_request_data
request.call_args_list[0][1], token_headers, token_request_data
)
# Verify service account impersonation request parameters.
self.assert_impersonation_request_kwargs(
request.call_args_list[1].kwargs,
request.call_args_list[1][1],
impersonation_headers,
impersonation_request_data,
)
Expand All @@ -436,7 +434,7 @@ def test_refresh_impersonation_without_client_auth_success(self):
assert credentials.token == impersonation_response["accessToken"]

def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default_scopes(
self
self,
):
headers = {"Content-Type": "application/x-www-form-urlencoded"}
request_data = {
Expand All @@ -458,9 +456,7 @@ def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default

credentials.refresh(request)

self.assert_token_request_kwargs(
request.call_args.kwargs, headers, request_data
)
self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert not credentials.expired
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
Expand Down Expand Up @@ -488,9 +484,7 @@ def test_refresh_without_client_auth_success_explicit_default_scopes_only(self):

credentials.refresh(request)

self.assert_token_request_kwargs(
request.call_args.kwargs, headers, request_data
)
self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert not credentials.expired
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
Expand Down Expand Up @@ -551,9 +545,7 @@ def test_refresh_with_client_auth_success(self):

credentials.refresh(request)

self.assert_token_request_kwargs(
request.call_args.kwargs, headers, request_data
)
self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert not credentials.expired
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
Expand Down Expand Up @@ -616,11 +608,11 @@ def test_refresh_impersonation_with_client_auth_success_ignore_default_scopes(se
assert len(request.call_args_list) == 2
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
request.call_args_list[0].kwargs, token_headers, token_request_data
request.call_args_list[0][1], token_headers, token_request_data
)
# Verify service account impersonation request parameters.
self.assert_impersonation_request_kwargs(
request.call_args_list[1].kwargs,
request.call_args_list[1][1],
impersonation_headers,
impersonation_request_data,
)
Expand Down Expand Up @@ -687,11 +679,11 @@ def test_refresh_impersonation_with_client_auth_success_use_default_scopes(self)
assert len(request.call_args_list) == 2
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
request.call_args_list[0].kwargs, token_headers, token_request_data
request.call_args_list[0][1], token_headers, token_request_data
)
# Verify service account impersonation request parameters.
self.assert_impersonation_request_kwargs(
request.call_args_list[1].kwargs,
request.call_args_list[1][1],
impersonation_headers,
impersonation_request_data,
)
Expand Down Expand Up @@ -1045,11 +1037,11 @@ def test_get_project_id_cloud_resource_manager_success(self):
assert len(request.call_args_list) == 3
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
request.call_args_list[0].kwargs, token_headers, token_request_data
request.call_args_list[0][1], token_headers, token_request_data
)
# Verify service account impersonation request parameters.
self.assert_impersonation_request_kwargs(
request.call_args_list[1].kwargs,
request.call_args_list[1][1],
impersonation_headers,
impersonation_request_data,
)
Expand All @@ -1061,7 +1053,7 @@ def test_get_project_id_cloud_resource_manager_success(self):
assert credentials.token == impersonation_response["accessToken"]
# Verify cloud resource manager request parameters.
self.assert_resource_manager_request_kwargs(
request.call_args_list[2].kwargs,
request.call_args_list[2][1],
self.PROJECT_NUMBER,
{
"x-goog-user-project": self.QUOTA_PROJECT_ID,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_iam.py
Expand Up @@ -89,7 +89,7 @@ def test_sign_bytes(self):
returned_signature = signer.sign("123")

assert returned_signature == signature
kwargs = request.call_args.kwargs
kwargs = request.call_args[1]
assert kwargs["headers"]["Content-Type"] == "application/json"

def test_sign_bytes_failure(self):
Expand Down
16 changes: 8 additions & 8 deletions tests/test_identity_pool.py
Expand Up @@ -223,10 +223,10 @@ def assert_underlying_credentials_refresh(

assert len(request.call_args_list) == len(requests)
if credential_data:
cls.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None)
cls.assert_credential_request_kwargs(request.call_args_list[0][1], None)
# Verify token exchange request parameters.
cls.assert_token_request_kwargs(
request.call_args_list[token_request_index].kwargs,
request.call_args_list[token_request_index][1],
token_headers,
token_request_data,
token_url,
Expand All @@ -235,7 +235,7 @@ def assert_underlying_credentials_refresh(
# is processed.
if service_account_impersonation_url:
cls.assert_impersonation_request_kwargs(
request.call_args_list[impersonation_request_index].kwargs,
request.call_args_list[impersonation_request_index][1],
impersonation_headers,
impersonation_request_data,
service_account_impersonation_url,
Expand Down Expand Up @@ -505,7 +505,7 @@ def test_retrieve_subject_token_file_not_found(self):
assert excinfo.match(r"File './not_found.txt' was not found")

def test_refresh_text_file_success_without_impersonation_ignore_default_scopes(
self
self,
):
credentials = self.make_credentials(
client_id=CLIENT_ID,
Expand Down Expand Up @@ -677,7 +677,7 @@ def test_retrieve_subject_token_from_url(self):
subject_token = credentials.retrieve_subject_token(request)

assert subject_token == TEXT_FILE_SUBJECT_TOKEN
self.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None)
self.assert_credential_request_kwargs(request.call_args_list[0][1], None)

def test_retrieve_subject_token_from_url_with_headers(self):
credentials = self.make_credentials(
Expand All @@ -688,7 +688,7 @@ def test_retrieve_subject_token_from_url_with_headers(self):

assert subject_token == TEXT_FILE_SUBJECT_TOKEN
self.assert_credential_request_kwargs(
request.call_args_list[0].kwargs, {"foo": "bar"}
request.call_args_list[0][1], {"foo": "bar"}
)

def test_retrieve_subject_token_from_url_json(self):
Expand All @@ -699,7 +699,7 @@ def test_retrieve_subject_token_from_url_json(self):
subject_token = credentials.retrieve_subject_token(request)

assert subject_token == JSON_FILE_SUBJECT_TOKEN
self.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None)
self.assert_credential_request_kwargs(request.call_args_list[0][1], None)

def test_retrieve_subject_token_from_url_json_with_headers(self):
credentials = self.make_credentials(
Expand All @@ -714,7 +714,7 @@ def test_retrieve_subject_token_from_url_json_with_headers(self):

assert subject_token == JSON_FILE_SUBJECT_TOKEN
self.assert_credential_request_kwargs(
request.call_args_list[0].kwargs, {"foo": "bar"}
request.call_args_list[0][1], {"foo": "bar"}
)

def test_retrieve_subject_token_from_url_not_found(self):
Expand Down
4 changes: 2 additions & 2 deletions tests/test_impersonated_credentials.py
Expand Up @@ -203,7 +203,7 @@ def test_refresh_success_iam_endpoint_override(
assert credentials.valid
assert not credentials.expired
# Confirm override endpoint used.
request_kwargs = request.call_args.kwargs
request_kwargs = request.call_args[1]
assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE

@pytest.mark.parametrize("time_skew", [100, -100])
Expand Down Expand Up @@ -378,7 +378,7 @@ def test_with_quota_project_iam_endpoint_override(
assert quota_project_creds.valid
assert not quota_project_creds.expired
# Confirm override endpoint used.
request_kwargs = request.call_args.kwargs
request_kwargs = request.call_args[1]
assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE

def test_id_token_success(
Expand Down