diff --git a/README.md b/README.md index eaa54d6..125f5d4 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,25 @@ def event_handler(msg_payload, msg_headers, inputs): return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers ``` +Memphis Functions support using Async functions through asyncio. When functions are async, set the use_async parameter to true. +```python +import json +import base64 +import asyncio +from memphis import create_function + +def handler(event, context): # The name of this file and this function should match the handler field in the memphis.yaml file in the following format . + return create_function(event, event_handler = event_handler, use_async = True) + +async def event_handler(msg_payload, msg_headers, inputs): + payload = str(msg_payload, 'utf-8') + as_json = json.loads(payload) + as_json['modified'] = True + asyncio.sleep(1) + + return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers +``` + If the user would want to have a message that they would want to validate and send to the dead letter station if the validation fails then the user can raise an exception. In the following example, the field `check` is simply a boolean. The following function will send any messages which fail the `check` to the dead letter station. ```python diff --git a/memphis/functions.py b/memphis/functions.py index 2aa0044..8937f1c 100644 --- a/memphis/functions.py +++ b/memphis/functions.py @@ -1,9 +1,11 @@ import json import base64 +import asyncio def create_function( event, - event_handler: callable + event_handler: callable, + use_async: bool = False ) -> None: """ This function creates a Memphis function and processes events with the passed-in event_handler function. @@ -26,7 +28,8 @@ def create_function( } event_handler (callable): `create_function` assumes the function signature is in the format: (payload, headers, inputs) -> processed_payload, processed_headers. - This function will modify the payload and headers and return them in the modified format. + This function will modify the payload and headers and return them in the modified format. This function may also be async. + If using asyncio set the create_function parameter use_async to True. Args: payload (bytes): The payload of the message. It will be encoded as bytes, and the user can assume UTF-8 encoding. @@ -41,6 +44,8 @@ def create_function( Error: Raises an exception of any kind when something goes wrong with processing a message. The unprocessed message and the exception will be sent to the dead-letter station. + use_async (bool): + When using an async function through asyncio, set this flag to True. This will await the event_handler call instead of calling it directly. Returns: handler (callable): @@ -71,14 +76,17 @@ def default(self, o): return str(base64.b64encode(o), encoding='utf-8') return json.JSONEncoder.default(self, o) - def handler(event): + async def handler(event): processed_events = {} processed_events["messages"] = [] processed_events["failed_messages"] = [] for message in event["messages"]: try: payload = base64.b64decode(bytes(message['payload'], encoding='utf-8')) - processed_message, processed_headers = event_handler(payload, message['headers'], event["inputs"]) + if use_async: + processed_message, processed_headers = await event_handler(payload, message['headers'], event["inputs"]) + else: + processed_message, processed_headers = event_handler(payload, message['headers'], event["inputs"]) if isinstance(processed_message, bytes) and isinstance(processed_headers, dict): processed_events["messages"].append({ @@ -105,4 +113,4 @@ def handler(event): except Exception as e: return f"Returned message types from user function are not able to be converted into JSON: {e}" - return handler(event) + return asyncio.run(handler(event))