From bc8f666ce590422833435acdce507fdad658d87b Mon Sep 17 00:00:00 2001 From: John Peters Date: Wed, 29 Nov 2023 22:24:33 -0600 Subject: [PATCH 1/2] "Added asyncio support" --- memphis/functions.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/memphis/functions.py b/memphis/functions.py index 2aa0044..4b8a4ee 100644 --- a/memphis/functions.py +++ b/memphis/functions.py @@ -1,9 +1,11 @@ import json import base64 +import asyncio def create_function( event, - event_handler: callable + event_handler: callable, + use_async = False ) -> None: """ This function creates a Memphis function and processes events with the passed-in event_handler function. @@ -71,14 +73,17 @@ def default(self, o): return str(base64.b64encode(o), encoding='utf-8') return json.JSONEncoder.default(self, o) - def handler(event): + async def handler(event): processed_events = {} processed_events["messages"] = [] processed_events["failed_messages"] = [] for message in event["messages"]: try: payload = base64.b64decode(bytes(message['payload'], encoding='utf-8')) - processed_message, processed_headers = event_handler(payload, message['headers'], event["inputs"]) + if use_async: + processed_message, processed_headers = await event_handler(payload, message['headers'], event["inputs"]) + else: + processed_message, processed_headers = event_handler(payload, message['headers'], event["inputs"]) if isinstance(processed_message, bytes) and isinstance(processed_headers, dict): processed_events["messages"].append({ @@ -105,4 +110,4 @@ def handler(event): except Exception as e: return f"Returned message types from user function are not able to be converted into JSON: {e}" - return handler(event) + return asyncio.run(handler(event)) From cef71e648f37ef7b9d037f05926a281200700066 Mon Sep 17 00:00:00 2001 From: John Peters Date: Wed, 29 Nov 2023 22:29:58 -0600 Subject: [PATCH 2/2] "added documentation" --- README.md | 19 +++++++++++++++++++ memphis/functions.py | 7 +++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index eaa54d6..125f5d4 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,25 @@ def event_handler(msg_payload, msg_headers, inputs): return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers ``` +Memphis Functions support using Async functions through asyncio. When functions are async, set the use_async parameter to true. +```python +import json +import base64 +import asyncio +from memphis import create_function + +def handler(event, context): # The name of this file and this function should match the handler field in the memphis.yaml file in the following format . + return create_function(event, event_handler = event_handler, use_async = True) + +async def event_handler(msg_payload, msg_headers, inputs): + payload = str(msg_payload, 'utf-8') + as_json = json.loads(payload) + as_json['modified'] = True + asyncio.sleep(1) + + return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers +``` + If the user would want to have a message that they would want to validate and send to the dead letter station if the validation fails then the user can raise an exception. In the following example, the field `check` is simply a boolean. The following function will send any messages which fail the `check` to the dead letter station. ```python diff --git a/memphis/functions.py b/memphis/functions.py index 4b8a4ee..8937f1c 100644 --- a/memphis/functions.py +++ b/memphis/functions.py @@ -5,7 +5,7 @@ def create_function( event, event_handler: callable, - use_async = False + use_async: bool = False ) -> None: """ This function creates a Memphis function and processes events with the passed-in event_handler function. @@ -28,7 +28,8 @@ def create_function( } event_handler (callable): `create_function` assumes the function signature is in the format: (payload, headers, inputs) -> processed_payload, processed_headers. - This function will modify the payload and headers and return them in the modified format. + This function will modify the payload and headers and return them in the modified format. This function may also be async. + If using asyncio set the create_function parameter use_async to True. Args: payload (bytes): The payload of the message. It will be encoded as bytes, and the user can assume UTF-8 encoding. @@ -43,6 +44,8 @@ def create_function( Error: Raises an exception of any kind when something goes wrong with processing a message. The unprocessed message and the exception will be sent to the dead-letter station. + use_async (bool): + When using an async function through asyncio, set this flag to True. This will await the event_handler call instead of calling it directly. Returns: handler (callable):