From f5a72c4e61e6e761172abe66f842663aa073e2c4 Mon Sep 17 00:00:00 2001 From: xiaoguoguo626807 Date: Thu, 11 Jun 2026 14:59:37 +0800 Subject: [PATCH] support ep --- fastdeploy/config.py | 2 +- .../models/paddleformers/base_fleet.py | 47 ++++++------------- 2 files changed, 15 insertions(+), 34 deletions(-) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index dfb1e3c530d..f7cd4a5000d 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -319,7 +319,7 @@ def _post_init(self): if self.runner_type == "generate" and not is_generative_model: if is_multimodal_model: pass - elif self.model_impl in ("auto", "paddleformers"): + elif self.model_impl in ("auto", "paddleformers", "paddlefleet"): # Skip check for auto/paddleformers - may fallback to paddleformers which supports any model pass else: diff --git a/fastdeploy/model_executor/models/paddleformers/base_fleet.py b/fastdeploy/model_executor/models/paddleformers/base_fleet.py index bc7540121a8..fc93a723372 100644 --- a/fastdeploy/model_executor/models/paddleformers/base_fleet.py +++ b/fastdeploy/model_executor/models/paddleformers/base_fleet.py @@ -290,8 +290,6 @@ def __init__(self, fd_config: "FDConfig", **kwargs): # Assign parallel config from fd_config.parallel_config to paddleformers_config parallel_config = fd_config.parallel_config - # parallel_config.tensor_parallel_size = 1 - # parallel_config.expert_parallel_size = 2 self.paddleformers_config.data_parallel_size = parallel_config.data_parallel_size self.paddleformers_config.tensor_model_parallel_size = parallel_config.tensor_parallel_size self.paddleformers_config.sequence_parallel = parallel_config.sequence_parallel @@ -305,6 +303,8 @@ def __init__(self, fd_config: "FDConfig", **kwargs): # self.paddleformers_config.moe_grouped_gemm = True self.paddleformers_config.moe_token_dispatcher_type = "deepep" # self.paddleformers_config.use_cpu_initialization = True + self.paddleformers_config.use_cpu_initialization = True + self.paddleformers_config.perform_initialization = False self.paddleformers_config.gated_attention = getattr(self.paddleformers_config, "use_gated_attn", False) if getattr(self.paddleformers_config, "multi_latent_attention", False): self.paddleformers_config.qk_head_dim = ( @@ -396,6 +396,16 @@ def _init_paddlefleet_parallel_state(self, fd_config) -> None: "mp", ], } + # Reset parallel state so that PaddleFleet's fleet.init can reinitialize + # with the correct EP topology instead of reusing FastDeploy's. + import paddle.distributed.fleet.base.topology as tp_mod + import paddle.distributed.parallel_helper as ph + + # 1) Reset hybrid parallel group so _init_hybrid_parallel_env runs again + tp_mod._HYBRID_PARALLEL_GROUP = None + # 2) Reset parallel context so init_parallel_env runs again + ph.__parallel_ctx__clz__ = None + fleet.init(is_collective=True, strategy=strategy) logger.info( f"Initialized PaddleFleet parallel_state via initialize_fleet " @@ -405,40 +415,11 @@ def _init_paddlefleet_parallel_state(self, fd_config) -> None: f"sp={parallel_config.sequence_parallel})" ) - import paddle.distributed as dist from paddlefleet import parallel_state - hcg = fleet.get_hybrid_communicate_group() - expected_tp_size = parallel_config.tensor_parallel_size - - # Check if we need to initialize or reinitialize TP group - need_init = False if parallel_state._TENSOR_MODEL_PARALLEL_GROUP is None: - need_init = True - reason = "TP group not initialized" - else: - # Check if current TP group size matches expected - current_tp_group = parallel_state._TENSOR_MODEL_PARALLEL_GROUP - current_tp_size = getattr(current_tp_group, "nranks", None) - if current_tp_size is None: - current_tp_size = getattr(current_tp_group, "world_size", None) - if current_tp_size != expected_tp_size: - need_init = True - reason = f"TP group size mismatch: current={current_tp_size}, expected={expected_tp_size}" - - if need_init: - logger.warning(f"{reason}, reinitializing TP group with size={expected_tp_size}") - if expected_tp_size == 1: - # Single process TP group - create manually - current_rank = dist.get_rank() - tp_ranks = [current_rank] - default_pg = dist.new_group(ranks=tp_ranks) - parallel_state._TENSOR_MODEL_PARALLEL_GROUP = default_pg - parallel_state._TENSOR_MODEL_PARALLEL_GLOBAL_RANKS = tp_ranks - logger.info(f"Reinitialized TP group with size=1, rank={current_rank}, ranks={tp_ranks}") - else: - # Multiple processes - use hcg's mp group - parallel_state.initialize_model_parallel(hcg) + hcg = fleet.get_hybrid_communicate_group() + parallel_state.initialize_model_parallel(hcg) from paddlefleet.tensor_parallel.random import ( model_parallel_cuda_manual_seed,