Skip to content

Commit

Permalink
fix: fix unit tests so they can work in g3 (#714)
Browse files Browse the repository at this point in the history
* fix: mock not working well with g3 test

* update

* update
  • Loading branch information
arithmetic1728 committed Mar 8, 2021
1 parent d7c6300 commit d80c85f
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 56 deletions.
12 changes: 6 additions & 6 deletions tests/oauth2/test_service_account.py
Expand Up @@ -203,18 +203,18 @@ def test_apply_with_no_quota_project_id(self):
assert "x-goog-user-project" not in headers
assert "token" in headers["authorization"]

@mock.patch("google.auth.jwt.Credentials.from_signing_credentials", autospec=True)
def test__create_self_signed_jwt(self, from_signing_credentials):
@mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
def test__create_self_signed_jwt(self, jwt):
credentials = service_account.Credentials(
SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI
)

audience = "https://pubsub.googleapis.com"
credentials._create_self_signed_jwt(audience)
from_signing_credentials.assert_called_once_with(credentials, audience)
jwt.from_signing_credentials.assert_called_once_with(credentials, audience)

@mock.patch("google.auth.jwt.Credentials.from_signing_credentials", autospec=True)
def test__create_self_signed_jwt_with_user_scopes(self, from_signing_credentials):
@mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
def test__create_self_signed_jwt_with_user_scopes(self, jwt):
credentials = service_account.Credentials(
SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI, scopes=["foo"]
)
Expand All @@ -223,7 +223,7 @@ def test__create_self_signed_jwt_with_user_scopes(self, from_signing_credentials
credentials._create_self_signed_jwt(audience)

# JWT should not be created if there are user-defined scopes
from_signing_credentials.assert_not_called()
jwt.from_signing_credentials.assert_not_called()

@mock.patch("google.oauth2._client.jwt_grant", autospec=True)
def test_refresh_success(self, jwt_grant):
Expand Down
12 changes: 6 additions & 6 deletions tests/oauth2/test_sts.py
Expand Up @@ -128,7 +128,7 @@ def test_exchange_token_full_success_without_auth(self):
self.ADDON_HEADERS,
)

self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE

def test_exchange_token_partial_success_without_auth(self):
Expand Down Expand Up @@ -157,7 +157,7 @@ def test_exchange_token_partial_success_without_auth(self):
requested_token_type=self.REQUESTED_TOKEN_TYPE,
)

self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE

def test_exchange_token_non200_without_auth(self):
Expand Down Expand Up @@ -227,7 +227,7 @@ def test_exchange_token_full_success_with_basic_auth(self):
self.ADDON_HEADERS,
)

self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE

def test_exchange_token_partial_success_with_basic_auth(self):
Expand Down Expand Up @@ -259,7 +259,7 @@ def test_exchange_token_partial_success_with_basic_auth(self):
requested_token_type=self.REQUESTED_TOKEN_TYPE,
)

self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE

def test_exchange_token_non200_with_basic_auth(self):
Expand Down Expand Up @@ -331,7 +331,7 @@ def test_exchange_token_full_success_with_reqbody_auth(self):
self.ADDON_HEADERS,
)

self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE

def test_exchange_token_partial_success_with_reqbody_auth(self):
Expand Down Expand Up @@ -362,7 +362,7 @@ def test_exchange_token_partial_success_with_reqbody_auth(self):
requested_token_type=self.REQUESTED_TOKEN_TYPE,
)

self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE

def test_exchange_token_non200_with_reqbody_auth(self):
Expand Down
22 changes: 11 additions & 11 deletions tests/test_aws.py
Expand Up @@ -959,15 +959,15 @@ def test_retrieve_subject_token_success_temp_creds_no_environment_vars(
)
# Assert region request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[0].kwargs, REGION_URL
request.call_args_list[0][1], REGION_URL
)
# Assert role request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[1].kwargs, SECURITY_CREDS_URL
request.call_args_list[1][1], SECURITY_CREDS_URL
)
# Assert security credentials request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[2].kwargs,
request.call_args_list[2][1],
"{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
{"Content-Type": "application/json"},
)
Expand All @@ -986,11 +986,11 @@ def test_retrieve_subject_token_success_temp_creds_no_environment_vars(
assert len(new_request.call_args_list) == 2
# Assert role request.
self.assert_aws_metadata_request_kwargs(
new_request.call_args_list[0].kwargs, SECURITY_CREDS_URL
new_request.call_args_list[0][1], SECURITY_CREDS_URL
)
# Assert security credentials request.
self.assert_aws_metadata_request_kwargs(
new_request.call_args_list[1].kwargs,
new_request.call_args_list[1][1],
"{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
{"Content-Type": "application/json"},
)
Expand Down Expand Up @@ -1193,7 +1193,7 @@ def test_refresh_success_without_impersonation_ignore_default_scopes(self, utcno
assert len(request.call_args_list) == 4
# Fourth request should be sent to GCP STS endpoint.
self.assert_token_request_kwargs(
request.call_args_list[3].kwargs, token_headers, token_request_data
request.call_args_list[3][1], token_headers, token_request_data
)
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
assert credentials.quota_project_id == QUOTA_PROJECT_ID
Expand Down Expand Up @@ -1249,7 +1249,7 @@ def test_refresh_success_without_impersonation_use_default_scopes(self, utcnow):
assert len(request.call_args_list) == 4
# Fourth request should be sent to GCP STS endpoint.
self.assert_token_request_kwargs(
request.call_args_list[3].kwargs, token_headers, token_request_data
request.call_args_list[3][1], token_headers, token_request_data
)
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
assert credentials.quota_project_id == QUOTA_PROJECT_ID
Expand Down Expand Up @@ -1326,12 +1326,12 @@ def test_refresh_success_with_impersonation_ignore_default_scopes(self, utcnow):
assert len(request.call_args_list) == 5
# Fourth request should be sent to GCP STS endpoint.
self.assert_token_request_kwargs(
request.call_args_list[3].kwargs, token_headers, token_request_data
request.call_args_list[3][1], token_headers, token_request_data
)
# Fifth request should be sent to iamcredentials endpoint for service
# account impersonation.
self.assert_impersonation_request_kwargs(
request.call_args_list[4].kwargs,
request.call_args_list[4][1],
impersonation_headers,
impersonation_request_data,
)
Expand Down Expand Up @@ -1410,12 +1410,12 @@ def test_refresh_success_with_impersonation_use_default_scopes(self, utcnow):
assert len(request.call_args_list) == 5
# Fourth request should be sent to GCP STS endpoint.
self.assert_token_request_kwargs(
request.call_args_list[3].kwargs, token_headers, token_request_data
request.call_args_list[3][1], token_headers, token_request_data
)
# Fifth request should be sent to iamcredentials endpoint for service
# account impersonation.
self.assert_impersonation_request_kwargs(
request.call_args_list[4].kwargs,
request.call_args_list[4][1],
impersonation_headers,
impersonation_request_data,
)
Expand Down
36 changes: 14 additions & 22 deletions tests/test_external_account.py
Expand Up @@ -363,9 +363,7 @@ def test_refresh_without_client_auth_success(self, unused_utcnow):

credentials.refresh(request)

self.assert_token_request_kwargs(
request.call_args.kwargs, headers, request_data
)
self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert credentials.expiry == expected_expiry
assert not credentials.expired
Expand Down Expand Up @@ -422,11 +420,11 @@ def test_refresh_impersonation_without_client_auth_success(self):
assert len(request.call_args_list) == 2
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
request.call_args_list[0].kwargs, token_headers, token_request_data
request.call_args_list[0][1], token_headers, token_request_data
)
# Verify service account impersonation request parameters.
self.assert_impersonation_request_kwargs(
request.call_args_list[1].kwargs,
request.call_args_list[1][1],
impersonation_headers,
impersonation_request_data,
)
Expand All @@ -436,7 +434,7 @@ def test_refresh_impersonation_without_client_auth_success(self):
assert credentials.token == impersonation_response["accessToken"]

def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default_scopes(
self
self,
):
headers = {"Content-Type": "application/x-www-form-urlencoded"}
request_data = {
Expand All @@ -458,9 +456,7 @@ def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default

credentials.refresh(request)

self.assert_token_request_kwargs(
request.call_args.kwargs, headers, request_data
)
self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert not credentials.expired
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
Expand Down Expand Up @@ -488,9 +484,7 @@ def test_refresh_without_client_auth_success_explicit_default_scopes_only(self):

credentials.refresh(request)

self.assert_token_request_kwargs(
request.call_args.kwargs, headers, request_data
)
self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert not credentials.expired
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
Expand Down Expand Up @@ -551,9 +545,7 @@ def test_refresh_with_client_auth_success(self):

credentials.refresh(request)

self.assert_token_request_kwargs(
request.call_args.kwargs, headers, request_data
)
self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert not credentials.expired
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
Expand Down Expand Up @@ -616,11 +608,11 @@ def test_refresh_impersonation_with_client_auth_success_ignore_default_scopes(se
assert len(request.call_args_list) == 2
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
request.call_args_list[0].kwargs, token_headers, token_request_data
request.call_args_list[0][1], token_headers, token_request_data
)
# Verify service account impersonation request parameters.
self.assert_impersonation_request_kwargs(
request.call_args_list[1].kwargs,
request.call_args_list[1][1],
impersonation_headers,
impersonation_request_data,
)
Expand Down Expand Up @@ -687,11 +679,11 @@ def test_refresh_impersonation_with_client_auth_success_use_default_scopes(self)
assert len(request.call_args_list) == 2
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
request.call_args_list[0].kwargs, token_headers, token_request_data
request.call_args_list[0][1], token_headers, token_request_data
)
# Verify service account impersonation request parameters.
self.assert_impersonation_request_kwargs(
request.call_args_list[1].kwargs,
request.call_args_list[1][1],
impersonation_headers,
impersonation_request_data,
)
Expand Down Expand Up @@ -1045,11 +1037,11 @@ def test_get_project_id_cloud_resource_manager_success(self):
assert len(request.call_args_list) == 3
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
request.call_args_list[0].kwargs, token_headers, token_request_data
request.call_args_list[0][1], token_headers, token_request_data
)
# Verify service account impersonation request parameters.
self.assert_impersonation_request_kwargs(
request.call_args_list[1].kwargs,
request.call_args_list[1][1],
impersonation_headers,
impersonation_request_data,
)
Expand All @@ -1061,7 +1053,7 @@ def test_get_project_id_cloud_resource_manager_success(self):
assert credentials.token == impersonation_response["accessToken"]
# Verify cloud resource manager request parameters.
self.assert_resource_manager_request_kwargs(
request.call_args_list[2].kwargs,
request.call_args_list[2][1],
self.PROJECT_NUMBER,
{
"x-goog-user-project": self.QUOTA_PROJECT_ID,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_iam.py
Expand Up @@ -89,7 +89,7 @@ def test_sign_bytes(self):
returned_signature = signer.sign("123")

assert returned_signature == signature
kwargs = request.call_args.kwargs
kwargs = request.call_args[1]
assert kwargs["headers"]["Content-Type"] == "application/json"

def test_sign_bytes_failure(self):
Expand Down
16 changes: 8 additions & 8 deletions tests/test_identity_pool.py
Expand Up @@ -223,10 +223,10 @@ def assert_underlying_credentials_refresh(

assert len(request.call_args_list) == len(requests)
if credential_data:
cls.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None)
cls.assert_credential_request_kwargs(request.call_args_list[0][1], None)
# Verify token exchange request parameters.
cls.assert_token_request_kwargs(
request.call_args_list[token_request_index].kwargs,
request.call_args_list[token_request_index][1],
token_headers,
token_request_data,
token_url,
Expand All @@ -235,7 +235,7 @@ def assert_underlying_credentials_refresh(
# is processed.
if service_account_impersonation_url:
cls.assert_impersonation_request_kwargs(
request.call_args_list[impersonation_request_index].kwargs,
request.call_args_list[impersonation_request_index][1],
impersonation_headers,
impersonation_request_data,
service_account_impersonation_url,
Expand Down Expand Up @@ -505,7 +505,7 @@ def test_retrieve_subject_token_file_not_found(self):
assert excinfo.match(r"File './not_found.txt' was not found")

def test_refresh_text_file_success_without_impersonation_ignore_default_scopes(
self
self,
):
credentials = self.make_credentials(
client_id=CLIENT_ID,
Expand Down Expand Up @@ -677,7 +677,7 @@ def test_retrieve_subject_token_from_url(self):
subject_token = credentials.retrieve_subject_token(request)

assert subject_token == TEXT_FILE_SUBJECT_TOKEN
self.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None)
self.assert_credential_request_kwargs(request.call_args_list[0][1], None)

def test_retrieve_subject_token_from_url_with_headers(self):
credentials = self.make_credentials(
Expand All @@ -688,7 +688,7 @@ def test_retrieve_subject_token_from_url_with_headers(self):

assert subject_token == TEXT_FILE_SUBJECT_TOKEN
self.assert_credential_request_kwargs(
request.call_args_list[0].kwargs, {"foo": "bar"}
request.call_args_list[0][1], {"foo": "bar"}
)

def test_retrieve_subject_token_from_url_json(self):
Expand All @@ -699,7 +699,7 @@ def test_retrieve_subject_token_from_url_json(self):
subject_token = credentials.retrieve_subject_token(request)

assert subject_token == JSON_FILE_SUBJECT_TOKEN
self.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None)
self.assert_credential_request_kwargs(request.call_args_list[0][1], None)

def test_retrieve_subject_token_from_url_json_with_headers(self):
credentials = self.make_credentials(
Expand All @@ -714,7 +714,7 @@ def test_retrieve_subject_token_from_url_json_with_headers(self):

assert subject_token == JSON_FILE_SUBJECT_TOKEN
self.assert_credential_request_kwargs(
request.call_args_list[0].kwargs, {"foo": "bar"}
request.call_args_list[0][1], {"foo": "bar"}
)

def test_retrieve_subject_token_from_url_not_found(self):
Expand Down
4 changes: 2 additions & 2 deletions tests/test_impersonated_credentials.py
Expand Up @@ -203,7 +203,7 @@ def test_refresh_success_iam_endpoint_override(
assert credentials.valid
assert not credentials.expired
# Confirm override endpoint used.
request_kwargs = request.call_args.kwargs
request_kwargs = request.call_args[1]
assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE

@pytest.mark.parametrize("time_skew", [100, -100])
Expand Down Expand Up @@ -378,7 +378,7 @@ def test_with_quota_project_iam_endpoint_override(
assert quota_project_creds.valid
assert not quota_project_creds.expired
# Confirm override endpoint used.
request_kwargs = request.call_args.kwargs
request_kwargs = request.call_args[1]
assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE

def test_id_token_success(
Expand Down

0 comments on commit d80c85f

Please sign in to comment.