-
Notifications
You must be signed in to change notification settings - Fork 384
refactor(azure-iot-device): Pipeline Refactor Part 1 #392
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Conversation
|
||
@pipeline_thread.runs_on_pipeline_thread | ||
def add_callback(self, callback): | ||
self.callbacks.append(callback) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
append [](start = 23, length = 6)
Something to think about: If our collection of callbacks is a list (in the abstract sense), then the call order is not obvious (do we call the first item in the list first, or do we call the last item first?). Since we're using append here, i think we call the last one first, but that's the opposite of what I would assume.
However, if we think of our collection of callbacks as a stack, then the order is much more intuitive. When we add a callback, we push it on the stack. When the operation is complete, we pop the first callback and call it, then we pop the next. This matches the order we want for callbacks (LIFO). #Closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, one of the internal changes I've been considering is to refer to this as the callback_stack
In reply to: 345985415 [](ancestors = 345985415)
"{}: Operation no longer complete! Suspending completion".format(self.name) | ||
) | ||
break | ||
callback = self.callbacks.pop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pop [](start = 42, length = 3)
ah ha! there's the pop. If only the list object had a method named push(). #Closed
break | ||
callback = self.callbacks.pop() | ||
try: | ||
callback(op=self, error=error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error=error [](start = 38, length = 11)
I like this code, but there are 2 big implications here:
- You suspend callback processing by "uncompleting" the op, which also clears the error.
- Inside a callback, you can't change the error to a different error, and you also can't change success to error.
I think these are both perfectly acceptable, and maybe even desirable. I just want to put it out there and keep it in mind. #Closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some further documentation, I think this is clearer now.
In reply to: 345991747 [](ancestors = 345991747)
def spawn_worker_op(self, worker_op_type, **kwargs): | ||
logger.debug("{}: creating worker op of type {}".format(self.name, worker_op_type.__name__)) | ||
|
||
# CT-TODO: does this need to use a weakref? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CT-TODO: does this need to use a weakref? [](start = 8, length = 43)
personal opinion: it doesn't hurt to use weak references, but why make the code more confusing? I think we only need weak references if we're passing a callback to a timer function or to an external component. All other cases are (I think) superfluous. Maybe we need a coding guideline for this in the wiki? #Closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kwargs["callback"] = on_worker_op_complete | ||
worker_op = worker_op_type(**kwargs) | ||
worker_op.add_callback(provided_callback) | ||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I like this code. You're making some big assumptions here about the nature of the pre-existing callback, and you're making for a confusing flow if there is a pre-existing callback. If I understand the code, I think completing a worker with a pre-existing callback will first call all the callbacks on the parent op, then, after all that is done, it will call the pre-existing callback. This doesn't seem useful. #Closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you are mistaken, unless I am mistaken.
What this does is, if the user provides a callback as kwarg for creating a worker op, that callback will be added to the callback_stack AFTER the callback that completes the original op. And because it's a stack, that means that this user provided callback will be called FIRST and BEFORE completing the parent op that spawned the worker. Completing the parent op is ALWAYS last.
In reply to: 345996131 [](ancestors = 345996131)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I can see the utility in this. As long as there's a UT to validate the ordering, I'm OK with this.
In reply to: 346547761 [](ancestors = 346547761,345996131)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I've double checked, there are tests for this, and as a result requirements as well.
In reply to: 347011108 [](ancestors = 347011108,346547761,345996131)
else: | ||
self.send_op_down(op) | ||
|
||
@pipeline_thread.runs_on_pipeline_thread | ||
def _on_intercepted_return(self, op, error): | ||
def _clear_timer(self, op, error): | ||
# When an op comes back, delete the timer and pass it right up. | ||
if op.timeout_timer: | ||
op.timeout_timer.cancel() | ||
op.timeout_timer = None | ||
logger.debug("{}({}): Op completed".format(self.name, op.name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.debug("{}({}): Op completed".format(self.name, op.name)) [](start = 8, length = 63)
this debug string seems like noise. I would maybe change it to "cancelling timer" and move it into the if block above. #Closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
) | ||
else: | ||
self.send_op_down(op) | ||
op.add_callback(self._do_retry_if_necessary) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_do_retry_if_necessary [](start = 33, length = 22)
This is great. I like that the name of the callback describes what it does. Huge improvement. #Closed
@@ -852,14 +728,14 @@ def do_retry(): | |||
) | |||
|
|||
# if we don't keep track of this op, it might get collected. | |||
op.uncomplete() | |||
self.ops_waiting_to_retry.append(op) | |||
op.retry_timer = Timer(self.retry_intervals[type(op)], do_retry) | |||
op.retry_timer.start() | |||
|
|||
else: | |||
if op.retry_timer: | |||
op.retry_timer = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retry_timer [](start = 19, length = 11)
possible unrelated bug: op.retry_timer.cancel() should be called. #Pending
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made the change (and also in the do_retry
callback slightly above), but it seems like there are no tests for this entire code branch. The do_retry
callback has tests that I've adjusted, but I can't seem to find them for this else block. Am I just missing something?
In reply to: 345999596 [](ancestors = 345999596)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're correct. I have a test to make sure the callback gets called but we don't have a test to make sure the timer is cancelled:
@pytest.mark.it("Calls the original callback with success when the retried op succeeds")
In truth, I'm having trouble figuring out how this code could even be hit because it means that op completes while the op is waiting to be retried (which means it must be completed already). Can you mark this with a BKTODO please? I'm reworking this code path and I need to make sure I think this through.
In reply to: 347059768 [](ancestors = 347059768,345999596)
@@ -255,6 +255,8 @@ def _on_mqtt_disconnected(self, cause=None): | |||
op = self._pending_connection_op | |||
self._pending_connection_op = None | |||
|
|||
# Swallow any errors, because we intended to disconnect - even if something went wrong, we |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
errors [](start = 26, length = 6)
You have the (almost) exact same comment 3 lines down. #Closed
callback=op.callback, | ||
), | ||
op=op, | ||
worker_op = op.spawn_worker_op( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spawn_worker_op [](start = 27, length = 15)
Just thinking out loud, does the verb "spawn" imply that this function also sends it down? It feels like it does, but I don't know why. Why is this "spawn_worker_op" instead of "create_worker_op"? What does "spawn" do that "create" doesn't do? #Closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think spawn refers to creating something that will start on a different process. Also verb wise spawn refers to multiple instances and create refers to only 1.
In reply to: 346001572 [](ancestors = 346001572)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think spawn implies that it sends it down at all. The reason I chose "spawn" over "create" is that I think it makes the relationship between the original op and the worker op clearer. The original op spawns a worker op, i.e. the worker op comes from the original op. With "create", that isn't as clear.
In reply to: 346998748 [](ancestors = 346998748,346001572)
op.uncomplete() | ||
worker_op = op.spawn_worker_op( | ||
worker_op_type=pipeline_ops_base.ReconnectOperation, | ||
callback=on_reconnect_complete, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on_reconnect_complete [](start = 33, length = 21)
You could remove this callback (it only logs), especially if this is the only example we have with a callback inside the kwargs on a spawn_worker_op call. #Closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I had considered it but felt the log might be valuable. If you think it can be removed I'll go ahead and do it though.
In reply to: 346003283 [](ancestors = 346003283)
assert op.complete.call_args == mocker.call(error=error) | ||
|
||
@pytest.mark.it( | ||
"Returns a worker operation, which, when completed, triggers the 'callback' optionally provided in the **kwargs parameter, prior to completing the operation that spawned it" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prior to completing the operation that spawned it [](start = 134, length = 50)
This doesn't actually test the order that these callbacks are called. #Closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually does, due to the assertion in the callback. However, there were some bugs in this test anyway, which I have fixed up, as well as adding some better clarity.
In reply to: 346495978 [](ancestors = 346495978)
assert len(op.callbacks) == 1 | ||
|
||
def cb3(op, error): | ||
# Callback 3 has been triggered, but no others have been. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allback 3 has been triggered, but no others have been. [](start = 19, length = 54)
Nit: Comments on these 3 callbacks are not symmetrical. First 2 comments describe pre-conditions (state before the callback is called) and last commend describes post-condition (state after the callback is called). #Closed
# the assertions in the above callbacks won't be able to directly raise AssertionErrors that will | ||
# allow for testing normally. Instead we should check the background_exception_handler to see if | ||
# any of the assertions raised errors and sent them there. | ||
assert handle_exceptions.handle_background_exception.call_count == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert handle_exceptions.handle_background_exception.call_count == 0 [](start = 12, length = 68)
This assumption may be correct but it's an assumption nonetheless. In other words, you assume the asserts in cb1, cb2, and cb3 all pass because there are no calls to handle_background_exception, but you don't verify this. If I comment out cb1_mock.side_effect = cb1
, this test would still pass without ever running the checks inside cb1. Likewise if I break the exception handling around the callback invocation, it's possible that those asserts could fail, but the test would pass. #ByDesign
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps the comment is unclear - doing an assertion raises an AssertionError
. The AssertionError
normally is able to bubble up because no Exception handling accounts for it, until it is intercepted by the testing framework, which uses that to fail the test. However, due to us catching ALL exceptions that come from a callback, the AssertionError is also sent to the background handler - like any other exception would be.
I agree that if you comment out the mock side effect lines, this test will be broken, but that's because that's a crucial line of the test. You could say that about MANY lines of tests. So long as the mock side effects ARE set up, this test works correctly.
To your second point, if the Exception handling around the callback invocation were broken within the .complete()
method, then this test would still fail, as suddenly the AssertionError
s raised by the callback would be able to propagate to the test framework and immediately cause a failure.
In reply to: 346502193 [](ancestors = 346502193)
assert handle_exceptions.handle_background_exception.call_count == 0 | ||
|
||
@pytest.mark.it( | ||
"Handles any Exceptions raised during execution of a callback by sending them to the background exception handler, and continuing on with completion" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
continuing on with completion [](start = 131, length = 29)
I'm not convinced that continuing with completion is the correct behavior in this scenario. Yes, we are reporting the exception (as best we can), but continuing with completion is also like ignoring the exception and that seems wrong. #Pending
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm open to changing the implementation, I'm not particularly tied to this logic. Open to suggestion for exactly what you'd like to see from this behaviour.
In reply to: 346503501 [](ancestors = 346503501)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think an exception in a callback becomes the op error. This is a deviation from the old behavior, but I think your callback stack makes this better.
Say we have an op that has a stack with 3 callbacks. The op completes with some status. For our example, lets say it succeeds.
Now, inside the first callback, let's say there is some exception. This means that something failed in the completion path for this op. Can we say that this op succeeded? No. At this point, the best thing to do is to catch this exception and pass it as the op error to the next callback.
In a different comment thread, I talked about ops that change from error1->error2 or success->error. This is how that might be accomplished and it feels perfectly intuitive to me.
In reply to: 347047620 [](ancestors = 347047620,346503501)
) | ||
|
||
@pytest.mark.it( | ||
"Allows any BaseExceptions raised during execution of a callback to propagate" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
propagate [](start = 80, length = 9)
Do you want to add asserts that verify that cb3_mock was called but cb1_mock was not called?
#Closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
) | ||
def test_feature_name(self, cls_type, init_kwargs): | ||
op = cls_type(**init_kwargs) | ||
assert op.feature_name == init_kwargs["feature_name"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the benefit of writing these tests by hand rather than using kwargs to parametrize them? #ByDesign
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, it allows for simpler, more readable tests. I don't mind using a function to add a series of test classes dynamically into a test module, but only when those tests are still statically laid out in a common place. When the tests themselves are dynamically created, I feel it becomes harder to follow (especially for someone not familiar with the codebase)
In reply to: 346512691 [](ancestors = 346512691)
op.callback = mocker.MagicMock() | ||
add_mock_method_waiter(op, "callback") | ||
op.callbacks.append(mocker.MagicMock()) # TODO: make this simpler | ||
# add_mock_method_waiter(op, "callback") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add_mock_method_waiter(op, "callback") [](start = 12, length = 40)
If these tests pass without this, then you've removed all consumers of this and you can remove this line completely #Closed
@@ -80,8 +80,8 @@ class LocalTestObject(StageRunOpTestBase): | |||
@pytest.fixture(params=unknown_ops) | |||
def op(self, request, mocker): | |||
op = make_mock_op_or_event(request.param) | |||
op.callback = mocker.MagicMock() | |||
add_mock_method_waiter(op, "callback") | |||
op.callbacks.append(mocker.MagicMock()) # TODO: make this simpler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: make this simpler [](start = 53, length = 25)
please take ownership of this TODOD (# CT-TODO) and/or make it easier to understand. I'm not sure how this particular line of code could be any simpler. #Closed
@@ -351,7 +233,8 @@ def __init__(self, pipeline_configuration): | |||
self.pipeline_configuration = pipeline_configuration | |||
|
|||
def run_op(self, op): | |||
op.callback = pipeline_thread.invoke_on_callback_thread_nowait(op.callback) | |||
# CT-TODO: make this more elegant | |||
op.callbacks[0] = pipeline_thread.invoke_on_callback_thread_nowait(op.callbacks[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
op.callbacks[0] [](start = 75, length = 15)
is this the top of the stack ? i think it should be , then can this be pop ? #Pending
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is, however, .pop()
removes the item from the stack, which we do not want.
I agree however that this is not ideal at all, and want to come up with something better.
In reply to: 346992405 [](ancestors = 346992405)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm...this seems to be the only way to do peek....
In reply to: 347044769 [](ancestors = 347044769,346992405)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, now that I think about it, we could pop and re-append... that said, I feel like that's maybe a little less clear about what's happening?
In reply to: 347050315 [](ancestors = 347050315,347044769,346992405)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my $.02: popping, adding the wrapper, and then pushing again seems completely clear to me.
In reply to: 347050862 [](ancestors = 347050862,347050315,347044769,346992405)
@@ -97,100 +95,91 @@ def _execute_op(self, op): | |||
) | |||
) | |||
|
|||
# make a callback that can call the user's callback after the reconnect is complete | |||
def on_reconnect_complete(reconnect_op, error): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on_reconnect_complete [](start = 16, length = 21)
should this have a complete ? #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. A worker op automatically completes its parent op.
Also this callback no longer exists.
In reply to: 347018814 [](ancestors = 347018814)
"Initializes the 'resource_location' attribute with the provided 'resource_location' parameter" | ||
) | ||
def test_resource_location(self, cls_type, init_kwargs): | ||
op = cls_type(**init_kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
op = cls_type(**init_kwargs) [](start = 8, length = 28)
nit: this line is repeated many many times, op could be a fixture, either in this class, RequestOperationTestConfig, or in some base class. If you do this, you can remove cls_type from the test parameters while you add op. Feel free to ignore. I tend to over-design. #ByDesign
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rationale here is that as the test is testing the instantiation, the instantiation should be in the test
In reply to: 347139500 [](ancestors = 347139500)
else: | ||
# Operation is now in the process of completing | ||
self.completing = True | ||
self.error = error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.error = error [](start = 12, length = 18)
not sure what I think about this. Now error is a parameter that's passed to the callback and also an attribute of the op. I think we have to pick one. If it stays as an attribute, then maybe we should remove it from the callback parameter list. (not as part of this CR) #Pending
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert mock_mqtt_transport.unsubscribe.call_count == 1 | ||
assert mock_mqtt_transport.unsubscribe.call_args[1]["topic"] == fake_sub_unsub_topic | ||
# # ------------------------------------------------------------------------- | ||
# # Copyright (c) Microsoft Corporation. All rights reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pet peeve/Nit: huge deltas like this make for painful source management (merges, rebases, git blame, etc) If you want to comment out the tests, just add an underscore to the beginning of the class/test name so pytest doesn't pick it up. #Pending
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.completed = False # Operation has been fully completed | ||
self.completing = False # Operation is in the process of completing | ||
self.error = None # Error associated with Operation completion | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something to think about: These 3 attributes (completed, completing, and error) are all part of one thing. I hesitate to use the word state, but these combine together to form the state of the op, and not all combinations are valid. For states, I think we have: not_complete, completing_with_error, completing_without_error, completed_with_error, and completed_without_error. With these 3 attributes, there's 8 possible combinations, but only 5 valid states. This worries me, especially as code grows and the possibility of internally inconsistent ops gets bigger. #Pending
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'm a little concerned about this as well - for now I feel safe with the value checks I put on each operation, but it's not an ideal situation. As I was working on this class, some of the functionality started to resemble a state machine a bit too much for my liking.
In reply to: 347141190 [](ancestors = 347141190)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PLEASE NOTE:
provisioning_pipeline.py
tests have been disabled, pending an upcoming PR from Oliva. This too, will be resolved in the Part 2 PRerror
parameter, which is no longer necessary due toerror
being stored as an attribute on an Operation)