1
1
import os
2
2
import io
3
3
import re
4
+ import sys
4
5
import threading
5
6
import random
6
7
import time
57
58
)
58
59
59
60
61
+ def get_code_location (stacklevel ):
62
+ try :
63
+ frm = sys ._getframe (stacklevel + 3 )
64
+ except Exception :
65
+ return None
66
+ return {
67
+ "line" : frm .f_lineno ,
68
+ "module" : frm .f_globals .get ("__name__" ),
69
+ "filename" : frm .f_code .co_filename ,
70
+ "function" : frm .f_code .co_name ,
71
+ }
72
+
73
+
60
74
@contextmanager
61
75
def recursion_protection ():
62
76
# type: () -> Generator[bool, None, None]
@@ -314,6 +328,7 @@ def __init__(
314
328
):
315
329
# type: (...) -> None
316
330
self .buckets = {} # type: Dict[int, Any]
331
+ self .code_locations = {} # type: Dict[BucketKey, Any]
317
332
self ._buckets_total_weight = 0
318
333
self ._capture_func = capture_func
319
334
self ._lock = Lock ()
@@ -409,6 +424,7 @@ def add(
409
424
unit , # type: MeasurementUnit
410
425
tags , # type: Optional[MetricTags]
411
426
timestamp = None , # type: Optional[Union[float, datetime]]
427
+ stacklevel = 1 , # type: int
412
428
):
413
429
# type: (...) -> None
414
430
if not self ._ensure_thread () or self ._flusher is None :
@@ -441,6 +457,12 @@ def add(
441
457
442
458
self ._buckets_total_weight += metric .weight - previous_weight
443
459
460
+ # Store code location once per bucket
461
+ if bucket_key not in self .code_locations :
462
+ loc = get_code_location (stacklevel )
463
+ if loc is not None :
464
+ self .code_locations [bucket_key ] = loc
465
+
444
466
# Given the new weight we consider whether we want to force flush.
445
467
self ._consider_force_flush ()
446
468
@@ -536,6 +558,7 @@ def incr(
536
558
unit = "none" , # type: MeasurementUnit
537
559
tags = None , # type: Optional[MetricTags]
538
560
timestamp = None , # type: Optional[Union[float, datetime]]
561
+ stacklevel = 1 , # type: int
539
562
):
540
563
# type: (...) -> None
541
564
"""Increments a counter."""
@@ -552,6 +575,7 @@ def __init__(
552
575
timestamp , # type: Optional[Union[float, datetime]]
553
576
value , # type: Optional[float]
554
577
unit , # type: DurationUnit
578
+ stacklevel # type: int
555
579
):
556
580
# type: (...) -> None
557
581
self .key = key
@@ -560,6 +584,7 @@ def __init__(
560
584
self .value = value
561
585
self .unit = unit
562
586
self .entered = None # type: Optional[float]
587
+ self .stacklevel = stacklevel
563
588
564
589
def _validate_invocation (self , context ):
565
590
# type: (str) -> None
@@ -579,7 +604,7 @@ def __exit__(self, exc_type, exc_value, tb):
579
604
aggregator , tags = _get_aggregator_and_update_tags (self .key , self .tags )
580
605
if aggregator is not None :
581
606
elapsed = TIMING_FUNCTIONS [self .unit ]() - self .entered # type: ignore
582
- aggregator .add ("d" , self .key , elapsed , self .unit , tags , self .timestamp )
607
+ aggregator .add ("d" , self .key , elapsed , self .unit , tags , self .timestamp , self . stacklevel )
583
608
584
609
def __call__ (self , f ):
585
610
# type: (Any) -> Any
@@ -589,7 +614,8 @@ def __call__(self, f):
589
614
def timed_func (* args , ** kwargs ):
590
615
# type: (*Any, **Any) -> Any
591
616
with timing (
592
- key = self .key , tags = self .tags , timestamp = self .timestamp , unit = self .unit
617
+ key = self .key , tags = self .tags , timestamp = self .timestamp , unit = self .unit ,
618
+ stacklevel = self .stacklevel + 1
593
619
):
594
620
return f (* args , ** kwargs )
595
621
@@ -602,6 +628,7 @@ def timing(
602
628
unit = "second" , # type: DurationUnit
603
629
tags = None , # type: Optional[MetricTags]
604
630
timestamp = None , # type: Optional[Union[float, datetime]]
631
+ stacklevel = 1 , # type: int
605
632
):
606
633
# type: (...) -> _Timing
607
634
"""Emits a distribution with the time it takes to run the given code block.
@@ -615,8 +642,8 @@ def timing(
615
642
if value is not None :
616
643
aggregator , tags = _get_aggregator_and_update_tags (key , tags )
617
644
if aggregator is not None :
618
- aggregator .add ("d" , key , value , unit , tags , timestamp )
619
- return _Timing (key , tags , timestamp , value , unit )
645
+ aggregator .add ("d" , key , value , unit , tags , timestamp , stacklevel )
646
+ return _Timing (key , tags , timestamp , value , unit , stacklevel )
620
647
621
648
622
649
def distribution (
@@ -625,12 +652,13 @@ def distribution(
625
652
unit = "none" , # type: MeasurementUnit
626
653
tags = None , # type: Optional[MetricTags]
627
654
timestamp = None , # type: Optional[Union[float, datetime]]
655
+ stacklevel = 1 , # type: int
628
656
):
629
657
# type: (...) -> None
630
658
"""Emits a distribution."""
631
659
aggregator , tags = _get_aggregator_and_update_tags (key , tags )
632
660
if aggregator is not None :
633
- aggregator .add ("d" , key , value , unit , tags , timestamp )
661
+ aggregator .add ("d" , key , value , unit , tags , timestamp , stacklevel )
634
662
635
663
636
664
def set (
@@ -639,12 +667,13 @@ def set(
639
667
unit = "none" , # type: MeasurementUnit
640
668
tags = None , # type: Optional[MetricTags]
641
669
timestamp = None , # type: Optional[Union[float, datetime]]
670
+ stacklevel = 1 , # type: int
642
671
):
643
672
# type: (...) -> None
644
673
"""Emits a set."""
645
674
aggregator , tags = _get_aggregator_and_update_tags (key , tags )
646
675
if aggregator is not None :
647
- aggregator .add ("s" , key , value , unit , tags , timestamp )
676
+ aggregator .add ("s" , key , value , unit , tags , timestamp , stacklevel )
648
677
649
678
650
679
def gauge (
@@ -653,9 +682,10 @@ def gauge(
653
682
unit = "none" , # type: MetricValue
654
683
tags = None , # type: Optional[MetricTags]
655
684
timestamp = None , # type: Optional[Union[float, datetime]]
685
+ stacklevel = 1 , # type: int
656
686
):
657
687
# type: (...) -> None
658
688
"""Emits a gauge."""
659
689
aggregator , tags = _get_aggregator_and_update_tags (key , tags )
660
690
if aggregator is not None :
661
- aggregator .add ("g" , key , value , unit , tags , timestamp )
691
+ aggregator .add ("g" , key , value , unit , tags , timestamp , stacklevel )
0 commit comments