forked from vllm-project/vllm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_engine_core.py
141 lines (111 loc) · 4.78 KB
/
test_engine_core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import time
import uuid
import pytest
from transformers import AutoTokenizer
from vllm import SamplingParams
from vllm.engine.arg_utils import EngineArgs
from vllm.platforms import current_platform
from vllm.usage.usage_lib import UsageContext
from vllm.v1.engine import EngineCoreRequest
from vllm.v1.engine.async_llm import AsyncLLM
from vllm.v1.engine.core import EngineCore
if not current_platform.is_cuda():
pytest.skip(reason="V1 currently only supported on CUDA.",
allow_module_level=True)
MODEL_NAME = "meta-llama/Llama-3.2-1B-Instruct"
TOKENIZER = AutoTokenizer.from_pretrained(MODEL_NAME)
PROMPT = "Hello my name is Robert and I love quantization kernels"
PROMPT_TOKENS = TOKENIZER(PROMPT).input_ids
def make_request() -> EngineCoreRequest:
return EngineCoreRequest(
request_id=uuid.uuid4(),
prompt=PROMPT,
prompt_token_ids=PROMPT_TOKENS,
mm_data=None,
mm_placeholders=None,
mm_processor_kwargs=None,
sampling_params=SamplingParams(),
eos_token_id=None,
arrival_time=time.time(),
lora_request=None,
)
def test_engine_core(monkeypatch):
with monkeypatch.context() as m:
m.setenv("VLLM_USE_V1", "1")
"""Setup the EngineCore."""
engine_args = EngineArgs(model=MODEL_NAME)
vllm_config = engine_args.create_engine_config(
usage_context=UsageContext.UNKNOWN_CONTEXT)
executor_class = AsyncLLM._get_executor_cls(vllm_config)
engine_core = EngineCore(vllm_config=vllm_config,
executor_class=executor_class,
usage_context=UsageContext.UNKNOWN_CONTEXT)
"""Test basic request lifecycle."""
# First request.
engine_core.add_request(make_request())
assert len(engine_core.scheduler.waiting) == 1
assert len(engine_core.scheduler.running) == 0
_ = engine_core.step()
assert len(engine_core.scheduler.waiting) == 0
assert len(engine_core.scheduler.running) == 1
# Second request.
engine_core.add_request(make_request())
assert len(engine_core.scheduler.waiting) == 1
assert len(engine_core.scheduler.running) == 1
_ = engine_core.step()
assert len(engine_core.scheduler.waiting) == 0
assert len(engine_core.scheduler.running) == 2
# Add two requests in a row.
engine_core.add_request(make_request())
engine_core.add_request(make_request())
assert len(engine_core.scheduler.waiting) == 2
assert len(engine_core.scheduler.running) == 2
_ = engine_core.step()
assert len(engine_core.scheduler.waiting) == 0
assert len(engine_core.scheduler.running) == 4
# Loop through until they are all done.
while len(engine_core.step()) > 0:
pass
assert len(engine_core.scheduler.waiting) == 0
assert len(engine_core.scheduler.running) == 0
"""Test abort cycle."""
# Basic abort.
req = make_request()
request_id = req.request_id
engine_core.add_request(req)
assert len(engine_core.scheduler.waiting) == 1
assert len(engine_core.scheduler.running) == 0
_ = engine_core.step()
assert len(engine_core.scheduler.waiting) == 0
assert len(engine_core.scheduler.running) == 1
engine_core.abort_requests([request_id])
assert len(engine_core.scheduler.waiting) == 0
assert len(engine_core.scheduler.running) == 0
# Add, step, abort 1 of the 3.
req0 = make_request()
req1 = make_request()
req2 = make_request()
engine_core.add_request(req0)
engine_core.add_request(req1)
assert len(engine_core.scheduler.waiting) == 2
assert len(engine_core.scheduler.running) == 0
_ = engine_core.step()
assert len(engine_core.scheduler.waiting) == 0
assert len(engine_core.scheduler.running) == 2
engine_core.add_request(req2)
assert len(engine_core.scheduler.waiting) == 1
assert len(engine_core.scheduler.running) == 2
_ = engine_core.step()
assert len(engine_core.scheduler.waiting) == 0
assert len(engine_core.scheduler.running) == 3
# Abort just one.
engine_core.abort_requests([req1.request_id])
assert len(engine_core.scheduler.waiting) == 0
assert len(engine_core.scheduler.running) == 2
_ = engine_core.step()
assert len(engine_core.scheduler.waiting) == 0
assert len(engine_core.scheduler.running) == 2
# Abort the other requests at the same time.
engine_core.abort_requests([req2.request_id, req0.request_id])
assert len(engine_core.scheduler.waiting) == 0
assert len(engine_core.scheduler.running) == 0