diff --git a/system_tests/system_tests_sync/test_external_accounts.py b/system_tests/system_tests_sync/test_external_accounts.py index 59fbd4bef..29355f479 100644 --- a/system_tests/system_tests_sync/test_external_accounts.py +++ b/system_tests/system_tests_sync/test_external_accounts.py @@ -42,6 +42,7 @@ import sys import google.auth +from google.auth import _helpers from googleapiclient import discovery from six.moves import BaseHTTPServer from google.oauth2 import service_account @@ -132,7 +133,6 @@ def get_project_dns(dns_access, credential_data): with NamedTemporaryFile() as credfile: credfile.write(json.dumps(credential_data).encode("utf-8")) credfile.flush() - old_credentials = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") with patch.dict(os.environ, {"GOOGLE_APPLICATION_CREDENTIALS": credfile.name}): # If our setup and credential file are correct, @@ -150,9 +150,7 @@ def get_xml_value_by_tagname(data, tagname): # This test makes sure that setting an accesible credential file # works to allow access to Google resources. -def test_file_based_external_account( - oidc_credentials, service_account_info, dns_access -): +def test_file_based_external_account(oidc_credentials, dns_access): with NamedTemporaryFile() as tmpfile: tmpfile.write(oidc_credentials.token.encode("utf-8")) tmpfile.flush() @@ -173,10 +171,11 @@ def test_file_based_external_account( }, ) + # This test makes sure that setting a token lifetime works # for service account impersonation. def test_file_based_external_account_with_configure_token_lifetime( - oidc_credentials, service_account_info, dns_access + oidc_credentials, dns_access ): with NamedTemporaryFile() as tmpfile: tmpfile.write(oidc_credentials.token.encode("utf-8")) @@ -202,6 +201,47 @@ def test_file_based_external_account_with_configure_token_lifetime( ) +def test_configurable_token_lifespan(oidc_credentials, http_request): + TOKEN_LIFETIME_SECONDS = 2800 + BUFFER_SECONDS = 5 + + def check_impersonation_expiration(): + # First, get the default credentials. + credentials, _ = google.auth.default( + scopes=["https://www.googleapis.com/auth/cloud-platform.read-only"], + request=http_request, + ) + + utcmax = _helpers.utcnow() + datetime.timedelta(seconds=TOKEN_LIFETIME_SECONDS) + utcmin = utcmax - datetime.timedelta(seconds=BUFFER_SECONDS) + assert utcmin < credentials._impersonated_credentials.expiry <= utcmax + + return True + + with NamedTemporaryFile() as tmpfile: + tmpfile.write(oidc_credentials.token.encode("utf-8")) + tmpfile.flush() + + assert get_project_dns( + check_impersonation_expiration, + { + "type": "external_account", + "audience": _AUDIENCE_OIDC, + "subject_token_type": "urn:ietf:params:oauth:token-type:jwt", + "token_url": "https://sts.googleapis.com/v1/token", + "service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:generateAccessToken".format( + oidc_credentials.service_account_email + ), + "service_account_impersonation": { + "token_lifetime_seconds": TOKEN_LIFETIME_SECONDS, + }, + "credential_source": { + "file": tmpfile.name, + }, + }, + ) + + # This test makes sure that setting up an http server to provide credentials # works to allow access to Google resources. def test_url_based_external_account(dns_access, oidc_credentials, service_account_info): @@ -337,9 +377,7 @@ def test_aws_based_external_account( # This test makes sure that setting up an executable to provide credentials # works to allow access to Google resources. -def test_pluggable_external_account( - oidc_credentials, service_account_info, dns_access -): +def test_pluggable_external_account(oidc_credentials, service_account_info, dns_access): now = datetime.datetime.now() unix_seconds = time.mktime(now.timetuple()) expiration_time = (unix_seconds + 1 * 60 * 60) * 1000 @@ -354,7 +392,7 @@ def test_pluggable_external_account( tmpfile = NamedTemporaryFile(delete=True) with open(tmpfile.name, "w") as f: f.write("#!/bin/bash\n") - f.write("echo \"{}\"\n".format(json.dumps(credential).replace('"', '\\"'))) + f.write('echo "{}"\n'.format(json.dumps(credential).replace('"', '\\"'))) tmpfile.file.close() os.chmod(tmpfile.name, 0o777)