Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1264,11 +1264,17 @@ def issue_write_back_storage_task(self, task: WriteStorageTask, is_sync=True):

def wait_write_storage_task(self, req_id):
"""
Sync write back task
Sync write back task with timeout to prevent blocking the main process indefinitely.
"""
if req_id in self.task_write_back_event:
self.task_write_back_event[req_id].wait()
del self.task_write_back_event[req_id]
timeout = envs.FD_AS_WAIT_TIMEOUT
success = self.task_write_back_event[req_id].wait(timeout=timeout)
if not success:
logger.error(
f"wait_write_storage_task: write back to storage timed out after {timeout}s, "
f"req_id: {req_id}, skipping to avoid blocking scheduling"
)
self.task_write_back_event.pop(req_id, None)

def issue_prefetch_storage_task(self, task: ReadStorageTask, is_sync=True):
"""
Expand All @@ -1292,10 +1298,16 @@ def wait_prefetch_storage_task(self, req_id):
if req_id not in self.task_prefetch_event:
return None

self.task_prefetch_event[req_id].wait()
storage_block_ids = self.storage_prefetch_block_ids[req_id]
del self.task_prefetch_event[req_id]
del self.storage_prefetch_block_ids[req_id]
timeout = envs.FD_AS_WAIT_TIMEOUT
success = self.task_prefetch_event[req_id].wait(timeout=timeout)
if not success:
logger.error(
f"wait_prefetch_storage_task: prefetch from storage timed out after {timeout}s, "
f"req_id: {req_id}, skipping to avoid blocking scheduling"
)
storage_block_ids = self.storage_prefetch_block_ids.get(req_id, [])
self.task_prefetch_event.pop(req_id, None)
self.storage_prefetch_block_ids.pop(req_id, None)

This comment was marked as outdated.

return storage_block_ids

def free_nodes_directly(self, node):
Expand Down
2 changes: 2 additions & 0 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ def _validate_split_kv_size(value: int) -> int:
"FD_ENABLE_SWAP_SPACE_CLEARING": lambda: int(os.getenv("FD_ENABLE_SWAP_SPACE_CLEARING", "0")),
# AS-only flush mode: AttentionStore only reports cache index without storing actual data.
"FD_AS_ONLY_FLUSH": lambda: bool(int(os.getenv("FD_AS_ONLY_FLUSH", "0"))),
# Timeout (seconds) for waiting AS write-back/prefetch tasks to complete before skipping.
"FD_AS_WAIT_TIMEOUT": lambda: float(os.getenv("FD_AS_WAIT_TIMEOUT", "45")),
# enable return text, used when FD_ENABLE_INTERNAL_ADAPTER=1
"FD_ENABLE_RETURN_TEXT": lambda: bool(int(os.getenv("FD_ENABLE_RETURN_TEXT", "0"))),
# Used to truncate the string inserted during thinking when reasoning in a model. (</think> for ernie-45-vl, \n</think>\n\n for ernie-x1)
Expand Down
Loading