13
13
# limitations under the License.
14
14
import os
15
15
import re
16
+ import weakref
16
17
17
18
from sqlalchemy .event import ( # pylint: disable=no-name-in-module
18
19
listen ,
@@ -42,7 +43,7 @@ def _normalize_vendor(vendor):
42
43
43
44
44
45
def _wrap_create_async_engine (
45
- tracer , connections_usage , enable_commenter = False
46
+ tracer , connections_usage , enable_commenter = False , commenter_options = None
46
47
):
47
48
# pylint: disable=unused-argument
48
49
def _wrap_create_async_engine_internal (func , module , args , kwargs ):
@@ -51,20 +52,32 @@ def _wrap_create_async_engine_internal(func, module, args, kwargs):
51
52
"""
52
53
engine = func (* args , ** kwargs )
53
54
EngineTracer (
54
- tracer , engine .sync_engine , connections_usage , enable_commenter
55
+ tracer ,
56
+ engine .sync_engine ,
57
+ connections_usage ,
58
+ enable_commenter ,
59
+ commenter_options ,
55
60
)
56
61
return engine
57
62
58
63
return _wrap_create_async_engine_internal
59
64
60
65
61
- def _wrap_create_engine (tracer , connections_usage , enable_commenter = False ):
66
+ def _wrap_create_engine (
67
+ tracer , connections_usage , enable_commenter = False , commenter_options = None
68
+ ):
62
69
def _wrap_create_engine_internal (func , _module , args , kwargs ):
63
70
"""Trace the SQLAlchemy engine, creating an `EngineTracer`
64
71
object that will listen to SQLAlchemy events.
65
72
"""
66
73
engine = func (* args , ** kwargs )
67
- EngineTracer (tracer , engine , connections_usage , enable_commenter )
74
+ EngineTracer (
75
+ tracer ,
76
+ engine ,
77
+ connections_usage ,
78
+ enable_commenter ,
79
+ commenter_options ,
80
+ )
68
81
return engine
69
82
70
83
return _wrap_create_engine_internal
@@ -99,11 +112,11 @@ def __init__(
99
112
commenter_options = None ,
100
113
):
101
114
self .tracer = tracer
102
- self .engine = engine
103
115
self .connections_usage = connections_usage
104
116
self .vendor = _normalize_vendor (engine .name )
105
117
self .enable_commenter = enable_commenter
106
118
self .commenter_options = commenter_options if commenter_options else {}
119
+ self ._engine_attrs = _get_attributes_from_engine (engine )
107
120
self ._leading_comment_remover = re .compile (r"^/\*.*?\*/" )
108
121
109
122
self ._register_event_listener (
@@ -118,23 +131,11 @@ def __init__(
118
131
self ._register_event_listener (engine , "checkin" , self ._pool_checkin )
119
132
self ._register_event_listener (engine , "checkout" , self ._pool_checkout )
120
133
121
- def _get_connection_string (self ):
122
- drivername = self .engine .url .drivername or ""
123
- host = self .engine .url .host or ""
124
- port = self .engine .url .port or ""
125
- database = self .engine .url .database or ""
126
- return f"{ drivername } ://{ host } :{ port } /{ database } "
127
-
128
- def _get_pool_name (self ):
129
- if self .engine .pool .logging_name is not None :
130
- return self .engine .pool .logging_name
131
- return self ._get_connection_string ()
132
-
133
134
def _add_idle_to_connection_usage (self , value ):
134
135
self .connections_usage .add (
135
136
value ,
136
137
attributes = {
137
- "pool.name" : self ._get_pool_name () ,
138
+ ** self ._engine_attrs ,
138
139
"state" : "idle" ,
139
140
},
140
141
)
@@ -143,7 +144,7 @@ def _add_used_to_connection_usage(self, value):
143
144
self .connections_usage .add (
144
145
value ,
145
146
attributes = {
146
- "pool.name" : self ._get_pool_name () ,
147
+ ** self ._engine_attrs ,
147
148
"state" : "used" ,
148
149
},
149
150
)
@@ -169,12 +170,21 @@ def _pool_checkout(
169
170
@classmethod
170
171
def _register_event_listener (cls , target , identifier , func , * args , ** kw ):
171
172
listen (target , identifier , func , * args , ** kw )
172
- cls ._remove_event_listener_params .append ((target , identifier , func ))
173
+ cls ._remove_event_listener_params .append (
174
+ (weakref .ref (target ), identifier , func )
175
+ )
173
176
174
177
@classmethod
175
178
def remove_all_event_listeners (cls ):
176
- for remove_params in cls ._remove_event_listener_params :
177
- remove (* remove_params )
179
+ for (
180
+ weak_ref_target ,
181
+ identifier ,
182
+ func ,
183
+ ) in cls ._remove_event_listener_params :
184
+ # Remove an event listener only if saved weak reference points to an object
185
+ # which has not been garbage collected
186
+ if weak_ref_target () is not None :
187
+ remove (weak_ref_target (), identifier , func )
178
188
cls ._remove_event_listener_params .clear ()
179
189
180
190
def _operation_name (self , db_name , statement ):
@@ -300,3 +310,22 @@ def _get_attributes_from_cursor(vendor, cursor, attrs):
300
310
if info .port :
301
311
attrs [SpanAttributes .NET_PEER_PORT ] = int (info .port )
302
312
return attrs
313
+
314
+
315
+ def _get_connection_string (engine ):
316
+ drivername = engine .url .drivername or ""
317
+ host = engine .url .host or ""
318
+ port = engine .url .port or ""
319
+ database = engine .url .database or ""
320
+ return f"{ drivername } ://{ host } :{ port } /{ database } "
321
+
322
+
323
+ def _get_attributes_from_engine (engine ):
324
+ """Set metadata attributes of the database engine"""
325
+ attrs = {}
326
+
327
+ attrs ["pool.name" ] = getattr (
328
+ getattr (engine , "pool" , None ), "logging_name" , None
329
+ ) or _get_connection_string (engine )
330
+
331
+ return attrs
0 commit comments