diff --git a/src/kinesis/consumer.py b/src/kinesis/consumer.py index f5441f4..fe594ae 100644 --- a/src/kinesis/consumer.py +++ b/src/kinesis/consumer.py @@ -109,70 +109,89 @@ def shutdown_shard_reader(self, shard_id): def setup_shards(self): log.debug("Describing stream") - self.stream_data = self.kinesis_client.describe_stream(StreamName=self.stream_name) - # XXX TODO: handle StreamStatus -- our stream might not be ready, or might be deleting - - setup_again = False - for shard_data in self.stream_data['StreamDescription']['Shards']: - # see if we can get a lock on this shard id - try: - shard_locked = self.state.lock_shard(shard_data['ShardId'], self.LOCK_DURATION) - except AttributeError: - # no self.state - pass - else: - if not shard_locked: - # if we currently have a shard reader running we stop it - if shard_data['ShardId'] in self.shards: - log.warn("We lost our lock on shard %s, stopping shard reader", shard_data['ShardId']) - self.shutdown_shard_reader(shard_data['ShardId']) - - # since we failed to lock the shard we just continue to the next one - continue - - # we should try to start a shard reader if the shard id specified isn't in our shards - if shard_data['ShardId'] not in self.shards: - if 'EndingSequenceNumber' in shard_data['SequenceNumberRange']: - log.debug("Shard %s closed, skipping shard reader creation...", shard_data['ShardId']) - continue + try: + self.stream_data = self.kinesis_client.describe_stream(StreamName=self.stream_name) + except ClientError as exc: + if exc.response['Error']['Code'] in RETRY_EXCEPTIONS: + # sleep for 1 second the first loop, 1 second the next, then 2, 4, 6, 8, ..., up to a max of 30 or + # until we complete a successful describe_stream call + time_to_sleep = min(( + 30, + (self.retries * 2) or 1 + )) + log.debug("Retrying get_records (#%d %ds): %s", self.retries+1, time_to_sleep, exc) - log.info("Shard reader for %s does not exist, creating...", shard_data['ShardId']) + time.sleep(time_to_sleep) + self.retries += 1 + setup_again = True + else: + log.error("Client error occurred while reading: %s", exc) + setup_again = False + except: + setup_again = False + log.exception(f'Describe Stream Error') + else: + setup_again = False + for shard_data in self.stream_data['StreamDescription']['Shards']: + # see if we can get a lock on this shard id try: - iterator_args = self.state.get_iterator_args(shard_data['ShardId']) + shard_locked = self.state.lock_shard(shard_data['ShardId'], self.LOCK_DURATION) except AttributeError: # no self.state - iterator_args = dict(ShardIteratorType='LATEST') - - log.info("%s iterator arguments: %s", shard_data['ShardId'], iterator_args) - - # get our initial iterator - shard_iter = self.kinesis_client.get_shard_iterator( - StreamName=self.stream_name, - ShardId=shard_data['ShardId'], - **iterator_args - ) - - self.shards[shard_data['ShardId']] = ShardReader( - shard_data['ShardId'], - shard_iter['ShardIterator'], - self.record_queue, - self.error_queue, - boto3_session=self.boto3_session, - sleep_time=self.reader_sleep_time, - endpoint_url=self.endpoint_url - ) - else: - log.debug( - "Checking shard reader %s process at pid %d", - shard_data['ShardId'], - self.shards[shard_data['ShardId']].process.pid - ) - - if not self.shards[shard_data['ShardId']].process.is_alive(): - self.shutdown_shard_reader(shard_data['ShardId']) - setup_again = True + pass else: - log.debug("Shard reader %s alive & well", shard_data['ShardId']) + if not shard_locked: + # if we currently have a shard reader running we stop it + if shard_data['ShardId'] in self.shards: + log.warn("We lost our lock on shard %s, stopping shard reader", shard_data['ShardId']) + self.shutdown_shard_reader(shard_data['ShardId']) + + # since we failed to lock the shard we just continue to the next one + continue + + # we should try to start a shard reader if the shard id specified isn't in our shards + if shard_data['ShardId'] not in self.shards: + if 'EndingSequenceNumber' in shard_data['SequenceNumberRange']: + log.debug("Shard %s closed, skipping shard reader creation...", shard_data['ShardId']) + continue + + log.info("Shard reader for %s does not exist, creating...", shard_data['ShardId']) + try: + iterator_args = self.state.get_iterator_args(shard_data['ShardId']) + except AttributeError: + # no self.state + iterator_args = dict(ShardIteratorType='LATEST') + + log.info("%s iterator arguments: %s", shard_data['ShardId'], iterator_args) + + # get our initial iterator + shard_iter = self.kinesis_client.get_shard_iterator( + StreamName=self.stream_name, + ShardId=shard_data['ShardId'], + **iterator_args + ) + + self.shards[shard_data['ShardId']] = ShardReader( + shard_data['ShardId'], + shard_iter['ShardIterator'], + self.record_queue, + self.error_queue, + boto3_session=self.boto3_session, + sleep_time=self.reader_sleep_time, + endpoint_url=self.endpoint_url + ) + else: + log.debug( + "Checking shard reader %s process at pid %d", + shard_data['ShardId'], + self.shards[shard_data['ShardId']].process.pid + ) + + if not self.shards[shard_data['ShardId']].process.is_alive(): + self.shutdown_shard_reader(shard_data['ShardId']) + setup_again = True + else: + log.debug("Shard reader %s alive & well", shard_data['ShardId']) if setup_again: self.setup_shards() diff --git a/src/kinesis/exceptions.py b/src/kinesis/exceptions.py index 323c5d8..721c517 100644 --- a/src/kinesis/exceptions.py +++ b/src/kinesis/exceptions.py @@ -1,5 +1,6 @@ # These are exceptions we retry RETRY_EXCEPTIONS = ( 'ProvisionedThroughputExceededException', - 'ThrottlingException' + 'ThrottlingException', + 'LimitExceededException' )