You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have a pending_message_monitor to process message in PEL, like this
def consume_and_ack_messages(self, group: GroupInfo):
message_processor = TaskDispatchMessageProcessor(
stream=group.stream_name,
group_name=group.group_name,
consumer=self.consumer,
name_suffix=group.name_suffix,
message_processor_role='main_message_processor',
)
logging.info('%s start consume from %s', group.group_name, group.stream_name)
response = self.consumer.read_group(
group_name=group.group_name,
consumer_name='consumer',
stream=group.stream_name,
block=10,
count=BATCH_CONSUME_COUNT,
)
if not response:
return
for _, message_list in response:
try:
message_processor.process_message_list(message_list)
except Exception as e:
logging.error('Fail to consume message %s with: %s', str(message_list), str(e))
def start(self):
self.run()
def start_pending_message_monitor(self):
if self.pending_message_monitor_thread is None:
self.pending_message_monitor_thread = threading.Thread(
target=self.pending_messages_monitor_thread_impl,
daemon=True,
)
self.pending_message_monitor_thread.start()
else:
logging.warning('pending message monitor thread is already running')
def run(self):
self._create_consumer_group()
self.start_pending_message_monitor()
while True:
for group in self.master_groups:
self.consume_and_ack_messages(group)
def monitor_pending_messages(self):
pending_info = self.consumer.xpending(self.stream, self.group_name)
pending_messages = self.consumer.xpending_range(
stream=self.stream,
group_name=self.group_name,
min=pending_info['min'],
max=pending_info['max'],
count=pending_count,
idle=60000, # 60s
)
for msg in pending_messages:
msg_id = msg['message_id']
if pending_message_list := self.consumer.xrange(
self.stream, start_id=msg_id, end_id=msg_id, count=1
):
self.message_processor.process_message_list(pending_message_list)
continue
The type of message_id from xpending_range is string,but the type of message_id from xread_group in my main consumer is bytes. when i try to ack those two two different types of messages, the string one can be ack correctly,but bytes one xack return 0. pending_message_monitor and main consumer use the same redis client.
But if we force uniform type, like this, xack will not return 0 any more.
I have a pending_message_monitor to process message in PEL, like this
The type of message_id from xpending_range is string,but the type of message_id from xread_group in my main consumer is bytes. when i try to ack those two two different types of messages, the string one can be ack correctly,but bytes one xack return 0. pending_message_monitor and main consumer use the same redis client.
But if we force uniform type, like this, xack will not return 0 any more.
Can anyone give me some suggestions,thanks
The text was updated successfully, but these errors were encountered: