@@ -136,18 +136,52 @@ def _set_connection_attributes(span, conn):
136
136
span .set_attribute (key , value )
137
137
138
138
139
+ def _build_span_name (instance , cmd_args ):
140
+ if len (cmd_args ) > 0 and cmd_args [0 ]:
141
+ name = cmd_args [0 ]
142
+ else :
143
+ name = instance .connection_pool .connection_kwargs .get ("db" , 0 )
144
+ return name
145
+
146
+
147
+ def _build_span_meta_data_for_pipeline (instance ):
148
+ try :
149
+ command_stack = (
150
+ instance .command_stack
151
+ if hasattr (instance , "command_stack" )
152
+ else instance ._command_stack
153
+ )
154
+
155
+ cmds = [
156
+ _format_command_args (c .args if hasattr (c , "args" ) else c [0 ])
157
+ for c in command_stack
158
+ ]
159
+ resource = "\n " .join (cmds )
160
+
161
+ span_name = " " .join (
162
+ [
163
+ (c .args [0 ] if hasattr (c , "args" ) else c [0 ][0 ])
164
+ for c in command_stack
165
+ ]
166
+ )
167
+ except (AttributeError , IndexError ):
168
+ command_stack = []
169
+ resource = ""
170
+ span_name = ""
171
+
172
+ return command_stack , resource , span_name
173
+
174
+
175
+ # pylint: disable=R0915
139
176
def _instrument (
140
177
tracer ,
141
178
request_hook : _RequestHookT = None ,
142
179
response_hook : _ResponseHookT = None ,
143
180
):
144
181
def _traced_execute_command (func , instance , args , kwargs ):
145
182
query = _format_command_args (args )
183
+ name = _build_span_name (instance , args )
146
184
147
- if len (args ) > 0 and args [0 ]:
148
- name = args [0 ]
149
- else :
150
- name = instance .connection_pool .connection_kwargs .get ("db" , 0 )
151
185
with tracer .start_as_current_span (
152
186
name , kind = trace .SpanKind .CLIENT
153
187
) as span :
@@ -163,31 +197,11 @@ def _traced_execute_command(func, instance, args, kwargs):
163
197
return response
164
198
165
199
def _traced_execute_pipeline (func , instance , args , kwargs ):
166
- try :
167
- command_stack = (
168
- instance .command_stack
169
- if hasattr (instance , "command_stack" )
170
- else instance ._command_stack
171
- )
172
-
173
- cmds = [
174
- _format_command_args (
175
- c .args if hasattr (c , "args" ) else c [0 ],
176
- )
177
- for c in command_stack
178
- ]
179
- resource = "\n " .join (cmds )
180
-
181
- span_name = " " .join (
182
- [
183
- (c .args [0 ] if hasattr (c , "args" ) else c [0 ][0 ])
184
- for c in command_stack
185
- ]
186
- )
187
- except (AttributeError , IndexError ):
188
- command_stack = []
189
- resource = ""
190
- span_name = ""
200
+ (
201
+ command_stack ,
202
+ resource ,
203
+ span_name ,
204
+ ) = _build_span_meta_data_for_pipeline (instance )
191
205
192
206
with tracer .start_as_current_span (
193
207
span_name , kind = trace .SpanKind .CLIENT
@@ -232,32 +246,72 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
232
246
"ClusterPipeline.execute" ,
233
247
_traced_execute_pipeline ,
234
248
)
249
+
250
+ async def _async_traced_execute_command (func , instance , args , kwargs ):
251
+ query = _format_command_args (args )
252
+ name = _build_span_name (instance , args )
253
+
254
+ with tracer .start_as_current_span (
255
+ name , kind = trace .SpanKind .CLIENT
256
+ ) as span :
257
+ if span .is_recording ():
258
+ span .set_attribute (SpanAttributes .DB_STATEMENT , query )
259
+ _set_connection_attributes (span , instance )
260
+ span .set_attribute ("db.redis.args_length" , len (args ))
261
+ if callable (request_hook ):
262
+ request_hook (span , instance , args , kwargs )
263
+ response = await func (* args , ** kwargs )
264
+ if callable (response_hook ):
265
+ response_hook (span , instance , response )
266
+ return response
267
+
268
+ async def _async_traced_execute_pipeline (func , instance , args , kwargs ):
269
+ (
270
+ command_stack ,
271
+ resource ,
272
+ span_name ,
273
+ ) = _build_span_meta_data_for_pipeline (instance )
274
+
275
+ with tracer .start_as_current_span (
276
+ span_name , kind = trace .SpanKind .CLIENT
277
+ ) as span :
278
+ if span .is_recording ():
279
+ span .set_attribute (SpanAttributes .DB_STATEMENT , resource )
280
+ _set_connection_attributes (span , instance )
281
+ span .set_attribute (
282
+ "db.redis.pipeline_length" , len (command_stack )
283
+ )
284
+ response = await func (* args , ** kwargs )
285
+ if callable (response_hook ):
286
+ response_hook (span , instance , response )
287
+ return response
288
+
235
289
if redis .VERSION >= _REDIS_ASYNCIO_VERSION :
236
290
wrap_function_wrapper (
237
291
"redis.asyncio" ,
238
292
f"{ redis_class } .execute_command" ,
239
- _traced_execute_command ,
293
+ _async_traced_execute_command ,
240
294
)
241
295
wrap_function_wrapper (
242
296
"redis.asyncio.client" ,
243
297
f"{ pipeline_class } .execute" ,
244
- _traced_execute_pipeline ,
298
+ _async_traced_execute_pipeline ,
245
299
)
246
300
wrap_function_wrapper (
247
301
"redis.asyncio.client" ,
248
302
f"{ pipeline_class } .immediate_execute_command" ,
249
- _traced_execute_command ,
303
+ _async_traced_execute_command ,
250
304
)
251
305
if redis .VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION :
252
306
wrap_function_wrapper (
253
307
"redis.asyncio.cluster" ,
254
308
"RedisCluster.execute_command" ,
255
- _traced_execute_command ,
309
+ _async_traced_execute_command ,
256
310
)
257
311
wrap_function_wrapper (
258
312
"redis.asyncio.cluster" ,
259
313
"ClusterPipeline.execute" ,
260
- _traced_execute_pipeline ,
314
+ _async_traced_execute_pipeline ,
261
315
)
262
316
263
317
0 commit comments