From 3410bbdacc2aff7afba58474a52ccc7e2b45e959 Mon Sep 17 00:00:00 2001 From: Forum Gala Date: Thu, 25 Jul 2024 10:54:16 -0700 Subject: [PATCH] single producer changes --- server/app.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/server/app.py b/server/app.py index 4719fc38..8a882c25 100644 --- a/server/app.py +++ b/server/app.py @@ -22,7 +22,7 @@ logger = logging.getLogger(__name__) - +single_producer = None settings = Settings() @@ -166,24 +166,30 @@ async def submit_batch(batch: BatchData): Returns: Response: Generic response indicating status of request """ - + global single_producer topic = get_input_topic_name(batch.job_id) - producer = AIOKafkaProducer( - bootstrap_servers=settings.kafka_bootstrap_servers, - value_serializer=lambda v: json.dumps(v).encode("utf-8"), - ) - await producer.start() - + if not single_producer: + producer = AIOKafkaProducer( + bootstrap_servers=settings.kafka_bootstrap_servers, + value_serializer=lambda v: json.dumps(v).encode("utf-8"), + ) + single_producer=producer + await single_producer.start() + + import time try: for record in batch.data: - await producer.send_and_wait(topic, value=record) + await single_producer.send_and_wait(topic, value=record) + time.sleep(.1) except UnknownTopicOrPartitionError: - await producer.stop() + # await single_producer.stop() raise HTTPException( status_code=500, detail=f"{topic=} for job {batch.job_id} not found" ) - finally: - await producer.stop() + except Exception as e: + print(f"exception entering kafka msgs {e}", flush=True) + # finally: + # await producer.stop() return Response[BatchSubmitted](data=BatchSubmitted(job_id=batch.job_id))