Skip to content

Use new scopes API in Celery integration. #2851

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 46 additions & 56 deletions sentry_sdk/integrations/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import time
from functools import wraps

import sentry_sdk
from sentry_sdk.api import continue_trace
from sentry_sdk.consts import OP
from sentry_sdk.crons import capture_checkin, MonitorStatus
from sentry_sdk.hub import Hub
from sentry_sdk import isolation_scope
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.integrations.logging import ignore_logger
Expand All @@ -15,6 +15,7 @@
from sentry_sdk.utils import (
capture_internal_exceptions,
event_from_exception,
ensure_integration_enabled,
logger,
match_regex_list,
reraise,
Expand Down Expand Up @@ -147,17 +148,13 @@ def __exit__(self, exc_type, exc_value, traceback):
def _wrap_apply_async(f):
# type: (F) -> F
@wraps(f)
@ensure_integration_enabled(CeleryIntegration, f)
def apply_async(*args, **kwargs):
# type: (*Any, **Any) -> Any
hub = Hub.current
integration = hub.get_integration(CeleryIntegration)

if integration is None:
return f(*args, **kwargs)

# Note: kwargs can contain headers=None, so no setdefault!
# Unsure which backend though.
kwarg_headers = kwargs.get("headers") or {}
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
propagate_traces = kwarg_headers.pop(
"sentry-propagate-traces", integration.propagate_traces
)
Expand All @@ -173,15 +170,15 @@ def apply_async(*args, **kwargs):
task = args[0]

span_mgr = (
hub.start_span(op=OP.QUEUE_SUBMIT_CELERY, description=task.name)
sentry_sdk.start_span(op=OP.QUEUE_SUBMIT_CELERY, description=task.name)
if not task_started_from_beat
else NoOpMgr()
) # type: Union[Span, NoOpMgr]

with span_mgr as span:
with capture_internal_exceptions():
headers = (
dict(hub.iter_trace_propagation_headers(span))
dict(Scope.get_current_scope().iter_trace_propagation_headers(span))
if span is not None
else {}
)
Expand Down Expand Up @@ -240,12 +237,9 @@ def _wrap_tracer(task, f):
# Also because in Celery 3, signal dispatch returns early if one handler
# crashes.
@wraps(f)
@ensure_integration_enabled(CeleryIntegration, f)
def _inner(*args, **kwargs):
# type: (*Any, **Any) -> Any
hub = Hub.current
if hub.get_integration(CeleryIntegration) is None:
return f(*args, **kwargs)

with isolation_scope() as scope:
scope._name = "celery"
scope.clear_breadcrumbs()
Expand All @@ -268,7 +262,7 @@ def _inner(*args, **kwargs):
if transaction is None:
return f(*args, **kwargs)

with hub.start_transaction(
with sentry_sdk.start_transaction(
transaction,
custom_sampling_context={
"celery_job": {
Expand Down Expand Up @@ -339,34 +333,31 @@ def event_processor(event, hint):

def _capture_exception(task, exc_info):
# type: (Any, ExcInfo) -> None
hub = Hub.current

if hub.get_integration(CeleryIntegration) is None:
client = sentry_sdk.get_client()
if client.get_integration(CeleryIntegration) is None:
return

if isinstance(exc_info[1], CELERY_CONTROL_FLOW_EXCEPTIONS):
# ??? Doesn't map to anything
_set_status(hub, "aborted")
_set_status("aborted")
return

_set_status(hub, "internal_error")
_set_status("internal_error")

if hasattr(task, "throws") and isinstance(exc_info[1], task.throws):
return

# If an integration is there, a client has to be there.
client = hub.client # type: Any

event, hint = event_from_exception(
exc_info,
client_options=client.options,
mechanism={"type": "celery", "handled": False},
)

hub.capture_event(event, hint=hint)
sentry_sdk.capture_event(event, hint=hint)


def _set_status(hub, status):
# type: (Hub, str) -> None
def _set_status(status):
# type: (str) -> None
with capture_internal_exceptions():
scope = Scope.get_current_scope()
if scope.span is not None:
Expand All @@ -388,9 +379,11 @@ def sentry_workloop(*args, **kwargs):
return old_workloop(*args, **kwargs)
finally:
with capture_internal_exceptions():
hub = Hub.current
if hub.get_integration(CeleryIntegration) is not None:
hub.flush()
if (
sentry_sdk.get_client().get_integration(CeleryIntegration)
is not None
):
sentry_sdk.flush()

Worker.workloop = sentry_workloop

Expand Down Expand Up @@ -487,6 +480,7 @@ def _patch_beat_apply_entry():
# type: () -> None
original_apply_entry = Scheduler.apply_entry

@ensure_integration_enabled(CeleryIntegration, original_apply_entry)
def sentry_apply_entry(*args, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to do this for the new redbeat wrapper too once we update sentry-sdk-2.0 with current master. (Just a note for later.)

# type: (*Any, **Any) -> None
scheduler, schedule_entry = args
Expand All @@ -495,42 +489,38 @@ def sentry_apply_entry(*args, **kwargs):
celery_schedule = schedule_entry.schedule
monitor_name = schedule_entry.name

hub = Hub.current
integration = hub.get_integration(CeleryIntegration)
if integration is None:
return original_apply_entry(*args, **kwargs)

integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
if match_regex_list(monitor_name, integration.exclude_beat_tasks):
return original_apply_entry(*args, **kwargs)

with hub.configure_scope() as scope:
# When tasks are started from Celery Beat, make sure each task has its own trace.
scope.set_new_propagation_context()
# When tasks are started from Celery Beat, make sure each task has its own trace.
scope = Scope.get_isolation_scope()
scope.set_new_propagation_context()

monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)

is_supported_schedule = bool(monitor_config)
if is_supported_schedule:
headers = schedule_entry.options.pop("headers", {})
headers.update(
{
"sentry-monitor-slug": monitor_name,
"sentry-monitor-config": monitor_config,
}
)
is_supported_schedule = bool(monitor_config)
if is_supported_schedule:
headers = schedule_entry.options.pop("headers", {})
headers.update(
{
"sentry-monitor-slug": monitor_name,
"sentry-monitor-config": monitor_config,
}
)

check_in_id = capture_checkin(
monitor_slug=monitor_name,
monitor_config=monitor_config,
status=MonitorStatus.IN_PROGRESS,
)
headers.update({"sentry-monitor-check-in-id": check_in_id})
check_in_id = capture_checkin(
monitor_slug=monitor_name,
monitor_config=monitor_config,
status=MonitorStatus.IN_PROGRESS,
)
headers.update({"sentry-monitor-check-in-id": check_in_id})

# Set the Sentry configuration in the options of the ScheduleEntry.
# Those will be picked up in `apply_async` and added to the headers.
schedule_entry.options["headers"] = headers
# Set the Sentry configuration in the options of the ScheduleEntry.
# Those will be picked up in `apply_async` and added to the headers.
schedule_entry.options["headers"] = headers

return original_apply_entry(*args, **kwargs)
return original_apply_entry(*args, **kwargs)

Scheduler.apply_entry = sentry_apply_entry

Expand Down
7 changes: 5 additions & 2 deletions tests/integrations/celery/test_celery_beat_crons.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ def test_exclude_beat_tasks_option(
fake_integration = MagicMock()
fake_integration.exclude_beat_tasks = exclude_beat_tasks

fake_client = MagicMock()
fake_client.get_integration.return_value = fake_integration

fake_schedule_entry = MagicMock()
fake_schedule_entry.name = task_name

Expand All @@ -416,8 +419,8 @@ def test_exclude_beat_tasks_option(
"sentry_sdk.integrations.celery.Scheduler", fake_scheduler
) as Scheduler: # noqa: N806
with mock.patch(
"sentry_sdk.integrations.celery.Hub.current.get_integration",
return_value=fake_integration,
"sentry_sdk.integrations.celery.sentry_sdk.get_client",
return_value=fake_client,
):
with mock.patch(
"sentry_sdk.integrations.celery._get_monitor_config",
Expand Down