Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat: Added asyncio support #3

Merged
merged 2 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,25 @@ def event_handler(msg_payload, msg_headers, inputs):
return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers
```

Memphis Functions support using Async functions through asyncio. When functions are async, set the use_async parameter to true.
```python
import json
import base64
import asyncio
from memphis import create_function

def handler(event, context): # The name of this file and this function should match the handler field in the memphis.yaml file in the following format <file name>.<function name>
return create_function(event, event_handler = event_handler, use_async = True)

async def event_handler(msg_payload, msg_headers, inputs):
payload = str(msg_payload, 'utf-8')
as_json = json.loads(payload)
as_json['modified'] = True
asyncio.sleep(1)

return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers
```

If the user would want to have a message that they would want to validate and send to the dead letter station if the validation fails then the user can raise an exception. In the following example, the field `check` is simply a boolean. The following function will send any messages which fail the `check` to the dead letter station.

```python
Expand Down
18 changes: 13 additions & 5 deletions memphis/functions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import json
import base64
import asyncio

def create_function(
event,
event_handler: callable
event_handler: callable,
use_async: bool = False
) -> None:
"""
This function creates a Memphis function and processes events with the passed-in event_handler function.
Expand All @@ -26,7 +28,8 @@ def create_function(
}
event_handler (callable):
`create_function` assumes the function signature is in the format: <event_handler>(payload, headers, inputs) -> processed_payload, processed_headers.
This function will modify the payload and headers and return them in the modified format.
This function will modify the payload and headers and return them in the modified format. This function may also be async.
If using asyncio set the create_function parameter use_async to True.

Args:
payload (bytes): The payload of the message. It will be encoded as bytes, and the user can assume UTF-8 encoding.
Expand All @@ -41,6 +44,8 @@ def create_function(
Error:
Raises an exception of any kind when something goes wrong with processing a message.
The unprocessed message and the exception will be sent to the dead-letter station.
use_async (bool):
When using an async function through asyncio, set this flag to True. This will await the event_handler call instead of calling it directly.

Returns:
handler (callable):
Expand Down Expand Up @@ -71,14 +76,17 @@ def default(self, o):
return str(base64.b64encode(o), encoding='utf-8')
return json.JSONEncoder.default(self, o)

def handler(event):
async def handler(event):
processed_events = {}
processed_events["messages"] = []
processed_events["failed_messages"] = []
for message in event["messages"]:
try:
payload = base64.b64decode(bytes(message['payload'], encoding='utf-8'))
processed_message, processed_headers = event_handler(payload, message['headers'], event["inputs"])
if use_async:
processed_message, processed_headers = await event_handler(payload, message['headers'], event["inputs"])
else:
processed_message, processed_headers = event_handler(payload, message['headers'], event["inputs"])

if isinstance(processed_message, bytes) and isinstance(processed_headers, dict):
processed_events["messages"].append({
Expand All @@ -105,4 +113,4 @@ def handler(event):
except Exception as e:
return f"Returned message types from user function are not able to be converted into JSON: {e}"

return handler(event)
return asyncio.run(handler(event))