Skip to content

Commit a647ffb

Browse files
committed
test: add common/udppair utility
Extracted from the QUIC PR. This adds a utility used to deterministically test UDP traffic. It is currently only used by the experimental QUIC implementation. Separated out on request to make review easier.
1 parent 5bb4d01 commit a647ffb

File tree

8 files changed

+343
-0
lines changed

8 files changed

+343
-0
lines changed

node.gyp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,7 @@
566566
'src/js_native_api_v8_internals.h',
567567
'src/js_stream.cc',
568568
'src/json_utils.cc',
569+
'src/js_udp_wrap.cc',
569570
'src/module_wrap.cc',
570571
'src/node.cc',
571572
'src/node_api.cc',

src/async_wrap.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ namespace node {
5151
V(HTTPINCOMINGMESSAGE) \
5252
V(HTTPCLIENTREQUEST) \
5353
V(JSSTREAM) \
54+
V(JSUDPWRAP) \
5455
V(MESSAGEPORT) \
5556
V(PIPECONNECTWRAP) \
5657
V(PIPESERVERWRAP) \

src/js_udp_wrap.cc

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
#include "udp_wrap.h"
2+
#include "async_wrap-inl.h"
3+
#include "node_errors.h"
4+
#include "node_sockaddr-inl.h"
5+
6+
namespace node {
7+
8+
using errors::TryCatchScope;
9+
using v8::Array;
10+
using v8::Context;
11+
using v8::FunctionCallbackInfo;
12+
using v8::FunctionTemplate;
13+
using v8::HandleScope;
14+
using v8::Int32;
15+
using v8::Local;
16+
using v8::Object;
17+
using v8::String;
18+
using v8::Value;
19+
20+
// JSUDPWrap is a testing utility used by test/common/udppair.js
21+
// to simulate UDP traffic deterministically in Node.js tests.
22+
class JSUDPWrap final : public UDPWrapBase, public AsyncWrap {
23+
public:
24+
JSUDPWrap(Environment* env, Local<Object> obj);
25+
26+
int RecvStart() override;
27+
int RecvStop() override;
28+
ssize_t Send(uv_buf_t* bufs,
29+
size_t nbufs,
30+
const sockaddr* addr) override;
31+
SocketAddress GetPeerName() override;
32+
SocketAddress GetSockName() override;
33+
AsyncWrap* GetAsyncWrap() override { return this; }
34+
35+
static void New(const FunctionCallbackInfo<Value>& args);
36+
static void EmitReceived(const FunctionCallbackInfo<Value>& args);
37+
static void OnSendDone(const FunctionCallbackInfo<Value>& args);
38+
static void OnAfterBind(const FunctionCallbackInfo<Value>& args);
39+
40+
static void Initialize(Local<Object> target,
41+
Local<Value> unused,
42+
Local<Context> context,
43+
void* priv);
44+
SET_NO_MEMORY_INFO()
45+
SET_MEMORY_INFO_NAME(JSUDPWrap)
46+
SET_SELF_SIZE(JSUDPWrap)
47+
};
48+
49+
JSUDPWrap::JSUDPWrap(Environment* env, Local<Object> obj)
50+
: AsyncWrap(env, obj, PROVIDER_JSUDPWRAP) {
51+
MakeWeak();
52+
53+
obj->SetAlignedPointerInInternalField(
54+
kUDPWrapBaseField, static_cast<UDPWrapBase*>(this));
55+
}
56+
57+
int JSUDPWrap::RecvStart() {
58+
HandleScope scope(env()->isolate());
59+
Context::Scope context_scope(env()->context());
60+
TryCatchScope try_catch(env());
61+
Local<Value> value;
62+
int32_t value_int = UV_EPROTO;
63+
if (!MakeCallback(env()->onreadstart_string(), 0, nullptr).ToLocal(&value) ||
64+
!value->Int32Value(env()->context()).To(&value_int)) {
65+
if (try_catch.HasCaught() && !try_catch.HasTerminated())
66+
errors::TriggerUncaughtException(env()->isolate(), try_catch);
67+
}
68+
return value_int;
69+
}
70+
71+
int JSUDPWrap::RecvStop() {
72+
HandleScope scope(env()->isolate());
73+
Context::Scope context_scope(env()->context());
74+
TryCatchScope try_catch(env());
75+
Local<Value> value;
76+
int32_t value_int = UV_EPROTO;
77+
if (!MakeCallback(env()->onreadstop_string(), 0, nullptr).ToLocal(&value) ||
78+
!value->Int32Value(env()->context()).To(&value_int)) {
79+
if (try_catch.HasCaught() && !try_catch.HasTerminated())
80+
errors::TriggerUncaughtException(env()->isolate(), try_catch);
81+
}
82+
return value_int;
83+
}
84+
85+
ssize_t JSUDPWrap::Send(uv_buf_t* bufs,
86+
size_t nbufs,
87+
const sockaddr* addr) {
88+
HandleScope scope(env()->isolate());
89+
Context::Scope context_scope(env()->context());
90+
TryCatchScope try_catch(env());
91+
Local<Value> value;
92+
int64_t value_int = UV_EPROTO;
93+
size_t total_len = 0;
94+
95+
MaybeStackBuffer<Local<Value>, 16> buffers(nbufs);
96+
for (size_t i = 0; i < nbufs; i++) {
97+
buffers[i] = Buffer::Copy(env(), bufs[i].base, bufs[i].len)
98+
.ToLocalChecked();
99+
total_len += bufs[i].len;
100+
}
101+
102+
Local<Value> args[] = {
103+
listener()->CreateSendWrap(total_len)->object(),
104+
Array::New(env()->isolate(), buffers.out(), nbufs),
105+
AddressToJS(env(), addr)
106+
};
107+
108+
if (!MakeCallback(env()->onwrite_string(), arraysize(args), args)
109+
.ToLocal(&value) ||
110+
!value->IntegerValue(env()->context()).To(&value_int)) {
111+
if (try_catch.HasCaught() && !try_catch.HasTerminated())
112+
errors::TriggerUncaughtException(env()->isolate(), try_catch);
113+
}
114+
return value_int;
115+
}
116+
117+
SocketAddress JSUDPWrap::GetPeerName() {
118+
SocketAddress ret;
119+
CHECK(SocketAddress::New(AF_INET, "127.0.0.1", 1337, &ret));
120+
return ret;
121+
}
122+
123+
SocketAddress JSUDPWrap::GetSockName() {
124+
SocketAddress ret;
125+
CHECK(SocketAddress::New(AF_INET, "127.0.0.1", 1337, &ret));
126+
return ret;
127+
}
128+
129+
void JSUDPWrap::New(const FunctionCallbackInfo<Value>& args) {
130+
Environment* env = Environment::GetCurrent(args);
131+
CHECK(args.IsConstructCall());
132+
new JSUDPWrap(env, args.Holder());
133+
}
134+
135+
void JSUDPWrap::EmitReceived(const FunctionCallbackInfo<Value>& args) {
136+
JSUDPWrap* wrap;
137+
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
138+
Environment* env = wrap->env();
139+
140+
ArrayBufferViewContents<char> buffer(args[0]);
141+
const char* data = buffer.data();
142+
int len = buffer.length();
143+
144+
CHECK(args[1]->IsInt32()); // family
145+
CHECK(args[2]->IsString()); // address
146+
CHECK(args[3]->IsInt32()); // port
147+
CHECK(args[4]->IsInt32()); // flags
148+
int family = args[1].As<Int32>()->Value() == 4 ? AF_INET : AF_INET6;
149+
Utf8Value address(env->isolate(), args[2]);
150+
int port = args[3].As<Int32>()->Value();
151+
int flags = args[3].As<Int32>()->Value();
152+
153+
sockaddr_storage addr;
154+
CHECK_EQ(sockaddr_for_family(family, *address, port, &addr), 0);
155+
156+
// Repeatedly ask the stream's owner for memory, copy the data that we
157+
// just read from JS into those buffers and emit them as reads.
158+
while (len != 0) {
159+
uv_buf_t buf = wrap->listener()->OnAlloc(len);
160+
ssize_t avail = len;
161+
if (static_cast<ssize_t>(buf.len) < avail)
162+
avail = buf.len;
163+
164+
memcpy(buf.base, data, avail);
165+
data += avail;
166+
len -= avail;
167+
wrap->listener()->OnRecv(
168+
avail, buf, reinterpret_cast<sockaddr*>(&addr), flags);
169+
}
170+
}
171+
172+
void JSUDPWrap::OnSendDone(const FunctionCallbackInfo<Value>& args) {
173+
JSUDPWrap* wrap;
174+
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
175+
176+
CHECK(args[0]->IsObject());
177+
CHECK(args[1]->IsInt32());
178+
ReqWrap<uv_udp_send_t>* req_wrap;
179+
ASSIGN_OR_RETURN_UNWRAP(&req_wrap, args[0].As<Object>());
180+
int status = args[1].As<Int32>()->Value();
181+
182+
wrap->listener()->OnSendDone(req_wrap, status);
183+
}
184+
185+
void JSUDPWrap::OnAfterBind(const FunctionCallbackInfo<Value>& args) {
186+
JSUDPWrap* wrap;
187+
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
188+
189+
wrap->listener()->OnAfterBind();
190+
}
191+
192+
void JSUDPWrap::Initialize(Local<Object> target,
193+
Local<Value> unused,
194+
Local<Context> context,
195+
void* priv) {
196+
Environment* env = Environment::GetCurrent(context);
197+
198+
Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
199+
Local<String> js_udp_wrap_string =
200+
FIXED_ONE_BYTE_STRING(env->isolate(), "JSUDPWrap");
201+
t->SetClassName(js_udp_wrap_string);
202+
t->InstanceTemplate()
203+
->SetInternalFieldCount(UDPWrapBase::kUDPWrapBaseField + 1);
204+
t->Inherit(AsyncWrap::GetConstructorTemplate(env));
205+
206+
UDPWrapBase::AddMethods(env, t);
207+
env->SetProtoMethod(t, "emitReceived", EmitReceived);
208+
env->SetProtoMethod(t, "onSendDone", OnSendDone);
209+
env->SetProtoMethod(t, "onAfterBind", OnAfterBind);
210+
211+
target->Set(env->context(),
212+
js_udp_wrap_string,
213+
t->GetFunction(context).ToLocalChecked()).Check();
214+
}
215+
216+
217+
} // namespace node
218+
219+
NODE_MODULE_CONTEXT_AWARE_INTERNAL(js_udp_wrap, node::JSUDPWrap::Initialize)

src/node_binding.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
V(http_parser) \
5353
V(inspector) \
5454
V(js_stream) \
55+
V(js_udp_wrap) \
5556
V(messaging) \
5657
V(module_wrap) \
5758
V(native_module) \

src/udp_wrap.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,11 @@ class UDPWrap final : public HandleWrap,
215215
v8::Local<v8::Object> current_send_req_wrap_;
216216
};
217217

218+
int sockaddr_for_family(int address_family,
219+
const char* address,
220+
const unsigned short port,
221+
sockaddr_storage* addr);
222+
218223
} // namespace node
219224

220225
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

test/common/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -943,6 +943,19 @@ listener to process `'beforeExit'`. If a file needs to be left open until
943943
Node.js completes, use a child process and call `refresh()` only in the
944944
parent.
945945

946+
## UDP pair helper
947+
948+
The `common/udppair` module exports a function `makeUDPPair` and a class
949+
`FakeUDPWrap`.
950+
951+
`FakeUDPWrap` emits `'send'` events when data is to be sent on it, and provides
952+
an `emitReceived()` API for actin as if data has been received on it.
953+
954+
`makeUDPPair` returns an object `{ clientSide, serverSide }` where each side
955+
is an `FakeUDPWrap` connected to the other side.
956+
957+
There is no difference between cient or server side beyond their names.
958+
946959
## WPT Module
947960

948961
### `harness`

test/common/udppair.js

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/* eslint-disable node-core/require-common-first, node-core/required-modules */
2+
'use strict';
3+
const { internalBinding } = require('internal/test/binding');
4+
const { JSUDPWrap } = internalBinding('js_udp_wrap');
5+
const EventEmitter = require('events');
6+
7+
// FakeUDPWrap is a testing utility that emulates a UDP connection
8+
// for the sake of making UDP tests more deterministic.
9+
class FakeUDPWrap extends EventEmitter {
10+
constructor() {
11+
super();
12+
13+
this._handle = new JSUDPWrap();
14+
15+
this._handle.onreadstart = () => this._startReading();
16+
this._handle.onreadstop = () => this._stopReading();
17+
this._handle.onwrite =
18+
(wrap, buffers, addr) => this._write(wrap, buffers, addr);
19+
this._handle.getsockname = (obj) => {
20+
Object.assign(obj, { address: '127.0.0.1', family: 'IPv4', port: 1337 });
21+
return 0;
22+
};
23+
24+
this.reading = false;
25+
this.bufferedReceived = [];
26+
this.emitBufferedImmediate = null;
27+
}
28+
29+
_emitBuffered = () => {
30+
if (!this.reading) return;
31+
if (this.bufferedReceived.length > 0) {
32+
this.emitReceived(this.bufferedReceived.shift());
33+
this.emitBufferedImmediate = setImmediate(this._emitBuffered);
34+
} else {
35+
this.emit('wantRead');
36+
}
37+
};
38+
39+
_startReading() {
40+
this.reading = true;
41+
this.emitBufferedImmediate = setImmediate(this._emitBuffered);
42+
}
43+
44+
_stopReading() {
45+
this.reading = false;
46+
clearImmediate(this.emitBufferedImmediate);
47+
}
48+
49+
_write(wrap, buffers, addr) {
50+
this.emit('send', { buffers, addr });
51+
setImmediate(() => this._handle.onSendDone(wrap, 0));
52+
}
53+
54+
afterBind() {
55+
this._handle.onAfterBind();
56+
}
57+
58+
emitReceived(info) {
59+
if (!this.reading) {
60+
this.bufferedReceived.push(info);
61+
return;
62+
}
63+
64+
const {
65+
buffers,
66+
addr: {
67+
family = 4,
68+
address = '127.0.0.1',
69+
port = 1337,
70+
},
71+
flags = 0
72+
} = info;
73+
74+
let familyInt;
75+
switch (family) {
76+
case 'IPv4': familyInt = 4; break;
77+
case 'IPv6': familyInt = 6; break;
78+
default: throw new Error('bad family');
79+
}
80+
81+
for (const buffer of buffers) {
82+
this._handle.emitReceived(buffer, familyInt, address, port, flags);
83+
}
84+
}
85+
}
86+
87+
function makeUDPPair() {
88+
const serverSide = new FakeUDPWrap();
89+
const clientSide = new FakeUDPWrap();
90+
91+
serverSide.on('send',
92+
(chk) => setImmediate(() => clientSide.emitReceived(chk)));
93+
clientSide.on('send',
94+
(chk) => setImmediate(() => serverSide.emitReceived(chk)));
95+
96+
return { serverSide, clientSide };
97+
}
98+
99+
module.exports = {
100+
FakeUDPWrap,
101+
makeUDPPair
102+
};

test/sequential/test-async-wrap-getasyncid.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ const { getSystemErrorName } = require('util');
4545
delete providers.STREAMPIPE;
4646
delete providers.MESSAGEPORT;
4747
delete providers.WORKER;
48+
delete providers.JSUDPWRAP;
4849
if (!common.isMainThread)
4950
delete providers.INSPECTORJSBINDING;
5051
delete providers.KEYPAIRGENREQUEST;

0 commit comments

Comments
 (0)