diff --git a/lib/internal/main/worker_thread.js b/lib/internal/main/worker_thread.js index 9ae04e288fc70c..c256f10aef9573 100644 --- a/lib/internal/main/worker_thread.js +++ b/lib/internal/main/worker_thread.js @@ -29,6 +29,7 @@ const { messageTypes: { // Messages that may be received by workers LOAD_SCRIPT, + PARENT_LOOP_START, // Messages that may be posted from workers UP_AND_RUNNING, ERROR_MESSAGE, @@ -159,6 +160,8 @@ port.on('message', (message) => { const CJSLoader = require('internal/modules/cjs/loader'); CJSLoader.Module.runMain(filename); } + } else if (message.type === PARENT_LOOP_START) { + require('internal/worker').setParentEventLoopStartTime(message.value); } else if (message.type === STDIO_PAYLOAD) { const { stream, chunks } = message; ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => { diff --git a/lib/internal/worker.js b/lib/internal/worker.js index 3cc589c996703c..151eb530c044f7 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -27,6 +27,7 @@ const { const EventEmitter = require('events'); const assert = require('internal/assert'); const path = require('path'); +const { setImmediate } = require('timers'); const { internalEventLoopUtilization } = require('internal/perf/event_loop_utilization'); @@ -60,6 +61,13 @@ const { fileURLToPath, isURLInstance, pathToFileURL } = require('internal/url'); const { kEmptyObject } = require('internal/util'); const { validateArray } = require('internal/validators'); +const { + constants: { + NODE_PERFORMANCE_MILESTONE_LOOP_START, + }, + milestones, +} = internalBinding('performance'); + const { ownsProcessState, isMainThread, @@ -70,7 +78,8 @@ const { kMaxOldGenerationSizeMb, kCodeRangeSizeMb, kStackSizeMb, - kTotalResourceLimitCount + kTotalResourceLimitCount, + parentLoopIdleTime, } = internalBinding('worker'); const kHandle = Symbol('kHandle'); @@ -83,6 +92,7 @@ const kOnErrorMessage = Symbol('kOnErrorMessage'); const kParentSideStdio = Symbol('kParentSideStdio'); const kLoopStartTime = Symbol('kLoopStartTime'); const kIsOnline = Symbol('kIsOnline'); +const kSendLoopStart = Symbol('kSendLoopStart'); const SHARE_ENV = SymbolFor('nodejs.worker_threads.SHARE_ENV'); let debug = require('internal/util/debuglog').debuglog('worker', (fn) => { @@ -265,6 +275,13 @@ class Worker extends EventEmitter { this[kHandle].startThread(); process.nextTick(() => process.emit('worker', this)); + // Send current thread loopStart to the worker. In case the loop has not yet + // started, send it after the poll phase of the loop has completed. + if (milestones[NODE_PERFORMANCE_MILESTONE_LOOP_START] === -1) { + setImmediate(() => this[kSendLoopStart]()); + } else { + this[kSendLoopStart](); + } if (workerThreadsChannel.hasSubscribers) { workerThreadsChannel.publish({ worker: this, @@ -346,6 +363,13 @@ class Worker extends EventEmitter { } } + [kSendLoopStart]() { + this[kPort]?.postMessage({ + type: messageTypes.PARENT_LOOP_START, + value: milestones[NODE_PERFORMANCE_MILESTONE_LOOP_START] / 1e6 + }); + } + postMessage(...args) { if (this[kPublicPort] === null) return; @@ -494,6 +518,24 @@ function eventLoopUtilization(util1, util2) { ); } +let parentEventLoopStartTime = -1; +function setParentEventLoopStartTime(time) { + parentEventLoopStartTime = time; +} + +function parentEventLoopUtilization(util1, util2) { + if (parentEventLoopStartTime === -1) { + return { idle: 0, active: 0, utilization: 0 }; + } + + return internalEventLoopUtilization( + parentEventLoopStartTime, + parentLoopIdleTime(), + util1, + util2, + ); +} + module.exports = { ownsProcessState, isMainThread, @@ -505,4 +547,6 @@ module.exports = { assignEnvironmentData, threadId, Worker, + setParentEventLoopStartTime, + parentEventLoopUtilization, }; diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js index 61f9a5363716a8..9373e32e3d4739 100644 --- a/lib/internal/worker/io.js +++ b/lib/internal/worker/io.js @@ -89,7 +89,8 @@ const messageTypes = { ERROR_MESSAGE: 'errorMessage', STDIO_PAYLOAD: 'stdioPayload', STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData', - LOAD_SCRIPT: 'loadScript' + LOAD_SCRIPT: 'loadScript', + PARENT_LOOP_START: 'parentLoopStart', }; // We have to mess with the MessagePort prototype a bit, so that a) we can make diff --git a/lib/worker_threads.js b/lib/worker_threads.js index 9d702fa2883447..19e27dbb1444ac 100644 --- a/lib/worker_threads.js +++ b/lib/worker_threads.js @@ -7,7 +7,8 @@ const { setEnvironmentData, getEnvironmentData, threadId, - Worker + Worker, + parentEventLoopUtilization, } = require('internal/worker'); const { @@ -38,4 +39,9 @@ module.exports = { BroadcastChannel, setEnvironmentData, getEnvironmentData, + parent: isMainThread ? null : { + performance: { + eventLoopUtilization: parentEventLoopUtilization, + }, + }, }; diff --git a/src/node_worker.cc b/src/node_worker.cc index 6b0ca484ace83a..ff00f74aebcfed 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -56,6 +56,7 @@ Worker::Worker(Environment* env, per_isolate_opts_(per_isolate_opts), exec_argv_(exec_argv), platform_(env->isolate_data()->platform()), + parent_loop_(env->event_loop()), thread_id_(AllocateEnvironmentThreadId()), env_vars_(env_vars), snapshot_data_(snapshot_data) { @@ -867,6 +868,14 @@ void Worker::LoopStartTime(const FunctionCallbackInfo& args) { args.GetReturnValue().Set(loop_start_time / 1e6); } +void Worker::ParentLoopIdleTime(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + CHECK(!env->is_main_thread()); + Worker* w = env->worker_context(); + uint64_t idle_time = uv_metrics_idle_time(w->parent_loop_); + args.GetReturnValue().Set(1.0 * idle_time / 1e6); +} + namespace { // Return the MessagePort that is global for this Environment and communicates @@ -923,6 +932,7 @@ void InitWorker(Local target, } SetMethod(context, target, "getEnvMessagePort", GetEnvMessagePort); + SetMethod(context, target, "parentLoopIdleTime", Worker::ParentLoopIdleTime); target ->Set(env->context(), @@ -969,6 +979,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) { registry->Register(Worker::TakeHeapSnapshot); registry->Register(Worker::LoopIdleTime); registry->Register(Worker::LoopStartTime); + registry->Register(Worker::ParentLoopIdleTime); } } // anonymous namespace diff --git a/src/node_worker.h b/src/node_worker.h index dcb58d13e0e6f9..9a5defad41137f 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -75,6 +75,7 @@ class Worker : public AsyncWrap { static void TakeHeapSnapshot(const v8::FunctionCallbackInfo& args); static void LoopIdleTime(const v8::FunctionCallbackInfo& args); static void LoopStartTime(const v8::FunctionCallbackInfo& args); + static void ParentLoopIdleTime(const v8::FunctionCallbackInfo&); private: bool CreateEnvMessagePort(Environment* env); @@ -91,6 +92,8 @@ class Worker : public AsyncWrap { std::unique_ptr inspector_parent_handle_; + uv_loop_t* parent_loop_; + // This mutex protects access to all variables listed below it. mutable Mutex mutex_; diff --git a/test/parallel/test-worker-parent-performance-eventlooputil.js b/test/parallel/test-worker-parent-performance-eventlooputil.js new file mode 100644 index 00000000000000..69fe61550c5d3b --- /dev/null +++ b/test/parallel/test-worker-parent-performance-eventlooputil.js @@ -0,0 +1,54 @@ +'use strict'; + +const { mustCall } = require('../common'); + +const TIMEOUT = 10; +const SPIN_DUR = 50; + +const assert = require('assert'); +const { Worker, parent, workerData } = require('worker_threads'); + +// Do not use isMainThread directly, otherwise the test would time out in case +// it's started inside of another worker thread. +if (!process.env.HAS_STARTED_WORKER) { + process.env.HAS_STARTED_WORKER = '1'; + const i32arr = new Int32Array(new SharedArrayBuffer(4)); + const w = new Worker(__filename, { workerData: i32arr }); + w.on('online', mustCall(() => { + Atomics.wait(i32arr, 0, 0); + + const t = Date.now(); + while (Date.now() - t < SPIN_DUR); + + Atomics.store(i32arr, 0, 0); + Atomics.notify(i32arr, 0); + Atomics.wait(i32arr, 0, 0); + })); +} else { + setTimeout(() => { + const { eventLoopUtilization } = parent.performance; + const i32arr = workerData; + const elu1 = eventLoopUtilization(); + + Atomics.store(i32arr, 0, 1); + Atomics.notify(i32arr, 0); + Atomics.wait(i32arr, 0, 1); + + const elu2 = eventLoopUtilization(elu1); + const elu3 = eventLoopUtilization(); + const elu4 = eventLoopUtilization(elu3, elu1); + + assert.strictEqual(elu2.idle, 0); + assert.strictEqual(elu4.idle, 0); + assert.strictEqual(elu2.utilization, 1); + assert.strictEqual(elu4.utilization, 1); + assert.strictEqual(elu3.active - elu1.active, elu4.active); + assert.ok(elu2.active > SPIN_DUR - 10, `${elu2.active} <= ${SPIN_DUR - 10}`); + assert.ok(elu2.active < elu4.active, `${elu2.active} >= ${elu4.active}`); + assert.ok(elu3.active > elu2.active, `${elu3.active} <= ${elu2.active}`); + assert.ok(elu3.active > elu4.active, `${elu3.active} <= ${elu4.active}`); + + Atomics.store(i32arr, 0, 1); + Atomics.notify(i32arr, 0); + }, TIMEOUT); +}