Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Handles collections of Flows #51

Merged
merged 3 commits into from
Nov 18, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 26 additions & 28 deletions proter/src/main/scala/com/workflowfm/proter/flows/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ sealed trait Flow {
case _: NoTask => this
case _ => new Or(this, f)
}
def *(i: Int): Flow = new All((for (_ <- 1 to i) yield this.copy()): _*)
def *(i: Int): Flow = Flow.par(for (_ <- 1 to i) yield this.copy())

def +(t: Task): Flow = this + new FlowTask(t)
def >(t: Task): Flow = this > new FlowTask(t)
Expand Down Expand Up @@ -67,20 +67,34 @@ class And(val left: Flow, val right: Flow) extends Flow {
override def copy(): Flow = new And(left.copy(), right.copy())
}

class All(val elements: Flow*) extends Flow {
override def copy(): Flow = new All(elements.map(_.copy()): _*)
}

class Or(val left: Flow, val right: Flow) extends Flow {
override def copy(): Flow = new Or(left.copy(), right.copy())
}

object Flow {
def apply(): NoTask = new NoTask()

def apply(t: Task): FlowTask = new FlowTask(t)
def apply(t: Task*): Flow = Flow.seq(t.map(new FlowTask(_)))

implicit def flowOfTask(t: Task): FlowTask = new FlowTask(t)

/**
* Creates a sequence of a collection of [[Flow]]s.
*
* @see [[Then]]
* @example Flow.seq(Seq(t1,t2,t3)) = t1 > t2 > t3
* @param l The collection of [[Flow]]s
* @return A [[Flow]] that executes the given collection in sequence
*/
def seq(l: Seq[Flow]): Flow = (l.foldRight[Flow](new NoTask()) { (l, r) => new Then(l, r) })

/**
* Creates a parallel [[Flow]] from a collection of [[Flow]]s.
*
* @see [[And]]
* @example Flow.par(Seq(t1,t2,t3)) = t1 | t2 | t3
* @param l The collection of [[Flow]]s
* @return A [[Flow]] that executes the given collection in parallel
*/
def par(l: Seq[Flow]): Flow = (l.foldRight[Flow](new NoTask()) { (l, r) => new And(l, r) })
}

/**
Expand Down Expand Up @@ -169,12 +183,6 @@ class FlowSimulation(
runFlow(f.right, rightCallback)
}

case f: All =>
runFlow(
(f.elements.fold(new NoTask()) { (l, r) => new And(l, r) }),
callback((_, _) => complete(f.id))
)

case f: Or => {
val leftCallback: Callback =
callback((_, _) => (if (tasks.contains(f.right.id)) complete(f.id)))
Expand Down Expand Up @@ -236,7 +244,7 @@ class FlowLookahead(
* however the preconditions of the right branch are that which is returned by the left branch (as opposed to
* the preconditions of this `Then` node). This is because, by our definition, the right branch happens _after_
* the left branch, and hence the preconditions of the right branch is that the left branch is completed.
* 1. If the current node is an And, All, or Or node, it parses all its child nodes normally, since the child
* 1. If the current node is an And, or Or node, it parses all its child nodes normally, since the child
* branches are independent.
*
* Then, depending on the node type, it returns a function and a lookahead structure. The lookahead structure contains
Expand All @@ -253,11 +261,11 @@ class FlowLookahead(
* was already considered when parsing the right branch. For example, if you had a flow [A THEN B THEN C], we want to
* express that A is a prerequisite of B, and B is a prerequisite of C. It is unnecessary to say that A is a prerequisite
* of C since it is already a prerequisite of B.
* 1. If our current node is an And or All, we need to combine the functions of all the child nodes. If all the child nodes
* 1. If our current node is an And, we need to combine the functions of all the child nodes. If all the child nodes
* are "complete" then this node is complete, hence if any child function returns None, this function should also return
* None. Otherwise, the maximum value of the child functions is returned, since tasks that come after an And or an All
* None. Otherwise, the maximum value of the child functions is returned, since tasks that come after an And
* may only begin once all of the previous tasks have finished.
* 1. If our current node is an Or, the child node functions are also combined in a similar way to Ands and Alls, but this
* 1. If our current node is an Or, the child node functions are also combined in a similar way to Ands, but this
* time we return the minimum value of the child functions, since any task that comes after an Or may begin as soon as
* any of the branches in an Or are completed.
*
Expand Down Expand Up @@ -298,16 +306,6 @@ class FlowLookahead(
functions.map(_._2).fold(NoLookahead) { (a, b) => a and b }
)
}
case f: All => {
val functions = f.elements map (parseFlow(_, extraFunction, lookaheadStructure))
(
(m) => {
val results = functions map (_._1(m))
if (results.contains(None)) None else results.max
},
functions.map(_._2).fold(NoLookahead) { (a, b) => a and b }
)
}
case f: Or => {
val functions = Seq(
parseFlow(f.left, extraFunction, lookaheadStructure),
Expand Down