Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions vllm_ascend/ops/common_fused_moe.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def __init__(self, *args, **kwargs):
os.R_OK):
self.expert_load_balancer = ExpertLoadBalancer(
self.expert_map_path, self.global_num_experts)
self.expert_load_balancer.check_expert_map_tensor()
self.global_redundant_expert_num = (
self.expert_load_balancer.get_global_redundant_expert_num())
try:
Expand Down
28 changes: 23 additions & 5 deletions vllm_ascend/ops/expert_load_balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,31 @@
from typing import Dict, List

import torch
import torch.distributed as dist


class ExpertLoadBalancer(object):

def __init__(self, expert_map_path, global_expert_num):
self.expert_map_path = expert_map_path
self.global_expert_num = global_expert_num
self.tensor_data = []
self.expert_map_tensor, self.layers_num, self.ranks_num = (
self._expert_file_to_tensor())
self.expert_placement_map = self.generate_expert_placement_map()

def _expert_file_to_tensor(self):
with open(self.expert_map_path, "r") as f:
data = json.load(f)
layers_num = data["moe_layer_count"]
gpus_num = data["layer_list"][0]["device_count"]

tensor_data = []
for layer in data["layer_list"]:
device_data = []
for device in layer["device_list"]:
device_data.append(device["device_expert"])
tensor_data.append(device_data)
expert_map_tensor = torch.tensor(tensor_data, dtype=torch.int32)
self.tensor_data.append(device_data)
expert_map_tensor = torch.tensor(self.tensor_data, dtype=torch.int32)
return expert_map_tensor, layers_num, gpus_num

def generate_index_dicts(self, tensor_2d):
Expand Down Expand Up @@ -81,8 +83,7 @@ def generate_log2phy_expert_map(self, layer_id):
return log2phy_map

def get_rank_placement_map(self, layer_id, rank_id):
expert_placement_map = self.generate_expert_placement_map()
layer_expert_map = expert_placement_map[layer_id]
layer_expert_map = self.expert_placement_map[layer_id]
rank_expert_map = layer_expert_map[rank_id].to(
torch.npu.current_device())
rank_local_expert_num = torch.sum(torch.ne(rank_expert_map, -1)).item()
Expand All @@ -97,3 +98,20 @@ def get_global_redundant_expert_num(self):
len(self.expert_map_tensor[0][0]) * self.ranks_num -
self.global_expert_num)
return global_redundant_expert_num

def check_expert_map_tensor(self):
if dist.is_initialized():
try:
rank = dist.get_rank()
world_size = dist.get_world_size()
all_expert_maps = [None for _ in range(world_size)]
dist.all_gather_object(all_expert_maps, self.tensor_data)
for rank_id, expert_map_tensor in enumerate(all_expert_maps):
if self.tensor_data != expert_map_tensor:
raise ValueError(
f"The expert map of rank{rank} is not equal to rank{rank_id}"
)
return True
except Exception as e:
raise ValueError(
f"The expert maps of all ranks are inconsistency: {e}")
Comment on lines +102 to +117
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This method has a critical bug: it references self.tensor_data, which is not an attribute of the ExpertLoadBalancer class. This will cause a NameError at runtime. Based on the class's __init__ method, you probably meant to use self.expert_map_tensor.

Additionally, comparing tensors with != is not suitable for checking equality in a boolean context, as it performs an element-wise comparison and returns a boolean tensor.

To fix this, I suggest converting the tensor to a list before gathering and comparing the lists. This approach seems to align with your use of dist.all_gather_object.

I've also corrected a typo ('inconsistency' -> 'inconsistent') and improved f-string formatting for better readability.

Suggested change
def check_expert_map_tensor(self):
if dist.is_initialized():
try:
rank = dist.get_rank()
world_size = dist.get_world_size()
all_expert_maps = [None for _ in range(world_size)]
dist.all_gather_object(all_expert_maps, self.tensor_data)
for rank_id, expert_map_tensor in enumerate(all_expert_maps):
if self.tensor_data != expert_map_tensor:
raise ValueError(
f"The expert map of rank{rank} is not equal to rank{rank_id}"
)
return True
except Exception as e:
raise ValueError(
f"The expert maps of all ranks are inconsistency: {e}")
def check_expert_map_tensor(self):
if dist.is_initialized():
try:
rank = dist.get_rank()
world_size = dist.get_world_size()
all_expert_maps = [None for _ in range(world_size)]
current_map_list = self.expert_map_tensor.tolist()
dist.all_gather_object(all_expert_maps, current_map_list)
for rank_id, other_map_list in enumerate(all_expert_maps):
if current_map_list != other_map_list:
raise ValueError(
f"The expert map of rank {rank} is not equal to rank {rank_id}"
)
return True
except Exception as e:
raise ValueError(
f"The expert maps of all ranks are inconsistent: {e}")

1 change: 1 addition & 0 deletions vllm_ascend/torchair/ops/torchair_fused_moe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,7 @@ def __init__(
os.R_OK):
self.expert_load_balancer = ExpertLoadBalancer(
self.expert_map_path, self.global_num_experts)
self.expert_load_balancer.check_expert_map_tensor()
self.global_redundant_expert_num = (
self.expert_load_balancer.get_global_redundant_expert_num())
try:
Expand Down