@@ -86,7 +86,9 @@ class SocketClient {
86
86
///
87
87
/// If this instance is disposed, this method does nothing.
88
88
Future <void > _connect () async {
89
- if (_connectionStateController.isClosed) return ;
89
+ if (_connectionStateController.isClosed) {
90
+ return ;
91
+ }
90
92
91
93
_connectionStateController.value = SocketConnectionState .CONNECTING ;
92
94
print ('Connecting to websocket: $url ...' );
@@ -105,15 +107,17 @@ class SocketClient {
105
107
.map <GraphQLSocketMessage >(_parseSocketMessage);
106
108
107
109
if (config.inactivityTimeout != null ) {
108
- _keepAliveSubscription = _connectionKeepAlive
109
- .timeout (config.inactivityTimeout, onTimeout: (event) {
110
- print (
111
- "Haven't received keep alive message for ${config .inactivityTimeout .inSeconds } seconds. Disconnecting.." );
112
- event.close ();
113
- _socket.close (WebSocketStatus .goingAway);
114
- _connectionStateController.value =
115
- SocketConnectionState .NOT_CONNECTED ;
116
- }).listen (null );
110
+ _keepAliveSubscription = _connectionKeepAlive.timeout (
111
+ config.inactivityTimeout,
112
+ onTimeout: (EventSink <ConnectionKeepAlive > event) {
113
+ print (
114
+ "Haven't received keep alive message for ${config .inactivityTimeout .inSeconds } seconds. Disconnecting.." );
115
+ event.close ();
116
+ _socket.close (WebSocketStatus .goingAway);
117
+ _connectionStateController.value =
118
+ SocketConnectionState .NOT_CONNECTED ;
119
+ },
120
+ ).listen (null );
117
121
}
118
122
119
123
_messageSubscription = _messageStream.listen (
@@ -139,7 +143,9 @@ class SocketClient {
139
143
_keepAliveSubscription? .cancel ();
140
144
_messageSubscription? .cancel ();
141
145
142
- if (_connectionStateController.isClosed) return ;
146
+ if (_connectionStateController.isClosed) {
147
+ return ;
148
+ }
143
149
144
150
if (_connectionStateController.value != SocketConnectionState .NOT_CONNECTED )
145
151
_connectionStateController.value = SocketConnectionState .NOT_CONNECTED ;
@@ -149,9 +155,12 @@ class SocketClient {
149
155
print (
150
156
'Scheduling to connect in ${config .delayBetweenReconnectionAttempts .inSeconds } seconds...' );
151
157
152
- _reconnectTimer = Timer (config.delayBetweenReconnectionAttempts, () {
153
- _connect ();
154
- });
158
+ _reconnectTimer = Timer (
159
+ config.delayBetweenReconnectionAttempts,
160
+ () {
161
+ _connect ();
162
+ },
163
+ );
155
164
} else {
156
165
Timer .run (() => _connect ());
157
166
}
@@ -238,35 +247,51 @@ class SocketClient {
238
247
.take (1 );
239
248
240
249
final Observable <SocketConnectionState > waitForConnectedState = addTimeout
241
- ? waitForConnectedStateWithoutTimeout
242
- .timeout (config.queryAndMutationTimeout, onTimeout: (e) {
243
- print ('Connection timed out.' );
244
- response.addError (TimeoutException ('Connection timed out.' ));
245
- e.close ();
246
- response.close ();
247
- })
250
+ ? waitForConnectedStateWithoutTimeout.timeout (
251
+ config.queryAndMutationTimeout,
252
+ onTimeout: (EventSink <SocketConnectionState > event) {
253
+ print ('Connection timed out.' );
254
+ response.addError (TimeoutException ('Connection timed out.' ));
255
+ event.close ();
256
+ response.close ();
257
+ },
258
+ )
248
259
: waitForConnectedStateWithoutTimeout;
249
260
250
261
sub = waitForConnectedState.listen ((_) {
251
262
final Stream <GraphQLSocketMessage > dataErrorComplete =
252
- _messageStream.where ((GraphQLSocketMessage message) {
253
- if (message is SubscriptionData ) return message.id == id;
254
- if (message is SubscriptionError ) return message.id == id;
255
- if (message is SubscriptionComplete ) return message.id == id;
256
- return false ;
257
- }).takeWhile ((_) => ! response.isClosed);
263
+ _messageStream.where (
264
+ (GraphQLSocketMessage message) {
265
+ if (message is SubscriptionData ) {
266
+ return message.id == id;
267
+ }
268
+
269
+ if (message is SubscriptionError ) {
270
+ return message.id == id;
271
+ }
272
+
273
+ if (message is SubscriptionComplete ) {
274
+ return message.id == id;
275
+ }
276
+
277
+ return false ;
278
+ },
279
+ ).takeWhile ((_) => ! response.isClosed);
258
280
259
281
final Stream <GraphQLSocketMessage > subscriptionComplete = addTimeout
260
282
? dataErrorComplete
261
283
.where ((GraphQLSocketMessage message) =>
262
284
message is SubscriptionComplete )
263
285
.take (1 )
264
- .timeout (config.queryAndMutationTimeout, onTimeout: (e) {
265
- print ('Request timed out.' );
266
- response.addError (TimeoutException ('Request timed out.' ));
267
- e.close ();
268
- response.close ();
269
- })
286
+ .timeout (
287
+ config.queryAndMutationTimeout,
288
+ onTimeout: (EventSink <GraphQLSocketMessage > event) {
289
+ print ('Request timed out.' );
290
+ response.addError (TimeoutException ('Request timed out.' ));
291
+ event.close ();
292
+ response.close ();
293
+ },
294
+ )
270
295
: dataErrorComplete
271
296
.where ((GraphQLSocketMessage message) =>
272
297
message is SubscriptionComplete )
0 commit comments