diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 5c2c62965fe..d56e1618c74 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -2125,7 +2125,8 @@ def _process_allocate_resource_requests(): if not is_success: if not self.enable_decode_cache_task: - task.error_msg = "Not enough resources" + if not task.get("error_msg", None): + task.error_msg = "Not enough resources" self.split_connector.send_cache_info_to_prefill([task]) self.llm_logger.warning(f"D has failed to send cache infos for task {task.request_id}") processed_indices.append(idx) diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 9b9c78d6a59..8c9b56c7299 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -1587,6 +1587,13 @@ def preallocate_resource_in_d(self, request: Request): ) // self.config.cache_config.block_size + self.config.cache_config.enc_dec_block_num with self.lock: + if request.request_id in self.requests: + llm_logger.warning( + f"PD reschedule: request {request.request_id} already exists in D, " + f"rejecting to avoid KV cache conflict" + ) + request.error_msg = "Duplicate request id in decode" + return False if len(self.waiting) > 0: return False if self.available_batch() == 0: