Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Switch to callbacks from streams for ping handling (#137)
Browse files Browse the repository at this point in the history
* Switch to callbacks from streams for ping handling

* Debugging

* Revert "Debugging"

This reverts commit 910c1e1.

* Revert "Switch to callbacks from streams for ping handling"

This reverts commit c4f4730.

* Fix by using `hasListener`

* Rev version

* Rerun checks
  • Loading branch information
mosuem authored Jan 8, 2024
1 parent 6980f2c commit 1849a26
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 24 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.3.0

- Only send updates on frames and pings being received when there are listeners, as to not fill up memory.

## 2.2.0

- Transform headers to lowercase.
Expand Down
12 changes: 7 additions & 5 deletions lib/src/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ abstract class Connection {

final StreamController<int> _pingReceived = StreamController<int>();

final StreamController<void> _receivedFrame = StreamController<void>();
final StreamController<void> _frameReceived = StreamController<void>();

/// Future which completes when the first SETTINGS frame is received from
/// the peer.
Expand Down Expand Up @@ -183,7 +183,7 @@ abstract class Connection {
// Setup handlers.
_settingsHandler = SettingsHandler(_hpackContext.encoder, _frameWriter,
acknowledgedSettings, peerSettings);
_pingHandler = PingHandler(_frameWriter, _pingReceived.sink);
_pingHandler = PingHandler(_frameWriter, _pingReceived);

var settings = _decodeSettings(settingsObject);

Expand Down Expand Up @@ -344,7 +344,9 @@ abstract class Connection {
frame.decodedHeaders =
_hpackContext.decoder.decode(frame.headerBlockFragment);
}
_receivedFrame.add(null);
if (_frameReceived.hasListener) {
_frameReceived.add(null);
}

// Handle the frame as either a connection or a stream frame.
if (frame.header.streamId == 0) {
Expand Down Expand Up @@ -483,7 +485,7 @@ class ClientConnection extends Connection implements ClientTransportConnection {
Stream<int> get onPingReceived => _pingReceived.stream;

@override
Stream<void> get onFrameReceived => _receivedFrame.stream;
Stream<void> get onFrameReceived => _frameReceived.stream;
}

class ServerConnection extends Connection implements ServerTransportConnection {
Expand All @@ -505,5 +507,5 @@ class ServerConnection extends Connection implements ServerTransportConnection {
Stream<int> get onPingReceived => _pingReceived.stream;

@override
Stream<void> get onFrameReceived => _receivedFrame.stream;
Stream<void> get onFrameReceived => _frameReceived.stream;
}
16 changes: 11 additions & 5 deletions lib/src/ping/ping_handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@ class PingHandler extends Object with TerminatableMixin {
final FrameWriter _frameWriter;
final Map<int, Completer> _remainingPings = {};
final Sink<int>? pingReceived;
final bool Function() isListeningToPings;
int _nextId = 1;

PingHandler(this._frameWriter, [this.pingReceived]);
PingHandler(this._frameWriter, StreamController<int> pingStream)
: pingReceived = pingStream.sink,
isListeningToPings = (() => pingStream.hasListener);

@override
void onTerminated(Object? error) {
var values = _remainingPings.values.toList();
final remainingPings = _remainingPings.values.toList();
_remainingPings.clear();
for (var value in values) {
value.completeError(error ?? 'Unspecified error');
for (final ping in remainingPings) {
ping.completeError(
error ?? 'Remaining ping completed with unspecified error');
}
}

Expand All @@ -37,7 +41,9 @@ class PingHandler extends Object with TerminatableMixin {
}

if (!frame.hasAckFlag) {
pingReceived?.add(frame.opaqueData);
if (isListeningToPings()) {
pingReceived?.add(frame.opaqueData);
}
_frameWriter.writePingFrame(frame.opaqueData, ack: true);
} else {
var c = _remainingPings.remove(frame.opaqueData);
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: http2
version: 2.2.0
version: 2.3.0
description: A HTTP/2 implementation in Dart.
repository: https://github.com/dart-lang/http2

Expand Down
41 changes: 28 additions & 13 deletions test/src/ping/ping_handler_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ void main() {
group('ping-handler', () {
test('successful-ping', () async {
var writer = FrameWriterMock();
var pingHandler = PingHandler(writer);
var pingHandler = instantiateHandler(writer);

var p1 = pingHandler.ping();
var p2 = pingHandler.ping();
Expand All @@ -37,7 +37,7 @@ void main() {

test('successful-ack-to-remote-ping', () async {
var writer = FrameWriterMock();
var pingHandler = PingHandler(writer);
var pingHandler = instantiateHandler(writer);

var header = FrameHeader(8, FrameType.PING, 0, 0);
pingHandler.processPingFrame(PingFrame(header, 1));
Expand All @@ -53,7 +53,7 @@ void main() {

test('ping-unknown-opaque-data', () async {
var writer = FrameWriterMock();
var pingHandler = PingHandler(writer);
var pingHandler = instantiateHandler(writer);

var future = pingHandler.ping();
verify(writer.writePingFrame(1)).called(1);
Expand All @@ -73,7 +73,7 @@ void main() {

test('terminate-ping-handler', () async {
var writer = FrameWriterMock();
var pingHandler = PingHandler(writer);
var pingHandler = instantiateHandler(writer);

pingHandler.terminate('hello world');
expect(
Expand All @@ -86,7 +86,7 @@ void main() {

test('ping-non-zero-stream-id', () async {
var writer = FrameWriterMock();
var pingHandler = PingHandler(writer);
var pingHandler = instantiateHandler(writer);

var header = FrameHeader(8, FrameType.PING, PingFrame.FLAG_ACK, 1);
expect(() => pingHandler.processPingFrame(PingFrame(header, 1)),
Expand All @@ -95,17 +95,32 @@ void main() {
});

test('receiving-ping-calls-stream', () async {
var writer = FrameWriterMock();
var streamController = StreamController<int>();
var pingHandler = PingHandler(writer, streamController.sink);
final pings = <int>[];

var header = FrameHeader(8, FrameType.PING, 0, 0);
pingHandler.processPingFrame(PingFrame(header, 1));
var header2 = FrameHeader(8, FrameType.PING, 0, 0);
pingHandler.processPingFrame(PingFrame(header2, 2));
await expectLater(streamController.stream, emitsInOrder([1, 2]));
final writer = FrameWriterMock();
final pingStream = StreamController<int>()
..stream.listen((event) => pings.add(event));

PingHandler(writer, pingStream)
..processPingFrame(PingFrame(
FrameHeader(8, FrameType.PING, 0, 0),
1,
))
..processPingFrame(PingFrame(
FrameHeader(8, FrameType.PING, 0, 0),
2,
));

await pingStream.close();

expect(pings, [1, 2]);
});
});
}

PingHandler instantiateHandler(FrameWriterMock writer) {
StreamController<int> controller = StreamController();
return PingHandler(writer, controller);
}

class FrameWriterMock extends Mock implements FrameWriter {}

0 comments on commit 1849a26

Please # to comment.