Skip to content

Commit 8fd8f1d

Browse files
clshortfuseMylesBorins
authored andcommitted
http2: use and support non-empty DATA frame with END_STREAM flag
Adds support for reading from a stream where the final frame is a non-empty DATA frame with the END_STREAM flag set, instead of hanging waiting for another frame. When writing to a stream, uses a END_STREAM flag on final DATA frame instead of adding an empty DATA frame. BREAKING: http2 client now expects servers to properly support END_STREAM flag Fixes: #31309 Fixes: #33891 Refs: https://nghttp2.org/documentation/types.html#c.nghttp2_on_data_chunk_recv_callback
1 parent a16f0f4 commit 8fd8f1d

6 files changed

+191
-47
lines changed

lib/internal/http2/core.js

+82-22
Original file line numberDiff line numberDiff line change
@@ -1136,6 +1136,7 @@ class Http2Session extends EventEmitter {
11361136
streams: new Map(),
11371137
pendingStreams: new Set(),
11381138
pendingAck: 0,
1139+
shutdownWritableCalled: false,
11391140
writeQueueSize: 0,
11401141
originSet: undefined
11411142
};
@@ -1702,6 +1703,26 @@ function afterShutdown(status) {
17021703
stream[kMaybeDestroy]();
17031704
}
17041705

1706+
function shutdownWritable(callback) {
1707+
const handle = this[kHandle];
1708+
if (!handle) return callback();
1709+
const state = this[kState];
1710+
if (state.shutdownWritableCalled) {
1711+
// Backport v14.x: Session required for debugging stream object
1712+
// debugStreamObj(this, 'shutdownWritable() already called');
1713+
return callback();
1714+
}
1715+
state.shutdownWritableCalled = true;
1716+
1717+
const req = new ShutdownWrap();
1718+
req.oncomplete = afterShutdown;
1719+
req.callback = callback;
1720+
req.handle = handle;
1721+
const err = handle.shutdown(req);
1722+
if (err === 1) // synchronous finish
1723+
return afterShutdown.call(req, 0);
1724+
}
1725+
17051726
function finishSendTrailers(stream, headersList) {
17061727
// The stream might be destroyed and in that case
17071728
// there is nothing to do.
@@ -1962,10 +1983,48 @@ class Http2Stream extends Duplex {
19621983

19631984
let req;
19641985

1986+
let waitingForWriteCallback = true;
1987+
let waitingForEndCheck = true;
1988+
let writeCallbackErr;
1989+
let endCheckCallbackErr;
1990+
const done = () => {
1991+
if (waitingForEndCheck || waitingForWriteCallback) return;
1992+
const err = writeCallbackErr || endCheckCallbackErr;
1993+
// writeGeneric does not destroy on error and
1994+
// we cannot enable autoDestroy,
1995+
// so make sure to destroy on error.
1996+
if (err) {
1997+
this.destroy(err);
1998+
}
1999+
cb(err);
2000+
};
2001+
const writeCallback = (err) => {
2002+
waitingForWriteCallback = false;
2003+
writeCallbackErr = err;
2004+
done();
2005+
};
2006+
const endCheckCallback = (err) => {
2007+
waitingForEndCheck = false;
2008+
endCheckCallbackErr = err;
2009+
done();
2010+
};
2011+
// Shutdown write stream right after last chunk is sent
2012+
// so final DATA frame can include END_STREAM flag
2013+
process.nextTick(() => {
2014+
if (writeCallbackErr ||
2015+
!this._writableState.ending ||
2016+
this._writableState.buffered.length ||
2017+
(this[kState].flags & STREAM_FLAGS_HAS_TRAILERS))
2018+
return endCheckCallback();
2019+
// Backport v14.x: Session required for debugging stream object
2020+
// debugStreamObj(this, 'shutting down writable on last write');
2021+
shutdownWritable.call(this, endCheckCallback);
2022+
});
2023+
19652024
if (writev)
1966-
req = writevGeneric(this, data, cb);
2025+
req = writevGeneric(this, data, writeCallback);
19672026
else
1968-
req = writeGeneric(this, data, encoding, cb);
2027+
req = writeGeneric(this, data, encoding, writeCallback);
19692028

19702029
trackWriteState(this, req.bytes);
19712030
}
@@ -1979,21 +2038,13 @@ class Http2Stream extends Duplex {
19792038
}
19802039

19812040
_final(cb) {
1982-
const handle = this[kHandle];
19832041
if (this.pending) {
19842042
this.once('ready', () => this._final(cb));
1985-
} else if (handle !== undefined) {
1986-
debugStreamObj(this, '_final shutting down');
1987-
const req = new ShutdownWrap();
1988-
req.oncomplete = afterShutdown;
1989-
req.callback = cb;
1990-
req.handle = handle;
1991-
const err = handle.shutdown(req);
1992-
if (err === 1) // synchronous finish
1993-
return afterShutdown.call(req, 0);
1994-
} else {
1995-
cb();
2043+
return;
19962044
}
2045+
// Backport v14.x: Session required for debugging stream object
2046+
// debugStreamObj(this, 'shutting down writable on _final');
2047+
shutdownWritable.call(this, cb);
19972048
}
19982049

19992050
_read(nread) {
@@ -2098,11 +2149,20 @@ class Http2Stream extends Duplex {
20982149
debugStream(this[kID] || 'pending', session[kType], 'destroying stream');
20992150

21002151
const state = this[kState];
2101-
const sessionCode = session[kState].goawayCode ||
2102-
session[kState].destroyCode;
2103-
const code = err != null ?
2104-
sessionCode || NGHTTP2_INTERNAL_ERROR :
2105-
state.rstCode || sessionCode;
2152+
const sessionState = session[kState];
2153+
const sessionCode = sessionState.goawayCode || sessionState.destroyCode;
2154+
2155+
// If a stream has already closed successfully, there is no error
2156+
// to report from this stream, even if the session has errored.
2157+
// This can happen if the stream was already in process of destroying
2158+
// after a successful close, but the session had a error between
2159+
// this stream's close and destroy operations.
2160+
// Previously, this always overrode a successful close operation code
2161+
// NGHTTP2_NO_ERROR (0) with sessionCode because the use of the || operator.
2162+
const code = (err != null ?
2163+
(sessionCode || NGHTTP2_INTERNAL_ERROR) :
2164+
(this.closed ? this.rstCode : sessionCode)
2165+
);
21062166
const hasHandle = handle !== undefined;
21072167

21082168
if (!this.closed)
@@ -2111,13 +2171,13 @@ class Http2Stream extends Duplex {
21112171

21122172
if (hasHandle) {
21132173
handle.destroy();
2114-
session[kState].streams.delete(id);
2174+
sessionState.streams.delete(id);
21152175
} else {
2116-
session[kState].pendingStreams.delete(this);
2176+
sessionState.pendingStreams.delete(this);
21172177
}
21182178

21192179
// Adjust the write queue size for accounting
2120-
session[kState].writeQueueSize -= state.writeQueueSize;
2180+
sessionState.writeQueueSize -= state.writeQueueSize;
21212181
state.writeQueueSize = 0;
21222182

21232183
// RST code 8 not emitted as an error as its used by clients to signify

src/node_http2.cc

+7-6
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,7 @@ ssize_t Http2Session::OnMaxFrameSizePadding(size_t frameLen,
732732
// quite expensive. This is a potential performance optimization target later.
733733
ssize_t Http2Session::ConsumeHTTP2Data() {
734734
CHECK_NOT_NULL(stream_buf_.base);
735-
CHECK_LT(stream_buf_offset_, stream_buf_.len);
735+
CHECK_LE(stream_buf_offset_, stream_buf_.len);
736736
size_t read_len = stream_buf_.len - stream_buf_offset_;
737737

738738
// multiple side effects.
@@ -753,11 +753,11 @@ ssize_t Http2Session::ConsumeHTTP2Data() {
753753
CHECK_GT(ret, 0);
754754
CHECK_LE(static_cast<size_t>(ret), read_len);
755755

756-
if (static_cast<size_t>(ret) < read_len) {
757-
// Mark the remainder of the data as available for later consumption.
758-
stream_buf_offset_ += ret;
759-
return ret;
760-
}
756+
// Mark the remainder of the data as available for later consumption.
757+
// Even if all bytes were received, a paused stream may delay the
758+
// nghttp2_on_frame_recv_callback which may have an END_STREAM flag.
759+
stream_buf_offset_ += ret;
760+
return ret;
761761
}
762762

763763
// We are done processing the current input chunk.
@@ -1093,6 +1093,7 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
10931093
if (session->is_write_in_progress()) {
10941094
CHECK(session->is_reading_stopped());
10951095
session->set_receive_paused();
1096+
Debug(session, "receive paused");
10961097
return NGHTTP2_ERR_PAUSE;
10971098
}
10981099

test/parallel/test-http2-misbehaving-multiplex.js

+39-17
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Flags: --expose-internals
33

44
const common = require('../common');
5+
const assert = require('assert');
56

67
if (!common.hasCrypto)
78
common.skip('missing crypto');
@@ -13,16 +14,36 @@ const h2test = require('../common/http2');
1314
let client;
1415

1516
const server = h2.createServer();
17+
let gotFirstStreamId1;
1618
server.on('stream', common.mustCall((stream) => {
1719
stream.respond();
1820
stream.end('ok');
1921

20-
// The error will be emitted asynchronously
21-
stream.on('error', common.expectsError({
22-
constructor: NghttpError,
23-
code: 'ERR_HTTP2_ERROR',
24-
message: 'Stream was already closed or invalid'
25-
}));
22+
// Http2Server should be fast enough to respond to and close
23+
// the first streams with ID 1 and ID 3 without errors.
24+
25+
// Test for errors in 'close' event to ensure no errors on some streams.
26+
stream.on('error', () => {});
27+
stream.on('close', (err) => {
28+
if (stream.id === 1) {
29+
if (gotFirstStreamId1) {
30+
// We expect our outgoing frames to fail on Stream ID 1 the second time
31+
// because a stream with ID 1 was already closed before.
32+
common.expectsError({
33+
constructor: NghttpError,
34+
code: 'ERR_HTTP2_ERROR',
35+
message: 'Stream was already closed or invalid'
36+
});
37+
return;
38+
}
39+
gotFirstStreamId1 = true;
40+
}
41+
assert.strictEqual(err, undefined);
42+
});
43+
44+
// Stream ID 5 should never reach the server
45+
assert.notStrictEqual(stream.id, 5);
46+
2647
}, 2));
2748

2849
server.on('session', common.mustCall((session) => {
@@ -35,26 +56,27 @@ server.on('session', common.mustCall((session) => {
3556

3657
const settings = new h2test.SettingsFrame();
3758
const settingsAck = new h2test.SettingsFrame(true);
38-
const head1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
39-
const head2 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
40-
const head3 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
41-
const head4 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);
59+
// HeadersFrame(id, payload, padding, END_STREAM)
60+
const id1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
61+
const id3 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
62+
const id5 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);
4263

4364
server.listen(0, () => {
4465
client = net.connect(server.address().port, () => {
4566
client.write(h2test.kClientMagic, () => {
4667
client.write(settings.data, () => {
4768
client.write(settingsAck.data);
48-
// This will make it ok.
49-
client.write(head1.data, () => {
50-
// This will make it ok.
51-
client.write(head2.data, () => {
69+
// Stream ID 1 frame will make it OK.
70+
client.write(id1.data, () => {
71+
// Stream ID 3 frame will make it OK.
72+
client.write(id3.data, () => {
73+
// A second Stream ID 1 frame should fail.
5274
// This will cause an error to occur because the client is
5375
// attempting to reuse an already closed stream. This must
5476
// cause the server session to be torn down.
55-
client.write(head3.data, () => {
56-
// This won't ever make it to the server
57-
client.write(head4.data);
77+
client.write(id1.data, () => {
78+
// This Stream ID 5 frame will never make it to the server
79+
client.write(id5.data);
5880
});
5981
});
6082
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
if (!common.hasCrypto)
5+
common.skip('missing crypto');
6+
const assert = require('assert');
7+
const http2 = require('http2');
8+
9+
const { PerformanceObserver } = require('perf_hooks');
10+
11+
const server = http2.createServer();
12+
13+
server.on('stream', (stream, headers) => {
14+
stream.respond({
15+
'content-type': 'text/html',
16+
':status': 200
17+
});
18+
switch (headers[':path']) {
19+
case '/singleEnd':
20+
stream.end('OK');
21+
break;
22+
case '/sequentialEnd':
23+
stream.write('OK');
24+
stream.end();
25+
break;
26+
case '/delayedEnd':
27+
stream.write('OK', () => stream.end());
28+
break;
29+
}
30+
});
31+
32+
function testRequest(path, targetFrameCount, callback) {
33+
const obs = new PerformanceObserver((list, observer) => {
34+
const entry = list.getEntries()[0];
35+
if (entry.name !== 'Http2Session') return;
36+
if (entry.type !== 'client') return;
37+
assert.strictEqual(entry.framesReceived, targetFrameCount);
38+
observer.disconnect();
39+
callback();
40+
});
41+
obs.observe({ entryTypes: ['http2'] });
42+
const client = http2.connect(`http://localhost:${server.address().port}`, () => {
43+
const req = client.request({ ':path': path });
44+
req.resume();
45+
req.end();
46+
req.on('end', () => client.close());
47+
});
48+
}
49+
50+
// SETTINGS => SETTINGS => HEADERS => DATA
51+
const MIN_FRAME_COUNT = 4;
52+
53+
server.listen(0, () => {
54+
testRequest('/singleEnd', MIN_FRAME_COUNT, () => {
55+
testRequest('/sequentialEnd', MIN_FRAME_COUNT, () => {
56+
testRequest('/delayedEnd', MIN_FRAME_COUNT + 1, () => {
57+
server.close();
58+
});
59+
});
60+
});
61+
});

test/parallel/test-http2-padding-aligned.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ const makeDuplexPair = require('../common/duplexpair');
2626
// The lengths of the expected writes... note that this is highly
2727
// sensitive to how the internals are implemented.
2828
const serverLengths = [24, 9, 9, 32];
29-
const clientLengths = [9, 9, 48, 9, 1, 21, 1, 16];
29+
const clientLengths = [9, 9, 48, 9, 1, 21, 1];
3030

3131
// Adjust for the 24-byte preamble and two 9-byte settings frames, and
3232
// the result must be equally divisible by 8

test/parallel/test-http2-perf_hooks.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const obs = new PerformanceObserver(common.mustCall((items) => {
3030
break;
3131
case 'client':
3232
assert.strictEqual(entry.streamCount, 1);
33-
assert.strictEqual(entry.framesReceived, 8);
33+
assert.strictEqual(entry.framesReceived, 7);
3434
break;
3535
default:
3636
assert.fail('invalid Http2Session type');

0 commit comments

Comments
 (0)