Skip to content

Commit 8864e93

Browse files
committed
feat: pipe example
1 parent 801b0f9 commit 8864e93

File tree

4 files changed

+168
-50
lines changed

4 files changed

+168
-50
lines changed

package/collaboration/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
export * from "./batch";
2+
export * from "./pipe";
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import {
2+
EventEmitter,
3+
Pipe,
4+
// ChainPipeContext,
5+
ChainPipeOptions,
6+
chainPipes,
7+
PipeOptions,
8+
} from "@idealeap/gwt"; // 请替换成你的模块导入方式
9+
10+
test("Pipe", async () => {
11+
// 创建一个事件广播器实例用于测试
12+
const globalEmitter = new EventEmitter();
13+
14+
// // 创建一个全局的上下文,用于保存各个pipe的输出
15+
// const globalContext: ChainPipeContext = {
16+
// stepResults: new Map(),
17+
// emitter: globalEmitter,
18+
// };
19+
20+
// 定义一个进度跟踪函数
21+
const progressTracker: ChainPipeOptions = {
22+
onProgress: (completed, total) => {
23+
console.log(`Progress: ${completed}/${total}`);
24+
},
25+
};
26+
27+
// 第一个 Pipe: 前处理输入,使其加 1
28+
const pipe1Options: PipeOptions<number, number> = {
29+
id: "pipe1",
30+
description: "Increment by 1",
31+
preProcess: (input) => {
32+
return input + 1;
33+
},
34+
};
35+
const pipe1 = new Pipe<number, number>((input) => input * 2, pipe1Options);
36+
37+
// 第二个 Pipe: 依赖于第一个 Pipe 的结果
38+
const pipe2Options: PipeOptions<number, number> = {
39+
id: "pipe2",
40+
description: "Square the input",
41+
dependencies: ["pipe1"],
42+
postProcess: (result) => {
43+
return result * result;
44+
},
45+
};
46+
const pipe2 = new Pipe<number, number>((input) => input * 3, pipe2Options);
47+
48+
// 为 stepComplete 事件添加监听器
49+
globalEmitter.on(
50+
"stepComplete",
51+
(result: any, step: any, totalSteps: any, stepResults: any) => {
52+
console.log(
53+
`Step ${step} of ${totalSteps} completed with result: ${result}`,
54+
);
55+
console.log("All results so far: ", stepResults);
56+
},
57+
);
58+
59+
// 为 error 事件添加监听器
60+
globalEmitter.on("error", (error: any) => {
61+
console.log(`Error: ${error}`);
62+
});
63+
64+
// 开始执行管道
65+
await (async () => {
66+
try {
67+
await chainPipes([pipe1, pipe2], 1, progressTracker);
68+
} catch (error) {
69+
console.error(`Failed to chain pipes: ${String(error)}`);
70+
}
71+
})().catch((error) => {
72+
console.error(`Failed to chain pipes: ${error}`);
73+
});
74+
});
75+
76+
test("Pipe with batchDecorator", async () => {
77+
// 创建 Pipes
78+
const pipe1Options: PipeOptions<number, number> = {
79+
id: "pipe1",
80+
description: "Multiply by 2",
81+
preProcess: (input) => {
82+
console.log(`Preprocessing in pipe1: ${input}`);
83+
return input + 1;
84+
},
85+
postProcess: (result) => {
86+
console.log(`Postprocessing in pipe1: ${result}`);
87+
return result - 1;
88+
},
89+
onError: (error) => {
90+
console.log(`Error in pipe1: ${error}`);
91+
},
92+
};
93+
94+
const pipe1 = new Pipe<number, number>((input) => {
95+
return input * 2;
96+
}, pipe1Options);
97+
98+
const pipe2Options: PipeOptions<number, number> = {
99+
id: "pipe2",
100+
description: "Multiply by 3 and add dependency result",
101+
dependencies: ["pipe1"],
102+
};
103+
104+
const pipe2 = new Pipe<number, number>((input, context) => {
105+
const dependencyResult = context.stepResults.get("pipe1") || 0;
106+
return (input + dependencyResult) * 3;
107+
}, pipe2Options);
108+
109+
// 创建 EventEmitter
110+
const globalEmitter = new EventEmitter();
111+
112+
// 添加事件监听器
113+
globalEmitter.on(
114+
"stepComplete",
115+
(result: any, step: any, totalSteps: any) => {
116+
console.log(`Step ${step} of ${totalSteps} completed. Result: ${result}`);
117+
},
118+
);
119+
120+
globalEmitter.on("error", (error: any) => {
121+
console.log(`An error occurred: ${error}`);
122+
});
123+
124+
// ChainPipeOptions
125+
const chainOptions: ChainPipeOptions = {
126+
onProgress: (completed, total) => {
127+
console.log(`Progress: ${completed} of ${total} steps completed.`);
128+
},
129+
};
130+
131+
async function runPipes() {
132+
// const context: ChainPipeContext = {
133+
// stepResults: new Map(),
134+
// emitter: globalEmitter,
135+
// };
136+
137+
try {
138+
const results = await chainPipes([pipe1, pipe2], 1, chainOptions);
139+
console.log(`Final step results: `, results);
140+
} catch (error) {
141+
console.log(`Failed to run pipes: ${error as string}`);
142+
}
143+
}
144+
145+
// 执行测试
146+
await runPipes();
147+
});

package/collaboration/pipe/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from "./pipe";

package/collaboration/pipe/pipe.ts

Lines changed: 19 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
type AsyncOrSync<T> = Promise<T> | T;
1+
export type AsyncOrSync<T> = Promise<T> | T;
22

3-
// 使用一个简单的事件广播器
4-
class EventEmitter {
3+
export class EventEmitter {
54
private events: Record<string, any[]> = {};
65

76
on(event: string, listener: any) {
@@ -16,24 +15,25 @@ class EventEmitter {
1615
}
1716
}
1817

19-
interface PipeOptions<T, R> {
18+
export interface PipeOptions<T, R> {
2019
id: string;
2120
description: string;
2221
dependencies?: string[];
23-
onData?: (input: T, context: ChainPipeContext) => AsyncOrSync<T>;
22+
preProcess?: (input: T, context: ChainPipeContext) => AsyncOrSync<T>;
23+
postProcess?: (result: R, context: ChainPipeContext) => AsyncOrSync<R>;
2424
onError?: (error: any) => void;
2525
}
2626

27-
interface ChainPipeContext {
27+
export interface ChainPipeContext {
2828
stepResults: Map<string, any>;
2929
emitter: EventEmitter;
3030
}
3131

32-
interface ChainPipeOptions {
32+
export interface ChainPipeOptions {
3333
onProgress?: (completed: number, total: number) => void;
3434
}
3535

36-
class Pipe<T, R> extends EventEmitter {
36+
export class Pipe<T, R> extends EventEmitter {
3737
constructor(
3838
private callback: (input: T, context: ChainPipeContext) => AsyncOrSync<R>,
3939
public options: PipeOptions<T, R>,
@@ -56,24 +56,28 @@ class Pipe<T, R> extends EventEmitter {
5656
}
5757
}
5858

59-
const processedInput = this.options.onData
60-
? await Promise.resolve(this.options.onData(input, context))
59+
const preProcessedInput = this.options.preProcess
60+
? await Promise.resolve(this.options.preProcess(input, context))
6161
: input;
6262

6363
const result = await Promise.resolve(
64-
this.callback(processedInput, context),
64+
this.callback(preProcessedInput, context),
6565
);
6666

67-
context.stepResults.set(this.options.id, result);
67+
const postProcessedResult = this.options.postProcess
68+
? await Promise.resolve(this.options.postProcess(result, context))
69+
: result;
70+
71+
context.stepResults.set(this.options.id, postProcessedResult);
6872
context.emitter.emit(
6973
"stepComplete",
70-
result,
74+
postProcessedResult,
7175
step,
7276
totalSteps,
7377
context.stepResults,
7478
);
7579

76-
return result;
80+
return postProcessedResult;
7781
} catch (error) {
7882
this.options.onError?.(error);
7983
context.emitter.emit("error", error);
@@ -82,7 +86,7 @@ class Pipe<T, R> extends EventEmitter {
8286
}
8387
}
8488

85-
async function chainPipes(
89+
export async function chainPipes(
8690
pipes: Pipe<any, any>[],
8791
input: any,
8892
options?: ChainPipeOptions,
@@ -107,38 +111,3 @@ async function chainPipes(
107111

108112
return context.stepResults;
109113
}
110-
111-
const pipe1 = new Pipe<number, number>(
112-
(input) => {
113-
return input + 1;
114-
},
115-
{ id: "pipe1", description: "Increment" },
116-
);
117-
118-
const pipe2 = new Pipe<number, number>(
119-
(input) => {
120-
return input * 2;
121-
},
122-
{ id: "pipe2", description: "Multiply by 2" },
123-
);
124-
125-
// 订阅事件
126-
pipe1.on("stepComplete", (result: any, step: any, totalSteps: any) => {
127-
console.log(`Step ${step}/${totalSteps} completed with result ${result}`);
128-
});
129-
130-
pipe2.on("stepComplete", (result: any, step: any, totalSteps: any) => {
131-
console.log(`Step ${step}/${totalSteps} completed with result ${result}`);
132-
});
133-
134-
const run = async () => {
135-
const results = await chainPipes([pipe1, pipe2], 1, {
136-
onProgress: (completed, total) => {
137-
console.log(`${completed}/${total} steps completed.`);
138-
},
139-
});
140-
141-
console.log("All pipes completed:", results);
142-
};
143-
144-
run().catch((error) => console.error(error));

0 commit comments

Comments
 (0)