Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions vllm_ascend/worker/model_runner_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
from vllm.compilation.counter import compilation_counter
from vllm.compilation.monitor import set_cudagraph_capturing_enabled
from vllm.config import CUDAGraphMode, VllmConfig, get_layers_from_vllm_config
from vllm.distributed.ec_transfer import (get_ec_transfer,
has_ec_transfer)

Check failure on line 49 in vllm_ascend/worker/model_runner_v1.py

View workflow job for this annotation

GitHub Actions / lint / pre-commit

Cannot find implementation or library stub for module named "vllm.distributed.ec_transfer" [import-not-found]

Check failure on line 49 in vllm_ascend/worker/model_runner_v1.py

View workflow job for this annotation

GitHub Actions / lint / pre-commit

Cannot find implementation or library stub for module named "vllm.distributed.ec_transfer" [import-not-found]

Check failure on line 49 in vllm_ascend/worker/model_runner_v1.py

View workflow job for this annotation

GitHub Actions / lint / pre-commit

Cannot find implementation or library stub for module named "vllm.distributed.ec_transfer" [import-not-found]
from vllm.distributed import tensor_model_parallel_all_gather
from vllm.distributed.kv_transfer import (get_kv_transfer_group,
has_kv_transfer_group)
Expand Down Expand Up @@ -96,20 +98,22 @@
MambaSpec, MLAAttentionSpec,
UniformTypeKVCacheSpecs)
# yapf: enable
from vllm.v1.outputs import (EMPTY_MODEL_RUNNER_OUTPUT, AsyncModelRunnerOutput,
DraftTokenIds, LogprobsTensors, ModelRunnerOutput,
PoolerOutput)
from vllm.v1.outputs import (EMPTY_MODEL_RUNNER_OUTPUT, ECConnectorOutput,
AsyncModelRunnerOutput, DraftTokenIds, LogprobsTensors,
ModelRunnerOutput,
PoolerOutput, make_empty_encoder_model_runner_output)
from vllm.v1.pool.metadata import PoolingMetadata
from vllm.v1.sample.metadata import SamplingMetadata
from vllm.v1.spec_decode.metadata import SpecDecodeMetadata
from vllm.v1.spec_decode.ngram_proposer import NgramProposer
from vllm.v1.utils import CpuGpuBuffer

Check failure on line 109 in vllm_ascend/worker/model_runner_v1.py

View workflow job for this annotation

GitHub Actions / lint / pre-commit

Cannot find implementation or library stub for module named "vllm.v1.worker.ec_connector_model_runner_mixin" [import-not-found]

Check failure on line 109 in vllm_ascend/worker/model_runner_v1.py

View workflow job for this annotation

GitHub Actions / lint / pre-commit

Cannot find implementation or library stub for module named "vllm.v1.worker.ec_connector_model_runner_mixin" [import-not-found]

Check failure on line 109 in vllm_ascend/worker/model_runner_v1.py

View workflow job for this annotation

GitHub Actions / lint / pre-commit

Cannot find implementation or library stub for module named "vllm.v1.worker.ec_connector_model_runner_mixin" [import-not-found]
from vllm.v1.worker.kv_connector_model_runner_mixin import KVConnectorOutput
from vllm.v1.worker.lora_model_runner_mixin import LoRAModelRunnerMixin
from vllm.v1.worker.utils import (AttentionGroup, bind_kv_cache,
gather_mm_placeholders,
sanity_check_mm_encoder_outputs,
scatter_mm_placeholders)
from vllm.v1.worker.ec_connector_model_runner_mixin import ECConnectorModelRunnerMixin

import vllm_ascend.envs as envs_ascend
from vllm_ascend.ascend_config import get_ascend_config
Expand Down Expand Up @@ -280,7 +284,7 @@
return output


class NPUModelRunner(LoRAModelRunnerMixin):
class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin):

def __init__(self, vllm_config: VllmConfig, device: torch.device):
self.vllm_config = vllm_config
Expand Down Expand Up @@ -816,6 +820,11 @@

req_ids_to_add.append(req_id)

# If this rank is an EC transfer producer,
# skip updating the states of KV cache blocks.
if has_ec_transfer() and get_ec_transfer().is_producer:
return

# Update the states of the running/resumed requests.
is_last_rank = get_pp_group().is_last_rank
req_data = scheduler_output.scheduled_cached_reqs
Expand Down Expand Up @@ -1774,8 +1783,12 @@
# _prepare_inputs may reorder the batch, so we must gather
# multi-modal outputs after that to ensure the correct order
if self.is_multimodal_model:
# Run the multimodal encoder if any.
self._execute_mm_encoder(scheduler_output)
with self.maybe_get_ec_connector_output(
scheduler_output,

Check failure on line 1787 in vllm_ascend/worker/model_runner_v1.py

View workflow job for this annotation

GitHub Actions / lint / pre-commit

Ruff (F841)

vllm_ascend/worker/model_runner_v1.py:1787:18: F841 Local variable `ec_connector_output` is assigned to but never used
encoder_cache=self.encoder_cache,
) as ec_connector_output:
# Run the multimodal encoder if any.
self._execute_mm_encoder(scheduler_output)
Comment on lines +1790 to +1791
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

On a consumer rank in a disaggregated setup, maybe_get_ec_connector_output is responsible for receiving the encoder outputs and populating self.encoder_cache. However, the subsequent call to _execute_mm_encoder will re-run the encoder and overwrite these received embeddings. This is a correctness bug that defeats the purpose of receiving the embeddings and also introduces a significant performance overhead.

Producer ranks are handled earlier in execute_model and do not reach this code path. Therefore, this block is executed for consumer ranks and for non-disaggregated setups. The encoder should only be executed in a non-disaggregated setup. A consumer rank can be identified by has_ec_transfer() being true.

I suggest adding a condition to only run the encoder when not in a disaggregated setup (i.e., when has_ec_transfer() is false).

Suggested change
# Run the multimodal encoder if any.
self._execute_mm_encoder(scheduler_output)
# In a disaggregated setup, consumer ranks receive encoder outputs
# and should not run the encoder.
# In a non-disaggregated setup, we need to run the encoder.
# Producer ranks are handled in `execute_model` and do not reach here.
if not has_ec_transfer():
# Run the multimodal encoder if any.
self._execute_mm_encoder(scheduler_output)


# NOTE(woosuk): To unify token ids and soft tokens (vision
# embeddings), we always use embeddings (rather than token ids)
Expand Down Expand Up @@ -2447,6 +2460,14 @@
) -> Union[ModelRunnerOutput, AsyncModelRunnerOutput, IntermediateTensors]:
with ProfileExecuteDuration().capture_async("prepare input"):
self._update_states(scheduler_output)
if has_ec_transfer() and get_ec_transfer().is_producer:
with self.maybe_get_ec_connector_output(
scheduler_output,

Check failure on line 2465 in vllm_ascend/worker/model_runner_v1.py

View workflow job for this annotation

GitHub Actions / lint / pre-commit

Ruff (F841)

vllm_ascend/worker/model_runner_v1.py:2465:22: F841 Local variable `ec_connector_output` is assigned to but never used
encoder_cache=self.encoder_cache,
) as ec_connector_output:
self._execute_mm_encoder(scheduler_output)
return make_empty_encoder_model_runner_output(scheduler_output)

if not scheduler_output.total_num_scheduled_tokens:
if not has_kv_transfer_group():
logger.debug(
Expand Down Expand Up @@ -3873,6 +3894,9 @@
format. Layers that do not need KV cache are not included.
"""

if has_ec_transfer() and get_ec_transfer().is_producer:
return {}

block_size = self.vllm_config.cache_config.block_size
use_mla = self.vllm_config.model_config.use_mla
use_sparse = self.use_sparse
Expand Down
2 changes: 2 additions & 0 deletions vllm_ascend/worker/worker_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from vllm.config import VllmConfig
from vllm.distributed import (ensure_model_parallel_initialized,
init_distributed_environment)
from vllm.distributed.ec_transfer import ensure_ec_transfer_initialized

Check failure on line 32 in vllm_ascend/worker/worker_v1.py

View workflow job for this annotation

GitHub Actions / lint / pre-commit

Cannot find implementation or library stub for module named "vllm.distributed.ec_transfer" [import-not-found]

Check failure on line 32 in vllm_ascend/worker/worker_v1.py

View workflow job for this annotation

GitHub Actions / lint / pre-commit

Cannot find implementation or library stub for module named "vllm.distributed.ec_transfer" [import-not-found]
from vllm.distributed.kv_transfer import ensure_kv_transfer_initialized
from vllm.distributed.parallel_state import get_pp_group, get_tp_group
from vllm.logger import logger
Expand Down Expand Up @@ -413,6 +414,7 @@
self.parallel_config.decode_context_parallel_size)
init_ascend_model_parallel(self.parallel_config)
ensure_kv_transfer_initialized(self.vllm_config)
ensure_ec_transfer_initialized(self.vllm_config)

def _init_profiler(self):
# Torch profiler. Enabled and configured through env vars:
Expand Down
Loading