Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fastdeploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def _post_init(self):
if self.runner_type == "generate" and not is_generative_model:
if is_multimodal_model:
pass
elif self.model_impl in ("auto", "paddleformers"):
elif self.model_impl in ("auto", "paddleformers", "paddlefleet"):
# Skip check for auto/paddleformers - may fallback to paddleformers which supports any model
pass
else:
Expand Down
47 changes: 14 additions & 33 deletions fastdeploy/model_executor/models/paddleformers/base_fleet.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,6 @@ def __init__(self, fd_config: "FDConfig", **kwargs):

# Assign parallel config from fd_config.parallel_config to paddleformers_config
parallel_config = fd_config.parallel_config
# parallel_config.tensor_parallel_size = 1
# parallel_config.expert_parallel_size = 2
self.paddleformers_config.data_parallel_size = parallel_config.data_parallel_size
self.paddleformers_config.tensor_model_parallel_size = parallel_config.tensor_parallel_size
self.paddleformers_config.sequence_parallel = parallel_config.sequence_parallel
Expand All @@ -305,6 +303,8 @@ def __init__(self, fd_config: "FDConfig", **kwargs):
# self.paddleformers_config.moe_grouped_gemm = True
self.paddleformers_config.moe_token_dispatcher_type = "deepep"
# self.paddleformers_config.use_cpu_initialization = True
self.paddleformers_config.use_cpu_initialization = True
self.paddleformers_config.perform_initialization = False
self.paddleformers_config.gated_attention = getattr(self.paddleformers_config, "use_gated_attn", False)
if getattr(self.paddleformers_config, "multi_latent_attention", False):
self.paddleformers_config.qk_head_dim = (
Expand Down Expand Up @@ -396,6 +396,16 @@ def _init_paddlefleet_parallel_state(self, fd_config) -> None:
"mp",
],
}
# Reset parallel state so that PaddleFleet's fleet.init can reinitialize
# with the correct EP topology instead of reusing FastDeploy's.
import paddle.distributed.fleet.base.topology as tp_mod
import paddle.distributed.parallel_helper as ph

# 1) Reset hybrid parallel group so _init_hybrid_parallel_env runs again
tp_mod._HYBRID_PARALLEL_GROUP = None
# 2) Reset parallel context so init_parallel_env runs again
ph.__parallel_ctx__clz__ = None

fleet.init(is_collective=True, strategy=strategy)
logger.info(
f"Initialized PaddleFleet parallel_state via initialize_fleet "
Expand All @@ -405,40 +415,11 @@ def _init_paddlefleet_parallel_state(self, fd_config) -> None:
f"sp={parallel_config.sequence_parallel})"
)

import paddle.distributed as dist
from paddlefleet import parallel_state

hcg = fleet.get_hybrid_communicate_group()
expected_tp_size = parallel_config.tensor_parallel_size

# Check if we need to initialize or reinitialize TP group
need_init = False
if parallel_state._TENSOR_MODEL_PARALLEL_GROUP is None:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug 这里只在 _TENSOR_MODEL_PARALLEL_GROUP is None 时才用新的 HCG 初始化 PaddleFleet TP group,会把已有但拓扑已过期的 TP group 保留下来。

本函数前面只重置了 paddle.distributed.fleet.base.topology._HYBRID_PARALLEL_GROUPpaddle.distributed.parallel_helper.__parallel_ctx__clz__,没有清理 paddlefleet.parallel_state._TENSOR_MODEL_PARALLEL_GROUP。如果该全局已经由旧的 FastDeploy/Fleet 拓扑初始化过,fleet.init() 重建 HCG 后这里会直接跳过 parallel_state.initialize_model_parallel(hcg),PaddleFleet 的 Column/RowParallelLinear 仍按旧 TP group 分片,和新的 EP/TP 拓扑不一致。

建议修复方式:在重新 fleet.init() 前同步清理 PaddleFleet parallel_state 中的 TP group/global ranks,或保留旧代码里的 group size/topology mismatch 检查;发现当前 group 与 parallel_config.tensor_parallel_size 或新 HCG 不一致时,必须用 fleet.get_hybrid_communicate_group() 重新调用 parallel_state.initialize_model_parallel(hcg)

need_init = True
reason = "TP group not initialized"
else:
# Check if current TP group size matches expected
current_tp_group = parallel_state._TENSOR_MODEL_PARALLEL_GROUP
current_tp_size = getattr(current_tp_group, "nranks", None)
if current_tp_size is None:
current_tp_size = getattr(current_tp_group, "world_size", None)
if current_tp_size != expected_tp_size:
need_init = True
reason = f"TP group size mismatch: current={current_tp_size}, expected={expected_tp_size}"

if need_init:
logger.warning(f"{reason}, reinitializing TP group with size={expected_tp_size}")
if expected_tp_size == 1:
# Single process TP group - create manually
current_rank = dist.get_rank()
tp_ranks = [current_rank]
default_pg = dist.new_group(ranks=tp_ranks)
parallel_state._TENSOR_MODEL_PARALLEL_GROUP = default_pg
parallel_state._TENSOR_MODEL_PARALLEL_GLOBAL_RANKS = tp_ranks
logger.info(f"Reinitialized TP group with size=1, rank={current_rank}, ranks={tp_ranks}")
else:
# Multiple processes - use hcg's mp group
parallel_state.initialize_model_parallel(hcg)
hcg = fleet.get_hybrid_communicate_group()
parallel_state.initialize_model_parallel(hcg)

from paddlefleet.tensor_parallel.random import (
model_parallel_cuda_manual_seed,
Expand Down
Loading