From 72e9ddebb1632c9c27e305810455e536f00dda5e Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Tue, 16 Feb 2021 02:11:01 -0800 Subject: [PATCH] Use hack to enqueue frames to write before writing Tornado has an internal queue that uses to hold frames before writing them. To avoid needing to track and wait on various `Future`s and the amount of data sent, we can just enqueue all of the frames we want to send before a send even happens and then start the write. This way Tornado already has all of the data we plan to send once it starts working. In the meantime, we are able to carry on with other tasks while this gets handled in the background. --- distributed/comm/tcp.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 92cc698b3ca..855490b39bb 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -256,17 +256,13 @@ async def write(self, msg, serializers=None, on_error="message"): frames = [b"".join(frames)] try: + # hack to enque all frames for writing for each_frame in frames: - # Can't wait for the write() Future as it may be lost - # ("If write is called again before that Future has resolved, - # the previous future will be orphaned and will never resolve") - each_frame_nbytes = nbytes(each_frame) - if each_frame_nbytes: - future = stream.write(each_frame) - bytes_since_last_yield += each_frame_nbytes - if bytes_since_last_yield > 32e6: - await future - bytes_since_last_yield = 0 + stream._write_buffer.append(each_frame) + stream._total_write_index += frames_nbytes + + # start writing frames + stream.write(b"") except StreamClosedError as e: self.stream = None self._closed = True