Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Introduce job property for channels #260

Closed
elizarov opened this issue Feb 27, 2018 · 19 comments
Closed

Introduce job property for channels #260

elizarov opened this issue Feb 27, 2018 · 19 comments
Assignees

Comments

@elizarov
Copy link
Contributor

elizarov commented Feb 27, 2018

When data is being consumed by an actor there is sometimes a need to wait until all the messages are processed after a channel was closed. Currently this requires the following code patten:

val job = Job() // create parent job
val actor = actor(parent = job) { ... } 
// ... work with actor
actor.close() // close an actor
job.join() // wait for actor's completion

The proposal is to introduce ActorSendChannel<T> interface that extends SendChannel<T> interface and introduces job property that directly gives access to actor's job, so that this code can be written in a slightly more straightforward way:

val actor = actor { ... } 
// ... work with actor
actor.close() // close an actor
actor.job.join() // wait for actor's completion

It makes sense to also introduce ProducerReceiveChannel for a symmetry, albeit its usefulness is not immediately apparent at the moment.

Note, that in some older version of kotlinx.coroutines library there ware an ActorJob<T> and ProducerJob<T> interfaces that extended both SendChannel<T>/ReceiveChannel<T> and Job. It served a similar role, but mixing both channel and job methods onto a single receiver had proved harmful (see #127). This issue basically reboots this idea, but in cleaner way, by moving job into a separate property.

@fvasco
Copy link
Contributor

fvasco commented Feb 27, 2018

A tiny consideration: the Job exposes many other useful informations (context and childrens).
Is is possible to achieve the same goal introducing the flush() method into the SendChannel interface.

Error: job.await -> job.join

@elizarov
Copy link
Contributor Author

@fvasco Thanks for join correction.

flush() concept is interesting, however I find it somewhat counter-initive that it actually "waits". Usually, flush (on a socket, for example) is associated with "send data to the underlying transport", but it does not actually wait for data to get physically transferred, yet alone processed.

Also note, that we are looking for a common nomenclature between regular (object) channels and byte-channels that are work-in-progress in kotlinx-coroutines-io. There we already have a concept of flush that exists there because all byte channels are buffered and, unlike object channels, writing a few bytes does not immediately resume receiving coroutine (for performance reasons), but only stores bytes in the buffer and you must flush your byte channel to resume the coroutine on the other end of your channel.

In order for the flush concept to work on a regular SendChannel we'll need to figure out:

  • A different name (awaitProcessed?)
  • A way to define semantics for this method when SendChannel channel does not have an actor on the other side, but some arbitrary code running and receiving from it.

@fvasco
Copy link
Contributor

fvasco commented Feb 27, 2018

awaitProcessed should work well only for the special CLOSE token (how estabilish if an item was processed?), so awaitClose.

A way to define semantics for this method when SendChannel channel does not have an actor on the other side, but some arbitrary code running and receiving from it.

This looks interesting, in such case we should consider to implement nonblocking, blocking and select clause methods (close, closeAndJoin, onJoin) (see #240).

@fvasco
Copy link
Contributor

fvasco commented Feb 27, 2018

I am considering this as an opportunity to introduce a Disposable, Closeable or Cancellable interface, however dispose, close and cancel have different meanings so triangulation is difficult.

Other consideration is to introduce an (optional?) job property in SendChannel (or a dedicated interface as proposed), but maybe a bit confusing having sendChannel.close() and sendChannel.job.cancel().

@elizarov
Copy link
Contributor Author

elizarov commented Feb 27, 2018

@fvasco If we add job property to SendChannel then it is not clear what sendChannel.job.join() does if this channel is not backed by an actor. What should it wait for?

@fvasco
Copy link
Contributor

fvasco commented Feb 27, 2018

The CLOSE token is received, so the channel has not pending actions.

@elizarov
Copy link
Contributor Author

elizarov commented Feb 28, 2018

@fvasco This can be done, but it would have a different semantics with actors, since when you wait for an actor's job you wait not just until it receives the close token, but until it actually processes close token.

@fvasco
Copy link
Contributor

fvasco commented Feb 28, 2018

Not only, map/reduce functions should be rewritten considering the parent job.
The actor's goal is shareable with other use case.

broadcastChannel
    .openSubscription()
    .filter { it > 10 }
    .map { TooHigh(it) }
    .sendTo(actor)

@elizarov
Copy link
Contributor Author

@fvasco Good point. We need to cary this semantics across the whole chain of transformations. This is a strong argument in favor of adding job property to all channels.

@fvasco
Copy link
Contributor

fvasco commented Mar 1, 2018

The actor's job should support the start method, also.
I have some use cases that require lazy actors (start = CoroutineStart.LAZY).

@elizarov
Copy link
Contributor Author

elizarov commented Mar 1, 2018

@fvasco Lazy actors are supported now. You can do actor(start = CoroutineStart.LAZY) { ... }

@fvasco
Copy link
Contributor

fvasco commented Mar 1, 2018

I intend the Job.start() method

val actor = actor<Int>(start = CoroutineStart.LAZY) {
    ...
}
// other stuff
actor.job.start()

@elizarov
Copy link
Contributor Author

Another argument in favor of adding a job property to all channels was mentioned in #280. It is a performance consideration. Right now both produce and actor has to return a channel with some special behaviors, so they "wrap" the underlying channel implementation using delegation. This involves an extra dispatch on each send/recieve invocation to the resulting channel. If the channel implementations natively supported an attached job, then this delegation would not have been required.

@elizarov
Copy link
Contributor Author

Another use-case just popped up in public Slack:

kasper.kondzielski [14:48]
Hi I trying to wrap callback based api with coroutines. This api can fire multiple times so I am using channels. Now I am facing the problem that when user stops using channel my callback is still attached to the source. Is there any way to add onCancelListener to channel so I can cancel my internal callback?

If we have a job available on each channel, then we could indeed use channel.job.invokeOnCompletion { ... } to get a prompt cancellation notification.

@LouisCAD
Copy link
Contributor

Only channels created with actor and produce would have a job property? If so, it doesn't solve #341

@qwwdfsad
Copy link
Collaborator

All channels will have job property

@LouisCAD
Copy link
Contributor

So the title of this issue is outdated.
BTW, will it be only on Channel or will it be also on SendChannel and ReceiveChannel?

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Jun 6, 2018

@LouisCAD it's still unclear for us which semantic job should carry when defined both on SendChannel and ReceiveChannel (because job can be "owned" by both produce and actor builders, but they have asymmetric semantics and requirements for job.cancel <-> ch.cancel/ch.close relationship). One of the possible solutions is to expose job only on SendChannel.

If you have any considerations why job should be exposed in both interfaces, feel free to share them

@elizarov elizarov changed the title Introduce ActorSendChannel and ProducerReceiveChannel with job property Introduce job property for channels Jun 26, 2018
@qwwdfsad
Copy link
Collaborator

We decided not to introduce Job to channels for multiple reasons

  1. produce-like and actor-like builders have different semantics in terms of close and cancellation for channel and job
  2. Channel is a communication primitive which is frequently not bound to any lifecycle. E.g. it can be considered as BlockingQueue in coroutines world and queue doesn't have a lifecycle
  3. For standalone channel it's unclear which semantics job.join and job.invokeOnCompletion and their relation with channel.close and channel.cancel carry

New actors don't expose channel but expose job, so this issue is irrelevant to them.
Produce job is postponed until cold streams or demand on channel lifecycle.

# for free to join this conversation on GitHub. Already have an account? # to comment
Projects
None yet
Development

No branches or pull requests

4 participants