From bd1b369eec3beead618a2dcfd330002d4f015166 Mon Sep 17 00:00:00 2001 From: Alex Hultman Date: Fri, 23 Aug 2024 21:49:42 +0200 Subject: [PATCH] New WorkerThread.js apporoach using addChildApp --- examples/WorkerThreads.js | 50 ++++++++++++++++++++++++++++-------- src/AppWrapper.h | 53 +++++++++++++++++++++++++++++++++++++++ uWebSockets | 2 +- 3 files changed, 93 insertions(+), 12 deletions(-) diff --git a/examples/WorkerThreads.js b/examples/WorkerThreads.js index b06be08d..9db83ddf 100644 --- a/examples/WorkerThreads.js +++ b/examples/WorkerThreads.js @@ -1,33 +1,61 @@ -/* This example spawns two worker threads, each with their own - * server listening to the same port (Linux feature). */ +/* This example shows two different approaches to multi-core load balancing. + * The first approach (the oldest) requires Linux and will only work on Linux. + * This approach listens to port 4000 on all CPUs. That's it. That's all you do. + * Listening to the same port from many worker threads will work on Linux. + * The second approach will work on all platforms; you set up a main acceptorApp and register all child apps + * (worker apps) with it. The acceptorApp will listen to port 9001 and move sockets in round-robin fashion to + * the registered child apps. + * Note that, in this example we only create 2 worker threads. Ideally you should create as many as there are CPUs + * in your system. But by only creating 2 here, it is simple to see the perf. gain on a system of 4 cores, as you can then + * run the client side on the remaining 2 cores without interfering with the server side. */ const uWS = require('../dist/uws.js'); const port = 9001; -const { Worker, isMainThread, threadId } = require('worker_threads'); +const { Worker, isMainThread, threadId, parentPort } = require('worker_threads'); const os = require('os'); if (isMainThread) { + + /* The acceptorApp only listens, but must be SSL if worker apps are SSL and likewise opposite */ + const acceptorApp = uWS./*SSL*/App({ + key_file_name: 'misc/key.pem', + cert_file_name: 'misc/cert.pem', + passphrase: '1234' + }).listen(port, (token) => { + if (token) { + console.log('Listening to port ' + port + ' from thread ' + threadId + ' as main acceptor'); + } else { + console.log('Failed to listen to port ' + port + ' from thread ' + threadId); + } + }); + /* Main thread loops over all CPUs */ /* In this case we only spawn two (hardcoded) */ /*os.cpus()*/[0, 1].forEach(() => { + /* Spawn a new thread running this source file */ - new Worker(__filename); + new Worker(__filename).on("message", (workerAppDescriptor) => { + acceptorApp.addChildAppDescriptor(workerAppDescriptor); + }); }); /* I guess main thread joins by default? */ } else { /* Here we are inside a worker thread */ - const app = uWS.SSLApp({ + const app = uWS./*SSL*/App({ key_file_name: 'misc/key.pem', cert_file_name: 'misc/cert.pem', passphrase: '1234' }).get('/*', (res, req) => { res.end('Hello Worker!'); - }).listen(port, (token) => { - if (token) { - console.log('Listening to port ' + port + ' from thread ' + threadId); - } else { - console.log('Failed to listen to port ' + port + ' from thread ' + threadId); - } + }).listen(4000, (token) => { + if (token) { + console.log('Listening to port ' + 4000 + ' from thread ' + threadId); + } else { + console.log('Failed to listen to port ' + 4000 + ' from thread ' + threadId); + } }); + + /* The worker sends back its descriptor to the main acceptor */ + parentPort.postMessage(app.getDescriptor()); } diff --git a/src/AppWrapper.h b/src/AppWrapper.h index 363d8ac5..71fa3877 100644 --- a/src/AppWrapper.h +++ b/src/AppWrapper.h @@ -713,6 +713,54 @@ std::pair readOptionsObject(const FunctionCallb return {options, true}; } +template +void uWS_App_addChildApp(const FunctionCallbackInfo &args) { + APP *app = (APP *) args.Holder()->GetAlignedPointerFromInternalField(0); + + Isolate *isolate = args.GetIsolate(); + + double descriptor = args[0]->NumberValue(isolate->GetCurrentContext()).ToChecked(); + + + APP *receivingApp;// = (APP *) args[0]->ToObject(isolate->GetCurrentContext()).ToLocalChecked()->GetAlignedPointerFromInternalField(0); + + memcpy(&receivingApp, &descriptor, sizeof(receivingApp)); + + /* Todo: check the class type of args[0] must match class type of args.Holder() */ + //if (args[0]) + + //std::cout << "addChildApp: " << receivingApp << std::endl; + + app->addChildApp(receivingApp); + + args.GetReturnValue().Set(args.Holder()); +} + +template +void uWS_App_getDescriptor(const FunctionCallbackInfo &args) { + APP *app = (APP *) args.Holder()->GetAlignedPointerFromInternalField(0); + + Isolate *isolate = args.GetIsolate(); + + static_assert(sizeof(double) >= sizeof(app)); + + //static thread_local std::unordered_set> persistentApps; + + UniquePersistent *persistentApp = new UniquePersistent; + persistentApp->Reset(args.GetIsolate(), args.Holder()); + + //persistentApps.emplace(persistentApp); + + double descriptor = 0; + memcpy(&descriptor, &app, sizeof(app)); + + //std::cout << "getDescriptor: " << app << std::endl; + + //std::cout << "Loop: " << app->getLoop() << std::endl; + + args.GetReturnValue().Set(Number::New(isolate, descriptor)); +} + template void uWS_App_addServerName(const FunctionCallbackInfo &args) { APP *app = (APP *) args.Holder()->GetAlignedPointerFromInternalField(0); @@ -920,6 +968,11 @@ void uWS_App(const FunctionCallbackInfo &args) { appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "listen_unix", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_listen_unix, args.Data())); appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "filter", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_filter, args.Data())); + /* load balancing */ + appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "addChildAppDescriptor", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_addChildApp, args.Data())); + appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "getDescriptor", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_getDescriptor, args.Data())); + + /* ws, listen */ appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "ws", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_ws, args.Data())); appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "publish", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_publish, args.Data())); diff --git a/uWebSockets b/uWebSockets index 9bbf161a..9cca5d68 160000 --- a/uWebSockets +++ b/uWebSockets @@ -1 +1 @@ -Subproject commit 9bbf161a72db2a38a3741005d43801fb1f65cbbb +Subproject commit 9cca5d68e0ac7153d8250445c076a9ffc3d58dae