Skip to content

Commit

Permalink
feat: add integration tests for configurable token lifespan (#1103)
Browse files Browse the repository at this point in the history
* feat: add integration tests for configurable token lifespan

* Reducing buffer to 5 seconds
  • Loading branch information
ScruffyProdigy authored Aug 18, 2022
1 parent 05f125d commit 124bae6
Showing 1 changed file with 47 additions and 9 deletions.
56 changes: 47 additions & 9 deletions system_tests/system_tests_sync/test_external_accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import sys
import google.auth
from google.auth import _helpers
from googleapiclient import discovery
from six.moves import BaseHTTPServer
from google.oauth2 import service_account
Expand Down Expand Up @@ -132,7 +133,6 @@ def get_project_dns(dns_access, credential_data):
with NamedTemporaryFile() as credfile:
credfile.write(json.dumps(credential_data).encode("utf-8"))
credfile.flush()
old_credentials = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")

with patch.dict(os.environ, {"GOOGLE_APPLICATION_CREDENTIALS": credfile.name}):
# If our setup and credential file are correct,
Expand All @@ -150,9 +150,7 @@ def get_xml_value_by_tagname(data, tagname):

# This test makes sure that setting an accesible credential file
# works to allow access to Google resources.
def test_file_based_external_account(
oidc_credentials, service_account_info, dns_access
):
def test_file_based_external_account(oidc_credentials, dns_access):
with NamedTemporaryFile() as tmpfile:
tmpfile.write(oidc_credentials.token.encode("utf-8"))
tmpfile.flush()
Expand All @@ -173,10 +171,11 @@ def test_file_based_external_account(
},
)


# This test makes sure that setting a token lifetime works
# for service account impersonation.
def test_file_based_external_account_with_configure_token_lifetime(
oidc_credentials, service_account_info, dns_access
oidc_credentials, dns_access
):
with NamedTemporaryFile() as tmpfile:
tmpfile.write(oidc_credentials.token.encode("utf-8"))
Expand All @@ -202,6 +201,47 @@ def test_file_based_external_account_with_configure_token_lifetime(
)


def test_configurable_token_lifespan(oidc_credentials, http_request):
TOKEN_LIFETIME_SECONDS = 2800
BUFFER_SECONDS = 5

def check_impersonation_expiration():
# First, get the default credentials.
credentials, _ = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform.read-only"],
request=http_request,
)

utcmax = _helpers.utcnow() + datetime.timedelta(seconds=TOKEN_LIFETIME_SECONDS)
utcmin = utcmax - datetime.timedelta(seconds=BUFFER_SECONDS)
assert utcmin < credentials._impersonated_credentials.expiry <= utcmax

return True

with NamedTemporaryFile() as tmpfile:
tmpfile.write(oidc_credentials.token.encode("utf-8"))
tmpfile.flush()

assert get_project_dns(
check_impersonation_expiration,
{
"type": "external_account",
"audience": _AUDIENCE_OIDC,
"subject_token_type": "urn:ietf:params:oauth:token-type:jwt",
"token_url": "https://sts.googleapis.com/v1/token",
"service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:generateAccessToken".format(
oidc_credentials.service_account_email
),
"service_account_impersonation": {
"token_lifetime_seconds": TOKEN_LIFETIME_SECONDS,
},
"credential_source": {
"file": tmpfile.name,
},
},
)


# This test makes sure that setting up an http server to provide credentials
# works to allow access to Google resources.
def test_url_based_external_account(dns_access, oidc_credentials, service_account_info):
Expand Down Expand Up @@ -337,9 +377,7 @@ def test_aws_based_external_account(

# This test makes sure that setting up an executable to provide credentials
# works to allow access to Google resources.
def test_pluggable_external_account(
oidc_credentials, service_account_info, dns_access
):
def test_pluggable_external_account(oidc_credentials, service_account_info, dns_access):
now = datetime.datetime.now()
unix_seconds = time.mktime(now.timetuple())
expiration_time = (unix_seconds + 1 * 60 * 60) * 1000
Expand All @@ -354,7 +392,7 @@ def test_pluggable_external_account(
tmpfile = NamedTemporaryFile(delete=True)
with open(tmpfile.name, "w") as f:
f.write("#!/bin/bash\n")
f.write("echo \"{}\"\n".format(json.dumps(credential).replace('"', '\\"')))
f.write('echo "{}"\n'.format(json.dumps(credential).replace('"', '\\"')))
tmpfile.file.close()

os.chmod(tmpfile.name, 0o777)
Expand Down

0 comments on commit 124bae6

Please # to comment.