Skip to content

Commit

Permalink
New WorkerThread.js apporoach using addChildApp
Browse files Browse the repository at this point in the history
  • Loading branch information
uNetworkingAB committed Aug 23, 2024
1 parent e322cd8 commit bd1b369
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 12 deletions.
50 changes: 39 additions & 11 deletions examples/WorkerThreads.js
Original file line number Diff line number Diff line change
@@ -1,33 +1,61 @@
/* This example spawns two worker threads, each with their own
* server listening to the same port (Linux feature). */
/* This example shows two different approaches to multi-core load balancing.
* The first approach (the oldest) requires Linux and will only work on Linux.
* This approach listens to port 4000 on all CPUs. That's it. That's all you do.
* Listening to the same port from many worker threads will work on Linux.
* The second approach will work on all platforms; you set up a main acceptorApp and register all child apps
* (worker apps) with it. The acceptorApp will listen to port 9001 and move sockets in round-robin fashion to
* the registered child apps.
* Note that, in this example we only create 2 worker threads. Ideally you should create as many as there are CPUs
* in your system. But by only creating 2 here, it is simple to see the perf. gain on a system of 4 cores, as you can then
* run the client side on the remaining 2 cores without interfering with the server side. */

const uWS = require('../dist/uws.js');
const port = 9001;
const { Worker, isMainThread, threadId } = require('worker_threads');
const { Worker, isMainThread, threadId, parentPort } = require('worker_threads');
const os = require('os');

if (isMainThread) {

/* The acceptorApp only listens, but must be SSL if worker apps are SSL and likewise opposite */
const acceptorApp = uWS./*SSL*/App({
key_file_name: 'misc/key.pem',
cert_file_name: 'misc/cert.pem',
passphrase: '1234'
}).listen(port, (token) => {
if (token) {
console.log('Listening to port ' + port + ' from thread ' + threadId + ' as main acceptor');
} else {
console.log('Failed to listen to port ' + port + ' from thread ' + threadId);
}
});

/* Main thread loops over all CPUs */
/* In this case we only spawn two (hardcoded) */
/*os.cpus()*/[0, 1].forEach(() => {

/* Spawn a new thread running this source file */
new Worker(__filename);
new Worker(__filename).on("message", (workerAppDescriptor) => {
acceptorApp.addChildAppDescriptor(workerAppDescriptor);
});
});

/* I guess main thread joins by default? */
} else {
/* Here we are inside a worker thread */
const app = uWS.SSLApp({
const app = uWS./*SSL*/App({
key_file_name: 'misc/key.pem',
cert_file_name: 'misc/cert.pem',
passphrase: '1234'
}).get('/*', (res, req) => {
res.end('Hello Worker!');
}).listen(port, (token) => {
if (token) {
console.log('Listening to port ' + port + ' from thread ' + threadId);
} else {
console.log('Failed to listen to port ' + port + ' from thread ' + threadId);
}
}).listen(4000, (token) => {
if (token) {
console.log('Listening to port ' + 4000 + ' from thread ' + threadId);
} else {
console.log('Failed to listen to port ' + 4000 + ' from thread ' + threadId);
}
});

/* The worker sends back its descriptor to the main acceptor */
parentPort.postMessage(app.getDescriptor());
}
53 changes: 53 additions & 0 deletions src/AppWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,54 @@ std::pair<uWS::SocketContextOptions, bool> readOptionsObject(const FunctionCallb
return {options, true};
}

template <typename APP>
void uWS_App_addChildApp(const FunctionCallbackInfo<Value> &args) {
APP *app = (APP *) args.Holder()->GetAlignedPointerFromInternalField(0);

Isolate *isolate = args.GetIsolate();

double descriptor = args[0]->NumberValue(isolate->GetCurrentContext()).ToChecked();


APP *receivingApp;// = (APP *) args[0]->ToObject(isolate->GetCurrentContext()).ToLocalChecked()->GetAlignedPointerFromInternalField(0);

memcpy(&receivingApp, &descriptor, sizeof(receivingApp));

/* Todo: check the class type of args[0] must match class type of args.Holder() */
//if (args[0])

//std::cout << "addChildApp: " << receivingApp << std::endl;

app->addChildApp(receivingApp);

args.GetReturnValue().Set(args.Holder());
}

template <typename APP>
void uWS_App_getDescriptor(const FunctionCallbackInfo<Value> &args) {
APP *app = (APP *) args.Holder()->GetAlignedPointerFromInternalField(0);

Isolate *isolate = args.GetIsolate();

static_assert(sizeof(double) >= sizeof(app));

//static thread_local std::unordered_set<UniquePersistent<Object>> persistentApps;

UniquePersistent<Object> *persistentApp = new UniquePersistent<Object>;
persistentApp->Reset(args.GetIsolate(), args.Holder());

//persistentApps.emplace(persistentApp);

double descriptor = 0;
memcpy(&descriptor, &app, sizeof(app));

//std::cout << "getDescriptor: " << app << std::endl;

//std::cout << "Loop: " << app->getLoop() << std::endl;

args.GetReturnValue().Set(Number::New(isolate, descriptor));
}

template <typename APP>
void uWS_App_addServerName(const FunctionCallbackInfo<Value> &args) {
APP *app = (APP *) args.Holder()->GetAlignedPointerFromInternalField(0);
Expand Down Expand Up @@ -920,6 +968,11 @@ void uWS_App(const FunctionCallbackInfo<Value> &args) {
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "listen_unix", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_listen_unix<APP>, args.Data()));
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "filter", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_filter<APP>, args.Data()));

/* load balancing */
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "addChildAppDescriptor", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_addChildApp<APP>, args.Data()));
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "getDescriptor", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_getDescriptor<APP>, args.Data()));


/* ws, listen */
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "ws", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_ws<APP>, args.Data()));
appTemplate->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "publish", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, uWS_App_publish<APP>, args.Data()));
Expand Down
2 changes: 1 addition & 1 deletion uWebSockets

0 comments on commit bd1b369

Please # to comment.