diff --git a/fedcloudclient/auth.py b/fedcloudclient/auth.py index 7b05cb3..2ac6d8f 100644 --- a/fedcloudclient/auth.py +++ b/fedcloudclient/auth.py @@ -5,12 +5,15 @@ import jwt import liboidcagent as agent import requests +import os +import re from fedcloudclient.conf import CONF as CONF from fedcloudclient.exception import TokenError from fedcloudclient.logger import log_and_raise + class Token: """ Abstract object for managing tokens @@ -23,6 +26,7 @@ def get_token_type(self): ... + class OIDCToken(Token): """ OIDC tokens. Managing access tokens, oidc-agent account and mytoken @@ -35,6 +39,7 @@ def __init__(self, access_token=None): self.oidc_agent_account = None self.mytoken = None self.user_id = None + self._VO_PATTERN = "urn:mace:egi.eu:group:(.+?):(.+:)*role=member#aai.egi.eu" def get_token(self): """ @@ -67,6 +72,7 @@ def get_user_id(self) -> str: Return use ID :return: """ + if not self.payload: self.decode_token() return self.user_id @@ -87,6 +93,8 @@ def get_token_from_oidc_agent(self, oidc_agent_account: str) -> str: ) self.access_token = access_token self.oidc_agent_account = oidc_agent_account + + return access_token except agent.OidcAgentError as exception: error_msg = f"Error getting access token from oidc-agent: {exception}" @@ -140,6 +148,9 @@ def multiple_token(self, access_token: str, oidc_agent_account: str, mytoken: st """ if mytoken: try: + + """need to implement from mytoken and check""" + self.get_token_from_mytoken(mytoken) return except TokenError: @@ -154,3 +165,42 @@ def multiple_token(self, access_token: str, oidc_agent_account: str, mytoken: st self.access_token = access_token return log_and_raise("Cannot get access token", TokenError) + + def oidc_discover(self) -> dict: + """ + :param oidc_url: CheckIn URL get from payload + :return: JSON object of OIDC configuration + """ + oidc_url=self.payload["iss"] + request = requests.get(oidc_url + "/.well-known/openid-configuration") + request.raise_for_status() + self.request_json=request.json() + return self.request_json + + def token_list_vos(self): + """ + List VO memberships in EGI Check-in + :return: list of VO names + """ + + oidc_ep = self.request_json + z_user_info=oidc_ep["userinfo_endpoint"] + z_head={"Authorization": f"Bearer {self.access_token}"} + + request = requests.get( + oidc_ep["userinfo_endpoint"], + headers={"Authorization": f"Bearer {self.access_token}"}, + ) + + request.raise_for_status() + vos = set() + pattern = re.compile(self._VO_PATTERN) + for claim in request.json().get("eduperson_entitlement", []): + vo = pattern.match(claim) + if vo: + vos.add(vo.groups()[0]) + request.raise_for_status() + + return sorted(vos) + + diff --git a/fedcloudclient/auth_test.py b/fedcloudclient/auth_test.py index a12845e..13b5df8 100644 --- a/fedcloudclient/auth_test.py +++ b/fedcloudclient/auth_test.py @@ -2,23 +2,103 @@ Testing unit for auth.py """ import os +from colorama import init as colorama_init +from colorama import Fore +from colorama import Style import fedcloudclient.auth as auth +from fedcloudclient.conf import CONF as CONF +VO_PATTERN = "urn:mace:egi.eu:group:(.+?):(.+:)*role=member#aai.egi.eu" -def get_token_from_mytoken_decode_verify(mytoken: str, user_id: str): +def verify_MYTOKEN(mytoken: str) -> str: """ Get access token from mytoken server, decode, get user ID and verify - :return: """ token = auth.OIDCToken() - token.get_token_from_mytoken(mytoken) - token_id = token.get_user_id() - assert token_id == user_id + try: + access_token_mytoken=token.get_token_from_mytoken(mytoken, None) + return access_token_mytoken + except: + return print(f"No MYTOKEN") + + +def verify_OIDC_AGENT(user_id:str) -> str: + token = auth.OIDCToken() + try: + access_token_oidc=token.get_token_from_oidc_agent(user_id) + return access_token_oidc + except: + return print(f"No OIDC_AGENT_ACCOUNT") + + + +def verify_ACCESS_TOKEN(access_token:str) -> str: + token = auth.OIDCToken() + try: + token.access_token=access_token + return token.access_token + except: + return print(f"Error with ACCESS_TOKEN") + +def verify_user_id(access_token:str) -> str: + token = auth.OIDCToken() + token.access_token=access_token + try: + user_id=token.get_user_id() + return user_id + except: + print("No user_id!") + +def verify_pyload(access_token:str) -> dict: + token = auth.OIDCToken() + token.access_token=access_token + #try: + user_id=token.get_user_id() + payload=token.payload + request_json=token.oidc_discover() + list_vos=token.token_list_vos() + return payload,request_json,list_vos + #except: + # print("No user_id!") + + +def printing_dict(var_dict:dict): + for idx, item in enumerate(var_dict): + print(f"{item}:\t {var_dict[item]}") if __name__ == "__main__": - mytoken = os.environ["FEDCLOUD_MYTOKEN"] - user_id = os.environ["FEDCLOUD_ID"] - get_token_from_mytoken_decode_verify(mytoken, user_id) + print(f"Start of verifying auth.py") + + access_token= os.environ.get("ACCESS_TOKEN","") + access_token_check=verify_ACCESS_TOKEN(access_token) + + mytoken=os.environ.get("FEDCLOUD_MYTOKEN","") + access_token_mytok=verify_MYTOKEN(mytoken) + + oidc_agent_name=os.environ.get("OIDC_AGENT_ACCOUNT","") + access_token_oidc=verify_OIDC_AGENT(oidc_agent_name) + + user_id=verify_user_id(access_token_oidc) + payload,request_json,list_vos=verify_pyload(access_token_oidc) + + + print(f"{type(payload)}") + printing_dict(payload) + print("-------------------------------------------------") + printing_dict(request_json) + print("-------------------------------------------------") + print(list_vos) + print(f"Break") + + + + + + + + + + diff --git a/fedcloudclient/checkin.py b/fedcloudclient/checkin.py index 41b03bd..435676d 100644 --- a/fedcloudclient/checkin.py +++ b/fedcloudclient/checkin.py @@ -30,6 +30,7 @@ def print_error(message, quiet): print(message, file=sys.stderr) +""" Included in auth.py, line 50""" def decode_token(oidc_access_token): """ Decoding access token to a dict @@ -44,6 +45,7 @@ def decode_token(oidc_access_token): return payload + def oidc_discover(oidc_url): """ Discover OIDC endpoints @@ -57,6 +59,7 @@ def oidc_discover(oidc_url): return request.json() +""" Included in auth.py, line 74""" def get_token_from_oidc_agent(oidc_agent_account, quiet=False): """ Get access token from oidc-agent @@ -82,6 +85,7 @@ def get_token_from_oidc_agent(oidc_agent_account, quiet=False): return None +""" Included in auth.py, line 99""" def get_token_from_mytoken_server(mytoken, mytoken_server, quiet=False): """ Get access token from mytoken server @@ -149,7 +153,7 @@ def check_token(oidc_token, verbose=False): def get_checkin_id( oidc_token, -): + ): """ Get EGI Check-in ID from access token @@ -167,8 +171,7 @@ def get_access_token( oidc_access_token, oidc_agent_account, mytoken, - mytoken_server, -): + mytoken_server,): """ Get access token Generates new access token from oidc-agent diff --git a/fedcloudclient/conf.py b/fedcloudclient/conf.py index 5f55474..5631a16 100644 --- a/fedcloudclient/conf.py +++ b/fedcloudclient/conf.py @@ -10,7 +10,7 @@ import yaml from tabulate import tabulate -from fedcloudclient.exception import ConfigError +#from fedcloudclient.exception import ConfigError DEFAULT_CONFIG_LOCATION = Path.home() / ".config/fedcloud/config.yaml" DEFAULT_SETTINGS = { @@ -137,6 +137,7 @@ def create(config_file: str): envvar="FEDCLOUD_CONFIG_FILE", show_default=True, ) + @click.option( "--output-format", "-f", @@ -144,6 +145,7 @@ def create(config_file: str): help="Output format", type=click.Choice(["text", "YAML", "JSON"], case_sensitive=False), ) + def show(config_file: str, output_format: str): """Show actual client configuration """ saved_config = load_config(config_file) diff --git a/fedcloudclient/shell.py b/fedcloudclient/shell.py index 88b2413..77ae92b 100644 --- a/fedcloudclient/shell.py +++ b/fedcloudclient/shell.py @@ -35,7 +35,7 @@ def get_shell_type(): return Shell.LINUX - +""" Imported to the sites """ def print_set_env_command(name, value): """ Print command to set environment variable, @@ -62,3 +62,14 @@ def print_comment(comment): print(f"# {comment!s}") else: print(f"rem {comment!s}") + + +out_1=Shell(1) + +print(type(out_1)) +print(Shell.LINUX) + +print(print_comment({"gewgweg": False})) +print(f"Done") + +