Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

problem: no way to cancel receive operation #876

Merged
merged 1 commit into from
May 21, 2020

Conversation

somdoron
Copy link
Member

Solution: add cancellation token to all receive operation (thread safe only, for now)

{
if (TryReceiveBytes(socket, out var bytes))
return new ValueTask<byte[]>(bytes);

// TODO: this is a hack, eventually we need kind of IO ThreadPool for thread-safe socket to wait on asynchronously
// and probably implement IValueTaskSource
return new ValueTask<byte[]>(Task.Factory.StartNew(socket.ReceiveBytes, TaskCreationOptions.LongRunning));
// TODO: should we avoid lambda here as it cause heap allocation for the environment?
return new ValueTask<byte[]>(Task.Factory.StartNew(() => socket.ReceiveBytes(cancellationToken),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drewnoakes do you think we would benefit here from de-lambda the action? using a real method with the cancellation token passed as a state object?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would avoid a heap allocation for the closure object, yes. You'd need to pass both the socket and token as state, then cast them at the other side.

I played around with this a little:

It'd be interesting to use BenchmarkDotNet to get some data on allocations for send/receive in various scenarios.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I have to wrap both the socket and the token I'm not sure I have a benefit of using state instead of lambda.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With extension method seems interesting:

https://sharplab.io/#gist:e7e76a5ece9d763149b8fb1eeed2587b

The question is, what is __ldftn is doing...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

solved it with private extension method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually casting CancellationToken as object will box, which allocates. I'm not sure there's a way around allocating here. I'd wondered about making the method async ValueTask<> but didn't test it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main thing is that these methods don't allocate when they complete synchronously. If they need to yield, ValueTask<> will allocate anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. I will simplify it back and merge it.

@somdoron somdoron requested a review from drewnoakes May 20, 2020 08:23
@@ -993,7 +995,7 @@ public bool TrySend(ref Msg msg, TimeSpan timeout, bool more)
/// </remarks>
/// <exception cref="FaultException">the Msg must already have been uninitialised</exception>
/// <exception cref="TerminatingException">The socket must not already be stopped.</exception>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to document OperationCanceledException here too, and a few other places in this file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not throwing, I only throw from non-Try methods. The Try methods return false.

{
if (TryReceiveBytes(socket, out var bytes))
return new ValueTask<byte[]>(bytes);

// TODO: this is a hack, eventually we need kind of IO ThreadPool for thread-safe socket to wait on asynchronously
// and probably implement IValueTaskSource
return new ValueTask<byte[]>(Task.Factory.StartNew(socket.ReceiveBytes, TaskCreationOptions.LongRunning));
// TODO: should we avoid lambda here as it cause heap allocation for the environment?
return new ValueTask<byte[]>(Task.Factory.StartNew(() => socket.ReceiveBytes(cancellationToken),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would avoid a heap allocation for the closure object, yes. You'd need to pass both the socket and token as state, then cast them at the other side.

I played around with this a little:

It'd be interesting to use BenchmarkDotNet to get some data on allocations for send/receive in various scenarios.

src/NetMQ/RoutingIdSocketExtensions.cs Show resolved Hide resolved
Solution: add cancellation token to all receive operation (thread safe only, for now)
@somdoron somdoron force-pushed the thread_safe branch 2 times, most recently from 83f7173 to 2658130 Compare May 21, 2020 06:50
@somdoron somdoron merged commit bad9dc7 into zeromq:master May 21, 2020
@somdoron
Copy link
Member Author

@drewnoakes next is IAsyncEnumerable, I wonder if you know of a convention to get the IAsyncEnumerable. One option is for the socket itself to implement it, however, that means we stick to one message type (byte[] probably). BlockingCollection has a method called GetConsumingEnumerable, I thought maybe to use that convention and to have:

  1. GetConsumingStringAsyncEnumerable
  2. GetConsumingBytesEnumerable

Anyway, not a big fan of it either. What do you think?

@drewnoakes
Copy link
Member

I haven't had much hands on experience with IAsyncEnumerable yet so don't have any strong opinions.

One naming idea to consider is maintaining prefix consistency to help discoverability via IntelliSense.

For example:

  • Receive
  • ReceiveAsync
  • ReceiveAsyncEnumerable

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants