Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Double firing of stream in the same transaction multiply applies function. #151

Closed
greyson opened this issue Mar 7, 2018 · 7 comments
Closed

Comments

@greyson
Copy link

greyson commented Mar 7, 2018

We have an application which collects a pool, and then submits a request based on whether that pool satisfies a threshold (external stimulus of some kind) which, of course, removes from the pool.

Unfortunately, it appears that the pool is being deducted from more than once in a transaction. I have written a test case which shows this working. Logically, I would assume that adding 10 to the pool and the resulting immediate deduction from the pool would be mergeed into a single (x = x + 10 - 10) which would result in x = 0.

The assertion in this test shows x = -10;

The submitPooledAmount stream should only be triggered once, but it appears to be triggered twice; thus applying itself twice to the pool, and is only filtered out at the end so that listen only sees one firing.

Either that or I've done something incredibly dumb here and am operating under some false assumption. I'm sorry that the test case is so dense, but it's the smallest, clearest amount of code I could cobble together to replicate the problem.


public class TestMultiFire
{
   @Test
   public void poolDoubleSubtraction()
   {
      CellSink< Integer > threshold = new CellSink<>( 10 );
      StreamSink< Integer > addPoolSink = new StreamSink<>();

      Tuple2< Stream< Integer >, Cell< Integer > > dat = Transaction.run( () ->
      {
         StreamLoop< Integer > submitPooledAmount = new StreamLoop<>();

         // Ways that the pool is modified.
         Stream< UnaryOperator< Integer > > pool_addByInput = addPoolSink
            .map( i -> x -> x + i );
         Stream< UnaryOperator< Integer > > pool_removeByUsage = submitPooledAmount
            .map( i -> x -> x - i );

         // The current level of the pool
         Cell< Integer > pool = pool_addByInput
            .merge( pool_removeByUsage, ( f, g ) -> x -> g.apply( f.apply( x ) ) )
            .accum( 0, Function::apply );

         // The current input changes combined with the pool as a stream
         Stream< Integer > input_byAdded = Stream.filterOptional( pool_addByInput
            .snapshot( pool, threshold, ( f, x, t ) -> f.apply( x ) >= t
               ? Optional.of( f.apply( x ) )
               : Optional.empty() ) );

         // Simple rising edge on pool threshold satisfaction.
         Stream< Integer > input_bySatisfaction = Stream
            .filterOptional( Operational.updates( pool )
               .snapshot( pool, threshold, ( neu, alt, t ) -> neu >= t && alt < t
                  ? Optional.of( neu )
                  : Optional.empty() ) );

         submitPooledAmount.loop( input_byAdded.merge( input_bySatisfaction, Math::max ) );

         return new Tuple2<>( submitPooledAmount, pool );
      });

      Stream< Integer > input = dat.a;
      Cell< Integer > pool = dat.b;

      LinkedList< Integer > submissions = new LinkedList<>();
      Listener listener = input.listen( submissions::add );

      // Add amount which can be immediately used based on threshold.
      // Pool should remain zero after the transaction is complete.
      addPoolSink.send( 10 );

      assertEquals( 1, submissions.size() );
      assertEquals( Integer.valueOf( 10 ), submissions.get( 0 ) );
      assertEquals( Integer.valueOf( 0 ), pool.sample() );
   }
}
@greyson
Copy link
Author

greyson commented Mar 7, 2018

Note that this particular test case can be "fixed" by changing input_bySatisfaction to

         // Simple rising edge on pool threshold satisfaction.
         Stream< Integer > input_bySatisfaction = Stream.filterOptional(
            Operational.updates( threshold )
               .snapshot( pool, (t, x) -> x >= t
                  ? Optional.of( x )
                  : Optional.empty() ) );

But still does not address why the firing of both input_bySatisfaction and input_byInput in the same transaction would not be properly merged by their maximum value.

jam40jeff added a commit that referenced this issue Mar 7, 2018
@jam40jeff
Copy link
Contributor

jam40jeff commented Mar 7, 2018

This is not a legal FRP circuit because there is a dependency cycle. In fact, the C# code throws an appropriate exception on the line:

submitPooledAmount.loop( input_byAdded.merge( input_bySatisfaction, Math::max ) );

to indicate where the dependency cycle is created. There is an issue open (#139) to implement this logic in the Java code as well.

To implement a feedback loop in Sodium, you must send the next value of the feedback loop through in a new transaction. I have added tests to the C# repository showing that the current way will break and a way to get around the dependency cycle.

@greyson
Copy link
Author

greyson commented Mar 7, 2018

Ouch. I depend heavily on at least three loops like this one; this was the only one giving me issues (as far as I am aware) however. I guess I'll have to go back through the codebase and use some strategic additions of Operational.defer()

Thank you for the reply.

@greyson greyson closed this as completed Mar 7, 2018
@greyson
Copy link
Author

greyson commented Mar 7, 2018

Is it necessary to have the listener? How would you keep that from getting garbage collected, other than passing it up from the (very) deep parts of your code?

jam40jeff added a commit that referenced this issue Mar 7, 2018
@jam40jeff
Copy link
Contributor

You're right, I should have used Operational.Defer, which would have not forced me to manage the listener manually. I have updated the test.

@ziriax
Copy link

ziriax commented Mar 8, 2018

I don't understand, the only reason CellLoop and StreamLoop exist is to allow dependency cycles no? What kind of cycles are not allowed then?

@clinuxrulz
Copy link
Contributor

clinuxrulz commented Mar 8, 2018

Generally ones where a cell or stream depends recursively on itself at the same instant in time. Sort like ones that make no sense mathematically, see below:
E.g.

CellLoop<Integer> cla = new CellLoop<>();
Cell<Integer> ca = cla.map((Integer a) -> a + 1);
cla.loop(ca);

Is completely invalid. Its like saying to someone in math class that x = x + 1, now solve for x. No solution.

Its probably helps to think of CellLoop<> and StreamLoop<> as providing forward references to things you need to use.

E.g.

StreamLoop<Integer> sla = new StreamLoop<>();
Cell<Integer> ca = sla.hold(0);
sla.loop(sEvent.snapshot(ca).map((Integer a) -> a + 1));

Is completely valid.

And

CellLoop<Integer> cla = new CellLoop<>();
cla.loop(sEvent.snapshot(cla).map((Integer a) -> a + 1));

Is completely valid. They do not go against any math laws.

Long story short. If you know how the ranks work internally. Anything that causes the rank to skyrocket to infinity is invalid.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants