forked from kvc0/CircuitPython_async
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanaged_resource.py
executable file
·69 lines (59 loc) · 3.03 KB
/
managed_resource.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import tasko
class ManagedResource:
"""
Manages a singleton resource with your functions that initialize a resource and clean it up between uses.
This class vends access to `resource` via a fair queue. Intended use is with something like a busio.SPI
with on_acquire setting a chip select pin and on_release resetting that pin.
A ManagedResource instance should be shared among all users of `resource`.
"""
def __init__(self, resource, on_acquire=lambda *args, **kwargs: None, on_release=lambda *args, **kwargs: None, loop=tasko.get_loop()):
"""
:param resource: The resource you want to manage access to (e.g., a busio.SPI)
:param on_acquire: function(*args, **kwargs) => void acquires your singleton resource (CS pin low or something)
:param on_release: function(*args, **kwargs) => void releases your singleton resource (CS pin high or something)
"""
self._resource = resource
self._on_acquire = on_acquire
self._on_release = on_release
self._loop = loop
self._ownership_queue = []
self._owned = False
def handle(self, *args, **kwargs):
"""
returns a reusable, reentrant handle to the managed resource.
args and kwargs are passed to on_acquire and on_release functions you provided with the resource.
"""
return ManagedResource.Handle(self, args, kwargs)
async def _aenter(self, args, kwargs):
if self._owned:
# queue up for access to the resource later
await_handle, resume_fn = self._loop.suspend()
self._ownership_queue.append(resume_fn)
# This leverages the suspend() feature in tasko; this current coroutine is not considered again until
# the owning job is complete and __aexit__s below. This keeps waiting handles as cheap as possible.
await await_handle
self._owned = True
self._on_acquire(*args, **kwargs)
return self._resource
async def _aexit(self, args, kwargs):
assert self._owned, 'Exited from a context where a managed resource was not owned'
self._on_release(*args, **kwargs)
if len(self._ownership_queue) > 0:
resume_fn = self._ownership_queue.pop(0)
# Note that the awaiter has already passed the ownership check.
# By not resetting to unowned here we avoid unfair resource starvation in certain code constructs.
resume_fn()
else:
self._owned = False
class Handle:
"""
For binding resource initialization/teardown args to a resource.
"""
def __init__(self, managed_resource, args, kwargs):
self._managed_resource = managed_resource
self._args = args
self._kwargs = kwargs
async def __aenter__(self):
return await self._managed_resource._aenter(self._args, self._kwargs)
async def __aexit__(self, exc_type, exc_val, exc_tb):
return await self._managed_resource._aexit(self._args, self._kwargs)