Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

chore: Add logging message when core.task_success_overtime is exceeded #47449

Merged
merged 1 commit into from
Mar 6, 2025

Conversation

kacpermuda
Copy link
Contributor

The overtime mechanism for auxiliary processes (such as listeners) after task success was introduced in #39890. However, when used, the resulting stack trace is not very user-friendly, making it unclear what actually happened. This PR does not modify the stack trace itself, as the termination mechanism is broader than just this specific case. Instead, I’ve added a warning message that explains why the process was killed when the overtime mechanism is the cause and provides guidance on how to prevent it.

unfriendly stacktrace
[2025-03-06T08:45:20.648+0000] {local_task_job_runner.py:346} WARNING - State of this instance has been externally set to success. Terminating instance.
[2025-03-06T08:45:20.649+0000] {local_task_job_runner.py:245} INFO - ::endgroup::
[2025-03-06T08:45:20.652+0000] {process_utils.py:132} INFO - Sending 15 to group 503. PIDs of all processes in the group: [836, 503]
[2025-03-06T08:45:20.652+0000] {process_utils.py:87} INFO - Sending the signal 15 to group 503
[2025-03-06T08:45:20.653+0000] {taskinstance.py:3094} ERROR - Received SIGTERM. Terminating subprocesses.
[2025-03-06T08:45:20.665+0000] {taskinstance.py:3095} ERROR - Stacktrace: 
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.12/site-packages/airflow/__main__.py", line 62, in main
    args.func(args)
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line 116, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 56, in scheduler
    run_command_with_daemon_option(
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 59, in <lambda>
    callback=lambda: _run_scheduler_job(args),
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 47, in _run_scheduler_job
    run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 97, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 421, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 450, in execute_job
    ret = execute_callable()
  File "/usr/local/lib/python3.12/site-packages/astronomer/airflow/version_check/plugin.py", line 38, in run_before
    fn(*args, **kwargs)
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 990, in _execute
    executor.start()
  File "/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py", line 392, in start
    self.impl.start()
  File "/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py", line 330, in start
    worker.start()
  File "/usr/local/lib/python3.12/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/local/lib/python3.12/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/usr/local/lib/python3.12/multiprocessing/context.py", line 282, in _Popen
    return Popen(process_obj)
  File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 71, in _launch
    code = process_obj._bootstrap(parent_sentinel=child_r)
  File "/usr/local/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py", line 80, in run
    return super().run()
  File "/usr/local/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py", line 58, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py", line 211, in do_work
    self.execute_work(key=key, command=command)
  File "/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py", line 58, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py", line 100, in execute_work
    state = self._execute_work_in_fork(command)
  File "/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py", line 58, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py", line 142, in _execute_work_in_fork
    args.func(args)
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line 116, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 483, in task_run
    task_return_code = _run_task_by_selected_method(args, _dag, ti)
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 254, in _run_task_by_selected_method
    return _run_task_by_local_task_job(args, ti)
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 322, in _run_task_by_local_task_job
    ret = run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 97, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 421, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 450, in execute_job
    ret = execute_callable()
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/local_task_job_runner.py", line 171, in _execute
    self.task_runner.start()
  File "/usr/local/lib/python3.12/site-packages/airflow/task/task_runner/standard_task_runner.py", line 55, in start
    self.process = self._start_by_fork()
  File "/usr/local/lib/python3.12/site-packages/airflow/task/task_runner/standard_task_runner.py", line 117, in _start_by_fork
    ret = args.func(args, dag=self.dag)
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line 116, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 483, in task_run
    task_return_code = _run_task_by_selected_method(args, _dag, ti)
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 256, in _run_task_by_selected_method
    return _run_raw_task(args, ti)
  File "/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", line 341, in _run_raw_task
    return ti._run_raw_task(
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 97, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3006, in _run_raw_task
    return _run_raw_task(
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 368, in _run_raw_task
    get_listener_manager().hook.on_task_instance_success(
  File "/usr/local/lib/python3.12/site-packages/pluggy/_hooks.py", line 513, in __call__
    return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
  File "/usr/local/lib/python3.12/site-packages/pluggy/_manager.py", line 120, in _hookexec
    return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
  File "/usr/local/lib/python3.12/site-packages/pluggy/_callers.py", line 103, in _multicall
    res = hook_impl.function(*args)
  File "/usr/local/lib/python3.12/site-packages/airflow/providers/openlineage/plugins/listener.py", line 259, in on_task_instance_success
    self._on_task_instance_success(task_instance, task.dag, task_instance.dag_run, task)
  File "/usr/local/lib/python3.12/site-packages/airflow/providers/openlineage/plugins/listener.py", line 326, in _on_task_instance_success
    self._execute(on_success, "on_success", use_fork=True)
  File "/usr/local/lib/python3.12/site-packages/airflow/providers/openlineage/plugins/listener.py", line 452, in _execute
    self._fork_execute(callable, callable_name)
  File "/usr/local/lib/python3.12/site-packages/airflow/providers/openlineage/plugins/listener.py", line 472, in _fork_execute
    process.wait(conf.execution_timeout())
  File "/usr/local/lib/python3.12/site-packages/psutil/__init__.py", line 1355, in wait
    self._exitcode = self._proc.wait(timeout)
  File "/usr/local/lib/python3.12/site-packages/psutil/_pslinux.py", line 1717, in wrapper
    return fun(self, *args, **kwargs)
  File "/usr/local/lib/python3.12/site-packages/psutil/_pslinux.py", line 1949, in wait
    return _psposix.wait_pid(self.pid, timeout, self._name)
  File "/usr/local/lib/python3.12/site-packages/psutil/_psposix.py", line 146, in wait_pid
    interval = sleep(interval)
  File "/usr/local/lib/python3.12/site-packages/psutil/_psposix.py", line 124, in sleep
    _sleep(interval)
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3095, in signal_handler
    self.log.error("Stacktrace: \n%s", "".join(traceback.format_stack()))
[2025-03-06T08:45:22.020+0000] {process_utils.py:80} INFO - Process psutil.Process(pid=503, status='terminated', exitcode=0, started='08:43:49') (503) terminated with exit code 0
[2025-03-06T08:45:22.022+0000] {process_utils.py:80} INFO - Process psutil.Process(pid=836, status='terminated', started='08:44:54') (836) terminated with exit code None


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@kacpermuda kacpermuda requested review from ashb and XD-DENG as code owners March 6, 2025 13:22
@potiuk potiuk merged commit 91abe01 into apache:main Mar 6, 2025
44 checks passed
@kacpermuda kacpermuda deleted the chore-task-success-overtime-log branch March 7, 2025 07:24
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants