-
-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Investigate a non-class-based version #8
Comments
This would need to have a separate registry object. Here's a potential design: # Register three functions
registry = AsyncRegistry(func1, func2, func3)
# Add another function later on
registry.register(func4)
# Resolve the result:
result = await registry.resolve(func1, arg=x) That |
Since functions are no longer being directly coupled to one particular registry (in their class) I could skip the bit where the method itself is rewritten by the decorator to work magically. That's actually potential usability improvement since it's less surprising. |
Initial rough prototype: from functools import wraps
import inspect
import graphlib
import asyncio
class AsyncRegistry:
def __init__(self, *fns):
self._registry = {}
self._graph = None
for fn in fns:
self.register(fn)
def register(self, fn):
self._registry[fn.__name__] = fn
# Clear _graph cache:
self._graph = None
@property
def graph(self):
if self._graph is None:
self._graph = {
key: {
p
for p in inspect.signature(fn).parameters.keys()
if not p.startswith("_")
}
for key, fn in self._registry.items()
}
return self._graph
async def resolve(self, fn):
try:
name = fn.__name__
except AttributeError:
name = fn
return (await self.resolve_multi([name]))[name]
async def resolve_multi(self, names, results=None):
if results is None:
results = {}
# Come up with an execution plan, just for these nodes
ts = graphlib.TopologicalSorter()
to_do = set(names)
done = set()
while to_do:
item = to_do.pop()
dependencies = self.graph.get(item) or set()
ts.add(item, *dependencies)
done.add(item)
# Add any not-done dependencies to the queue
to_do.update({k for k in dependencies if k not in done})
ts.prepare()
plan = []
while ts.is_active():
node_group = ts.get_ready()
plan.append(node_group)
ts.done(*node_group)
#instance._log(
# "Resolving {} in {}>".format(names, repr(instance).split(" object at ")[0])
#)
for node_group in plan:
awaitable_names = [name for name in node_group if name in self._registry]
# instance._log(" Run {}".format(awaitable_names))
awaitables = [
self._registry[name](
**{k: v for k, v in results.items() if k in self.graph[name]},
)
for name in awaitable_names
]
awaitable_results = await asyncio.gather(*awaitables)
results.update(dict(zip(awaitable_names, awaitable_results)))
print("results:", results)
return results
def _make_fn(fn, registry):
parameters = inspect.signature(fn).parameters
@wraps(fn)
async def inner(**kwargs):
# Any parameters not provided by kwargs are resolved from registry
to_resolve = [
p
for p in parameters
# Not already provided
if p not in kwargs
# Doesn't have a default value
and parameters[p].default is inspect._empty
]
missing = [p for p in to_resolve if p not in registry]
assert (
not missing
), "The following DI parameters could not be found in the registry: {}".format(
missing
)
results = {}
results.update(kwargs)
if to_resolve:
resolved_parameters = await resolve(registry, to_resolve, results)
results.update(resolved_parameters)
return await method(
self, **{k: v for k, v in results.items() if k in parameters}
)
return inner |
Got this working in a branch. Updated documentation is here: https://github.com/simonw/asyncinject/blob/ed8fbbdef513ff8385f91cc345226439deab6515/README.md |
I'm thinking about using this with Datasette plugins, which aren't well suited to the current class-based mechanism because plugins may want to register their own additional dependency injection functions.
The text was updated successfully, but these errors were encountered: