@@ -58,38 +58,9 @@ class Connection extends EventEmitter {
58
58
/* ignore errors, listen to `close` instead */
59
59
} ) ;
60
60
61
- stream . on ( 'close' , ( ) => {
62
- if ( this . closed ) {
63
- return ;
64
- }
65
-
66
- this . closed = true ;
67
- this [ kQueue ] . forEach ( op =>
68
- op . cb ( new MongoNetworkError ( `connection ${ this . id } to ${ this . address } closed` ) )
69
- ) ;
70
- this [ kQueue ] . clear ( ) ;
71
-
72
- this . emit ( 'close' ) ;
73
- } ) ;
74
-
75
- stream . on ( 'timeout' , ( ) => {
76
- if ( this . closed ) {
77
- return ;
78
- }
79
-
80
- stream . destroy ( ) ;
81
- this . closed = true ;
82
- this [ kQueue ] . forEach ( op =>
83
- op . cb (
84
- new MongoNetworkTimeoutError ( `connection ${ this . id } to ${ this . address } timed out` , {
85
- beforeHandshake : this [ kIsMaster ] == null
86
- } )
87
- )
88
- ) ;
89
-
90
- this [ kQueue ] . clear ( ) ;
91
- this . emit ( 'close' ) ;
92
- } ) ;
61
+ this [ kMessageStream ] . on ( 'error' , error => this . handleIssue ( { destroy : error } ) ) ;
62
+ stream . on ( 'close' , ( ) => this . handleIssue ( { isClose : true } ) ) ;
63
+ stream . on ( 'timeout' , ( ) => this . handleIssue ( { isTimeout : true , destroy : true } ) ) ;
93
64
94
65
// hook the message stream up to the passed in stream
95
66
stream . pipe ( this [ kMessageStream ] ) ;
@@ -132,6 +103,39 @@ class Connection extends EventEmitter {
132
103
this [ kLastUseTime ] = now ( ) ;
133
104
}
134
105
106
+ /**
107
+ * @param {{ isTimeout?: boolean; isClose?: boolean; destroy?: boolean | Error } } issue
108
+ */
109
+ handleIssue ( issue ) {
110
+ if ( this . closed ) {
111
+ return ;
112
+ }
113
+
114
+ if ( issue . destroy ) {
115
+ this [ kStream ] . destroy ( typeof issue . destroy === 'boolean' ? undefined : issue . destroy ) ;
116
+ }
117
+
118
+ this . closed = true ;
119
+
120
+ for ( const idAndOp of this [ kQueue ] ) {
121
+ const op = idAndOp [ 1 ] ;
122
+ if ( issue . isTimeout ) {
123
+ op . cb (
124
+ new MongoNetworkTimeoutError ( `connection ${ this . id } to ${ this . address } timed out` , {
125
+ beforeHandshake : ! ! this [ kIsMaster ]
126
+ } )
127
+ ) ;
128
+ } else if ( issue . isClose ) {
129
+ op . cb ( new MongoNetworkError ( `connection ${ this . id } to ${ this . address } closed` ) ) ;
130
+ } else {
131
+ op . cb ( typeof issue . destroy === 'boolean' ? undefined : issue . destroy ) ;
132
+ }
133
+ }
134
+
135
+ this [ kQueue ] . clear ( ) ;
136
+ this . emit ( 'close' ) ;
137
+ }
138
+
135
139
destroy ( options , callback ) {
136
140
if ( typeof options === 'function' ) {
137
141
callback = options ;
0 commit comments