Skip to content

Commit d098554

Browse files
committed
fix: clean up handles in worker_threads environments to prevent aborting
When using in `worker_threads` Workers, Node.js will abort with messages like: ``` uv loop at [0x00000000] has open handles... ``` when workers are exiting or are terminated with active `DgramSocket` or `SeqpacketSocket`. To fix this issue, we need clean up all active uv_handles in hooks registered by `napi_add_env_cleanup_hook`.
1 parent bd3d460 commit d098554

17 files changed

+188
-39
lines changed

.github/workflows/CI.yml

+16-18
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,30 @@ name: CI
22
env:
33
DEBUG: napi:*
44
APP_NAME: node-unix-socket
5-
MACOSX_DEPLOYMENT_TARGET: '10.13'
6-
'on':
5+
MACOSX_DEPLOYMENT_TARGET: "10.13"
6+
"on":
77
push:
88
branches:
99
- master
1010
tags-ignore:
11-
- '**'
11+
- "**"
1212
paths-ignore:
13-
- '**/*.md'
13+
- "**/*.md"
1414
- LICENSE
15-
- '**/*.gitignore'
15+
- "**/*.gitignore"
1616
- .editorconfig
1717
- docs/**
1818
pull_request: null
1919
workflow_dispatch:
2020
inputs:
2121
publish:
22-
description: 'Publish the npm package.'
22+
description: "Publish the npm package."
2323
required: true
2424
type: boolean
2525
default: false
2626
jobs:
2727
build:
28-
if: '!contains(github.event.head_commit.message, ''skip ci'')'
28+
if: "!contains(github.event.head_commit.message, 'skip ci')"
2929
strategy:
3030
fail-fast: false
3131
matrix:
@@ -54,7 +54,7 @@ jobs:
5454
yarn build --target x86_64-unknown-linux-gnu &&
5555
strip *.node
5656
- host: ubuntu-latest
57-
target: 'x86_64-unknown-linux-musl'
57+
target: "x86_64-unknown-linux-musl"
5858
docker: ghcr.io/napi-rs/napi-rs/nodejs-rust:lts-alpine
5959
build: yarn build
6060
- host: ubuntu-latest
@@ -140,14 +140,14 @@ jobs:
140140
if: ${{ matrix.settings.docker }}
141141
with:
142142
image: ${{ matrix.settings.docker }}
143-
options: '-v ${{ env.HOME }}/.cargo/git:/root/.cargo/git -v ${{ env.HOME }}/.cargo/registry:/root/.cargo/registry -v ${{ github.workspace }}:/build -w /build'
143+
options: "-v ${{ env.HOME }}/.cargo/git:/root/.cargo/git -v ${{ env.HOME }}/.cargo/registry:/root/.cargo/registry -v ${{ github.workspace }}:/build -w /build"
144144
run: ${{ matrix.settings.build }}
145145
- name: Build
146146
run: ${{ matrix.settings.build }}
147147
if: ${{ !matrix.settings.docker }}
148148
shell: bash
149149
- name: Upload artifact
150-
uses: actions/upload-artifact@v2
150+
uses: actions/upload-artifact@v3
151151
with:
152152
name: bindings-${{ matrix.settings.target }}
153153
path: ${{ env.APP_NAME }}.*.node
@@ -160,10 +160,9 @@ jobs:
160160
fail-fast: false
161161
matrix:
162162
node:
163-
- '12'
164-
- '14'
165-
- '16'
166-
- '18'
163+
- "18"
164+
- "20"
165+
- "22"
167166
runs-on: ubuntu-latest
168167
steps:
169168
- uses: actions/checkout@v3
@@ -201,10 +200,9 @@ jobs:
201200
- host: macos-latest
202201
target: x86_64-apple-darwin
203202
node:
204-
- '12'
205-
- '14'
206-
- '16'
207-
- '18'
203+
- "18"
204+
- "20"
205+
- "22"
208206
runs-on: ${{ matrix.settings.host }}
209207
steps:
210208
- uses: actions/checkout@v3

.npmignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ yarn.lock
1010
*.node
1111

1212
check.py
13-
resouce
13+
resource
1414
scripts
1515
.devcontainer
1616
__test__

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# CHANGELOG
22

3+
## 0.2.6
4+
fix: clean up handles in worker_threads environments to prevent aborting
5+
36
## 0.2.5
47
fix: close the socket if connecting failed
58

__test__/create_socket.js

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
const { DgramSocket } = require('../js')
2+
const fs = require('fs')
3+
const path = require('path')
4+
5+
const serverPath = path.resolve(__dirname, './.tmp/worker_server.sock')
6+
try {
7+
fs.unlinkSync(serverPath)
8+
} catch (err) {
9+
//
10+
}
11+
const socket = new DgramSocket(serverPath);
12+
socket.bind(serverPath)

__test__/dgram.spec.ts

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
// TODO add tests for worker_threads
21
import * as path from 'path';
32
import * as fs from 'fs';
43
import * as os from 'os';
4+
import * as workerThreads from 'worker_threads'
55
import { DgramSocket } from '../js/index';
6-
import { kTmp, silently, createDefer, kServerPath } from './util';
6+
import { kTmp, silently, createDefer, kServerPath, wait } from './util';
77

88
const kClientPath = path.resolve(kTmp, './client.sock');
99
const kInvalidPath = path.resolve(kTmp, './A_PATH_THAT_DOESNT_EXIST');
1010

11-
const emptyFn = () => {};
11+
const emptyFn = () => { };
1212

1313
describe('DgramSocket', () => {
1414
beforeAll(() => {
@@ -308,4 +308,10 @@ describe('DgramSocket', () => {
308308
client.close()
309309
await p
310310
});
311+
312+
it('should not abort in worker_threads', async () => {
313+
const worker = new workerThreads.Worker(path.resolve(__dirname, './create_socket.js'))
314+
await wait(1000)
315+
worker.terminate()
316+
});
311317
});

__test__/socket.spec.ts

+12-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import * as net from 'net'
2-
import { createReuseportFd as createFd, closeFd } from '../js/index'
2+
import { createReuseportFd as createFd, closeFd } from '../js/index'
33
import { hasIPv6 } from './util'
44

55
describe('tcp', () => {
@@ -8,11 +8,13 @@ describe('tcp', () => {
88
const host = '0.0.0.0'
99
let port = 0;
1010

11-
async function createServer() {
11+
async function createServer(): Promise<net.Server> {
1212
const fd = createFd(port, host);
1313

1414
const server = await new Promise<net.Server>((resolve, reject) => {
15-
const server = net.createServer()
15+
const server = net.createServer({
16+
noDelay: true,
17+
})
1618

1719
server.listen({
1820
fd,
@@ -26,14 +28,14 @@ describe('tcp', () => {
2628
return server
2729
}
2830

29-
const servers = [];
31+
const servers: net.Server[] = [];
3032
for (let i = 0; i < 5; i += 1) {
3133
const server = await createServer()
3234
servers.push(server);
3335
}
3436

3537
const pList = servers.map(server => {
36-
return new Promise((resolve, reject) => {
38+
return new Promise<Buffer>((resolve, reject) => {
3739
server.once('connection', (socket) => {
3840
socket.on('data', buf => {
3941
resolve(buf)
@@ -55,15 +57,17 @@ describe('tcp', () => {
5557
servers.forEach(server => server.close());
5658
})
5759

58-
if (hasIPv6()) {
60+
if (hasIPv6()) {
5961
it('should work with ipv6', async () => {
6062
const host = '::1'
6163
let port = 0;
6264

6365
const fd = createFd(port, host);
6466

6567
const server = await new Promise<net.Server>((resolve, reject) => {
66-
const server = net.createServer()
68+
const server = net.createServer({
69+
noDelay: true,
70+
})
6771

6872
server.listen({
6973
fd,
@@ -72,7 +76,7 @@ describe('tcp', () => {
7276
})
7377
})
7478
port = (server.address() as any).port
75-
const p = new Promise((resolve, reject) => {
79+
const p = new Promise<Buffer>((resolve, reject) => {
7680
server.once('connection', (socket) => {
7781
socket.on('data', buf => {
7882
resolve(buf)

js/addon.d.ts

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
export function socketNewSoReuseportFd(domain: string, port: number, ip: string): number
77
export function socketClose(fd: number): void
8+
export function initCleanupHook(): void
89
export class SeqpacketSocketWrap {
910
constructor(ee: object, fd?: number | undefined | null)
1011
init(thisObj: object): void

js/index.ts

+8
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
import * as workerThreads from 'worker_threads'
2+
import { initCleanupHook } from './addon'
3+
14
export { SendCb, DgramSocket } from './dgram'
25
export { NotifyCb, SeqpacketSocket, SeqpacketServer } from './seqpacket'
36
export { createReuseportFd, closeFd } from './socket'
7+
8+
// Node.js will abort when threads are termiated if we don't clean up uv handles.
9+
if (!workerThreads.isMainThread) {
10+
initCleanupHook()
11+
}

js/seqpacket.ts

+7
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,13 @@ export class SeqpacketSocket extends EventEmitter {
291291
this.wrap.close();
292292
}
293293

294+
/**
295+
* Alias of "destory".
296+
*/
297+
close() {
298+
return this.destroy();
299+
}
300+
294301
/**
295302
* For test only
296303
* @ignore

package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "node-unix-socket",
3-
"version": "0.2.5",
3+
"version": "0.2.6",
44
"main": "js/index.js",
55
"types": "js/index.d.ts",
66
"author": {
@@ -39,7 +39,7 @@
3939
"devDependencies": {
4040
"@napi-rs/cli": "^2.10.3",
4141
"@types/jest": "^27.5.0",
42-
"@types/node": "^17.0.31",
42+
"@types/node": "^22.7.6",
4343
"jest": "^27.5.1",
4444
"ts-jest": "^27.1.4",
4545
"typedoc": "^0.22.15",

src/dgram.rs

+3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::util::{
1313
addr_to_string, buf_into_vec, check_emit, error, get_err, i8_slice_into_u8_slice,
1414
resolve_libc_err, resolve_uv_err, set_clo_exec, set_non_block, socket_addr_to_string,
1515
};
16+
use crate::uv_handle::{insert_handle, remove_handle};
1617

1718
#[allow(dead_code)]
1819
fn string_from_i8_slice(slice: &[i8]) -> Result<String> {
@@ -65,6 +66,7 @@ impl DgramSocketWrap {
6566
handle.data = std::ptr::null_mut() as *mut _;
6667
handle
6768
}));
69+
insert_handle(unsafe { mem::transmute(handle) })?;
6870

6971
Ok(Self {
7072
fd,
@@ -297,6 +299,7 @@ impl DgramSocketWrap {
297299
unsafe {
298300
let handle = mem::transmute(self.handle);
299301
sys::uv_close(handle, Some(on_close));
302+
remove_handle(handle)?;
300303
};
301304

302305
// release Ref<JsFunction> in msg_queue

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ mod seqpacket;
77
mod dgram;
88
mod util;
99
mod socket;
10+
mod uv_handle;

src/seqpacket.rs

+3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::util::{
77
addr_to_string, buf_into_vec, error, get_err, resolve_libc_err, resolve_uv_err, set_clo_exec,
88
set_non_block, socket_addr_to_string, uv_err_msg,
99
};
10+
use crate::uv_handle::{insert_handle, remove_handle};
1011
use libc::{sockaddr, sockaddr_un, EAGAIN, EINTR, EINVAL, ENOBUFS, EWOULDBLOCK};
1112
use napi::{Env, JsBuffer, JsFunction, JsNumber, JsObject, JsString, JsUnknown, Ref, Result};
1213
use nix::errno::errno;
@@ -97,6 +98,7 @@ impl SeqpacketSocketWrap {
9798
handle.data = std::ptr::null_mut();
9899
handle
99100
}));
101+
insert_handle(unsafe { mem::transmute(handle) })?;
100102

101103
let uv_loop = get_loop(&env)?;
102104
resolve_uv_err(unsafe { sys::uv_poll_init(uv_loop, handle, fd) })?;
@@ -137,6 +139,7 @@ impl SeqpacketSocketWrap {
137139

138140
unsafe {
139141
sys::uv_close(self.handle as *mut _, Some(on_close));
142+
remove_handle(mem::transmute(self.handle))?;
140143
};
141144

142145
// release msg_queue

src/socket.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ use std::str::FromStr;
44

55
use crate::util::{error, get_err, resolve_libc_err, resolve_uv_err};
66
use libc::{c_void, sockaddr_storage, sockaddr_un};
7-
use napi::{Env, JsFunction, JsNumber, JsObject, JsString, JsUnknown, Ref, Result, bindgen_prelude::FromNapiValue};
7+
use napi::{
8+
bindgen_prelude::FromNapiValue, Env, JsFunction, JsNumber, JsObject, JsString, JsUnknown, Ref,
9+
Result,
10+
};
811
use uv_sys::sys;
912

1013
pub(crate) fn get_loop(env: &Env) -> Result<*mut sys::uv_loop_t> {

src/util.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use std::ffi::CStr;
22
use std::intrinsics::transmute;
33
use std::mem;
44

5-
use libc::{sockaddr, sockaddr_un, c_char};
6-
use napi::{self, Error, JsBuffer, Result, JsFunction, JsObject};
5+
use libc::{c_char, sockaddr, sockaddr_un};
6+
use napi::{self, Error, JsBuffer, JsFunction, JsObject, Result};
77
use nix::errno::Errno;
88
use nix::fcntl::{fcntl, FcntlArg, OFlag};
99
use uv_sys::sys;
@@ -37,8 +37,8 @@ pub(crate) fn socket_addr_to_string(fd: i32) -> Result<String> {
3737
Ok(addr_to_string(&addr))
3838
}
3939

40-
pub(crate) fn error(msg: String) -> Error {
41-
Error::new(napi::Status::Unknown, msg)
40+
pub(crate) fn error<T: ToString>(item: T) -> Error {
41+
Error::new(napi::Status::Unknown, item.to_string())
4242
}
4343

4444
pub(crate) fn nix_err(err: Errno) -> Error {
@@ -51,7 +51,8 @@ pub(crate) fn uv_err_msg(errno: i32) -> String {
5151
let ret = CStr::from_ptr(ret);
5252
ret
5353
.to_str()
54-
.map_err(|_| error("parsing cstr failed".to_string())).unwrap()
54+
.map_err(|_| error("parsing cstr failed".to_string()))
55+
.unwrap()
5556
.to_string()
5657
};
5758

0 commit comments

Comments
 (0)