Skip to content

Commit acfe4e7

Browse files
committed
wip: completing tests
1 parent 9761519 commit acfe4e7

File tree

6 files changed

+38
-78
lines changed

6 files changed

+38
-78
lines changed

src/WorkerManager.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { WorkerFactory } from './types.js';
22
import Logger from '@matrixai/logger';
33
import { CreateDestroy, ready } from '@matrixai/async-init/CreateDestroy.js';
4-
import { WorkerTaskInformation } from './types.js';
4+
import { type WorkerTaskInput } from './types.js';
55
import * as errors from './errors.js';
66
import WorkerPool from './WorkerPool.js';
77

@@ -63,12 +63,12 @@ class WorkerManager {
6363
}
6464

6565
@ready(new errors.ErrorWorkerManagerDestroyed())
66-
public async call(task: WorkerTaskInformation): Promise<unknown> {
66+
public async call(task: WorkerTaskInput): Promise<unknown> {
6767
return await this.queue(task);
6868
}
6969

7070
@ready(new errors.ErrorWorkerManagerDestroyed())
71-
public queue(task: WorkerTaskInformation): Promise<unknown> {
71+
public queue(task: WorkerTaskInput): Promise<unknown> {
7272
return new Promise((resolve, reject) => {
7373
this.pool.runTask(task, (result, error) => {
7474
if (error != null) return reject(error);

src/WorkerPool.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class WorkerTask extends AsyncResource {
1919
this.callback = callback;
2020
}
2121

22-
public done(result, error: Error) {
22+
public done(result: unknown, error: Error) {
2323
this.runInAsyncScope(this.callback, null, result, error);
2424
this.emitDestroy();
2525
}
@@ -96,7 +96,7 @@ class WorkerPool {
9696
this.freeWorkers.push(worker);
9797
this.$workerFreed.next();
9898
};
99-
const errorHandler = (e) => {
99+
const errorHandler = (e: Error) => {
100100
workerError = e;
101101
if (worker[taskInfoSymbol]) worker[taskInfoSymbol].done(undefined, e);
102102
else this.$workerError.next(e);

src/expose.ts

-6
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,9 @@ function expose(workerManifest: WorkerManifest) {
3535
}
3636
parentPort!.postMessage({ data: result.data }, result.transferList);
3737
};
38-
const handleMessageError = (e) => {
39-
console.error('Message error!', e);
40-
// Do nothing for now
41-
};
4238
parentPort!.on('message', handleMessage);
43-
parentPort!.on('messageerror', handleMessageError);
4439
parentPort!.once('close', () => {
4540
parentPort!.off('message', handleMessage);
46-
parentPort!.off('messageerror', handleMessageError);
4741
});
4842
}
4943

src/worker.ts

+5
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ const worker = {
2828
});
2929
return { data: undefined };
3030
},
31+
transferBuffer: async (data: ArrayBuffer) => {
32+
const buffer = Buffer.from(data, 0, data.byteLength);
33+
buffer.fill(0xf);
34+
return { data, transferList: [data] };
35+
},
3136
} satisfies WorkerManifest;
3237

3338
expose(worker);

tests/WorkerManager.test.ts

+28-64
Original file line numberDiff line numberDiff line change
@@ -105,68 +105,32 @@ describe('WorkerManager', () => {
105105
await workerManager.settled();
106106
await workerManager.destroy();
107107
});
108-
// Test('zero-copy buffer transfer', async () => {
109-
// workerManager = await WorkerManager.createWorkerManager(
110-
// {
111-
// workerFactory,
112-
// cores: 1,
113-
// logger,
114-
// },
115-
// );
116-
// const buffer = await workerManager.call(async (w) => {
117-
// // Start with a Node Buffer that is "pooled"
118-
// const inputBuffer = Buffer.from('hello 1');
119-
// // Slice copy out the ArrayBuffer
120-
// const input = inputBuffer.buffer.slice(
121-
// inputBuffer.byteOffset,
122-
// inputBuffer.byteOffset + inputBuffer.byteLength,
123-
// );
124-
// // When the underlying ArrayBuffer is detached
125-
// // this Buffer's byteLength will also become 0
126-
// const inputBuffer_ = Buffer.from(input);
127-
// expect(inputBuffer_.byteLength).toBe(input.byteLength);
128-
// // Zero-copy transfer moves "ownership"
129-
// // input is detached from main thread
130-
// // output is detached from worker thread
131-
// const output = await w.transferBuffer(Transfer(input));
132-
// // Detached ArrayBuffers have byte lengths of 0
133-
// expect(input.byteLength).toBe(0);
134-
// expect(inputBuffer_.byteLength).toBe(0);
135-
// // Zero-copy wrap to use Node Buffer API
136-
// const outputBuffer = Buffer.from(output);
137-
// return outputBuffer;
138-
// });
139-
// expect(buffer).toEqual(Buffer.from('hello 2'));
140-
// await workerManager.destroy();
141-
// });
142-
// test('scratch', async () => {
143-
// console.log('start');
144-
// console.log(
145-
// 'Hello from main',
146-
// nodeWorkers.isMainThread,
147-
// nodeWorkers.threadId,
148-
// );
149-
//
150-
// const script = `
151-
// const nodeWorkers = require("node:worker_threads");
152-
// console.log("Hello from worker!: ", nodeWorkers.isMainThread, nodeWorkers.threadId);
153-
// nodeWorkers.parentPort.on('message', v => {
154-
// console.log('message', v);
155-
// nodeWorkers.parentPort.postMessage(v);
156-
// });
157-
// `;
158-
// await new Promise((resolve, reject) => {
159-
// const worker = new nodeWorkers.Worker(script, { eval: true });
160-
// worker.on('message', (v) => {
161-
// logger.warn(`message: ${v}`);
162-
// worker.terminate();
163-
// });
164-
// worker.on('error', reject);
165-
// worker.on('exit', resolve);
166-
// worker.postMessage('some message!');
167-
// });
168-
// console.log('stop');
169-
// });
170-
});
108+
test('zero-copy buffer transfer', async () => {
109+
workerManager = await WorkerManager.createWorkerManager({
110+
workerFactory,
111+
cores: 1,
112+
logger,
113+
});
114+
// Creating a new buffer
115+
const inputBuffer = Buffer.from('hello 1');
116+
// Extracting the underlying ArrayBuffer
117+
const input = inputBuffer.buffer.slice(
118+
inputBuffer.byteOffset,
119+
inputBuffer.byteOffset + inputBuffer.byteLength,
120+
);
121+
// Making call with transfer
122+
const output = await workerManager.call({
123+
type: 'transferBuffer',
124+
data: input,
125+
transferList: [input],
126+
});
127+
// The input ArrayBuffer is detached so the length is now 0
128+
expect(input.byteLength).toBe(0);
129+
// The output should be filled with 0xF
130+
expect(Buffer.from(output as ArrayBuffer)).toEqual(
131+
Buffer.alloc(inputBuffer.byteLength, 0xf),
132+
);
171133

172-
test('asd', async () => {});
134+
await workerManager.destroy();
135+
});
136+
});

tests/WorkerPool.test.ts

-3
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@ import * as errors from '#errors.js';
77

88
const dirname = url.fileURLToPath(new URL('.', import.meta.url));
99

10-
// TODO: zero copy of data
11-
// TODO: apply the async resource.
12-
1310
describe('WorkerPool', () => {
1411
let pool: WorkerPool;
1512
const workerFactory: WorkerFactory = () => {

0 commit comments

Comments
 (0)