-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathauthenticator_async.py
142 lines (116 loc) · 5.08 KB
/
authenticator_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from __future__ import annotations
import json
import logging
from datetime import datetime, timedelta, timezone
import aiofiles
from aiohttp import ClientError
from .api_async import ApiAsync
from .authenticator_common import (
Authentication,
AuthenticationState,
AuthenticatorCommon,
ValidationResult,
from_authentication_json,
to_authentication_json,
)
from .exceptions import AugustApiAIOHTTPError
_LOGGER = logging.getLogger(__name__)
class AuthenticatorAsync(AuthenticatorCommon):
"""Class to manage authentication with the August API."""
_api: ApiAsync
async def _read_access_token_file(
self,
access_token_cache_file: str,
file: aiofiles.threadpool.binary.AsyncBufferedIOBase,
) -> None:
contents = await file.read()
self._authentication = from_authentication_json(json.loads(contents))
# If token is to expire within 7 days then print a warning.
if self._authentication.is_expired():
_LOGGER.error("Token has expired.")
self._authentication = Authentication(
AuthenticationState.REQUIRES_AUTHENTICATION,
install_id=self._install_id,
)
# If token is not expired but less then 7 days before it
# will.
elif (
self._authentication.parsed_expiration_time() - datetime.now(timezone.utc)
) < timedelta(days=7):
exp_time = self._authentication.access_token_expires
_LOGGER.warning(
"API Token is going to expire at %s "
"hours. Deleting file %s will result "
"in a new token being requested next"
" time",
exp_time,
access_token_cache_file,
)
async def async_setup_authentication(self) -> None:
if access_token_cache_file := self._access_token_cache_file:
try:
async with aiofiles.open(access_token_cache_file, "r") as file:
await self._read_access_token_file(access_token_cache_file, file)
return
except FileNotFoundError:
_LOGGER.debug("Cache file not found: %s", access_token_cache_file)
except json.decoder.JSONDecodeError as error:
_LOGGER.error(
"Unable to read cache file (%s): %s",
access_token_cache_file,
error,
)
self._authentication = Authentication(
AuthenticationState.REQUIRES_AUTHENTICATION, install_id=self._install_id
)
async def async_authenticate(self) -> Authentication:
if self._authentication.state is AuthenticationState.AUTHENTICATED:
return self._authentication
identifier = self._login_method + ":" + self._username
install_id = self._authentication.install_id
response = await self._api.async_get_session(
install_id, identifier, self._password
)
json_dict = await response.json()
authentication = self._authentication_from_session_response(
install_id, response.headers, json_dict
)
if authentication.state is AuthenticationState.AUTHENTICATED:
await self._async_cache_authentication(authentication)
return authentication
async def async_validate_verification_code(
self, verification_code: str
) -> ValidationResult:
if not verification_code:
return ValidationResult.INVALID_VERIFICATION_CODE
try:
await self._api.async_validate_verification_code(
self._authentication.access_token,
self._login_method,
self._username,
verification_code,
)
except (AugustApiAIOHTTPError, ClientError):
return ValidationResult.INVALID_VERIFICATION_CODE
return ValidationResult.VALIDATED
async def async_send_verification_code(self) -> bool:
await self._api.async_send_verification_code(
self._authentication.access_token, self._login_method, self._username
)
return True
async def async_refresh_access_token(self, force=False) -> Authentication | None:
if not self.should_refresh() and not force:
return self._authentication
if self._authentication.state is not AuthenticationState.AUTHENTICATED:
_LOGGER.warning("Tried to refresh access token when not authenticated")
return self._authentication
refreshed_token = await self._api.async_refresh_access_token(
self._authentication.access_token
)
authentication = self._process_refreshed_access_token(refreshed_token)
await self._async_cache_authentication(authentication)
return authentication
async def _async_cache_authentication(self, authentication: Authentication) -> None:
if self._access_token_cache_file is not None:
async with aiofiles.open(self._access_token_cache_file, "w") as file:
await file.write(to_authentication_json(authentication))