Skip to content

Commit 9fc2f44

Browse files
Fix duplicate baggage headers in Celery integration introduced in SDK 2.0 (#2993)
This fixes the accumulating baggage headers problem found when dogfooding SDK 2.0 on sentry.io. This reverts the refactoring of header manipulation that was added in 2.0.0rc4 back to what is present in current 1.x branch. This PR uses the working code from 1.x and then extracts the header manipulating code into `_update_celery_task_headers()` to make it more readable and also testable. This PR adds a couple of tests to make sure we do not change the behavior by accident in the future. --------- Co-authored-by: Ivana Kellyerova <ivana.kellyerova@sentry.io>
1 parent 70e336b commit 9fc2f44

File tree

3 files changed

+235
-80
lines changed

3 files changed

+235
-80
lines changed

sentry_sdk/integrations/celery/__init__.py

Lines changed: 66 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -155,17 +155,77 @@ def event_processor(event, hint):
155155
return event_processor
156156

157157

158-
def _wrap_apply_async(f):
159-
# type: (F) -> F
158+
def _update_celery_task_headers(original_headers, span, monitor_beat_tasks):
159+
# type: (dict[str, Any], Optional[Span], bool) -> dict[str, Any]
160160
"""
161-
Apply_async is always called to put a task in the queue. This is called by the
162-
celery client (for example the Django project or the Celery Beat process)
161+
Updates the headers of the Celery task with the tracing information
162+
and eventually Sentry Crons monitoring information for beat tasks.
163163
"""
164+
updated_headers = original_headers.copy()
165+
with capture_internal_exceptions():
166+
headers = {}
167+
if span is not None:
168+
headers = dict(
169+
Scope.get_current_scope().iter_trace_propagation_headers(span=span)
170+
)
171+
if monitor_beat_tasks:
172+
headers.update(
173+
{
174+
"sentry-monitor-start-timestamp-s": "%.9f"
175+
% _now_seconds_since_epoch(),
176+
}
177+
)
178+
179+
if headers:
180+
existing_baggage = updated_headers.get(BAGGAGE_HEADER_NAME)
181+
sentry_baggage = headers.get(BAGGAGE_HEADER_NAME)
182+
183+
combined_baggage = sentry_baggage or existing_baggage
184+
if sentry_baggage and existing_baggage:
185+
combined_baggage = "{},{}".format(
186+
existing_baggage,
187+
sentry_baggage,
188+
)
189+
190+
updated_headers.update(headers)
191+
if combined_baggage:
192+
updated_headers[BAGGAGE_HEADER_NAME] = combined_baggage
193+
194+
# https://github.com/celery/celery/issues/4875
195+
#
196+
# Need to setdefault the inner headers too since other
197+
# tracing tools (dd-trace-py) also employ this exact
198+
# workaround and we don't want to break them.
199+
updated_headers.setdefault("headers", {}).update(headers)
200+
if combined_baggage:
201+
updated_headers["headers"][BAGGAGE_HEADER_NAME] = combined_baggage
202+
203+
# Add the Sentry options potentially added in `sentry_apply_entry`
204+
# to the headers (done when auto-instrumenting Celery Beat tasks)
205+
for key, value in updated_headers.items():
206+
if key.startswith("sentry-"):
207+
updated_headers["headers"][key] = value
208+
209+
return updated_headers
164210

211+
212+
def _wrap_apply_async(f):
213+
# type: (F) -> F
165214
@wraps(f)
166215
@ensure_integration_enabled(CeleryIntegration, f)
167216
def apply_async(*args, **kwargs):
168217
# type: (*Any, **Any) -> Any
218+
# Note: kwargs can contain headers=None, so no setdefault!
219+
# Unsure which backend though.
220+
kwarg_headers = kwargs.get("headers") or {}
221+
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
222+
propagate_traces = kwarg_headers.pop(
223+
"sentry-propagate-traces", integration.propagate_traces
224+
)
225+
226+
if not propagate_traces:
227+
return f(*args, **kwargs)
228+
169229
task = args[0]
170230

171231
# Do not create a span when the task is a Celery Beat task
@@ -177,82 +237,9 @@ def apply_async(*args, **kwargs):
177237
) # type: Union[Span, NoOpMgr]
178238

179239
with span_mgr as span:
180-
incoming_headers = kwargs.get("headers") or {}
181-
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
182-
183-
# If Sentry Crons monitoring for Celery Beat tasks is enabled
184-
# add start timestamp of task,
185-
if integration is not None and integration.monitor_beat_tasks:
186-
incoming_headers.update(
187-
{
188-
"sentry-monitor-start-timestamp-s": "%.9f"
189-
% _now_seconds_since_epoch(),
190-
}
191-
)
192-
193-
# Propagate Sentry trace information into the Celery task if desired
194-
default_propagate_traces = (
195-
integration.propagate_traces if integration is not None else True
196-
)
197-
propagate_traces = incoming_headers.pop(
198-
"sentry-propagate-traces", default_propagate_traces
240+
kwargs["headers"] = _update_celery_task_headers(
241+
kwarg_headers, span, integration.monitor_beat_tasks
199242
)
200-
201-
if propagate_traces:
202-
with capture_internal_exceptions():
203-
sentry_trace_headers = dict(
204-
Scope.get_current_scope().iter_trace_propagation_headers(
205-
span=span
206-
)
207-
)
208-
# Set Sentry trace data in the headers of the Celery task
209-
if sentry_trace_headers:
210-
# Make sure we don't overwrite existing baggage
211-
incoming_baggage = incoming_headers.get(BAGGAGE_HEADER_NAME)
212-
sentry_baggage = sentry_trace_headers.get(BAGGAGE_HEADER_NAME)
213-
214-
combined_baggage = sentry_baggage or incoming_baggage
215-
if sentry_baggage and incoming_baggage:
216-
combined_baggage = "{},{}".format(
217-
incoming_baggage,
218-
sentry_baggage,
219-
)
220-
221-
# Set Sentry trace data to the headers of the Celery task
222-
incoming_headers.update(sentry_trace_headers)
223-
224-
if combined_baggage:
225-
incoming_headers[BAGGAGE_HEADER_NAME] = combined_baggage
226-
227-
# Set sentry trace data also to the inner headers of the Celery task
228-
# https://github.com/celery/celery/issues/4875
229-
#
230-
# Need to setdefault the inner headers too since other
231-
# tracing tools (dd-trace-py) also employ this exact
232-
# workaround and we don't want to break them.
233-
incoming_headers.setdefault("headers", {}).update(
234-
sentry_trace_headers
235-
)
236-
if combined_baggage:
237-
incoming_headers["headers"][
238-
BAGGAGE_HEADER_NAME
239-
] = combined_baggage
240-
241-
# Add the Sentry options potentially added in `sentry_sdk.integrations.beat.sentry_apply_entry`
242-
# to the inner headers (done when auto-instrumenting Celery Beat tasks)
243-
# https://github.com/celery/celery/issues/4875
244-
#
245-
# Need to setdefault the inner headers too since other
246-
# tracing tools (dd-trace-py) also employ this exact
247-
# workaround and we don't want to break them.
248-
incoming_headers.setdefault("headers", {})
249-
for key, value in incoming_headers.items():
250-
if key.startswith("sentry-"):
251-
incoming_headers["headers"][key] = value
252-
253-
# Run the task (with updated headers in kwargs)
254-
kwargs["headers"] = incoming_headers
255-
256243
return f(*args, **kwargs)
257244

258245
return apply_async # type: ignore

tests/integrations/celery/test_celery.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,13 @@ def dummy_task(self, x, y):
514514
# in the monkey patched version of `apply_async`
515515
# in `sentry_sdk/integrations/celery.py::_wrap_apply_async()`
516516
result = dummy_task.apply_async(args=(1, 0), headers=sentry_crons_setup)
517-
assert result.get() == sentry_crons_setup
517+
518+
expected_headers = sentry_crons_setup.copy()
519+
# Newly added headers
520+
expected_headers["sentry-trace"] = mock.ANY
521+
expected_headers["baggage"] = mock.ANY
522+
523+
assert result.get() == expected_headers
518524

519525

520526
def test_baggage_propagation(init_celery):
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
import pytest
2+
3+
from unittest import mock
4+
5+
from sentry_sdk.integrations.celery import _update_celery_task_headers
6+
import sentry_sdk
7+
8+
9+
BAGGAGE_VALUE = (
10+
"sentry-trace_id=771a43a4192642f0b136d5159a501700,"
11+
"sentry-public_key=49d0f7386ad645858ae85020e393bef3,"
12+
"sentry-sample_rate=0.1337,"
13+
"custom=value"
14+
)
15+
16+
SENTRY_TRACE_VALUE = "771a43a4192642f0b136d5159a501700-1234567890abcdef-1"
17+
18+
19+
@pytest.mark.parametrize("monitor_beat_tasks", [True, False, None, "", "bla", 1, 0])
20+
def test_monitor_beat_tasks(monitor_beat_tasks):
21+
headers = {}
22+
span = None
23+
24+
updated_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks)
25+
26+
assert headers == {} # left unchanged
27+
28+
if monitor_beat_tasks:
29+
assert updated_headers == {
30+
"headers": {"sentry-monitor-start-timestamp-s": mock.ANY},
31+
"sentry-monitor-start-timestamp-s": mock.ANY,
32+
}
33+
else:
34+
assert updated_headers == headers
35+
36+
37+
@pytest.mark.parametrize("monitor_beat_tasks", [True, False, None, "", "bla", 1, 0])
38+
def test_monitor_beat_tasks_with_headers(monitor_beat_tasks):
39+
headers = {
40+
"blub": "foo",
41+
"sentry-something": "bar",
42+
}
43+
span = None
44+
45+
updated_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks)
46+
47+
if monitor_beat_tasks:
48+
assert updated_headers == {
49+
"blub": "foo",
50+
"sentry-something": "bar",
51+
"headers": {
52+
"sentry-monitor-start-timestamp-s": mock.ANY,
53+
"sentry-something": "bar",
54+
},
55+
"sentry-monitor-start-timestamp-s": mock.ANY,
56+
}
57+
else:
58+
assert updated_headers == headers
59+
60+
61+
def test_span_with_transaction(sentry_init):
62+
sentry_init(enable_tracing=True)
63+
headers = {}
64+
65+
with sentry_sdk.start_transaction(name="test_transaction") as transaction:
66+
with sentry_sdk.start_span(op="test_span") as span:
67+
updated_headers = _update_celery_task_headers(headers, span, False)
68+
69+
assert updated_headers["sentry-trace"] == span.to_traceparent()
70+
assert updated_headers["headers"]["sentry-trace"] == span.to_traceparent()
71+
assert updated_headers["baggage"] == transaction.get_baggage().serialize()
72+
assert (
73+
updated_headers["headers"]["baggage"]
74+
== transaction.get_baggage().serialize()
75+
)
76+
77+
78+
def test_span_with_no_transaction(sentry_init):
79+
sentry_init(enable_tracing=True)
80+
headers = {}
81+
82+
with sentry_sdk.start_span(op="test_span") as span:
83+
updated_headers = _update_celery_task_headers(headers, span, False)
84+
85+
assert updated_headers["sentry-trace"] == span.to_traceparent()
86+
assert updated_headers["headers"]["sentry-trace"] == span.to_traceparent()
87+
assert "baggage" not in updated_headers.keys()
88+
assert "baggage" not in updated_headers["headers"].keys()
89+
90+
91+
def test_custom_span(sentry_init):
92+
sentry_init(enable_tracing=True)
93+
span = sentry_sdk.tracing.Span()
94+
headers = {}
95+
96+
with sentry_sdk.start_transaction(name="test_transaction"):
97+
updated_headers = _update_celery_task_headers(headers, span, False)
98+
99+
assert updated_headers["sentry-trace"] == span.to_traceparent()
100+
assert updated_headers["headers"]["sentry-trace"] == span.to_traceparent()
101+
assert "baggage" not in updated_headers.keys()
102+
assert "baggage" not in updated_headers["headers"].keys()
103+
104+
105+
def test_span_with_transaction_custom_headers(sentry_init):
106+
sentry_init(enable_tracing=True)
107+
headers = {
108+
"baggage": BAGGAGE_VALUE,
109+
"sentry-trace": SENTRY_TRACE_VALUE,
110+
}
111+
112+
with sentry_sdk.start_transaction(name="test_transaction") as transaction:
113+
with sentry_sdk.start_span(op="test_span") as span:
114+
updated_headers = _update_celery_task_headers(headers, span, False)
115+
116+
assert updated_headers["sentry-trace"] == span.to_traceparent()
117+
assert updated_headers["headers"]["sentry-trace"] == span.to_traceparent()
118+
# This is probably the cause for https://github.com/getsentry/sentry-python/issues/2916
119+
# If incoming baggage includes sentry data, we should not concatenate a new baggage value to it
120+
# but just keep the incoming sentry baggage values and concatenate new third-party items to the baggage
121+
# I have some code somewhere where I have implemented this.
122+
assert (
123+
updated_headers["baggage"]
124+
== headers["baggage"] + "," + transaction.get_baggage().serialize()
125+
)
126+
assert (
127+
updated_headers["headers"]["baggage"]
128+
== headers["baggage"] + "," + transaction.get_baggage().serialize()
129+
)
130+
131+
132+
def test_span_with_no_transaction_custom_headers(sentry_init):
133+
sentry_init(enable_tracing=True)
134+
headers = {
135+
"baggage": BAGGAGE_VALUE,
136+
"sentry-trace": SENTRY_TRACE_VALUE,
137+
}
138+
139+
with sentry_sdk.start_span(op="test_span") as span:
140+
updated_headers = _update_celery_task_headers(headers, span, False)
141+
142+
assert updated_headers["sentry-trace"] == span.to_traceparent()
143+
assert updated_headers["headers"]["sentry-trace"] == span.to_traceparent()
144+
assert updated_headers["baggage"] == headers["baggage"]
145+
assert updated_headers["headers"]["baggage"] == headers["baggage"]
146+
147+
148+
def test_custom_span_custom_headers(sentry_init):
149+
sentry_init(enable_tracing=True)
150+
span = sentry_sdk.tracing.Span()
151+
headers = {
152+
"baggage": BAGGAGE_VALUE,
153+
"sentry-trace": SENTRY_TRACE_VALUE,
154+
}
155+
156+
with sentry_sdk.start_transaction(name="test_transaction"):
157+
updated_headers = _update_celery_task_headers(headers, span, False)
158+
159+
assert updated_headers["sentry-trace"] == span.to_traceparent()
160+
assert updated_headers["headers"]["sentry-trace"] == span.to_traceparent()
161+
assert updated_headers["baggage"] == headers["baggage"]
162+
assert updated_headers["headers"]["baggage"] == headers["baggage"]

0 commit comments

Comments
 (0)