Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ADC with impersonated workforce pools #877

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 5 additions & 2 deletions google/auth/_default.py
Expand Up @@ -54,6 +54,9 @@
added. Or you can use service accounts instead. For more information \
about service accounts, see https://cloud.google.com/docs/authentication/"""

# The subject token type used for AWS external_account credentials.
_AWS_SUBJECT_TOKEN_TYPE = "urn:ietf:params:aws:token-type:aws4_request"


def _warn_about_problematic_credentials(credentials):
"""Determines if the credentials are problematic.
Expand Down Expand Up @@ -321,14 +324,14 @@ def _get_external_account_credentials(
is in the wrong format or is missing required information.
"""
# There are currently 2 types of external_account credentials.
try:
if info.get("subject_token_type") == _AWS_SUBJECT_TOKEN_TYPE:
# Check if configuration corresponds to an AWS credentials.
from google.auth import aws

credentials = aws.Credentials.from_info(
info, scopes=scopes, default_scopes=default_scopes
)
except ValueError:
else:
try:
# Check if configuration corresponds to an Identity Pool credentials.
from google.auth import identity_pool
Expand Down
191 changes: 190 additions & 1 deletion tests/test__default.py
Expand Up @@ -55,6 +55,10 @@
SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt")
TOKEN_URL = "https://sts.googleapis.com/v1/token"
AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
WORKFORCE_AUDIENCE = (
"//iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID"
)
WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER"
REGION_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
SECURITY_CREDS_URL = "http://169.254.169.254/latest/meta-data/iam/security-credentials"
CRED_VERIFICATION_URL = (
Expand All @@ -79,6 +83,49 @@
"regional_cred_verification_url": CRED_VERIFICATION_URL,
},
}
SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
SERVICE_ACCOUNT_IMPERSONATION_URL = (
"https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
+ "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
)
IMPERSONATED_IDENTITY_POOL_DATA = {
"type": "external_account",
"audience": AUDIENCE,
"subject_token_type": "urn:ietf:params:oauth:token-type:jwt",
"token_url": TOKEN_URL,
"credential_source": {"file": SUBJECT_TOKEN_TEXT_FILE},
"service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
}
IMPERSONATED_AWS_DATA = {
"type": "external_account",
"audience": AUDIENCE,
"subject_token_type": "urn:ietf:params:aws:token-type:aws4_request",
"token_url": TOKEN_URL,
"credential_source": {
"environment_id": "aws1",
"region_url": REGION_URL,
"url": SECURITY_CREDS_URL,
"regional_cred_verification_url": CRED_VERIFICATION_URL,
},
"service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
}
IDENTITY_POOL_WORKFORCE_DATA = {
"type": "external_account",
"audience": WORKFORCE_AUDIENCE,
"subject_token_type": "urn:ietf:params:oauth:token-type:id_token",
"token_url": TOKEN_URL,
"credential_source": {"file": SUBJECT_TOKEN_TEXT_FILE},
"workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
}
IMPERSONATED_IDENTITY_POOL_WORKFORCE_DATA = {
"type": "external_account",
"audience": WORKFORCE_AUDIENCE,
"subject_token_type": "urn:ietf:params:oauth:token-type:id_token",
"token_url": TOKEN_URL,
"credential_source": {"file": SUBJECT_TOKEN_TEXT_FILE},
"service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
"workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
}

MOCK_CREDENTIALS = mock.Mock(spec=credentials.CredentialsWithQuotaProject)
MOCK_CREDENTIALS.with_quota_project.return_value = MOCK_CREDENTIALS
Expand Down Expand Up @@ -256,6 +303,68 @@ def test_load_credentials_from_file_external_account_aws(get_project_id, tmpdir)
assert get_project_id.called


@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
def test_load_credentials_from_file_external_account_identity_pool_impersonated(
get_project_id, tmpdir
):
config_file = tmpdir.join("config.json")
config_file.write(json.dumps(IMPERSONATED_IDENTITY_POOL_DATA))
credentials, project_id = _default.load_credentials_from_file(str(config_file))

assert isinstance(credentials, identity_pool.Credentials)
assert not credentials.is_user
assert not credentials.is_workforce_pool
# Since no scopes are specified, the project ID cannot be determined.
assert project_id is None
assert get_project_id.called


@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
def test_load_credentials_from_file_external_account_aws_impersonated(
get_project_id, tmpdir
):
config_file = tmpdir.join("config.json")
config_file.write(json.dumps(IMPERSONATED_AWS_DATA))
credentials, project_id = _default.load_credentials_from_file(str(config_file))

assert isinstance(credentials, aws.Credentials)
assert not credentials.is_user
assert not credentials.is_workforce_pool
# Since no scopes are specified, the project ID cannot be determined.
assert project_id is None
assert get_project_id.called


@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
def test_load_credentials_from_file_external_account_workforce(get_project_id, tmpdir):
config_file = tmpdir.join("config.json")
config_file.write(json.dumps(IDENTITY_POOL_WORKFORCE_DATA))
credentials, project_id = _default.load_credentials_from_file(str(config_file))

assert isinstance(credentials, identity_pool.Credentials)
assert credentials.is_user
assert credentials.is_workforce_pool
# Since no scopes are specified, the project ID cannot be determined.
assert project_id is None
assert get_project_id.called


@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
def test_load_credentials_from_file_external_account_workforce_impersonated(
get_project_id, tmpdir
):
config_file = tmpdir.join("config.json")
config_file.write(json.dumps(IMPERSONATED_IDENTITY_POOL_WORKFORCE_DATA))
credentials, project_id = _default.load_credentials_from_file(str(config_file))

assert isinstance(credentials, identity_pool.Credentials)
assert not credentials.is_user
assert credentials.is_workforce_pool
# Since no scopes are specified, the project ID cannot be determined.
assert project_id is None
assert get_project_id.called


@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
def test_load_credentials_from_file_external_account_with_user_and_default_scopes(
get_project_id, tmpdir
Expand Down Expand Up @@ -718,18 +827,98 @@ def test_default_no_app_engine_compute_engine_module(unused_get):


@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
def test_default_environ_external_credentials(get_project_id, monkeypatch, tmpdir):
def test_default_environ_external_credentials_identity_pool(
get_project_id, monkeypatch, tmpdir
):
config_file = tmpdir.join("config.json")
config_file.write(json.dumps(IDENTITY_POOL_DATA))
monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))

credentials, project_id = _default.default()

assert isinstance(credentials, identity_pool.Credentials)
assert not credentials.is_user
assert not credentials.is_workforce_pool
# Without scopes, project ID cannot be determined.
assert project_id is None


@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
def test_default_environ_external_credentials_identity_pool_impersonated(
get_project_id, monkeypatch, tmpdir
):
config_file = tmpdir.join("config.json")
config_file.write(json.dumps(IMPERSONATED_IDENTITY_POOL_DATA))
monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))

credentials, project_id = _default.default(
scopes=["https://www.google.com/calendar/feeds"]
)

assert isinstance(credentials, identity_pool.Credentials)
assert not credentials.is_user
assert not credentials.is_workforce_pool
assert project_id is mock.sentinel.project_id
assert credentials.scopes == ["https://www.google.com/calendar/feeds"]


@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
def test_default_environ_external_credentials_aws_impersonated(
get_project_id, monkeypatch, tmpdir
):
config_file = tmpdir.join("config.json")
config_file.write(json.dumps(IMPERSONATED_AWS_DATA))
monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))

credentials, project_id = _default.default(
scopes=["https://www.google.com/calendar/feeds"]
)

assert isinstance(credentials, aws.Credentials)
assert not credentials.is_user
assert not credentials.is_workforce_pool
assert project_id is mock.sentinel.project_id
assert credentials.scopes == ["https://www.google.com/calendar/feeds"]


@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
def test_default_environ_external_credentials_workforce(
get_project_id, monkeypatch, tmpdir
):
config_file = tmpdir.join("config.json")
config_file.write(json.dumps(IDENTITY_POOL_WORKFORCE_DATA))
monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))

credentials, project_id = _default.default(
scopes=["https://www.google.com/calendar/feeds"]
)

assert isinstance(credentials, identity_pool.Credentials)
assert credentials.is_user
assert credentials.is_workforce_pool
assert project_id is mock.sentinel.project_id
assert credentials.scopes == ["https://www.google.com/calendar/feeds"]


@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
def test_default_environ_external_credentials_workforce_impersonated(
get_project_id, monkeypatch, tmpdir
):
config_file = tmpdir.join("config.json")
config_file.write(json.dumps(IMPERSONATED_IDENTITY_POOL_WORKFORCE_DATA))
monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))

credentials, project_id = _default.default(
scopes=["https://www.google.com/calendar/feeds"]
)

assert isinstance(credentials, identity_pool.Credentials)
assert not credentials.is_user
assert credentials.is_workforce_pool
assert project_id is mock.sentinel.project_id
assert credentials.scopes == ["https://www.google.com/calendar/feeds"]


@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
def test_default_environ_external_credentials_with_user_and_default_scopes_and_quota_project_id(
get_project_id, monkeypatch, tmpdir
Expand Down