Skip to content

Commit f26482c

Browse files
committed
Report the size of served artifacts
closes #4602
1 parent 91d1587 commit f26482c

File tree

5 files changed

+124
-83
lines changed

5 files changed

+124
-83
lines changed

CHANGES/4602.feature

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added metrics reporting the size of served artifacts.

docs/admin/learn/architecture.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,8 @@ Collector here](https://opentelemetry.io/docs/collector/).
179179

180180
- Access to every API endpoint (an HTTP method, target URL, status code, and user agent).
181181
- Access to every requested package (an HTTP method, target URL, status code, and user agent).
182-
- Disk usage within a specific domain (total used disk space and the reference to the domain).
182+
- Disk usage within a specific domain (total used disk space and the reference to a domain). Currently disabled.
183+
- The size of served artifacts (total count of served data and the reference to a domain).
183184

184185
The information above is sent to the collector in the form of spans and metrics. Thus, the data is
185186
emitted either based on the user interaction with the system or on a regular basis. Consult

pulpcore/app/models/domain.py

-15
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from opentelemetry.metrics import Observation
2-
31
from django.core.files.storage import default_storage
42
from django.db import models
53
from django_lifecycle import hook, BEFORE_DELETE, BEFORE_UPDATE
@@ -86,16 +84,3 @@ class Meta:
8684
permissions = [
8785
("manage_roles_domain", "Can manage role assignments on domain"),
8886
]
89-
90-
91-
def disk_usage_callback(domain):
92-
from pulpcore.app.models import Artifact
93-
from pulpcore.app.util import get_url
94-
95-
options = yield # noqa
96-
while True:
97-
artifacts = Artifact.objects.filter(pulp_domain=domain).only("size")
98-
total_size = artifacts.aggregate(size=models.Sum("size", default=0))["size"]
99-
options = yield [ # noqa
100-
Observation(total_size, {"pulp_href": get_url(domain), "domain_name": domain.name})
101-
]

pulpcore/app/util.py

+49-45
Original file line numberDiff line numberDiff line change
@@ -516,53 +516,18 @@ def cache_key(base_path):
516516
return base_path
517517

518518

519-
class DomainMetricsEmitterBuilder:
520-
"""A builder class that initializes an emitter for recording domain's metrics.
519+
class MetricsEmitter:
520+
"""
521+
A builder class that initializes an emitter.
521522
522523
If Open Telemetry is enabled, the builder configures a real emitter capable of sending data to
523-
the collector. Otherwise, a no-op emitter is initialized. The real emitter utilizes the global
524-
settings to send metrics.
524+
the collector. Otherwise, a no-op emitter is initialized. The real emitter may utilize the
525+
global settings to send metrics.
525526
526527
By default, the emitter sends data to the collector every 60 seconds. Adjust the environment
527528
variable OTEL_METRIC_EXPORT_INTERVAL accordingly if needed.
528529
"""
529530

530-
class _DomainMetricsEmitter:
531-
def __init__(self, domain):
532-
self.domain = domain
533-
self.meter = metrics.get_meter(f"domain.{domain.name}.meter")
534-
self.instrument = self._init_emitting_total_size()
535-
536-
def _init_emitting_total_size(self):
537-
return self.meter.create_observable_gauge(
538-
name="disk_usage",
539-
description="The total disk size by domain.",
540-
callbacks=[self._disk_usage_callback()],
541-
unit="Bytes",
542-
)
543-
544-
def _disk_usage_callback(self):
545-
try:
546-
with PGAdvisoryLock(STORAGE_METRICS_LOCK):
547-
from pulpcore.app.models import Artifact
548-
549-
options = yield # noqa
550-
551-
while True:
552-
artifacts = Artifact.objects.filter(pulp_domain=self.domain).only("size")
553-
total_size = artifacts.aggregate(size=Sum("size", default=0))["size"]
554-
options = yield [ # noqa
555-
metrics.Observation(
556-
total_size,
557-
{
558-
"pulp_href": get_url(self.domain),
559-
"domain_name": self.domain.name,
560-
},
561-
)
562-
]
563-
except AdvisoryLockError:
564-
yield
565-
566531
class _NoopEmitter:
567532
def __call__(self, *args, **kwargs):
568533
return self
@@ -571,19 +536,58 @@ def __getattr__(self, *args, **kwargs):
571536
return self
572537

573538
@classmethod
574-
def build(cls, domain):
575-
otel_enabled = os.getenv("PULP_OTEL_ENABLED")
576-
if otel_enabled == "true" and settings.DOMAIN_ENABLED:
577-
return cls._DomainMetricsEmitter(domain)
539+
def build(cls, *args, **kwargs):
540+
otel_enabled = os.getenv("PULP_OTEL_ENABLED", "").lower() == "true"
541+
if otel_enabled and settings.DOMAIN_ENABLED:
542+
return cls(*args, **kwargs)
578543
else:
579544
return cls._NoopEmitter()
580545

581546

547+
class DomainMetricsEmitter(MetricsEmitter):
548+
"""A builder class that initializes an emitter for recording domain's metrics."""
549+
550+
def __init__(self, domain):
551+
self.domain = domain
552+
self.meter = metrics.get_meter(f"domain.{domain.name}.meter")
553+
self.instrument = self._init_emitting_total_size()
554+
555+
def _init_emitting_total_size(self):
556+
return self.meter.create_observable_gauge(
557+
name="disk_usage",
558+
description="The total disk size by domain.",
559+
callbacks=[self._disk_usage_callback()],
560+
unit="Bytes",
561+
)
562+
563+
def _disk_usage_callback(self):
564+
try:
565+
with PGAdvisoryLock(STORAGE_METRICS_LOCK):
566+
from pulpcore.app.models import Artifact
567+
568+
options = yield # noqa
569+
570+
while True:
571+
artifacts = Artifact.objects.filter(pulp_domain=self.domain).only("size")
572+
total_size = artifacts.aggregate(size=Sum("size", default=0))["size"]
573+
options = yield [ # noqa
574+
metrics.Observation(
575+
total_size,
576+
{
577+
"pulp_href": get_url(self.domain),
578+
"domain_name": self.domain.name,
579+
},
580+
)
581+
]
582+
except AdvisoryLockError:
583+
yield
584+
585+
582586
def init_domain_metrics_exporter():
583587
from pulpcore.app.models.domain import Domain
584588

585589
for domain in Domain.objects.all():
586-
DomainMetricsEmitterBuilder.build(domain)
590+
DomainMetricsEmitter.build(domain)
587591

588592

589593
class PGAdvisoryLock:

pulpcore/content/handler.py

+72-22
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
from multidict import CIMultiDict
44
import os
55
import re
6+
import socket
67
from gettext import gettext as _
8+
from functools import lru_cache
79

810
from aiohttp.client_exceptions import ClientResponseError
911
from aiohttp.web import FileResponse, StreamResponse, HTTPOk
@@ -21,6 +23,8 @@
2123

2224
import django
2325

26+
from opentelemetry import metrics
27+
2428
from pulpcore.constants import STORAGE_RESPONSE_MAP
2529
from pulpcore.responses import ArtifactResponse
2630

@@ -49,7 +53,11 @@
4953
RemoteArtifact,
5054
)
5155
from pulpcore.app import mime_types # noqa: E402: module level not at top of file
52-
from pulpcore.app.util import get_domain, cache_key # noqa: E402: module level not at top of file
56+
from pulpcore.app.util import ( # noqa: E402: module level not at top of file
57+
MetricsEmitter,
58+
get_domain,
59+
cache_key,
60+
)
5361

5462
from pulpcore.exceptions import UnsupportedDigestValidationError # noqa: E402
5563

@@ -59,6 +67,11 @@
5967
log = logging.getLogger(__name__)
6068

6169

70+
@lru_cache(maxsize=1)
71+
def _get_content_app_name():
72+
return f"{os.getpid()}@{socket.gethostname()}"
73+
74+
6275
class PathNotResolved(HTTPNotFound):
6376
"""
6477
The path could not be resolved to a published file.
@@ -154,6 +167,20 @@ class Handler:
154167

155168
distribution_model = None
156169

170+
class ArtifactsSizeCounter(MetricsEmitter):
171+
def __init__(self):
172+
self.meter = metrics.get_meter("artifacts.size.meter")
173+
self.counter = self.meter.create_counter(
174+
"artifacts.size.counter",
175+
unit="Bytes",
176+
description="Counts the size of served artifacts",
177+
)
178+
179+
def add(self, amount, attributes):
180+
self.counter.add(amount, attributes)
181+
182+
artifacts_size_counter = ArtifactsSizeCounter.build()
183+
157184
@staticmethod
158185
def _reset_db_connection():
159186
"""
@@ -960,13 +987,37 @@ def _set_params_from_headers(hdrs, storage_domain):
960987
params[STORAGE_RESPONSE_MAP[storage_domain][a_key]] = hdrs[a_key]
961988
return params
962989

990+
def _build_url(**kwargs):
991+
filename = os.path.basename(content_artifact.relative_path)
992+
content_disposition = f"attachment;filename={filename}"
993+
994+
headers["Content-Disposition"] = content_disposition
995+
parameters = _set_params_from_headers(headers, domain.storage_class)
996+
storage_url = storage.url(artifact_name, parameters=parameters, **kwargs)
997+
998+
return URL(storage_url, encoded=True)
999+
9631000
artifact_file = content_artifact.artifact.file
9641001
artifact_name = artifact_file.name
965-
filename = os.path.basename(content_artifact.relative_path)
966-
content_disposition = f"attachment;filename={filename}"
9671002
domain = get_domain()
9681003
storage = domain.get_storage()
9691004

1005+
content_length = artifact_file.size
1006+
1007+
try:
1008+
range_start, range_stop = request.http_range.start, request.http_range.stop
1009+
if range_start or range_stop:
1010+
if range_stop and artifact_file.size and range_stop > artifact_file.size:
1011+
start = 0 if range_start is None else range_start
1012+
content_length = artifact_file.size - start
1013+
elif range_stop:
1014+
content_length = range_stop - range_start
1015+
except ValueError:
1016+
size = artifact_file.size or "*"
1017+
raise HTTPRequestRangeNotSatisfiable(headers={"Content-Range": f"bytes */{size}"})
1018+
1019+
self._report_served_artifact_size(content_length)
1020+
9701021
if domain.storage_class == "pulpcore.app.models.storage.FileSystem":
9711022
path = storage.path(artifact_name)
9721023
if not os.path.exists(path):
@@ -975,25 +1026,12 @@ def _set_params_from_headers(hdrs, storage_domain):
9751026
elif not domain.redirect_to_object_storage:
9761027
return ArtifactResponse(content_artifact.artifact, headers=headers)
9771028
elif domain.storage_class == "storages.backends.s3boto3.S3Boto3Storage":
978-
headers["Content-Disposition"] = content_disposition
979-
parameters = _set_params_from_headers(headers, domain.storage_class)
980-
url = URL(
981-
artifact_file.storage.url(
982-
artifact_name, parameters=parameters, http_method=request.method
983-
),
984-
encoded=True,
985-
)
986-
raise HTTPFound(url)
987-
elif domain.storage_class == "storages.backends.azure_storage.AzureStorage":
988-
headers["Content-Disposition"] = content_disposition
989-
parameters = _set_params_from_headers(headers, domain.storage_class)
990-
url = URL(artifact_file.storage.url(artifact_name, parameters=parameters), encoded=True)
991-
raise HTTPFound(url)
992-
elif domain.storage_class == "storages.backends.gcloud.GoogleCloudStorage":
993-
headers["Content-Disposition"] = content_disposition
994-
parameters = _set_params_from_headers(headers, domain.storage_class)
995-
url = URL(artifact_file.storage.url(artifact_name, parameters=parameters), encoded=True)
996-
raise HTTPFound(url)
1029+
raise HTTPFound(_build_url(http_method=request.method))
1030+
elif domain.storage_class in (
1031+
"storages.backends.azure_storage.AzureStorage",
1032+
"storages.backends.gcloud.GoogleCloudStorage",
1033+
):
1034+
raise HTTPFound(_build_url())
9971035
else:
9981036
raise NotImplementedError()
9991037

@@ -1111,6 +1149,11 @@ async def finalize():
11111149
downloader.finalize = finalize
11121150
download_result = await downloader.run()
11131151

1152+
if content_length := response.headers.get("Content-Length"):
1153+
self._report_served_artifact_size(int(content_length))
1154+
else:
1155+
self._report_served_artifact_size(size)
1156+
11141157
if save_artifact and remote.policy != Remote.STREAMED:
11151158
await asyncio.shield(
11161159
sync_to_async(self._save_artifact)(download_result, remote_artifact, request)
@@ -1120,3 +1163,10 @@ async def finalize():
11201163
if response.status == 404:
11211164
raise HTTPNotFound()
11221165
return response
1166+
1167+
def _report_served_artifact_size(self, size):
1168+
attributes = {
1169+
"domain_name": get_domain().name,
1170+
"content_app_name": _get_content_app_name(),
1171+
}
1172+
self.artifacts_size_counter.add(size, attributes)

0 commit comments

Comments
 (0)