diff --git a/fastdeploy/cache_manager/prefix_cache_manager.py b/fastdeploy/cache_manager/prefix_cache_manager.py index 70c8b20de68..7ba7bde35c2 100644 --- a/fastdeploy/cache_manager/prefix_cache_manager.py +++ b/fastdeploy/cache_manager/prefix_cache_manager.py @@ -1264,11 +1264,17 @@ def issue_write_back_storage_task(self, task: WriteStorageTask, is_sync=True): def wait_write_storage_task(self, req_id): """ - Sync write back task + Sync write back task with timeout to prevent blocking the main process indefinitely. """ if req_id in self.task_write_back_event: - self.task_write_back_event[req_id].wait() - del self.task_write_back_event[req_id] + timeout = envs.FD_AS_WAIT_TIMEOUT + success = self.task_write_back_event[req_id].wait(timeout=timeout) + if not success: + logger.error( + f"wait_write_storage_task: write back to storage timed out after {timeout}s, " + f"req_id: {req_id}, skipping to avoid blocking scheduling" + ) + self.task_write_back_event.pop(req_id, None) def issue_prefetch_storage_task(self, task: ReadStorageTask, is_sync=True): """ @@ -1292,10 +1298,16 @@ def wait_prefetch_storage_task(self, req_id): if req_id not in self.task_prefetch_event: return None - self.task_prefetch_event[req_id].wait() - storage_block_ids = self.storage_prefetch_block_ids[req_id] - del self.task_prefetch_event[req_id] - del self.storage_prefetch_block_ids[req_id] + timeout = envs.FD_AS_WAIT_TIMEOUT + success = self.task_prefetch_event[req_id].wait(timeout=timeout) + if not success: + logger.error( + f"wait_prefetch_storage_task: prefetch from storage timed out after {timeout}s, " + f"req_id: {req_id}, skipping to avoid blocking scheduling" + ) + storage_block_ids = self.storage_prefetch_block_ids.get(req_id, []) + self.task_prefetch_event.pop(req_id, None) + self.storage_prefetch_block_ids.pop(req_id, None) return storage_block_ids def free_nodes_directly(self, node): diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 5fd73e1962b..38680d18824 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -154,6 +154,8 @@ def _validate_split_kv_size(value: int) -> int: "FD_ENABLE_SWAP_SPACE_CLEARING": lambda: int(os.getenv("FD_ENABLE_SWAP_SPACE_CLEARING", "0")), # AS-only flush mode: AttentionStore only reports cache index without storing actual data. "FD_AS_ONLY_FLUSH": lambda: bool(int(os.getenv("FD_AS_ONLY_FLUSH", "0"))), + # Timeout (seconds) for waiting AS write-back/prefetch tasks to complete before skipping. + "FD_AS_WAIT_TIMEOUT": lambda: float(os.getenv("FD_AS_WAIT_TIMEOUT", "45")), # enable return text, used when FD_ENABLE_INTERNAL_ADAPTER=1 "FD_ENABLE_RETURN_TEXT": lambda: bool(int(os.getenv("FD_ENABLE_RETURN_TEXT", "0"))), # Used to truncate the string inserted during thinking when reasoning in a model. ( for ernie-45-vl, \n\n\n for ernie-x1)