Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

leftJoin against KeyProducer should support window on both streams #690

Open
pankajroark opened this issue Sep 28, 2016 · 1 comment
Open

Comments

@pankajroark
Copy link
Contributor

Right now it supports window on only one stream.
e.g. if x.leftJoin(y, buffer) then only events of y are kept for a window of time.

This is a problem when processing is needed to be done on only a part of payload data. Let's say X is a big thrift event, which wants an upper limit applied on only one field on it.

class X {
val id: Long
val charge: Int
...
}

Let's say we want to keep track of sum of charges per id and reduce it if needed to keep the maximum charge limited. e.g. if limit is 100 and current sum is at 99 and we get an event with charge of 2 we reduce it to 1. If current sum is already 100 then we reduce charge to 0 for all subsequent events.

The efficient way to do it would be to send only charge and id to a summer to apply the limits and join back with the original stream. But since the current join is windowed on only one stream there will be a race condition and we can fail to join.

We should support joins that are windowed on both streams.

@johnynek
Copy link
Collaborator

I'm not super crazy about windows, but mostly because merging offline and online results seems like a challenge. If you have a clear implementation idea, I'm happy to hear it.

One problem with summingbird is that you have to pass the storage for such things (Buffer), because storm has no storage, nor do some other streaming systems.

That said, windows can be done by a special data type and using sumByKey with the correct Semigroup[V]. So, if we have a Window[V] that manages this, I think we can leverage the existing merging of offline and online.

I do think the race conditions in complex topologies is an issue. I think this could be solved with new APIs for sumByKey and write For instance:

// write out, and get a way to read the store (service) and an event stream of changes
def sumByKeyEff(s: Store[K, V])(implicit s: Semigroup[V]): Effect[(Service[K, V], Producer[(K, (Option[V], V)))]

def writeEff[T](s: Sink[T]): Effect[Producer[T]]

Where Effect[T] is a monad like Execution[T] or Future[T]. This will allow us to make sure that effects (writing external to the system) in a way that we can be sure that one write happens before another read.

Planning this offline seems doable pretty easily. Online there can be real challenges since ordering the reads and writes is not trivial (also at Twitter data-rates, it might not always scale easily).

# for free to subscribe to this conversation on GitHub. Already have an account? #.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants