|
| 1 | +import { DownstreamHandler, Inlet, Outlet, Shape, SingleOutputStage } from './stage' |
| 2 | +import { range } from 'lodash' |
| 3 | +import { complexFlow, Flow, FlowShape } from './flow' |
| 4 | +import { Graph } from './graph' |
| 5 | +import { Source } from './source' |
| 6 | + |
| 7 | +export abstract class FanInShape<O> implements Shape { |
| 8 | + |
| 9 | + outputs: Outlet<O>[] |
| 10 | + inputs: Inlet<any>[] |
| 11 | + |
| 12 | + constructor(public output: Outlet<O>) { |
| 13 | + this.outputs = [output] |
| 14 | + } |
| 15 | +} |
| 16 | + |
| 17 | +export class FanInShape2<I1, I2, O> implements Shape { |
| 18 | + |
| 19 | + outputs: Outlet<O>[] |
| 20 | + inputs: Inlet<any>[] |
| 21 | + |
| 22 | + constructor(public in1: Inlet<I1>, public in2: Inlet<I2>, public output: Outlet<O>) { |
| 23 | + this.inputs = [in1, in2] |
| 24 | + this.outputs = [output] |
| 25 | + } |
| 26 | +} |
| 27 | + |
| 28 | +export class FanInShape3<I1, I2, I3, O> implements Shape { |
| 29 | + |
| 30 | + outputs: Outlet<O>[] |
| 31 | + inputs: Inlet<any>[] |
| 32 | + |
| 33 | + constructor(public in1: Inlet<I1>, |
| 34 | + public in2: Inlet<I2>, |
| 35 | + public in3: Inlet<I3>, |
| 36 | + public output: Outlet<O>) { |
| 37 | + this.inputs = [in1, in2, in3] |
| 38 | + this.outputs = [output] |
| 39 | + } |
| 40 | +} |
| 41 | + |
| 42 | +export class UniformFanInShape<I, O> implements FanInShape<O> { |
| 43 | + |
| 44 | + outputs: Outlet<any>[] |
| 45 | + |
| 46 | + constructor(public inputs: Inlet<I>[], public output: Outlet<O>) { |
| 47 | + this.outputs = [output] |
| 48 | + } |
| 49 | +} |
| 50 | + |
| 51 | +export function fanInFlow<A>(source: Source<A>, graphFactory: (size: number) => Graph<UniformFanInShape<A, A>, void>): Flow<A, A> { |
| 52 | + return complexFlow(b => { |
| 53 | + const s = b.add(source) |
| 54 | + const merge = b.add(graphFactory(2)) |
| 55 | + s.output.wire(merge.inputs[1]) |
| 56 | + return new FlowShape(merge.inputs[0], merge.output) |
| 57 | + }) |
| 58 | +} |
| 59 | + |
| 60 | +export abstract class FanInStage<O, S extends FanInShape<O>> extends SingleOutputStage<O, S> { |
| 61 | + |
| 62 | + finish(): void { |
| 63 | + this.cancel() |
| 64 | + this.complete() |
| 65 | + } |
| 66 | + |
| 67 | + onCancel(): void { |
| 68 | + this.cancel() |
| 69 | + } |
| 70 | + |
| 71 | + cancel(): void { |
| 72 | + this.shape.inputs.forEach(input => { |
| 73 | + if (!input.isClosed()) { |
| 74 | + input.cancel() |
| 75 | + } |
| 76 | + }) |
| 77 | + } |
| 78 | +} |
| 79 | + |
| 80 | +export abstract class UniformFanInStage<I, O> extends FanInStage<O, UniformFanInShape<I, O>> { |
| 81 | + |
| 82 | + shape: UniformFanInShape<I, O> |
| 83 | + |
| 84 | + constructor(protected inCount: number) { |
| 85 | + super() |
| 86 | + const inputs = range(0, inCount).map(i => new Inlet<I>(this.createDownstreamHandler(i))) |
| 87 | + this.shape = new UniformFanInShape(inputs, new Outlet<O>(this)) |
| 88 | + } |
| 89 | + |
| 90 | + protected input(i: number): Inlet<I> { |
| 91 | + return this.shape.inputs[i] |
| 92 | + } |
| 93 | + |
| 94 | + abstract createDownstreamHandler(index: number): DownstreamHandler |
| 95 | +} |
0 commit comments