Skip to content

Commit 89e7081

Browse files
authored
Merge pull request #544 from splitio/kerberos-sessions-pr
Used four sessions per split host and reconnect when timing out
2 parents 2aa5ad1 + d187e8c commit 89e7081

File tree

2 files changed

+382
-124
lines changed

2 files changed

+382
-124
lines changed

splitio/api/client.py

+136-57
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,23 @@ def _get_headers(self, extra_headers, sdk_key):
119119
headers.update(extra_headers)
120120
return headers
121121

122+
def _record_telemetry(self, status_code, elapsed):
123+
"""
124+
Record Telemetry info
125+
126+
:param status_code: http request status code
127+
:type status_code: int
128+
129+
:param elapsed: response time elapsed.
130+
:type status_code: int
131+
"""
132+
self._telemetry_runtime_producer.record_sync_latency(self._metric_name, elapsed)
133+
if 200 <= status_code < 300:
134+
self._telemetry_runtime_producer.record_successful_sync(self._metric_name, get_current_epoch_time_ms())
135+
return
136+
137+
self._telemetry_runtime_producer.record_sync_error(self._metric_name, status_code)
138+
122139
class HttpClient(HttpClientBase):
123140
"""HttpClient wrapper."""
124141

@@ -140,7 +157,6 @@ def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None, t
140157
_LOGGER.debug("Initializing httpclient")
141158
self._timeout = timeout/1000 if timeout else None # Convert ms to seconds.
142159
self._urls = _construct_urls(sdk_url, events_url, auth_url, telemetry_url)
143-
self._lock = threading.RLock()
144160

145161
def get(self, server, path, sdk_key, query=None, extra_headers=None): # pylint: disable=too-many-arguments
146162
"""
@@ -208,23 +224,6 @@ def post(self, server, path, sdk_key, body, query=None, extra_headers=None): #
208224
except Exception as exc: # pylint: disable=broad-except
209225
raise HttpClientException(_EXC_MSG.format(source='request')) from exc
210226

211-
def _record_telemetry(self, status_code, elapsed):
212-
"""
213-
Record Telemetry info
214-
215-
:param status_code: http request status code
216-
:type status_code: int
217-
218-
:param elapsed: response time elapsed.
219-
:type status_code: int
220-
"""
221-
self._telemetry_runtime_producer.record_sync_latency(self._metric_name, elapsed)
222-
if 200 <= status_code < 300:
223-
self._telemetry_runtime_producer.record_successful_sync(self._metric_name, get_current_epoch_time_ms())
224-
return
225-
226-
self._telemetry_runtime_producer.record_sync_error(self._metric_name, status_code)
227-
228227
class HttpClientAsync(HttpClientBase):
229228
"""HttpClientAsync wrapper."""
230229

@@ -350,7 +349,7 @@ async def close_session(self):
350349
if not self._session.closed:
351350
await self._session.close()
352351

353-
class HttpClientKerberos(HttpClient):
352+
class HttpClientKerberos(HttpClientBase):
354353
"""HttpClient wrapper."""
355354

356355
def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None, telemetry_url=None, authentication_scheme=None, authentication_params=None):
@@ -367,11 +366,22 @@ def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None, t
367366
:type auth_url: str
368367
:param telemetry_url: Optional alternative telemetry URL.
369368
:type telemetry_url: str
369+
:param authentication_scheme: Optional authentication scheme to use.
370+
:type authentication_scheme: splitio.client.config.AuthenticateScheme
371+
:param authentication_params: Optional authentication username and password to use.
372+
:type authentication_params: [str, str]
370373
"""
371374
_LOGGER.debug("Initializing httpclient for Kerberos auth")
372-
HttpClient.__init__(self, timeout=timeout, sdk_url=sdk_url, events_url=events_url, auth_url=auth_url, telemetry_url=telemetry_url)
375+
self._timeout = timeout/1000 if timeout else None # Convert ms to seconds.
376+
self._urls = _construct_urls(sdk_url, events_url, auth_url, telemetry_url)
373377
self._authentication_scheme = authentication_scheme
374378
self._authentication_params = authentication_params
379+
self._lock = threading.RLock()
380+
self._sessions = {'sdk': requests.Session(),
381+
'events': requests.Session(),
382+
'auth': requests.Session(),
383+
'telemetry': requests.Session()}
384+
self._set_authentication()
375385

376386
def get(self, server, path, sdk_key, query=None, extra_headers=None): # pylint: disable=too-many-arguments
377387
"""
@@ -392,21 +402,49 @@ def get(self, server, path, sdk_key, query=None, extra_headers=None): # pylint:
392402
"""
393403
with self._lock:
394404
start = get_current_epoch_time_ms()
395-
with requests.Session() as session:
396-
self._set_authentication(session)
405+
try:
406+
return self._do_get(server, path, sdk_key, query, extra_headers, start)
407+
408+
except requests.exceptions.ProxyError as exc:
409+
_LOGGER.debug("Proxy Exception caught, resetting the http session")
410+
self._sessions[server].close()
411+
self._sessions[server] = requests.Session()
412+
self._set_authentication(server_name=server)
397413
try:
398-
response = session.get(
399-
_build_url(server, path, self._urls),
400-
headers=self._get_headers(extra_headers, sdk_key),
401-
params=query,
402-
timeout=self._timeout
403-
)
404-
self._record_telemetry(response.status_code, get_current_epoch_time_ms() - start)
405-
return HttpResponse(response.status_code, response.text, response.headers)
406-
407-
except Exception as exc: # pylint: disable=broad-except
414+
return self._do_get(server, path, sdk_key, query, extra_headers, start)
415+
416+
except Exception as exc:
408417
raise HttpClientException(_EXC_MSG.format(source='request')) from exc
409418

419+
except Exception as exc: # pylint: disable=broad-except
420+
raise HttpClientException(_EXC_MSG.format(source='request')) from exc
421+
422+
def _do_get(self, server, path, sdk_key, query, extra_headers, start):
423+
"""
424+
Issue a get request.
425+
:param server: Whether the request is for SDK server, Events server or Auth server.
426+
:typee server: str
427+
:param path: path to append to the host url.
428+
:type path: str
429+
:param sdk_key: sdk key.
430+
:type sdk_key: str
431+
:param query: Query string passed as dictionary.
432+
:type query: dict
433+
:param extra_headers: key/value pairs of possible extra headers.
434+
:type extra_headers: dict
435+
436+
:return: Tuple of status_code & response text
437+
:rtype: HttpResponse
438+
"""
439+
with self._sessions[server].get(
440+
_build_url(server, path, self._urls),
441+
headers=self._get_headers(extra_headers, sdk_key),
442+
params=query,
443+
timeout=self._timeout
444+
) as response:
445+
self._record_telemetry(response.status_code, get_current_epoch_time_ms() - start)
446+
return HttpResponse(response.status_code, response.text, response.headers)
447+
410448
def post(self, server, path, sdk_key, body, query=None, extra_headers=None): # pylint: disable=too-many-arguments
411449
"""
412450
Issue a POST request.
@@ -429,31 +467,72 @@ def post(self, server, path, sdk_key, body, query=None, extra_headers=None): #
429467
"""
430468
with self._lock:
431469
start = get_current_epoch_time_ms()
432-
with requests.Session() as session:
433-
self._set_authentication(session)
470+
try:
471+
return self._do_post(server, path, sdk_key, query, extra_headers, body, start)
472+
473+
except requests.exceptions.ProxyError as exc:
474+
_LOGGER.debug("Proxy Exception caught, resetting the http session")
475+
self._sessions[server].close()
476+
self._sessions[server] = requests.Session()
477+
self._set_authentication(server_name=server)
434478
try:
435-
response = session.post(
436-
_build_url(server, path, self._urls),
437-
params=query,
438-
headers=self._get_headers(extra_headers, sdk_key),
439-
json=body,
440-
timeout=self._timeout,
441-
)
442-
self._record_telemetry(response.status_code, get_current_epoch_time_ms() - start)
443-
return HttpResponse(response.status_code, response.text, response.headers)
444-
except Exception as exc: # pylint: disable=broad-except
479+
return self._do_post(server, path, sdk_key, query, extra_headers, body, start)
480+
481+
except Exception as exc:
445482
raise HttpClientException(_EXC_MSG.format(source='request')) from exc
446483

447-
def _set_authentication(self, session):
448-
if self._authentication_scheme == AuthenticateScheme.KERBEROS_SPNEGO:
449-
_LOGGER.debug("Using Kerberos Spnego Authentication")
450-
if self._authentication_params != [None, None]:
451-
session.auth = HTTPKerberosAuth(principal=self._authentication_params[0], password=self._authentication_params[1], mutual_authentication=OPTIONAL)
452-
else:
453-
session.auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
454-
elif self._authentication_scheme == AuthenticateScheme.KERBEROS_PROXY:
455-
_LOGGER.debug("Using Kerberos Proxy Authentication")
456-
if self._authentication_params != [None, None]:
457-
session.mount('https://', HTTPAdapterWithProxyKerberosAuth(principal=self._authentication_params[0], password=self._authentication_params[1]))
458-
else:
459-
session.mount('https://', HTTPAdapterWithProxyKerberosAuth())
484+
except Exception as exc: # pylint: disable=broad-except
485+
raise HttpClientException(_EXC_MSG.format(source='request')) from exc
486+
487+
def _do_post(self, server, path, sdk_key, query, extra_headers, body, start):
488+
"""
489+
Issue a POST request.
490+
491+
:param server: Whether the request is for SDK server or Events server.
492+
:typee server: str
493+
:param path: path to append to the host url.
494+
:type path: str
495+
:param sdk_key: sdk key.
496+
:type sdk_key: str
497+
:param body: body sent in the request.
498+
:type body: str
499+
:param query: Query string passed as dictionary.
500+
:type query: dict
501+
:param extra_headers: key/value pairs of possible extra headers.
502+
:type extra_headers: dict
503+
504+
:return: Tuple of status_code & response text
505+
:rtype: HttpResponse
506+
"""
507+
with self._sessions[server].post(
508+
_build_url(server, path, self._urls),
509+
params=query,
510+
headers=self._get_headers(extra_headers, sdk_key),
511+
json=body,
512+
timeout=self._timeout,
513+
) as response:
514+
self._record_telemetry(response.status_code, get_current_epoch_time_ms() - start)
515+
return HttpResponse(response.status_code, response.text, response.headers)
516+
517+
def _set_authentication(self, server_name=None):
518+
"""
519+
Set the authentication for all self._sessions variables based on authentication scheme.
520+
521+
:param server: If set, will only add the auth for its session variable, otherwise will set all sessions.
522+
:typee server: str
523+
"""
524+
for server in ['sdk', 'events', 'auth', 'telemetry']:
525+
if server_name is not None and server_name != server:
526+
continue
527+
if self._authentication_scheme == AuthenticateScheme.KERBEROS_SPNEGO:
528+
_LOGGER.debug("Using Kerberos Spnego Authentication")
529+
if self._authentication_params != [None, None]:
530+
self._sessions[server].auth = HTTPKerberosAuth(principal=self._authentication_params[0], password=self._authentication_params[1], mutual_authentication=OPTIONAL)
531+
else:
532+
self._sessions[server].auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
533+
elif self._authentication_scheme == AuthenticateScheme.KERBEROS_PROXY:
534+
_LOGGER.debug("Using Kerberos Proxy Authentication")
535+
if self._authentication_params != [None, None]:
536+
self._sessions[server].mount('https://', HTTPAdapterWithProxyKerberosAuth(principal=self._authentication_params[0], password=self._authentication_params[1]))
537+
else:
538+
self._sessions[server].mount('https://', HTTPAdapterWithProxyKerberosAuth())

0 commit comments

Comments
 (0)