Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[Python] Add initial support for asyncio #471

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions source/neuropod/bindings/neuropod_native.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,30 @@ py::dict infer(Neuropod &neuropod, py::dict &inputs_dict)
return to_numpy_dict(*outputs);
}

py::object infer_async(Neuropod &neuropod, py::dict &inputs_dict)
{
// Convert from a py::dict of numpy arrays to an unordered_map of `NeuropodTensor`s
auto allocator = neuropod.get_tensor_allocator();
NeuropodValueMap inputs = from_numpy_dict(*allocator, inputs_dict);

// Run inference
auto outputs = neuropod.infer(inputs);

// Convert the outputs to a python dict of numpy arrays
auto result = to_numpy_dict(*outputs);

// Get the event loop and create a future
// (note can't use get_running_loop because this isn't a coroutine)
py::object loop = py::module::import("asyncio").attr("get_event_loop")();
py::object f = loop.attr("create_future")();

// Set the result
// This can be called from another thread (still need to keep track of the GIL)
loop.attr("call_soon_threadsafe")(f.attr("set_result"), result);

return f;
}

py::array deserialize_tensor_binding(py::bytes buffer)
{
// Deserialize to a NeuropodTensor
Expand Down Expand Up @@ -157,6 +181,7 @@ PYBIND11_MODULE(neuropod_native, m)
const std::vector<BackendLoadSpec> &default_backend_overrides,
py::kwargs kwargs) { return make_neuropod(kwargs, path, default_backend_overrides); }))
.def("infer", &infer)
.def("infer_async", &infer_async)
.def("get_inputs", &Neuropod::get_inputs)
.def("get_outputs", &Neuropod::get_outputs)
.def("get_name", &Neuropod::get_name)
Expand Down
23 changes: 23 additions & 0 deletions source/python/neuropod/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,29 @@ def infer(self, inputs):
inputs = maybe_convert_bindings_types(inputs)
return self.model.infer(inputs)

def infer_async(self, inputs):
"""
Run inference in an asyncio compatible way.

Callers should ensure they limit the number of in-flight requests to avoid memory
growth (e.g. by using asyncio.Semaphore)

Note: this method currently blocks the current python thread. This will change
in the future

:param inputs: A dict mapping input names to values. This must match the input
spec in the neuropod config for the loaded model.
Ex: {'x1': np.array([5]), 'x2': np.array([6])}
*Note:* all the keys in this dict must be strings and all the
values must be numpy arrays

:returns: asyncio.Future containing the same result type as `infer` above
"""
inputs = maybe_convert_bindings_types(inputs)

# Pass the inputs to the native code and return a future
return self.model.infer_async(inputs)

def __enter__(self):
# Needed in order to be used as a contextmanager
return self
Expand Down
94 changes: 94 additions & 0 deletions source/python/neuropod/tests/test_async_infer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright (c) 2021 UATC, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import six
import numpy as np
import unittest
from testpath.tempdir import TemporaryDirectory

from neuropod.loader import load_neuropod
from neuropod.packagers import create_python_neuropod
from neuropod.tests.utils import get_addition_model_spec

ADDITION_MODEL_SOURCE = """
import sys

def addition_model(x, y):
return {
"out": x + y
}

def get_model(_):
return addition_model
"""


@unittest.skipIf(six.PY2, "Skipping asyncio test for Python 2")
class TestAsync(unittest.TestCase):
def package_simple_addition_model(self, test_dir, do_fail=False):
neuropod_path = os.path.join(test_dir, "test_neuropod")
model_code_dir = os.path.join(test_dir, "model_code")
os.makedirs(model_code_dir)

with open(os.path.join(model_code_dir, "addition_model.py"), "w") as f:
f.write(ADDITION_MODEL_SOURCE)

# `create_python_neuropod` runs inference with the test data immediately
# after creating the neuropod. Raises a ValueError if the model output
# does not match the expected output.
create_python_neuropod(
neuropod_path=neuropod_path,
model_name="addition_model",
data_paths=[],
code_path_spec=[
{
"python_root": model_code_dir,
"dirs_to_package": [""], # Package everything in the python_root
}
],
entrypoint_package="addition_model",
entrypoint="get_model",
# Get the input/output spec along with test data
**get_addition_model_spec(do_fail=do_fail)
)

return neuropod_path

def test_async_inference(self):
# Get an event loop
import asyncio

loop = asyncio.get_event_loop()

with TemporaryDirectory() as test_dir:
# Package a model
path = self.package_simple_addition_model(test_dir)

# Sample input data
input_data = {
"x": np.array([0.5], dtype=np.float32),
"y": np.array([1.5], dtype=np.float32),
}

with load_neuropod(path) as model:
# Async infer
result = loop.run_until_complete(model.infer_async(input_data))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close loop? Or use "run" that cares about it (I see it has "debug" that maybe good for test) https://docs.python.org/3/library/asyncio-task.html#running-an-asyncio-program


# Ensure the output is what we expect
self.assertEqual(result["out"][0], 2.0)


if __name__ == "__main__":
unittest.main()