Skip to content
This repository was archived by the owner on Jul 28, 2023. It is now read-only.

Allow websocket retries with backoff time in between #341

Merged
merged 28 commits into from
Oct 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4f1f48f
Allow retries with backoff time in between.
Sep 12, 2019
abd9f5d
Updated changelog.
Sep 12, 2019
33d33f4
Add PR # to changelog.
Sep 13, 2019
b383422
Wrap self._connect within retry loop.
Sep 13, 2019
ed2a509
Resolve merge conflicts.
Sep 24, 2019
3a89767
Refactor exception message printing.
Sep 24, 2019
2079e40
Fix return type.
Sep 24, 2019
08e3201
Merge branch 'master' into retry-websocket-backoff
Sep 24, 2019
a814010
Merge branch 'master' into retry-websocket-backoff
Sep 24, 2019
3caac9e
Return instead of break.
Sep 26, 2019
9ee8911
Remove unneeded exceptions.
Sep 26, 2019
3d6889d
Preserve original exception in handling.
Sep 26, 2019
5da4edb
Change reset counter position.
Sep 26, 2019
66990bf
Merge branch 'retry-websocket-backoff' of https://github.com/luisg5/q…
Sep 26, 2019
69756c3
Merge branch 'master' into retry-websocket-backoff
Sep 26, 2019
d29832a
Merge branch 'master' into retry-websocket-backoff
lgarc15 Sep 27, 2019
cd9e969
Merge branch 'master' into retry-websocket-backoff
Sep 30, 2019
e2b8c9b
Isolate certain expcetions to prevent retrying.
Sep 30, 2019
e643df5
Merge branch 'master' into retry-websocket-backoff
Sep 30, 2019
867ef14
Merge branch 'retry-websocket-backoff' of https://github.com/luisg5/q…
Sep 30, 2019
bfb0de7
Modify exception handling.
Sep 30, 2019
06ef611
Add InvalidURI exception for non retry.
Oct 1, 2019
78a91cf
Remove SSLError docstring.
Oct 2, 2019
61fdd9b
Resolve merge conflicts.
Oct 2, 2019
d725534
Raise WebsocketTimeoutError.
Oct 2, 2019
f96515c
Refactor WebsocketError exception handling.
Oct 2, 2019
4f458fc
Fix lint.
Oct 2, 2019
401c60c
Update docstring return type.
Oct 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ The format is based on [Keep a Changelog].
- The `backend.properties()` function now accepts an optional `datetime`
parameter. If specified, the function returns the backend properties closest
to, but older than, the specified datetime filter (\#277).
- The `WebsocketClient.get_job_status()` method now accepts two optional
parameters: `retries` and `backoff_factor`. `retries` specifies the
maximum number of retries to attempt in case a websocket connection
is closed. `backoff_factor` is used to calculate the amount of time to
wait between retries (\#341).

### Removed

Expand Down
203 changes: 134 additions & 69 deletions qiskit/providers/ibmq/api/clients/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import time
from typing import Dict, Generator, Union, Optional, Any
from concurrent import futures
from ssl import SSLError
import warnings

import nest_asyncio
from websockets import connect, ConnectionClosed
from websockets.client import Connect
from websockets.client import WebSocketClientProtocol
from websockets.exceptions import InvalidURI

from qiskit.providers.ibmq.apiconstants import ApiJobStatus, API_JOB_FINAL_STATES
from ..exceptions import (WebsocketError, WebsocketTimeoutError,
Expand Down Expand Up @@ -82,17 +84,18 @@ class WebsocketClient(BaseClient):
websocket_url (str): URL for websocket communication with IBM Q.
access_token (str): access token for IBM Q.
"""
BACKOFF_MAX = 8 # Maximum time to wait between retries.

def __init__(self, websocket_url: str, access_token: str) -> None:
self.websocket_url = websocket_url.rstrip('/')
self.access_token = access_token

@asyncio.coroutine
def _connect(self, url: str) -> Generator[Any, None, Connect]:
def _connect(self, url: str) -> Generator[Any, None, WebSocketClientProtocol]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also update the docstring for the return type (from Connect to WebSocketClientProtocol)?

"""Authenticate against the websocket server, returning the connection.

Returns:
Connect: an open websocket connection.
WebSocketClientProtocol: an open websocket connection.

Raises:
WebsocketError: if the connection to the websocket server could
Expand All @@ -109,6 +112,10 @@ def _connect(self, url: str) -> Generator[Any, None, Connect]:
warnings.filterwarnings("ignore", category=DeprecationWarning)
websocket = yield from connect(url)

# Isolate specific exceptions, so they are not retried in `get_job_status`.
except (SSLError, InvalidURI) as ex:
raise ex

# pylint: disable=broad-except
except Exception as ex:
raise WebsocketError('Could not connect to server') from ex
Expand Down Expand Up @@ -139,18 +146,36 @@ def _connect(self, url: str) -> Generator[Any, None, Connect]:
def get_job_status(
self,
job_id: str,
timeout: Optional[float] = None
timeout: Optional[float] = None,
retries: int = 5,
backoff_factor: float = 0.5
) -> Generator[Any, None, Dict[str, str]]:
"""Return the status of a job.

Reads status messages from the API, which are issued at regular
intervals. When a final state is reached, the server
closes the socket. If the websocket connection is closed without
a reason, there is an attempt to retry one time.
a reason, the exponential backoff algorithm is used as a basis to
reestablish connections. The algorithm takes effect when a
connection closes, it is given by:

1. When a connection closes, sleep for a calculated backoff
time.
2. Try to retrieve another socket and increment a retry
counter.
3. Attempt to get the job status.
- If the connection is closed, go back to step 1.
- If the job status is read successfully, reset the retry
counter.
4. Continue until the job status is complete or the maximum
number of retries is met.

Args:
job_id (str): id of the job.
timeout (float): timeout, in seconds.
retries (int): max number of retries.
backoff_factor (float): backoff factor used to calculate the
time to wait between retries.

Returns:
dict: the API response for the status of a job, as a dict that
Expand All @@ -161,77 +186,117 @@ def get_job_status(
WebsocketTimeoutError: if the timeout has been reached.
"""
url = '{}/jobs/{}/status'.format(self.websocket_url, job_id)
websocket = yield from self._connect(url)

original_timeout = timeout
start_time = time.time()
attempt_retry = True # By default, attempt to retry if the websocket connection closes.
current_retry_attempt = 0
last_status = None
websocket = None

while current_retry_attempt <= retries:
try:
websocket = yield from self._connect(url)
# Read messages from the server until the connection is closed or
# a timeout has been reached.
while True:
try:
with warnings.catch_warnings():
# Suppress websockets deprecation warnings until the fix is available
warnings.filterwarnings("ignore", category=DeprecationWarning)
if timeout:
response_raw = yield from asyncio.wait_for(
websocket.recv(), timeout=timeout)

# Decrease the timeout.
timeout = original_timeout - (time.time() - start_time)
else:
response_raw = yield from websocket.recv()
logger.debug('Received message from websocket: %s',
response_raw)

response = WebsocketMessage.from_bytes(response_raw)
last_status = response.data

# Successfully received and parsed a message, reset retry counter.
current_retry_attempt = 0

job_status = response.data.get('status')
if (job_status and
ApiJobStatus(job_status) in API_JOB_FINAL_STATES):
return last_status

if timeout and timeout <= 0:
raise WebsocketTimeoutError('Timeout reached')

except futures.TimeoutError:
# Timeout during our wait.
raise WebsocketTimeoutError('Timeout reached') from None
except ConnectionClosed as ex:
# From the API:
# 4001: closed due to an internal errors
# 4002: closed on purpose (no more updates to send)
# 4003: closed due to job not found.
message = 'Unexpected error'
if ex.code == 4001:
message = 'Internal server error'
elif ex.code == 4002:
return last_status
elif ex.code == 4003:
attempt_retry = False # No point in retrying.
message = 'Job id not found'

raise WebsocketError('Connection with websocket closed '
'unexpectedly: {}(status_code={})'
.format(message, ex.code)) from ex

except WebsocketError as ex:
logger.warning('%s', ex)

# Specific `WebsocketError` exceptions that are not worth retrying.
if isinstance(ex, (WebsocketTimeoutError, WebsocketIBMQProtocolError)):
raise ex

current_retry_attempt = current_retry_attempt + 1
if (current_retry_attempt > retries) or (not attempt_retry):
raise ex

# Sleep, and then `continue` with retrying.
backoff_time = self._backoff_time(backoff_factor, current_retry_attempt)
logger.warning('Retrying get_job_status after %s seconds: '
'Attempt #%s.', backoff_time, current_retry_attempt)
yield from asyncio.sleep(backoff_time) # Block asyncio loop for given backoff time.

continue # Continues next iteration after `finally` block.

finally:
with warnings.catch_warnings():
# Suppress websockets deprecation warnings until the fix is available
warnings.filterwarnings("ignore", category=DeprecationWarning)
if websocket is not None:
yield from websocket.close()

# Execution should not reach here, sanity check.
raise WebsocketError('Failed to establish a websocket '
'connection after {} retries.'.format(retries))

def _backoff_time(self, backoff_factor, current_retry_attempt):
"""Calculate the backoff time to sleep for.

Exponential backoff time formula:
{backoff_factor} * (2 ** (current_retry_attempt - 1))

try:
# Read messages from the server until the connection is closed or
# a timeout has been reached.
while True:
try:
with warnings.catch_warnings():
# Suppress websockets deprecation warnings until the fix is available
warnings.filterwarnings("ignore", category=DeprecationWarning)
if timeout:
response_raw = yield from asyncio.wait_for(
websocket.recv(), timeout=timeout)

# Decrease the timeout.
timeout = original_timeout - (time.time() - start_time)
else:
response_raw = yield from websocket.recv()
logger.debug('Received message from websocket: %s',
response_raw)

response = WebsocketMessage.from_bytes(response_raw)
last_status = response.data

job_status = response.data.get('status')
if (job_status and
ApiJobStatus(job_status) in API_JOB_FINAL_STATES):
break

if timeout and timeout <= 0:
raise WebsocketTimeoutError('Timeout reached')

except futures.TimeoutError:
# Timeout during our wait.
raise WebsocketTimeoutError('Timeout reached') from None
except ConnectionClosed as ex:
# From the API:
# 4001: closed due to an internal errors
# 4002: closed on purpose (no more updates to send)
# 4003: closed due to job not found.
message = 'Unexpected error'
if ex.code == 4001:
message = 'Internal server error'
elif ex.code == 4002:
break
elif ex.code == 4003:
attempt_retry = False # No point in retrying.
message = 'Job id not found'

if attempt_retry:
logger.warning('Connection with the websocket closed '
'unexpectedly: %s(status_code=%s). '
'Retrying get_job_status.', message, ex.code)
attempt_retry = False # Disallow further retries.
websocket = yield from self._connect(url)
continue

raise WebsocketError('Connection with websocket closed '
'unexpectedly: {}'.format(message)) from ex
finally:
with warnings.catch_warnings():
# Suppress websockets deprecation warnings until the fix is available
warnings.filterwarnings("ignore", category=DeprecationWarning)
yield from websocket.close()
Args:
backoff_factor (float): backoff factor, in seconds.
current_retry_attempt (int): current number of retry
attempts.

return last_status
Returns:
float: The number of seconds to sleep for, before a
retry attempt is made.
"""
backoff_time = backoff_factor * (2 ** (current_retry_attempt - 1))
return min(self.BACKOFF_MAX, backoff_time)

def _authentication_message(self) -> 'WebsocketMessage':
"""Return the message used for authenticating against the server."""
Expand Down