@@ -25,7 +25,7 @@ import {
25
25
import type { Topology } from '../sdam/topology' ;
26
26
import type { ClientSession } from '../sessions' ;
27
27
import { TimeoutContext } from '../timeout' ;
28
- import { squashError , supportsRetryableWrites } from '../utils' ;
28
+ import { supportsRetryableWrites } from '../utils' ;
29
29
import { AbstractOperation , Aspect } from './operation' ;
30
30
31
31
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES . IllegalOperation ;
@@ -46,10 +46,9 @@ type ResultTypeFromOperation<TOperation> = TOperation extends AbstractOperation<
46
46
* not provided.
47
47
*
48
48
* The expectation is that this function:
49
- * - Connects the MongoClient if it has not already been connected
49
+ * - Connects the MongoClient if it has not already been connected, see { @link autoConnect}
50
50
* - Creates a session if none is provided and cleans up the session it creates
51
- * - Selects a server based on readPreference or various factors
52
- * - Retries an operation if it fails for certain errors, see {@link retryOperation}
51
+ * - Tries an operation and retries under certain conditions, see {@link tryOperation}
53
52
*
54
53
* @typeParam T - The operation's type
55
54
* @typeParam TResult - The type of the operation's result, calculated from T
@@ -66,23 +65,7 @@ export async function executeOperation<
66
65
throw new MongoRuntimeError ( 'This method requires a valid operation instance' ) ;
67
66
}
68
67
69
- if ( client . topology == null ) {
70
- // Auto connect on operation
71
- if ( client . s . hasBeenClosed ) {
72
- throw new MongoNotConnectedError ( 'Client must be connected before running operations' ) ;
73
- }
74
- client . s . options [ Symbol . for ( '@@mdb.skipPingOnConnect' ) ] = true ;
75
- try {
76
- await client . connect ( ) ;
77
- } finally {
78
- delete client . s . options [ Symbol . for ( '@@mdb.skipPingOnConnect' ) ] ;
79
- }
80
- }
81
-
82
- const { topology } = client ;
83
- if ( topology == null ) {
84
- throw new MongoRuntimeError ( 'client.connect did not create a topology but also did not throw' ) ;
85
- }
68
+ const topology = await autoConnect ( client ) ;
86
69
87
70
// The driver sessions spec mandates that we implicitly create sessions for operations
88
71
// that are not explicitly provided with a session.
@@ -105,17 +88,10 @@ export async function executeOperation<
105
88
) ;
106
89
}
107
90
108
- timeoutContext ??= TimeoutContext . create ( {
109
- serverSelectionTimeoutMS : client . s . options . serverSelectionTimeoutMS ,
110
- waitQueueTimeoutMS : client . s . options . waitQueueTimeoutMS ,
111
- timeoutMS : operation . options . timeoutMS
112
- } ) ;
113
-
114
91
const readPreference = operation . readPreference ?? ReadPreference . primary ;
115
92
const inTransaction = ! ! session ?. inTransaction ( ) ;
116
93
117
94
const hasReadAspect = operation . hasAspect ( Aspect . READ_OPERATION ) ;
118
- const hasWriteAspect = operation . hasAspect ( Aspect . WRITE_OPERATION ) ;
119
95
120
96
if (
121
97
inTransaction &&
@@ -131,6 +107,84 @@ export async function executeOperation<
131
107
session . unpin ( ) ;
132
108
}
133
109
110
+ timeoutContext ??= TimeoutContext . create ( {
111
+ serverSelectionTimeoutMS : client . s . options . serverSelectionTimeoutMS ,
112
+ waitQueueTimeoutMS : client . s . options . waitQueueTimeoutMS ,
113
+ timeoutMS : operation . options . timeoutMS
114
+ } ) ;
115
+
116
+ try {
117
+ return await tryOperation ( operation , {
118
+ topology,
119
+ timeoutContext,
120
+ session,
121
+ readPreference
122
+ } ) ;
123
+ } finally {
124
+ if ( session ?. owner != null && session . owner === owner ) {
125
+ await session . endSession ( ) ;
126
+ }
127
+ }
128
+ }
129
+
130
+ /**
131
+ * Connects a client if it has not yet been connected
132
+ * @internal
133
+ */
134
+ async function autoConnect ( client : MongoClient ) : Promise < Topology > {
135
+ if ( client . topology == null ) {
136
+ if ( client . s . hasBeenClosed ) {
137
+ throw new MongoNotConnectedError ( 'Client must be connected before running operations' ) ;
138
+ }
139
+ client . s . options [ Symbol . for ( '@@mdb.skipPingOnConnect' ) ] = true ;
140
+ try {
141
+ await client . connect ( ) ;
142
+ if ( client . topology == null ) {
143
+ throw new MongoRuntimeError (
144
+ 'client.connect did not create a topology but also did not throw'
145
+ ) ;
146
+ }
147
+ return client . topology ;
148
+ } finally {
149
+ delete client . s . options [ Symbol . for ( '@@mdb.skipPingOnConnect' ) ] ;
150
+ }
151
+ }
152
+ return client . topology ;
153
+ }
154
+
155
+ /** @internal */
156
+ type RetryOptions = {
157
+ session : ClientSession | undefined ;
158
+ readPreference : ReadPreference ;
159
+ topology : Topology ;
160
+ timeoutContext : TimeoutContext ;
161
+ } ;
162
+
163
+ /**
164
+ * Executes an operation and retries as appropriate
165
+ * @internal
166
+ *
167
+ * @remarks
168
+ * Implements behaviour described in [Retryable Reads](https://github.com/mongodb/specifications/blob/master/source/retryable-reads/retryable-reads.md) and [Retryable
169
+ * Writes](https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.md) specification
170
+ *
171
+ * This function:
172
+ * - performs initial server selection
173
+ * - attempts to execute an operation
174
+ * - retries the operation if it meets the criteria for a retryable read or a retryable write
175
+ *
176
+ * @typeParam T - The operation's type
177
+ * @typeParam TResult - The type of the operation's result, calculated from T
178
+ *
179
+ * @param operation - The operation to execute
180
+ * */
181
+ async function tryOperation <
182
+ T extends AbstractOperation < TResult > ,
183
+ TResult = ResultTypeFromOperation < T >
184
+ > (
185
+ operation : T ,
186
+ { topology, timeoutContext, session, readPreference } : RetryOptions
187
+ ) : Promise < TResult > {
134
188
let selector : ReadPreference | ServerSelector ;
135
189
136
190
if ( operation . hasAspect ( Aspect . MUST_SELECT_SAME_SERVER ) ) {
@@ -146,31 +200,15 @@ export async function executeOperation<
146
200
selector = readPreference ;
147
201
}
148
202
149
- const server = await topology . selectServer ( selector , {
203
+ let server = await topology . selectServer ( selector , {
150
204
session,
151
205
operationName : operation . commandName ,
152
206
timeoutContext
153
207
} ) ;
154
208
155
- if ( session == null ) {
156
- // No session also means it is not retryable, early exit
157
- return await operation . execute ( server , undefined , timeoutContext ) ;
158
- }
159
-
160
- if ( ! operation . hasAspect ( Aspect . RETRYABLE ) ) {
161
- // non-retryable operation, early exit
162
- try {
163
- return await operation . execute ( server , session , timeoutContext ) ;
164
- } finally {
165
- if ( session ?. owner != null && session . owner === owner ) {
166
- try {
167
- await session . endSession ( ) ;
168
- } catch ( error ) {
169
- squashError ( error ) ;
170
- }
171
- }
172
- }
173
- }
209
+ const hasReadAspect = operation . hasAspect ( Aspect . READ_OPERATION ) ;
210
+ const hasWriteAspect = operation . hasAspect ( Aspect . WRITE_OPERATION ) ;
211
+ const inTransaction = session ?. inTransaction ( ) ?? false ;
174
212
175
213
const willRetryRead = topology . s . options . retryReads && ! inTransaction && operation . canRetryRead ;
176
214
@@ -180,108 +218,76 @@ export async function executeOperation<
180
218
supportsRetryableWrites ( server ) &&
181
219
operation . canRetryWrite ;
182
220
183
- const willRetry = ( hasReadAspect && willRetryRead ) || ( hasWriteAspect && willRetryWrite ) ;
221
+ const willRetry =
222
+ operation . hasAspect ( Aspect . RETRYABLE ) &&
223
+ session != null &&
224
+ ( ( hasReadAspect && willRetryRead ) || ( hasWriteAspect && willRetryWrite ) ) ;
184
225
185
- if ( hasWriteAspect && willRetryWrite ) {
226
+ if ( hasWriteAspect && willRetryWrite && session != null ) {
186
227
operation . options . willRetryWrite = true ;
187
228
session . incrementTransactionNumber ( ) ;
188
229
}
189
230
190
- try {
191
- return await operation . execute ( server , session , timeoutContext ) ;
192
- } catch ( operationError ) {
193
- if ( willRetry && operationError instanceof MongoError ) {
194
- return await retryOperation ( operation , operationError , {
195
- session,
196
- topology,
197
- selector,
198
- previousServer : server . description ,
199
- timeoutContext
200
- } ) ;
201
- }
202
- throw operationError ;
203
- } finally {
204
- if ( session ?. owner != null && session . owner === owner ) {
205
- try {
206
- await session . endSession ( ) ;
207
- } catch ( error ) {
208
- squashError ( error ) ;
209
- }
210
- }
211
- }
212
- }
231
+ // TODO(NODE-6231): implement infinite retry within CSOT timeout here
232
+ const maxTries = willRetry ? 2 : 1 ;
233
+ let previousOperationError : MongoError | undefined ;
234
+ let previousServer : ServerDescription | undefined ;
213
235
214
- /** @internal */
215
- type RetryOptions = {
216
- session : ClientSession ;
217
- topology : Topology ;
218
- selector : ReadPreference | ServerSelector ;
219
- previousServer : ServerDescription ;
220
- timeoutContext : TimeoutContext ;
221
- } ;
236
+ // TODO(NODE-6231): implement infinite retry within CSOT timeout here
237
+ for ( let tries = 0 ; tries < maxTries ; tries ++ ) {
238
+ if ( previousOperationError ) {
239
+ if ( hasWriteAspect && previousOperationError . code === MMAPv1_RETRY_WRITES_ERROR_CODE ) {
240
+ throw new MongoServerError ( {
241
+ message : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
242
+ errmsg : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
243
+ originalError : previousOperationError
244
+ } ) ;
245
+ }
222
246
223
- async function retryOperation <
224
- T extends AbstractOperation < TResult > ,
225
- TResult = ResultTypeFromOperation < T >
226
- > (
227
- operation : T ,
228
- originalError : MongoError ,
229
- { session, topology, selector, previousServer, timeoutContext } : RetryOptions
230
- ) : Promise < TResult > {
231
- const isWriteOperation = operation . hasAspect ( Aspect . WRITE_OPERATION ) ;
232
- const isReadOperation = operation . hasAspect ( Aspect . READ_OPERATION ) ;
233
-
234
- if ( isWriteOperation && originalError . code === MMAPv1_RETRY_WRITES_ERROR_CODE ) {
235
- throw new MongoServerError ( {
236
- message : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
237
- errmsg : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
238
- originalError
239
- } ) ;
240
- }
247
+ if ( hasWriteAspect && ! isRetryableWriteError ( previousOperationError ) )
248
+ throw previousOperationError ;
241
249
242
- if ( isWriteOperation && ! isRetryableWriteError ( originalError ) ) {
243
- throw originalError ;
244
- }
250
+ if ( hasReadAspect && ! isRetryableReadError ( previousOperationError ) )
251
+ throw previousOperationError ;
245
252
246
- if ( isReadOperation && ! isRetryableReadError ( originalError ) ) {
247
- throw originalError ;
248
- }
253
+ if (
254
+ previousOperationError instanceof MongoNetworkError &&
255
+ operation . hasAspect ( Aspect . CURSOR_CREATING ) &&
256
+ session != null &&
257
+ session . isPinned &&
258
+ ! session . inTransaction ( )
259
+ ) {
260
+ session . unpin ( { force : true , forceClear : true } ) ;
261
+ }
249
262
250
- if (
251
- originalError instanceof MongoNetworkError &&
252
- session . isPinned &&
253
- ! session . inTransaction ( ) &&
254
- operation . hasAspect ( Aspect . CURSOR_CREATING )
255
- ) {
256
- // If we have a cursor and the initial command fails with a network error,
257
- // we can retry it on another connection. So we need to check it back in, clear the
258
- // pool for the service id, and retry again.
259
- session . unpin ( { force : true , forceClear : true } ) ;
260
- }
263
+ server = await topology . selectServer ( selector , {
264
+ session,
265
+ operationName : operation . commandName ,
266
+ previousServer
267
+ } ) ;
261
268
262
- // select a new server, and attempt to retry the operation
263
- const server = await topology . selectServer ( selector , {
264
- session,
265
- operationName : operation . commandName ,
266
- previousServer,
267
- timeoutContext
268
- } ) ;
269
+ if ( hasWriteAspect && ! supportsRetryableWrites ( server ) ) {
270
+ throw new MongoUnexpectedServerResponseError (
271
+ 'Selected server does not support retryable writes'
272
+ ) ;
273
+ }
274
+ }
269
275
270
- if ( isWriteOperation && ! supportsRetryableWrites ( server ) ) {
271
- throw new MongoUnexpectedServerResponseError (
272
- 'Selected server does not support retryable writes'
273
- ) ;
274
- }
276
+ try {
277
+ return await operation . execute ( server , session , timeoutContext ) ;
278
+ } catch ( operationError ) {
279
+ if ( ! ( operationError instanceof MongoError ) ) throw operationError ;
275
280
276
- try {
277
- return await operation . execute ( server , session , timeoutContext ) ;
278
- } catch ( retryError ) {
279
- if (
280
- retryError instanceof MongoError &&
281
- retryError . hasErrorLabel ( MongoErrorLabel . NoWritesPerformed )
282
- ) {
283
- throw originalError ;
281
+ if (
282
+ previousOperationError != null &&
283
+ operationError . hasErrorLabel ( MongoErrorLabel . NoWritesPerformed )
284
+ ) {
285
+ throw previousOperationError ;
286
+ }
287
+ previousServer = server . description ;
288
+ previousOperationError = operationError ;
284
289
}
285
- throw retryError ;
286
290
}
291
+
292
+ throw previousOperationError ;
287
293
}
0 commit comments