-
Notifications
You must be signed in to change notification settings - Fork 697
Make force_flush available on SDK's tracer provider #594
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Make force_flush available on SDK's tracer provider #594
Conversation
* add the force_flush function on the SDK's tracer provide since it is currently not easily possible to call force flush because of the span processor(s) not being accessible.
flushed_within_time = sp.force_flush() and flushed_within_time | ||
continue | ||
|
||
remaining_time_millis = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to calculate deadline = time_ns + timeout_millis * 1000000
first and then compare against that instead of repeatedly subtracting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without subtracting it might lead to an overall timeout of about 2 times the given timeout_millis
.
E.g. having 2 span processors in the MultiSpanProcessor
and the 1st one flushing for almost timeout_millis
. so the deadline wouldn't have been reached and the 2nd span processor would start flushing with a timout of timeout_millis
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the timeout provided to the force_flush calls made here, you can just use deadline - time_ns()
. I guess you could also call that "repeatedly subtracting", but that way it does not accumulate errors from reusing the results of previous subtractions.
Returns: | ||
False if the timeout is exceeded, True otherwise. | ||
""" | ||
if timeout_millis is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this if
required? can't we call force_flush(timeout_millis)
in both cases? I mean technically there can be a difference if a function forgot to specify a default argument or has a different one, but should we support that?
Note that the same functionality was recently merged for Java: open-telemetry/opentelemetry-java#1068 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the idea makes sense but we should uniform the different force_flush
interfaces. I think having an infinitive timeout by default is not the best choice, it'll better to have some reasonable. About avoiding infinite timeout I let if to you, if you think it's easier to implement in the other places, go for it!
flush_start_ns = time_ns() | ||
for sp in self._span_processors: | ||
if timeout_millis is None: | ||
flushed_within_time = sp.force_flush() and flushed_within_time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeout_millis
None
should indicate that the force_flush could block forever, but force_flush
in the span processor has a default timeout of 30 seconds.
I think we should avoid providing an option for not having a timeout or update the span processors to implement it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my initial idea was to allow individual default timeouts for each span processor (e.g. someone implementing their own span processor) in MultiSpanProcessor
but i guess that makes it just more confusing.
i'll go with adding a default (not None
) timeout
@@ -674,3 +693,18 @@ def shutdown(self): | |||
if self._atexit_handler is not None: | |||
atexit.unregister(self._atexit_handler) | |||
self._atexit_handler = None | |||
|
|||
def force_flush(self, timeout_millis: int = None) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we want to allow None, the annotation should be typing.Optional[int]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's almost good to go.
Just a comment to simplify the logic on the loop, I also think @Oberon00's suggestion about the deadline is a nice one.
flushed_within_time = ( | ||
sp.force_flush(remaining_time_millis) and flushed_within_time | ||
) | ||
|
||
return flushed_within_time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about?
flushed_within_time = ( | |
sp.force_flush(remaining_time_millis) and flushed_within_time | |
) | |
return flushed_within_time | |
if not sp.force_flush(remaining_time_millis): | |
return False | |
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I think it's important to consider some form of threading here to see if processing can be parallelized.
Also is there a specific use case you're working toward for this PR? Wondering what a real-life scenario looks like for a MultiSpanProcessor.
@@ -114,6 +114,18 @@ def shutdown(self) -> None: | |||
for sp in self._span_processors: | |||
sp.shutdown() | |||
|
|||
def force_flush(self, timeout_millis: int = 30000) -> bool: | |||
deadline_ns = time_ns() + timeout_millis * 1000000 | |||
for sp in self._span_processors: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thoughts on this approach vs something like spawning multiple threads to allow the span processors to flush concurrently?
Force flush will typically fire the exporter downstream, which in many cases is a network (e.g. DataDog, stackdriver, or jaeger). Doing this synchronously you may find the timeout is reached just for the amount of time it takes to submit to one of the exporters. Also this approach results in a strong preference to the first span processor's success over the second one.
Might be good to document if that's the intended use case. Right now there's no mention of preference in failure scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is true that threading might be helpful here. But then again
- Spawning threads makes everything more complicated.
- shutdown has the same problem as force_flush, it just lacks timeout support.
I think if we want to add the functionality you suggest, we should add some ConcurrentMultiSpanProcessor, or AsyncMultiSpanProcessor, etc. instead of making currently very simple MultiSpanProcessor more complicated. EDIT: Also, I expect the most common case will be to have just one SpanProcessor, or maybe two, one of which is a file-base / console-logging exporter. In both cases, the overhead of spawning threads would probably not be worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@toumorokoshi, it is not that i had a concrete use case regarding the MultiSpanProcessor
in mind when making this PR. It is just that you have to go through the MultiSpanProcessor
when wanting to make force_flush
available on the TracerProvider
.
I'll add some documentation to the MultiSpanProcessor
and the TracerProvider
.
About the paralellism, i'd picture a solution of either having a thread pool executor in the ...MultiSpanProcessor
or spawning new threads on every call to force_flush
(and shutdown
). Since the force_flush
on the TracerProvider
itself can be called from multiple threads i'm rather leaning towards the approach of having a thread pool executor with a continuous but fixed number of threads.
Either way, since this will introduce additional overhead i agree with @Oberon00 to implement this in a separate ...MultiSpanProcessor
class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mariojonke fair enough. I think we may have to revisit the timeout behavior, but am fine with waiting until there's a clearer example of where this type of stacked timeout has issues.
Also fine with maybe have a synchronous vs async one. Here's a tracking ticket to rename the existing one to "Synchronous": #622
* introduce ConcurrentMultiSpanProcessor which manages a thread pool executor to parallelize calls to the underling span processors. * make the active span processor in the SDK's tracer provider configurable to allow flushing spans in parallel. By default use the synchronous multi span processor. * split/move span processor tests to separate test module/file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple small comments. I think if it's a time priority issue I can approve, but a few refactors I think will help a lot in long-term maintenance.
for mock_processor in mocks: | ||
multi_processor.add_span_processor(mock_processor) | ||
|
||
multi_processor.on_start(mock.Mock(spec=trace.Span)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could use DefaultSpan if you don't need to observe the object itself.
return True | ||
|
||
|
||
class ConcurrentMultiSpanProcessor(SpanProcessor): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
woah! awesome, thank you.
""" | ||
|
||
def __init__(self, num_threads: int = 2): | ||
# use a tuple to avoid race conditions when adding a new span and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this true? IIRC appending to a list is thread safe. I'm not sure what safety the tuple brings for iteration purposes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i took this one from the (Synchronous)MultiSpanProcessor
. in think in CPython a list is thread safe due to the global interpreter lock but might not be the case for other python interpreter implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea behind the initial implementation was to avoid using locks in on_start
and on_end
as these are called for every span in the system. I based my decision using the following considerations, some of them could be wrong:
- Appending to list is not thread safe in all the implementations (it's in CPython because of the GIL).
- Variable assignment is atomic in all implementations.
I wasn't able to find any authoritative answer about that, many of the documentation I found around is CPython specific, and AFAIU the Python specification doesn't state anything about operations atomicity, so the final decision is in the Python implementation.
It'd be very nice if somebody is able to clarify all these details to understand if the current implementation is 100% safe or not.
try: | ||
timeout_sec = (deadline_ns - time_ns()) / 1e9 | ||
flushed_in_time = ( | ||
future.result(timeout_sec) and flushed_in_time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe this will have the intended effect.
Future.result will block and time out for up to timeout_sec per call. As a result, if you have 2 futures, you'll actually be blocked for up to n * timeout_sec:
- future.result(timeout_sec)
- doesn't timeout, but finishes just under timeout_sec (~ timeout_sec time has passed)
- future.result(timeout_sec) Update README.md #2
- doesn't timoeut, but finishes just under timeout_sec (~timeout_sec * 2 time has passed)
and so on.
I believe map may be a more appropriate choice here: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, I see the timeout_sec is calculated every round, I believe this will work, but you could simplify the logic significantly by just passing all the futures to a wait function: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.wait
): | ||
self._active_span_processor = MultiSpanProcessor() | ||
self._active_span_processor = ( | ||
SynchronousMultiSpanProcessor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be self_active_span_processor = active_span_processor or SynchronousMultiSpanProcessor()
, taking advantage of Python's falsy values.
@@ -727,8 +852,8 @@ def add_span_processor(self, span_processor: SpanProcessor) -> None: | |||
The span processors are invoked in the same order they are registered. | |||
""" | |||
|
|||
# no lock here because MultiSpanProcessor.add_span_processor is | |||
# thread safe | |||
# no lock here because add_span_processor is thread safe for both |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
woo! great.
mock_processor2 = mock.Mock(spec=trace.SpanProcessor) | ||
multi_processor.add_span_processor(mock_processor2) | ||
|
||
with ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test is fairly implementation dependent. That works here but is brittle to the implementation details of the code. You could test this by creating a SpanProcessor that uses time.sleep for a specified period (maybe 50ms), and then having a SpanProcessor configured with an aggressive timeout.
# let the thread executing the late_mock continue | ||
wait_event.set() | ||
|
||
self.assertFalse(flushed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this assert could move on line 290?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this could become troublesome if the assert actually fails because the ThreadPoolExecutor
is joining all its threads in an atexit
handler. So this would cause the test process to run indefinitely since noone is setting the wait_event
.
self.assertListEqual(spans_calls_list, expected_list) | ||
|
||
|
||
class TestSynchronousMultiSpanProcessor(unittest.TestCase): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we really need two test clases, oner per span processor. Seems like they should be behave in almost identical manner, except how they deal with timeouts.
Thoughts on simplifying the test suite here and consolidating the common tests to a base class?
* instead of waiting on every future and recalculating the timeout, use the existing futures.wait method. * consolidate common multi span processor tests into a separate base class
PR has changed and original review is not valid anymore.
I'm not sure about the added value of I'd expect that in a production environment the different span processors will be asynchronous, I know this new implementation was created because # multispanprocessor
def force_flush(self):
futures = []
for sp in self._span_processors:
future = sp.force_flush_async()
futures.append(future)
# wait on all futures |
@mauriciovasquezbernal that works somewhat, although one design decision we made was having timeout / retry logic live outside the exporter (which could also be argued in one direction or another). You bring up a good point around potentially introducing multiple levels of concurrency: I don't think it's the end of the world for a thread to spawn another thread needlessly, but it would be good to avoid if possible. Looking at our examples, the SpanProcessors don't spawn any threads, so I presume you're referring to the exporters that may spawn them. I don't see any of the exporters spawning a thread explicitly, although Jaeger's reliance on thrift may be doing so. |
regarding adding a new API: I'd argue that we stick to one function for flushing. One nice thing about async is that it can be turned nearly synchronous just by blocking on the method. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
@toumorokoshi the I was suggesting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome work.
Hey Mauricio, I added #783 to make sure we discuss this before GA. I'm going to merge this in, because at least we'll provide the force_flush function that is needed for the MultiSpanProcessor. |
Co-authored-by: Yusuke Tsutsumi <yusuke@tsutsumi.io>
Since it is currently not (easily) possible to call force flush on the span processor(s) add a function to SDK's tracer provider which delegates to the active span processor (
MultiSpanProcessor
).Also implement the
MultiSpanProcessor
's force flush function to sequentially call force flush on the known span processors by respecting the given timeout.