Skip to content

Commit 68e8824

Browse files
committed
refactor aggregate method and fix bucket index
1 parent bfaec2d commit 68e8824

File tree

3 files changed

+25
-19
lines changed

3 files changed

+25
-19
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -95,17 +95,11 @@ def __init__(
9595
self._reservoir = reservoir_factory()
9696
self._previous_point = None
9797

98-
def aggregate(
99-
self, measurement: Measurement, should_sample_exemplar: bool = True
98+
@abstractmethod
99+
def aggregate(self, measurement: Measurement, should_sample_exemplar: bool = True
100100
) -> None:
101-
if should_sample_exemplar:
102-
self._reservoir.offer(
103-
measurement.value,
104-
measurement.time_unix_nano,
105-
measurement.attributes,
106-
measurement.context,
107-
)
108-
101+
pass
102+
109103
@abstractmethod
110104
def collect(
111105
self,
@@ -119,7 +113,14 @@ def _collect_exemplars(self) -> Sequence[Exemplar]:
119113
self._attributes
120114
)
121115

122-
116+
def sample_exemplar(self, measurement: Measurement) -> None:
117+
self._reservoir.offer(
118+
measurement.value,
119+
measurement.time_unix_nano,
120+
measurement.attributes,
121+
measurement.context,
122+
)
123+
123124
class _DropAggregation(_Aggregation):
124125
def aggregate(
125126
self, measurement: Measurement, should_sample_exemplar: bool = True
@@ -164,8 +165,10 @@ def aggregate(
164165
self._value = 0
165166

166167
self._value = self._value + measurement.value
167-
168-
super().aggregate(measurement, should_sample_exemplar)
168+
169+
if should_sample_exemplar:
170+
self.sample_exemplar(measurement)
171+
169172

170173
def collect(
171174
self,
@@ -390,8 +393,8 @@ def aggregate(
390393
):
391394
with self._lock:
392395
self._value = measurement.value
393-
394-
super().aggregate(measurement, should_sample_exemplar)
396+
if should_sample_exemplar:
397+
self.sample_exemplar(measurement)
395398

396399
def collect(
397400
self,
@@ -491,7 +494,8 @@ def aggregate(
491494

492495
self._value[bisect_left(self._boundaries, measurement_value)] += 1
493496

494-
super().aggregate(measurement, should_sample_exemplar)
497+
if should_sample_exemplar:
498+
self.sample_exemplar(measurement)
495499

496500
def collect(
497501
self,
@@ -790,7 +794,8 @@ def aggregate(
790794
# in _ExplicitBucketHistogramAggregation.aggregate
791795
value.increment_bucket(bucket_index)
792796

793-
super().aggregate(measurement, should_sample_exemplar)
797+
if should_sample_exemplar:
798+
self.sample_exemplar(measurement)
794799

795800
def collect(
796801
self,

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,8 @@ def _find_bucket_index(
204204
ctx: Context,
205205
) -> int:
206206
if self._measurements_seen < self._size:
207-
return self._measurements_seen
207+
self._measurements_seen += 1
208+
return self._measurements_seen - 1
208209

209210
index = randrange(0, self._measurements_seen)
210211
self._measurements_seen += 1

opentelemetry-sdk/tests/metrics/test_aggregation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,7 @@ def test_observable_gauge(self):
648648
class TestExemplarsFromAggregations(TestCase):
649649

650650
def test_collection_simple_fixed_size_reservoir(self):
651-
exemplar_reservoir_factory = lambda: SimpleFixedSizeExemplarReservoir(size=5)
651+
exemplar_reservoir_factory = lambda: SimpleFixedSizeExemplarReservoir(size=3)
652652
synchronous_sum_aggregation = _SumAggregation(
653653
Mock(),
654654
True,

0 commit comments

Comments
 (0)