Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[V1][Core] Use weakref.finalize instead of atexit #11242

Merged
merged 2 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import atexit
import os
import weakref
from typing import List, Optional

import msgspec
Expand Down Expand Up @@ -165,15 +165,9 @@ def __init__(
ready_path=ready_path, # type: ignore[misc]
**kwargs,
)
atexit.register(self.shutdown)
self._finalizer = weakref.finalize(self, self.shutdown)

def shutdown(self):
# During final garbage collection in process shutdown, atexit may be
# None.
if atexit:
# in case shutdown gets called via __del__ first
atexit.unregister(self.shutdown)

# Shut down the zmq context.
self.ctx.destroy(linger=0)

Expand All @@ -197,9 +191,6 @@ def shutdown(self):
os.remove(socket_file)
self.proc_handle = None

def __del__(self):
self.shutdown()


class SyncMPClient(MPClient):
"""Synchronous client for multi-proc EngineCore."""
Expand Down
10 changes: 3 additions & 7 deletions vllm/v1/executor/multiproc_executor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import atexit
import os
import pickle
import signal
import sys
import time
import weakref
from dataclasses import dataclass
from enum import Enum, auto
from multiprocessing.process import BaseProcess
Expand Down Expand Up @@ -37,7 +37,7 @@ class MultiprocExecutor(Executor):
def __init__(self, vllm_config: VllmConfig) -> None:
# Call self.shutdown at exit to clean up
# and ensure workers will be terminated.
atexit.register(self.shutdown)
self._finalizer = weakref.finalize(self, self.shutdown)

self.vllm_config = vllm_config
self.parallel_config = vllm_config.parallel_config
Expand Down Expand Up @@ -195,14 +195,10 @@ def _cleanup_sockets(self):
os.remove(socket_path)

def shutdown(self):
if atexit:
# in case shutdown was called explicitly, we don't need to call it
# again
atexit.unregister(self.shutdown)
"""Properly shut down the executor and its workers"""
if getattr(self, 'shutting_down', False):
self.shutting_down = True
for w in self.workers: #TODO: not sure if needed
for w in self.workers:
w.worker_response_mq = None
self._ensure_worker_termination()

Expand Down
Loading