Skip to content

Commit

Permalink
Merge branch 'main' into add_generation_config
Browse files Browse the repository at this point in the history
  • Loading branch information
DarkLight1337 authored Dec 16, 2024
2 parents d9fdb3b + 17138af commit 0708124
Show file tree
Hide file tree
Showing 12 changed files with 443 additions and 42 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ Documentation
usage/engine_args
usage/env_vars
usage/usage_stats
usage/disagg_prefill

.. toctree::
:maxdepth: 1
Expand Down
4 changes: 2 additions & 2 deletions docs/source/serving/deploying_with_k8s.rst
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ To test the deployment, run the following ``curl`` command:
curl http://mistral-7b.default.svc.cluster.local/v1/completions \
-H "Content-Type: application/json" \
-d '{
"model": "facebook/opt-125m",
"model": "mistralai/Mistral-7B-Instruct-v0.3",
"prompt": "San Francisco is a",
"max_tokens": 7,
"temperature": 0
Expand All @@ -172,4 +172,4 @@ If the service is correctly deployed, you should receive a response from the vLL

Conclusion
----------
Deploying vLLM with Kubernetes allows for efficient scaling and management of ML models leveraging GPU resources. By following the steps outlined above, you should be able to set up and test a vLLM deployment within your Kubernetes cluster. If you encounter any issues or have suggestions, please feel free to contribute to the documentation.
Deploying vLLM with Kubernetes allows for efficient scaling and management of ML models leveraging GPU resources. By following the steps outlined above, you should be able to set up and test a vLLM deployment within your Kubernetes cluster. If you encounter any issues or have suggestions, please feel free to contribute to the documentation.
69 changes: 69 additions & 0 deletions docs/source/usage/disagg_prefill.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
.. _disagg_prefill:

Disaggregated prefilling (experimental)
=======================================

This page introduces you the disaggregated prefilling feature in vLLM. This feature is experimental and subject to change.

Why disaggregated prefilling?
-----------------------------

Two main reasons:

* **Tuning time-to-first-token (TTFT) and inter-token-latency (ITL) separately**. Disaggregated prefilling put prefill and decode phase of LLM inference inside different vLLM instances. This gives you the flexibility to assign different parallel strategies (e.g. ``tp`` and ``pp``) to tune TTFT without affecting ITL, or to tune ITL without affecting TTFT.
* **Controlling tail ITL**. Without disaggregated prefilling, vLLM may insert some prefill jobs during the decoding of one request. This results in higher tail latency. Disaggregated prefilling helps you solve this issue and control tail ITL. Chunked prefill with a proper chunk size also can achieve the same goal, but in practice it's hard to figure out the correct chunk size value. So disaggregated prefilling is a much more reliable way to control tail ITL.

.. note::
Disaggregated prefill DOES NOT improve throughput.

Usage example
-------------

Please refer to ``examples/disaggregated_prefill.sh`` for the example usage of disaggregated prefilling.


Benchmarks
----------

Please refer to ``benchmarks/disagg_benchmarks/`` for disaggregated prefilling benchmarks.


Development
-----------

We implement disaggregated prefilling by running 2 vLLM instances. One for prefill (we call it prefill instance) and one for decode (we call it decode instance), and then use a connector to transfer the prefill KV caches and results from prefill instance to decode instance.

All disaggregated prefilling implementation is under ``vllm/distributed/kv_transfer``.

Key abstractions for disaggregated prefilling:

* **Connector**: Connector allows **kv consumer** to retrieve the KV caches of a batch of request from **kv producer**.
* **LookupBuffer**: LookupBuffer provides two API: ``insert`` KV cache and ``drop_select`` KV cache. The semantics of ``insert`` and ``drop_select`` are similar to SQL, where ``insert`` inserts a KV cache into the buffer, and ``drop_select`` returns the KV cache that matches the given condition and drop it from the buffer.
* **Pipe**: A single-direction FIFO pipe for tensor transmission. It supports ``send_tensor`` and ``recv_tensor``.

.. note::
``insert`` is non-blocking operation but ``drop_select`` is blocking operation.

Here is a figure illustrating how the above 3 abstractions are organized:

.. image:: /assets/usage/disagg_prefill/abstraction.jpg
:alt: Disaggregated prefilling abstractions

The workflow of disaggregated prefilling is as follows:

.. image:: /assets/usage/disagg_prefill/overview.jpg
:alt: Disaggregated prefilling workflow

The ``buffer`` corresponds to ``insert`` API in LookupBuffer, and the ``drop_select`` corresponds to ``drop_select`` API in LookupBuffer.


Third-party contributions
-------------------------

Disaggregated prefilling is highly related to infrastructure, so vLLM relies on third-party connectors for production-level disaggregated prefilling (and vLLM team will actively review and merge new PRs for third-party connectors).

We recommend three ways of implementations:

* **Fully-customized connector**: Implement your own ``Connector``, and call third-party libraries to send and receive KV caches, and many many more (like editing vLLM's model input to perform customized prefilling, etc). This approach gives you the most control, but at the risk of being incompatible with future vLLM versions.
* **Database-like connector**: Implement your own ``LookupBuffer`` and support the ``insert`` and ``drop_select`` APIs just like SQL.
* **Distributed P2P connector**: Implement your own ``Pipe`` and support the ``send_tensor`` and ``recv_tensor`` APIs, just like `torch.distributed`.
7 changes: 4 additions & 3 deletions vllm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2226,13 +2226,14 @@ def from_cli(cls, cli_value: str) -> "KVTransferConfig":
return KVTransferConfig.model_validate_json(cli_value)

def model_post_init(self, __context: Any) -> None:
supported_kv_connector = ["PyNcclConnector", "MooncakeConnector"]
if all([
self.kv_connector is not None,
self.kv_connector != "PyNcclConnector"
self.kv_connector is not None, self.kv_connector
not in supported_kv_connector
]):
raise ValueError(f"Unsupported kv_connector: {self.kv_connector}. "
f"Supported connectors are "
f"`PyNcclConnector`.")
f"{supported_kv_connector}.")

if self.kv_role is not None and self.kv_role not in [
"kv_producer", "kv_consumer", "kv_both"
Expand Down
3 changes: 2 additions & 1 deletion vllm/distributed/kv_transfer/kv_connector/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ class KVConnectorFactory:
@staticmethod
def create_connector(rank: int, local_rank: int,
config: "VllmConfig") -> KVConnectorBase:
if config.kv_transfer_config.kv_connector == 'PyNcclConnector':
supported_kv_connector = ["PyNcclConnector", "MooncakeConnector"]
if config.kv_transfer_config.kv_connector in supported_kv_connector:
from .simple_connector import SimpleConnector
return SimpleConnector(rank, local_rank, config)
else:
Expand Down
101 changes: 74 additions & 27 deletions vllm/distributed/kv_transfer/kv_connector/simple_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
Simple KV Cache Connector for Distributed Machine Learning Inference
The SimpleConnector transfers KV caches between prefill vLLM worker (KV cache
producer) and decode vLLM worker (KV cache consumer) using PyNcclPipe.
producer) and decode vLLM worker (KV cache consumer) using PyNcclPipe or
MooncakePipe.
But the logic can be extended to support other pipe and lookup buffer.
"""
Expand All @@ -15,7 +16,6 @@
from vllm.distributed.kv_transfer.kv_connector.base import KVConnectorBase
from vllm.distributed.kv_transfer.kv_lookup_buffer.simple_buffer import (
SimpleBuffer)
from vllm.distributed.kv_transfer.kv_pipe.pynccl_pipe import PyNcclPipe
from vllm.logger import init_logger
from vllm.sequence import IntermediateTensors

Expand All @@ -36,32 +36,66 @@ def __init__(

self.config = config.kv_transfer_config

logger.info("Initializing PyNcclConfig under kv_transfer_config %s",
if self.config.kv_connector == "PyNcclConnector":
from vllm.distributed.kv_transfer.kv_pipe.pynccl_pipe import (
PyNcclPipe)
logger.info(
"Initializing PyNcclConfig under kv_transfer_config %s",
self.config)
elif self.config.kv_connector == "MooncakeConnector":
# Check if MOONCAKE_CONFIG_PATH is set
import os
use_mooncake_distributed_pipe = os.getenv(
'MOONCAKE_CONFIG_PATH') is not None

if not use_mooncake_distributed_pipe:
raise ValueError(
"To use MooncakeConnector, you need to pass the ENV: "
"'MOONCAKE_CONFIG_PATH=/path/to/mooncake_config.json'.")
else:
from vllm.distributed.kv_transfer.kv_pipe.mooncake_pipe import ( # noqa: E501
MooncakePipe)
logger.info(
"Initializing MooncakeConfig under kv_transfer_config %s",
self.config)

self.lookup_buffer_size = self.config.kv_buffer_size

self.producer_buffer: Optional[SimpleBuffer] = None
self.consumer_buffer: Optional[SimpleBuffer] = None

self.producer_data_pipe: Union[PyNcclPipe, MooncakePipe]
self.consumer_data_pipe: Union[PyNcclPipe, MooncakePipe]
self.producer_signal_pipe: Union[PyNcclPipe, MooncakePipe]
self.consumer_signal_pipe: Union[PyNcclPipe, MooncakePipe]

# 2 pipes for every rank in the world
port_offset_base = 2 * rank

# In disaggregated prefill, the prefill vLLM only uses send pipe
# and the decode vLLM only uses recv pipe
if self.config.is_kv_producer:

self.producer_data_pipe = PyNcclPipe(
local_rank=local_rank,
config=self.config,
port_offset=port_offset_base,
)
self.producer_signal_pipe = PyNcclPipe(
local_rank=local_rank,
config=self.config,
port_offset=port_offset_base + 1,
device="cpu",
)
if self.config.kv_connector == "PyNcclConnector":
self.producer_data_pipe = PyNcclPipe(
local_rank=local_rank,
config=self.config,
port_offset=port_offset_base,
)
self.producer_signal_pipe = PyNcclPipe(
local_rank=local_rank,
config=self.config,
port_offset=port_offset_base + 1,
device="cpu",
)
elif self.config.kv_connector == "MooncakeConnector":
self.producer_data_pipe = MooncakePipe(
local_rank=local_rank,
config=self.config,
)
# We only need to initialize MooncakePipe once
self.producer_signal_pipe = self.producer_data_pipe

self.producer_buffer = SimpleBuffer(self.producer_signal_pipe,
self.producer_data_pipe,
self.config.kv_buffer_size)
Expand All @@ -70,17 +104,25 @@ def __init__(

# the current vLLM instance is KV consumer, so it needs to connect
# its recv pipe to the send pipe of KV producder
self.consumer_data_pipe = PyNcclPipe(
local_rank=local_rank,
config=self.config,
port_offset=port_offset_base,
)
self.consumer_signal_pipe = PyNcclPipe(
local_rank=local_rank,
config=self.config,
port_offset=port_offset_base + 1,
device="cpu",
)
if self.config.kv_connector == "PyNcclConnector":
self.consumer_data_pipe = PyNcclPipe(
local_rank=local_rank,
config=self.config,
port_offset=port_offset_base,
)
self.consumer_signal_pipe = PyNcclPipe(
local_rank=local_rank,
config=self.config,
port_offset=port_offset_base + 1,
device="cpu",
)
elif self.config.kv_connector == "MooncakeConnector":
self.consumer_data_pipe = MooncakePipe(
local_rank=local_rank,
config=self.config,
)
self.consumer_signal_pipe = self.consumer_data_pipe

self.consumer_buffer = SimpleBuffer(
self.consumer_signal_pipe,
self.consumer_data_pipe,
Expand Down Expand Up @@ -260,6 +302,11 @@ def recv_kv_caches_and_hidden_states(

def close(self):
self.producer_data_pipe.close()
self.producer_signal_pipe.close()
self.consumer_data_pipe.close()
self.consumer_signal_pipe.close()
if self.config.kv_connector == "PyNcclConnector":
self.producer_signal_pipe.close()
self.consumer_signal_pipe.close()
elif self.config.kv_connector == "MooncakeConnector":
# MooncakePipe reuses data_pipe for signal_pipe, so we only have to
# close the data_pipe.
pass
Loading

0 comments on commit 0708124

Please # to comment.