Skip to content

Commit

Permalink
Use hack to enqueue frames to write before writing
Browse files Browse the repository at this point in the history
Tornado has an internal queue that uses to hold frames before writing
them. To avoid needing to track and wait on various `Future`s and the
amount of data sent, we can just enqueue all of the frames we want to
send before a send even happens and then start the write. This way
Tornado already has all of the data we plan to send once it starts
working. In the meantime, we are able to carry on with other tasks while
this gets handled in the background.
  • Loading branch information
jakirkham committed Feb 16, 2021
1 parent 39f4f71 commit 72e9dde
Showing 1 changed file with 6 additions and 10 deletions.
16 changes: 6 additions & 10 deletions distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,17 +256,13 @@ async def write(self, msg, serializers=None, on_error="message"):
frames = [b"".join(frames)]

try:
# hack to enque all frames for writing
for each_frame in frames:
# Can't wait for the write() Future as it may be lost
# ("If write is called again before that Future has resolved,
# the previous future will be orphaned and will never resolve")
each_frame_nbytes = nbytes(each_frame)
if each_frame_nbytes:
future = stream.write(each_frame)
bytes_since_last_yield += each_frame_nbytes
if bytes_since_last_yield > 32e6:
await future
bytes_since_last_yield = 0
stream._write_buffer.append(each_frame)
stream._total_write_index += frames_nbytes

# start writing frames
stream.write(b"")
except StreamClosedError as e:
self.stream = None
self._closed = True
Expand Down

0 comments on commit 72e9dde

Please # to comment.