diff --git a/tests/oauth2/test_service_account.py b/tests/oauth2/test_service_account.py index 40a4ca219..648541e2b 100644 --- a/tests/oauth2/test_service_account.py +++ b/tests/oauth2/test_service_account.py @@ -203,18 +203,18 @@ def test_apply_with_no_quota_project_id(self): assert "x-goog-user-project" not in headers assert "token" in headers["authorization"] - @mock.patch("google.auth.jwt.Credentials.from_signing_credentials", autospec=True) - def test__create_self_signed_jwt(self, from_signing_credentials): + @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True) + def test__create_self_signed_jwt(self, jwt): credentials = service_account.Credentials( SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI ) audience = "https://pubsub.googleapis.com" credentials._create_self_signed_jwt(audience) - from_signing_credentials.assert_called_once_with(credentials, audience) + jwt.from_signing_credentials.assert_called_once_with(credentials, audience) - @mock.patch("google.auth.jwt.Credentials.from_signing_credentials", autospec=True) - def test__create_self_signed_jwt_with_user_scopes(self, from_signing_credentials): + @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True) + def test__create_self_signed_jwt_with_user_scopes(self, jwt): credentials = service_account.Credentials( SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI, scopes=["foo"] ) @@ -223,7 +223,7 @@ def test__create_self_signed_jwt_with_user_scopes(self, from_signing_credentials credentials._create_self_signed_jwt(audience) # JWT should not be created if there are user-defined scopes - from_signing_credentials.assert_not_called() + jwt.from_signing_credentials.assert_not_called() @mock.patch("google.oauth2._client.jwt_grant", autospec=True) def test_refresh_success(self, jwt_grant): diff --git a/tests/oauth2/test_sts.py b/tests/oauth2/test_sts.py index 8792bd6bc..e8e008df5 100644 --- a/tests/oauth2/test_sts.py +++ b/tests/oauth2/test_sts.py @@ -128,7 +128,7 @@ def test_exchange_token_full_success_without_auth(self): self.ADDON_HEADERS, ) - self.assert_request_kwargs(request.call_args.kwargs, headers, request_data) + self.assert_request_kwargs(request.call_args[1], headers, request_data) assert response == self.SUCCESS_RESPONSE def test_exchange_token_partial_success_without_auth(self): @@ -157,7 +157,7 @@ def test_exchange_token_partial_success_without_auth(self): requested_token_type=self.REQUESTED_TOKEN_TYPE, ) - self.assert_request_kwargs(request.call_args.kwargs, headers, request_data) + self.assert_request_kwargs(request.call_args[1], headers, request_data) assert response == self.SUCCESS_RESPONSE def test_exchange_token_non200_without_auth(self): @@ -227,7 +227,7 @@ def test_exchange_token_full_success_with_basic_auth(self): self.ADDON_HEADERS, ) - self.assert_request_kwargs(request.call_args.kwargs, headers, request_data) + self.assert_request_kwargs(request.call_args[1], headers, request_data) assert response == self.SUCCESS_RESPONSE def test_exchange_token_partial_success_with_basic_auth(self): @@ -259,7 +259,7 @@ def test_exchange_token_partial_success_with_basic_auth(self): requested_token_type=self.REQUESTED_TOKEN_TYPE, ) - self.assert_request_kwargs(request.call_args.kwargs, headers, request_data) + self.assert_request_kwargs(request.call_args[1], headers, request_data) assert response == self.SUCCESS_RESPONSE def test_exchange_token_non200_with_basic_auth(self): @@ -331,7 +331,7 @@ def test_exchange_token_full_success_with_reqbody_auth(self): self.ADDON_HEADERS, ) - self.assert_request_kwargs(request.call_args.kwargs, headers, request_data) + self.assert_request_kwargs(request.call_args[1], headers, request_data) assert response == self.SUCCESS_RESPONSE def test_exchange_token_partial_success_with_reqbody_auth(self): @@ -362,7 +362,7 @@ def test_exchange_token_partial_success_with_reqbody_auth(self): requested_token_type=self.REQUESTED_TOKEN_TYPE, ) - self.assert_request_kwargs(request.call_args.kwargs, headers, request_data) + self.assert_request_kwargs(request.call_args[1], headers, request_data) assert response == self.SUCCESS_RESPONSE def test_exchange_token_non200_with_reqbody_auth(self): diff --git a/tests/test_aws.py b/tests/test_aws.py index 9a8f98eec..7a55841ca 100644 --- a/tests/test_aws.py +++ b/tests/test_aws.py @@ -959,15 +959,15 @@ def test_retrieve_subject_token_success_temp_creds_no_environment_vars( ) # Assert region request. self.assert_aws_metadata_request_kwargs( - request.call_args_list[0].kwargs, REGION_URL + request.call_args_list[0][1], REGION_URL ) # Assert role request. self.assert_aws_metadata_request_kwargs( - request.call_args_list[1].kwargs, SECURITY_CREDS_URL + request.call_args_list[1][1], SECURITY_CREDS_URL ) # Assert security credentials request. self.assert_aws_metadata_request_kwargs( - request.call_args_list[2].kwargs, + request.call_args_list[2][1], "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE), {"Content-Type": "application/json"}, ) @@ -986,11 +986,11 @@ def test_retrieve_subject_token_success_temp_creds_no_environment_vars( assert len(new_request.call_args_list) == 2 # Assert role request. self.assert_aws_metadata_request_kwargs( - new_request.call_args_list[0].kwargs, SECURITY_CREDS_URL + new_request.call_args_list[0][1], SECURITY_CREDS_URL ) # Assert security credentials request. self.assert_aws_metadata_request_kwargs( - new_request.call_args_list[1].kwargs, + new_request.call_args_list[1][1], "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE), {"Content-Type": "application/json"}, ) @@ -1193,7 +1193,7 @@ def test_refresh_success_without_impersonation_ignore_default_scopes(self, utcno assert len(request.call_args_list) == 4 # Fourth request should be sent to GCP STS endpoint. self.assert_token_request_kwargs( - request.call_args_list[3].kwargs, token_headers, token_request_data + request.call_args_list[3][1], token_headers, token_request_data ) assert credentials.token == self.SUCCESS_RESPONSE["access_token"] assert credentials.quota_project_id == QUOTA_PROJECT_ID @@ -1249,7 +1249,7 @@ def test_refresh_success_without_impersonation_use_default_scopes(self, utcnow): assert len(request.call_args_list) == 4 # Fourth request should be sent to GCP STS endpoint. self.assert_token_request_kwargs( - request.call_args_list[3].kwargs, token_headers, token_request_data + request.call_args_list[3][1], token_headers, token_request_data ) assert credentials.token == self.SUCCESS_RESPONSE["access_token"] assert credentials.quota_project_id == QUOTA_PROJECT_ID @@ -1326,12 +1326,12 @@ def test_refresh_success_with_impersonation_ignore_default_scopes(self, utcnow): assert len(request.call_args_list) == 5 # Fourth request should be sent to GCP STS endpoint. self.assert_token_request_kwargs( - request.call_args_list[3].kwargs, token_headers, token_request_data + request.call_args_list[3][1], token_headers, token_request_data ) # Fifth request should be sent to iamcredentials endpoint for service # account impersonation. self.assert_impersonation_request_kwargs( - request.call_args_list[4].kwargs, + request.call_args_list[4][1], impersonation_headers, impersonation_request_data, ) @@ -1410,12 +1410,12 @@ def test_refresh_success_with_impersonation_use_default_scopes(self, utcnow): assert len(request.call_args_list) == 5 # Fourth request should be sent to GCP STS endpoint. self.assert_token_request_kwargs( - request.call_args_list[3].kwargs, token_headers, token_request_data + request.call_args_list[3][1], token_headers, token_request_data ) # Fifth request should be sent to iamcredentials endpoint for service # account impersonation. self.assert_impersonation_request_kwargs( - request.call_args_list[4].kwargs, + request.call_args_list[4][1], impersonation_headers, impersonation_request_data, ) diff --git a/tests/test_external_account.py b/tests/test_external_account.py index 42e53ecb5..8f8d98009 100644 --- a/tests/test_external_account.py +++ b/tests/test_external_account.py @@ -363,9 +363,7 @@ def test_refresh_without_client_auth_success(self, unused_utcnow): credentials.refresh(request) - self.assert_token_request_kwargs( - request.call_args.kwargs, headers, request_data - ) + self.assert_token_request_kwargs(request.call_args[1], headers, request_data) assert credentials.valid assert credentials.expiry == expected_expiry assert not credentials.expired @@ -422,11 +420,11 @@ def test_refresh_impersonation_without_client_auth_success(self): assert len(request.call_args_list) == 2 # Verify token exchange request parameters. self.assert_token_request_kwargs( - request.call_args_list[0].kwargs, token_headers, token_request_data + request.call_args_list[0][1], token_headers, token_request_data ) # Verify service account impersonation request parameters. self.assert_impersonation_request_kwargs( - request.call_args_list[1].kwargs, + request.call_args_list[1][1], impersonation_headers, impersonation_request_data, ) @@ -436,7 +434,7 @@ def test_refresh_impersonation_without_client_auth_success(self): assert credentials.token == impersonation_response["accessToken"] def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default_scopes( - self + self, ): headers = {"Content-Type": "application/x-www-form-urlencoded"} request_data = { @@ -458,9 +456,7 @@ def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default credentials.refresh(request) - self.assert_token_request_kwargs( - request.call_args.kwargs, headers, request_data - ) + self.assert_token_request_kwargs(request.call_args[1], headers, request_data) assert credentials.valid assert not credentials.expired assert credentials.token == self.SUCCESS_RESPONSE["access_token"] @@ -488,9 +484,7 @@ def test_refresh_without_client_auth_success_explicit_default_scopes_only(self): credentials.refresh(request) - self.assert_token_request_kwargs( - request.call_args.kwargs, headers, request_data - ) + self.assert_token_request_kwargs(request.call_args[1], headers, request_data) assert credentials.valid assert not credentials.expired assert credentials.token == self.SUCCESS_RESPONSE["access_token"] @@ -551,9 +545,7 @@ def test_refresh_with_client_auth_success(self): credentials.refresh(request) - self.assert_token_request_kwargs( - request.call_args.kwargs, headers, request_data - ) + self.assert_token_request_kwargs(request.call_args[1], headers, request_data) assert credentials.valid assert not credentials.expired assert credentials.token == self.SUCCESS_RESPONSE["access_token"] @@ -616,11 +608,11 @@ def test_refresh_impersonation_with_client_auth_success_ignore_default_scopes(se assert len(request.call_args_list) == 2 # Verify token exchange request parameters. self.assert_token_request_kwargs( - request.call_args_list[0].kwargs, token_headers, token_request_data + request.call_args_list[0][1], token_headers, token_request_data ) # Verify service account impersonation request parameters. self.assert_impersonation_request_kwargs( - request.call_args_list[1].kwargs, + request.call_args_list[1][1], impersonation_headers, impersonation_request_data, ) @@ -687,11 +679,11 @@ def test_refresh_impersonation_with_client_auth_success_use_default_scopes(self) assert len(request.call_args_list) == 2 # Verify token exchange request parameters. self.assert_token_request_kwargs( - request.call_args_list[0].kwargs, token_headers, token_request_data + request.call_args_list[0][1], token_headers, token_request_data ) # Verify service account impersonation request parameters. self.assert_impersonation_request_kwargs( - request.call_args_list[1].kwargs, + request.call_args_list[1][1], impersonation_headers, impersonation_request_data, ) @@ -1045,11 +1037,11 @@ def test_get_project_id_cloud_resource_manager_success(self): assert len(request.call_args_list) == 3 # Verify token exchange request parameters. self.assert_token_request_kwargs( - request.call_args_list[0].kwargs, token_headers, token_request_data + request.call_args_list[0][1], token_headers, token_request_data ) # Verify service account impersonation request parameters. self.assert_impersonation_request_kwargs( - request.call_args_list[1].kwargs, + request.call_args_list[1][1], impersonation_headers, impersonation_request_data, ) @@ -1061,7 +1053,7 @@ def test_get_project_id_cloud_resource_manager_success(self): assert credentials.token == impersonation_response["accessToken"] # Verify cloud resource manager request parameters. self.assert_resource_manager_request_kwargs( - request.call_args_list[2].kwargs, + request.call_args_list[2][1], self.PROJECT_NUMBER, { "x-goog-user-project": self.QUOTA_PROJECT_ID, diff --git a/tests/test_iam.py b/tests/test_iam.py index fbd3e418d..382713b9b 100644 --- a/tests/test_iam.py +++ b/tests/test_iam.py @@ -89,7 +89,7 @@ def test_sign_bytes(self): returned_signature = signer.sign("123") assert returned_signature == signature - kwargs = request.call_args.kwargs + kwargs = request.call_args[1] assert kwargs["headers"]["Content-Type"] == "application/json" def test_sign_bytes_failure(self): diff --git a/tests/test_identity_pool.py b/tests/test_identity_pool.py index c017ab59f..90a0e2549 100644 --- a/tests/test_identity_pool.py +++ b/tests/test_identity_pool.py @@ -223,10 +223,10 @@ def assert_underlying_credentials_refresh( assert len(request.call_args_list) == len(requests) if credential_data: - cls.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None) + cls.assert_credential_request_kwargs(request.call_args_list[0][1], None) # Verify token exchange request parameters. cls.assert_token_request_kwargs( - request.call_args_list[token_request_index].kwargs, + request.call_args_list[token_request_index][1], token_headers, token_request_data, token_url, @@ -235,7 +235,7 @@ def assert_underlying_credentials_refresh( # is processed. if service_account_impersonation_url: cls.assert_impersonation_request_kwargs( - request.call_args_list[impersonation_request_index].kwargs, + request.call_args_list[impersonation_request_index][1], impersonation_headers, impersonation_request_data, service_account_impersonation_url, @@ -505,7 +505,7 @@ def test_retrieve_subject_token_file_not_found(self): assert excinfo.match(r"File './not_found.txt' was not found") def test_refresh_text_file_success_without_impersonation_ignore_default_scopes( - self + self, ): credentials = self.make_credentials( client_id=CLIENT_ID, @@ -677,7 +677,7 @@ def test_retrieve_subject_token_from_url(self): subject_token = credentials.retrieve_subject_token(request) assert subject_token == TEXT_FILE_SUBJECT_TOKEN - self.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None) + self.assert_credential_request_kwargs(request.call_args_list[0][1], None) def test_retrieve_subject_token_from_url_with_headers(self): credentials = self.make_credentials( @@ -688,7 +688,7 @@ def test_retrieve_subject_token_from_url_with_headers(self): assert subject_token == TEXT_FILE_SUBJECT_TOKEN self.assert_credential_request_kwargs( - request.call_args_list[0].kwargs, {"foo": "bar"} + request.call_args_list[0][1], {"foo": "bar"} ) def test_retrieve_subject_token_from_url_json(self): @@ -699,7 +699,7 @@ def test_retrieve_subject_token_from_url_json(self): subject_token = credentials.retrieve_subject_token(request) assert subject_token == JSON_FILE_SUBJECT_TOKEN - self.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None) + self.assert_credential_request_kwargs(request.call_args_list[0][1], None) def test_retrieve_subject_token_from_url_json_with_headers(self): credentials = self.make_credentials( @@ -714,7 +714,7 @@ def test_retrieve_subject_token_from_url_json_with_headers(self): assert subject_token == JSON_FILE_SUBJECT_TOKEN self.assert_credential_request_kwargs( - request.call_args_list[0].kwargs, {"foo": "bar"} + request.call_args_list[0][1], {"foo": "bar"} ) def test_retrieve_subject_token_from_url_not_found(self): diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py index 430c770d3..90de704a2 100644 --- a/tests/test_impersonated_credentials.py +++ b/tests/test_impersonated_credentials.py @@ -203,7 +203,7 @@ def test_refresh_success_iam_endpoint_override( assert credentials.valid assert not credentials.expired # Confirm override endpoint used. - request_kwargs = request.call_args.kwargs + request_kwargs = request.call_args[1] assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE @pytest.mark.parametrize("time_skew", [100, -100]) @@ -378,7 +378,7 @@ def test_with_quota_project_iam_endpoint_override( assert quota_project_creds.valid assert not quota_project_creds.expired # Confirm override endpoint used. - request_kwargs = request.call_args.kwargs + request_kwargs = request.call_args[1] assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE def test_id_token_success(