diff --git a/.circleci/config.yml b/.circleci/config.yml index b2738bd34..8d4001762 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -184,7 +184,7 @@ workflows: nightly: triggers: - schedule: - cron: "3 * * * *" + cron: "41 * * * *" filters: branches: only: diff --git a/packages/graphql/lib/src/links/websocket_link/websocket_client.dart b/packages/graphql/lib/src/links/websocket_link/websocket_client.dart index ef0073f38..f7a60f34d 100644 --- a/packages/graphql/lib/src/links/websocket_link/websocket_client.dart +++ b/packages/graphql/lib/src/links/websocket_link/websocket_client.dart @@ -167,6 +167,9 @@ class SocketClient { @visibleForTesting WebSocketChannel socketChannel; + @visibleForTesting + Stream socketStream; + @visibleForTesting void Function(GraphQLSocketMessage) onMessage; @@ -184,19 +187,19 @@ class SocketClient { Response Function(Map) get parse => config.parser.parseResponse; - void disconnectOnKeepAliveTimeout(Stream messages) => - _keepAliveSubscription = - messages.whereType().timeout( - config.inactivityTimeout, - onTimeout: (EventSink event) { - print( - "Haven't received keep alive message for ${config.inactivityTimeout.inSeconds} seconds. Disconnecting..", - ); - event.close(); - socketChannel.sink.close(ws_status.goingAway); - _connectionStateController.add(SocketConnectionState.notConnected); - }, - ).listen(null); + void _disconnectOnKeepAliveTimeout(Stream messages) { + _keepAliveSubscription = messages.whereType().timeout( + config.inactivityTimeout, + onTimeout: (EventSink event) { + print( + "Haven't received keep alive message for ${config.inactivityTimeout.inSeconds} seconds. Disconnecting..", + ); + event.close(); + socketChannel.sink.close(ws_status.goingAway); + _connectionStateController.add(SocketConnectionState.notConnected); + }, + ).listen(null); + } /// Connects to the server. /// @@ -219,10 +222,13 @@ class SocketClient { print('Connected to websocket.'); _write(initOperation); - _messages = socketChannel.graphQLMessageStream; + socketStream = socketChannel.stream.asBroadcastStream(); + _messages = socketStream.map( + GraphQLSocketMessage.parse, + ); if (config.inactivityTimeout != null) { - disconnectOnKeepAliveTimeout(_messages); + _disconnectOnKeepAliveTimeout(_messages); } _messageSubscription = _messages.listen( @@ -449,14 +455,7 @@ class SocketClient { _connectionStateController.stream; } -extension GraphQLWebsocket on WebSocketChannel { - /// Multi-subscription stream of messages from the other endpoint. - /// GraphQLSocketMessage - /// - Stream get graphQLMessageStream => stream - .asBroadcastStream() - .map(GraphQLSocketMessage.parse); -} +extension GraphQLWebsocket on WebSocketChannel {} void _defaultOnStreamError(Object error, StackTrace st) { print('[SocketClient] message stream ecnountered error: $error\n' diff --git a/packages/graphql/test/websocket_test.dart b/packages/graphql/test/websocket_test.dart index 9527ab63e..4ea9cf121 100644 --- a/packages/graphql/test/websocket_test.dart +++ b/packages/graphql/test/websocket_test.dart @@ -167,7 +167,7 @@ void main() { .first; // ignore: unawaited_futures - socketClient.socketChannel.stream + socketClient.socketStream .where((message) => message == expectedMessage) .first .then((_) { @@ -205,14 +205,27 @@ void main() { final subscriptionDataStream = socketClient.subscribe(payload, waitForConnection); + await expectLater( + socketClient.connectionState, + emitsInOrder([ + SocketConnectionState.connecting, + SocketConnectionState.connected, + ]), + ); + socketClient.onConnectionLost(); - await socketClient.connectionState - .where((state) => state == SocketConnectionState.connected) - .first; + await expectLater( + socketClient.connectionState, + emitsInOrder([ + SocketConnectionState.notConnected, + SocketConnectionState.connecting, + SocketConnectionState.connected, + ]), + ); // ignore: unawaited_futures - socketClient.socketChannel.stream + socketClient.socketStream .where((message) => message == expectedMessage) .first .then((_) { @@ -265,7 +278,7 @@ void main() { .where((state) => state == SocketConnectionState.connected) .first; - await expectLater(socketClient.socketChannel.stream.map((s) { + await expectLater(socketClient.socketStream.map((s) { return jsonDecode(s)['payload']; }), emits(initPayload)); }); @@ -297,7 +310,7 @@ void main() { .where((state) => state == SocketConnectionState.connected) .first; - await expectLater(socketClient.socketChannel.stream.map((s) { + await expectLater(socketClient.socketStream.map((s) { return jsonDecode(s)['payload']; }), emits(initPayload)); });