forked from vllm-project/vllm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtracing.py
119 lines (91 loc) · 4.18 KB
/
tracing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os
from typing import Mapping, Optional
from vllm.logger import init_logger
from vllm.utils import run_once
TRACE_HEADERS = ["traceparent", "tracestate"]
logger = init_logger(__name__)
_is_otel_imported = False
otel_import_error_traceback: Optional[str] = None
try:
from opentelemetry.context.context import Context
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_TRACES_PROTOCOL)
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.semconv_ai import SpanAttributes as BaseSpanAttributes
from opentelemetry.trace import SpanKind, Tracer, set_tracer_provider
from opentelemetry.trace.propagation.tracecontext import (
TraceContextTextMapPropagator)
_is_otel_imported = True
except ImportError:
# Capture and format traceback to provide detailed context for the import
# error. Only the string representation of the error is retained to avoid
# memory leaks.
# See https://github.com/vllm-project/vllm/pull/7266#discussion_r1707395458
import traceback
otel_import_error_traceback = traceback.format_exc()
class Context: # type: ignore
pass
class BaseSpanAttributes: # type: ignore
pass
class SpanKind: # type: ignore
pass
class Tracer: # type: ignore
pass
def is_otel_available() -> bool:
return _is_otel_imported
def init_tracer(instrumenting_module_name: str,
otlp_traces_endpoint: str) -> Optional[Tracer]:
if not is_otel_available():
raise ValueError(
"OpenTelemetry is not available. Unable to initialize "
"a tracer. Ensure OpenTelemetry packages are installed. "
f"Original error:\n{otel_import_error_traceback}")
trace_provider = TracerProvider()
span_exporter = get_span_exporter(otlp_traces_endpoint)
trace_provider.add_span_processor(BatchSpanProcessor(span_exporter))
set_tracer_provider(trace_provider)
tracer = trace_provider.get_tracer(instrumenting_module_name)
return tracer
def get_span_exporter(endpoint):
protocol = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, "grpc")
if protocol == "grpc":
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter)
elif protocol == "http/protobuf":
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter) # type: ignore
else:
raise ValueError(
f"Unsupported OTLP protocol '{protocol}' is configured")
return OTLPSpanExporter(endpoint=endpoint)
def extract_trace_context(
headers: Optional[Mapping[str, str]]) -> Optional[Context]:
if is_otel_available():
headers = headers or {}
return TraceContextTextMapPropagator().extract(headers)
else:
return None
def extract_trace_headers(headers: Mapping[str, str]) -> Mapping[str, str]:
return {h: headers[h] for h in TRACE_HEADERS if h in headers}
class SpanAttributes(BaseSpanAttributes):
# The following span attribute names are added here because they are missing
# from the Semantic Conventions for LLM.
LLM_REQUEST_ID = "gen_ai.request.id"
LLM_REQUEST_N = "gen_ai.request.n"
LLM_USAGE_NUM_SEQUENCES = "gen_ai.usage.num_sequences"
LLM_LATENCY_TIME_IN_QUEUE = "gen_ai.latency.time_in_queue"
LLM_LATENCY_TIME_TO_FIRST_TOKEN = "gen_ai.latency.time_to_first_token"
LLM_LATENCY_E2E = "gen_ai.latency.e2e"
LLM_LATENCY_TIME_IN_SCHEDULER = "gen_ai.latency.time_in_scheduler"
# Time taken in the forward pass for this across all workers
LLM_LATENCY_TIME_IN_MODEL_FORWARD = "gen_ai.latency.time_in_model_forward"
# Time taken in the model execute function. This will include model
# forward, block/sync across workers, cpu-gpu sync time and sampling time.
LLM_LATENCY_TIME_IN_MODEL_EXECUTE = "gen_ai.latency.time_in_model_execute"
def contains_trace_headers(headers: Mapping[str, str]) -> bool:
return any(h in headers for h in TRACE_HEADERS)
@run_once
def log_tracing_disabled_warning() -> None:
logger.warning(
"Received a request with trace context but tracing is disabled")