Skip to content

Commit

Permalink
Handle describe_stream errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Crowe committed Jul 30, 2024
1 parent f8a2a52 commit 1a238c8
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 60 deletions.
137 changes: 78 additions & 59 deletions src/kinesis/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,70 +109,89 @@ def shutdown_shard_reader(self, shard_id):

def setup_shards(self):
log.debug("Describing stream")
self.stream_data = self.kinesis_client.describe_stream(StreamName=self.stream_name)
# XXX TODO: handle StreamStatus -- our stream might not be ready, or might be deleting

setup_again = False
for shard_data in self.stream_data['StreamDescription']['Shards']:
# see if we can get a lock on this shard id
try:
shard_locked = self.state.lock_shard(shard_data['ShardId'], self.LOCK_DURATION)
except AttributeError:
# no self.state
pass
else:
if not shard_locked:
# if we currently have a shard reader running we stop it
if shard_data['ShardId'] in self.shards:
log.warn("We lost our lock on shard %s, stopping shard reader", shard_data['ShardId'])
self.shutdown_shard_reader(shard_data['ShardId'])

# since we failed to lock the shard we just continue to the next one
continue

# we should try to start a shard reader if the shard id specified isn't in our shards
if shard_data['ShardId'] not in self.shards:
if 'EndingSequenceNumber' in shard_data['SequenceNumberRange']:
log.debug("Shard %s closed, skipping shard reader creation...", shard_data['ShardId'])
continue
try:
self.stream_data = self.kinesis_client.describe_stream(StreamName=self.stream_name)
except ClientError as exc:
if exc.response['Error']['Code'] in RETRY_EXCEPTIONS:
# sleep for 1 second the first loop, 1 second the next, then 2, 4, 6, 8, ..., up to a max of 30 or
# until we complete a successful describe_stream call
time_to_sleep = min((
30,
(self.retries * 2) or 1
))
log.debug("Retrying get_records (#%d %ds): %s", self.retries+1, time_to_sleep, exc)

log.info("Shard reader for %s does not exist, creating...", shard_data['ShardId'])
time.sleep(time_to_sleep)
self.retries += 1
setup_again = True
else:
log.error("Client error occurred while reading: %s", exc)
setup_again = False
except:
setup_again = False
log.exception(f'Describe Stream Error')
else:
setup_again = False
for shard_data in self.stream_data['StreamDescription']['Shards']:
# see if we can get a lock on this shard id
try:
iterator_args = self.state.get_iterator_args(shard_data['ShardId'])
shard_locked = self.state.lock_shard(shard_data['ShardId'], self.LOCK_DURATION)
except AttributeError:
# no self.state
iterator_args = dict(ShardIteratorType='LATEST')

log.info("%s iterator arguments: %s", shard_data['ShardId'], iterator_args)

# get our initial iterator
shard_iter = self.kinesis_client.get_shard_iterator(
StreamName=self.stream_name,
ShardId=shard_data['ShardId'],
**iterator_args
)

self.shards[shard_data['ShardId']] = ShardReader(
shard_data['ShardId'],
shard_iter['ShardIterator'],
self.record_queue,
self.error_queue,
boto3_session=self.boto3_session,
sleep_time=self.reader_sleep_time,
endpoint_url=self.endpoint_url
)
else:
log.debug(
"Checking shard reader %s process at pid %d",
shard_data['ShardId'],
self.shards[shard_data['ShardId']].process.pid
)

if not self.shards[shard_data['ShardId']].process.is_alive():
self.shutdown_shard_reader(shard_data['ShardId'])
setup_again = True
pass
else:
log.debug("Shard reader %s alive & well", shard_data['ShardId'])
if not shard_locked:
# if we currently have a shard reader running we stop it
if shard_data['ShardId'] in self.shards:
log.warn("We lost our lock on shard %s, stopping shard reader", shard_data['ShardId'])
self.shutdown_shard_reader(shard_data['ShardId'])

# since we failed to lock the shard we just continue to the next one
continue

# we should try to start a shard reader if the shard id specified isn't in our shards
if shard_data['ShardId'] not in self.shards:
if 'EndingSequenceNumber' in shard_data['SequenceNumberRange']:
log.debug("Shard %s closed, skipping shard reader creation...", shard_data['ShardId'])
continue

log.info("Shard reader for %s does not exist, creating...", shard_data['ShardId'])
try:
iterator_args = self.state.get_iterator_args(shard_data['ShardId'])
except AttributeError:
# no self.state
iterator_args = dict(ShardIteratorType='LATEST')

log.info("%s iterator arguments: %s", shard_data['ShardId'], iterator_args)

# get our initial iterator
shard_iter = self.kinesis_client.get_shard_iterator(
StreamName=self.stream_name,
ShardId=shard_data['ShardId'],
**iterator_args
)

self.shards[shard_data['ShardId']] = ShardReader(
shard_data['ShardId'],
shard_iter['ShardIterator'],
self.record_queue,
self.error_queue,
boto3_session=self.boto3_session,
sleep_time=self.reader_sleep_time,
endpoint_url=self.endpoint_url
)
else:
log.debug(
"Checking shard reader %s process at pid %d",
shard_data['ShardId'],
self.shards[shard_data['ShardId']].process.pid
)

if not self.shards[shard_data['ShardId']].process.is_alive():
self.shutdown_shard_reader(shard_data['ShardId'])
setup_again = True
else:
log.debug("Shard reader %s alive & well", shard_data['ShardId'])

if setup_again:
self.setup_shards()
Expand Down
3 changes: 2 additions & 1 deletion src/kinesis/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# These are exceptions we retry
RETRY_EXCEPTIONS = (
'ProvisionedThroughputExceededException',
'ThrottlingException'
'ThrottlingException',
'LimitExceededException'
)

0 comments on commit 1a238c8

Please # to comment.