Skip to content

Working on issues #540

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 14 commits into from
May 22, 2025
Merged
763 changes: 763 additions & 0 deletions components/devices/!class.interface.js

Large diffs are not rendered by default.

363 changes: 60 additions & 303 deletions components/devices/class.interface.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const { randomUUID } = require("crypto");

const timeout = require("../../helper/timeout.js");
const promisfy = require("../../helper/promisify.js");
const { isMainThread, parentPort } = require("worker_threads");
const MessagePortStream = require("../../system/worker/class.messageportstream.js");

const PENDING_BRIDGES = new Set();
const WEBSOCKET_SERVER = new Map();
Expand Down Expand Up @@ -130,307 +132,6 @@ module.exports = class Interface {
*
* @link https://nodejs.org/dist/latest-v16.x/docs/api/http.html#new-agentoptions
*/
/*
// *OLD* function, see #329
httpAgent(options) {

options = Object.assign({
keepAlive: true,
//maxSockets: 1,
keepAliveMsecs: 3000, // use this as websocket ping/pong value to detect broken connections?
}, options);

// stream nc tcp socket
// https://stackoverflow.com/a/33514724/5781499
let agent = new Agent(options);

// use interface stream as socket
// createConnection returns duplex stream
// https://nodejs.org/dist/latest-v14.x/docs/api/http.html#http_agent_createconnection_options_callback
agent.createConnection = (options, cb) => {

let input = new PassThrough();
let output = new PassThrough();


this.stream.pipe(input, { end: false });
output.pipe(this.stream, { end: false });


let socket = Duplex.from({
readable: input,
writable: output
});

// when multiple reuqests are done parallal, sometimes a AbortedErr is thrown
// see #329 for details
// TODO: Check if the upstream is drained, and perform requests in series
// As "quick fix" till a solution is found for #312 catch the trown error
socket.on("error", (err) => {
console.log("Catched error on http.agent.createConnection", err);
this.stream.destroy();
});

/*
[socket, this.stream, input, output].forEach((stream) => {
let cleanup = finished(stream, (err) => {

console.log("Socket duplex stream ended", err);

let chunk;

while (null !== (chunk = input.read())) {
console.log(`>>>>>> Read ${chunk.length} bytes of data...`);
}

while (null !== (chunk = output.read())) {
console.log(`>>>>>> Read ${chunk.length} bytes of data...`);
}

input.removeAllListeners();
output.removeAllListeners();

this.stream.unpipe(input);
output.unpipe(this.stream);

cleanup();

});
});
*


// TODO implement other socket functions?!
//if (process.env.NODE_ENV !== "production") {
socket.ref = (...args) => { console.log("socket.ref called", ...args); };
socket.unref = (...args) => { console.log("socket.unref called", ...args); };
socket.setKeepAlive = (...args) => { console.log("socket.setKeepAlive called", ...args); };
socket.setTimeout = (...args) => { console.log("socket.setTimeout called", ...args); };
socket.setNoDelay = (...args) => { console.log("socket.setNoDelay called", ...args); };
// socket.remoteAddress=this.settings.host
// socket.remotePort=this.settings.port
//}

//return socket;
cb(null, socket);

};

return agent;

}
*/


// NEW VERSION, fix for #329
/*
httpAgent(options = {}) {

if (this.cachedAgent) {
return this.cachedAgent;
}

let agent = new Agent({
keepAlive: true,
maxSockets: 1,
...options
});

//let settings = this.settings;

/*
// added for testing a solution for #411
// does nothing/not work, but feels like can be useful in the future
// see:
// - https://nodejs.org/docs/latest/api/http.html#agentkeepsocketalivesocket
// - https://nodejs.org/docs/latest/api/http.html#agentkeepsocketalivesocket
agent.keepSocketAlive = (socket) => {
console.log("agent.keepSocketAlive called");
return true;
};

agent.reuseSocket = (socket, request) => {
console.log("agent.reuseSocket called");
};
*

agent.createConnection = ({ headers = {} }) => {

//console.log(`############## Create connection to tcp://${host}:${port}`);

// cleanup, could be possible be piped from previous "connections"
this.stream.unpipe();

/*
// check if passed host/port matches interface settings?
if (host != settings.host || port != settings.port) {

let msg = "host/port for interface missmatch, expected:\r\n";
msg += `\thost = ${host}; got = ${settings.host}\r\n`;
msg += `\tport = ${settings.port}; got = ${settings.port}`;

throw new Error(msg);

}
*

//let readable = new PassThrough();
//let writable = new PassThrough();

// convert headers key/values to lowercase
// the string conversion prevents a error thrown for numbers
// this happens for websocket requests, where e.g. "sec-websocket-version=13"
// see snipp below "detect websocket connection with set headers"
headers = Object.keys(headers).reduce((obj, key) => {
obj[key.toLowerCase()] = `${headers[key]}`.toLowerCase();
return obj;
}, {});


let readable = new Transform({
transform(chunk, enc, cb) {

//console.log("[incoming]", chunk);

// temp fix for #343
// this is not the prefered fix for this issue
// it should be handled on "stream/socket" level instead
// the issue above occoured with a "shelly 1pm" and parallel requests to /status /ota /settings
// NOTE: what if the body contains json that has a `connection: close` property/key/value?

// detect websocket connection with set headers, fix #411
// agent.protocol is never "ws" regardless of the url used in requests
// temp solution, more like a hotfix than a final solution
if (agent.protocol === "http:" && !(headers?.upgrade === "websocket" && headers?.connection === "upgrade")) {
chunk = chunk.toString().replace(/connection:\s?close\r\n/i, "connection: keep-alive\r\n");
}

this.push(chunk);
cb();

}
});

let writable = new Transform({
transform(chunk, enc, cb) {

//console.log("[outgoing]", chunk);

this.push(chunk);
cb();

}
});


// TODO Implement "auto-drain" when no upstream is attached -> Move this "lower", e.g. before ws upstream?
/*
let writable = new Transform({
transform(chunk, enc, cb) {

debugger;

//console.log("this.stream",);
console.error(">>>> Write data, flowing?", str.upstream ? true : false, settings.host);

if (str.upstream) {
this.push(chunk);
} else {
while (this.read() !== null) {
// do nothing with writen input data
// empty readable queue
}
}

cb();

}
});
*


let stream = new Duplex.from({
readable,
writable
});

stream.destroy = () => {
//console.log("socket.destroy();", args);
};

stream.ref = () => {
//console.log("socket.unref();", args);
};

stream.unref = () => {
//console.log("socket.unref();", args);
};

stream.setKeepAlive = () => {
//console.log("socket.setKeepAlive()", args);
};

stream.setTimeout = () => {
//console.log("socket.setTimeout();", args);
};

stream.setNoDelay = () => {
//console.log("socket.setNotDelay();", args);
};

this.stream.pipe(readable, { end: false });
writable.pipe(this.stream, { end: false });

return stream;

};

/*
agent.createConnection = () => {

let readable = new PassThrough();
let writable = new PassThrough();

let stream = new Duplex.from({
readable,
writable
});

stream.destroy = () => { };
stream.ref = () => { };
stream.unref = () => { };
stream.setKeepAlive = () => { };
stream.setTimeout = () => { };
stream.setNoDelay = () => { };

Interface.socket({
iface: this,
events: this.scope.events
}, (err, socket) => {
if (err) {

stream.emit("error", err);

} else {

writable.pipe(socket)
socket.pipe(readable);

}
});

return stream;

};
*

this.cachedAgent = agent;
return agent;

}
*/


httpAgent(options = {}) {

options = Object.assign({
Expand Down Expand Up @@ -693,6 +394,7 @@ module.exports = class Interface {
};
}

// TODO: Rename to parseBridgeResponse
static parseBridgeRequest(request, cb) {
return ({ uuid, iface, type, socket, stream }) => {
// TODO: remove `socket` property
Expand Down Expand Up @@ -720,8 +422,63 @@ module.exports = class Interface {
process.nextTick(cb, socket, request);
});

events.emit("socket", request);
events.on("socket", handler);
// check if in worker trhead
// if yes: post message to main thread
// listen for response
// if no: do nothign

if (!isMainThread) {

let handleMessage = (data) => {
if (data.component === "devices" && data.event === "socket" && data.uuid === request.uuid && data.type === "response" && data.port) {

// data.event = socket
// data.uuid = request uuid
// data.iface = iface mongodb id
// data.type = reponse
// data.port = message port

// remove message handler
parentPort.off("message", handleMessage);

// create stream from message port
// which is used as the underlaying "socket"
let stream = new MessagePortStream(data.port);

// call handler
// NOTE: emit "socket" event?
// to notify other threads/things that a response was send
// but without the stream/message port, this has no real benefit
Reflect.apply(handler, this, [{
uuid: data.uuid,
iface: data.iface,
type: "response",
socket: true,
stream
}]);


}
};

parentPort.on("message", handleMessage);

// tell main we are waiting for a socket
parentPort.postMessage({
component: "devices",
event: "socket",
request
});

// forward message via events
events.emit("socket", request);

} else {

events.emit("socket", request);
events.on("socket", handler);

}

return function removeHandler() {

Expand Down
Loading