Skip to content

[BugFix] Fix several bugs in the request interruption and inference termination functionality#6278

Open
qwes5s5 wants to merge 2 commits intoPaddlePaddle:developfrom
qwes5s5:fix_request_disconnect
Open

[BugFix] Fix several bugs in the request interruption and inference termination functionality#6278
qwes5s5 wants to merge 2 commits intoPaddlePaddle:developfrom
qwes5s5:fix_request_disconnect

Conversation

@qwes5s5
Copy link
Collaborator

@qwes5s5 qwes5s5 commented Jan 29, 2026

Motivation

💡 If this PR is a Cherry Pick, the PR title needs to follow the format by adding the [Cherry-Pick] label at the very beginning and appending the original PR ID at the end. For example, [Cherry-Pick][CI] Add check trigger and logic(#5191)

💡 如若此PR是Cherry Pick,PR标题需遵循格式,在最开始加上[Cherry-Pick]标签,以及最后面加上原PR ID,例如[Cherry-Pick][CI] Add check trigger and logic(#5191)

Current request interruption for inference termination has issues in both centralized and PD-disaggregated modes. These bugs can potentially cause resource leakage, service instability, and request hangs.

Modifications

  • Construct and dispatch ScheduledAbortTask to workers to distinguish it from ScheduledPreemptTask.
  • Implement differential handling based on the request's role and state within the ResourceManager to mitigate resource leaks.
  • Optimize the overall performance of inference interruption in PD-separated mode, ensuring requests are interrupted and resources are reclaimed across P-nodes, D-nodes, and during transmission.

Usage or Command

Accuracy Tests

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

@paddle-bot
Copy link

paddle-bot bot commented Jan 29, 2026

Thanks for your contribution!

@codecov-commenter
Copy link

codecov-commenter commented Jan 29, 2026

Codecov Report

❌ Patch coverage is 35.59322% with 114 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (develop@c4abb01). Learn more about missing BASE report.

Files with missing lines Patch % Lines
fastdeploy/engine/common_engine.py 16.96% 90 Missing and 3 partials ⚠️
fastdeploy/splitwise/splitwise_connector.py 0.00% 9 Missing ⚠️
fastdeploy/scheduler/local_scheduler.py 40.00% 5 Missing and 1 partial ⚠️
fastdeploy/engine/sched/resource_manager_v1.py 85.71% 0 Missing and 3 partials ⚠️
fastdeploy/scheduler/dp_scheduler.py 80.00% 1 Missing and 1 partial ⚠️
fastdeploy/output/token_processor.py 90.90% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             develop    #6278   +/-   ##
==========================================
  Coverage           ?   67.57%           
==========================================
  Files              ?      391           
  Lines              ?    52867           
  Branches           ?     8244           
==========================================
  Hits               ?    35727           
  Misses             ?    14523           
  Partials           ?     2617           
Flag Coverage Δ
GPU 67.57% <35.59%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@qwes5s5 qwes5s5 changed the title [BugFix] Fix several bugs in PR #5320 [BugFix] Fix several bugs in the request interruption and inference termination functionality Feb 11, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

该 PR 旨在修复推理请求中断(ABORT)与推理终止流程在集中式与 PD(prefill/decode)拆分模式下的若干问题,避免资源泄漏、服务不稳定与请求卡死。

Changes:

  • 引入 ABORT 类型与 ScheduledAbortTask,用于将“终止推理释放资源”与“PREEMPT 让出资源重调度”区分开。
  • 在 Engine/ResourceManager/Connector 等路径增加对 abort 的差异化处理与资源回收逻辑。
  • 调整 Local/DP Scheduler 的回收游标更新与过期请求回收逻辑,并补充调试日志。

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
fastdeploy/worker/gpu_model_runner.py 在 worker 侧对 preempt/abort 任务增加区分日志与 stop 标记写入
fastdeploy/splitwise/splitwise_connector.py prefill 等待 decode 资源分配时增加 abort 快速退出逻辑
fastdeploy/scheduler/local_scheduler.py 修正 recycle 时 ids_read_cursor 的更新方式,并改动过期回收实现
fastdeploy/scheduler/dp_scheduler.py 同 local_scheduler,对 DP 场景的 recycle 逻辑做一致修正
fastdeploy/output/token_processor.py 调整 abort 负 token 场景下的回收与 abort set 清理时机,并在 prefill 回收等待中加入 abort 处理
fastdeploy/engine/sched/resource_manager_v1.py 新增 ScheduledAbortTask / abort_recycle_resource,并调整 add_request_in_p 行为
fastdeploy/engine/request.py 新增 RequestType.ABORT
fastdeploy/engine/common_engine.py 统一封装 abort 请求处理流程(_process_abort_task),并在多处增加 abort 跳过/回收逻辑

task = self.resource_manager._prepare_abort_task(req)
self.engine_worker_queue.put_tasks(([task], self.resource_manager.real_bsz))
self.llm_logger.info(
f"a runing task need to be aborted, put abort task in engine worker queue, req_id: {req_id}"
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

日志里出现拼写错误:a runing task 建议改为 a running task,避免影响日志检索与可读性。

Suggested change
f"a runing task need to be aborted, put abort task in engine worker queue, req_id: {req_id}"
f"a running task need to be aborted, put abort task in engine worker queue, req_id: {req_id}"

Copilot uses AI. Check for mistakes.
Comment on lines +2269 to +2271
]
)
self.llm_logger.info(f"waiting req aborted, put a fininsed result in scheduler, req_id: {req_id}")
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

日志里出现拼写错误:fininsed result 建议改为 finished result,避免影响日志检索与可读性。

Copilot uses AI. Check for mistakes.
Comment on lines 217 to +221
while self.current_request_ids[task.request_id] == "init":
time.sleep(0.001)
if task.request_id in self.resource_manager.abort_req_ids_set:
del self.current_request_ids[task.request_id]
return False, "task is aborted"
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while self.current_request_ids[task.request_id] == "init": 循环里遇到 abort 时同样直接 del self.current_request_ids[task.request_id];如果 receiver 线程/其它分支已删除该 key,会再次触发 KeyError。建议和函数开头一致改为 pop(..., None),并在访问 self.current_request_ids[task.request_id] 前考虑用 .get()/存在性判断来避免竞态下的 KeyError。

Copilot uses AI. Check for mistakes.
Comment on lines +1179 to +1180
if len(req.block_tables) != 0:
self._free_blocks(req)
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abort_recycle_resource 里只有在 len(req.block_tables) != 0 时才调用 _free_blocks(req)。但 _free_blocks 还负责回收 extend tables(using_extend_tables_req_id)以及清理相关 map;如果 block_tables 已为空但该请求仍处于 extend 模式,会导致 extend blocks/状态未被回收,可能造成资源泄漏。建议把是否调用 _free_blocks 的条件改为“存在任何需要释放的资源”(例如 block_tables 非空或 request_id 在 using_extend_tables_req_id),或直接无条件调用 _free_blocks(req)(让内部处理空列表的情况)。

Suggested change
if len(req.block_tables) != 0:
self._free_blocks(req)
self._free_blocks(req)

Copilot uses AI. Check for mistakes.
Comment on lines +2247 to +2258
if req_id in self.resource_manager.requests:
req = self.resource_manager.requests[req_id]
if req in self.resource_manager.running:
task = self.resource_manager._prepare_abort_task(req)
self.engine_worker_queue.put_tasks(([task], self.resource_manager.real_bsz))
self.llm_logger.info(
f"a runing task need to be aborted, put abort task in engine worker queue, req_id: {req_id}"
)
elif req in self.resource_manager.waiting:
self.llm_logger.info(f"a waiting task need to be aborted, req_id: {req_id}")
self.resource_manager.waiting.remove(req)
self.resource_manager.abort_recycle_resource(req)
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_process_abort_task 里对 resource_manager.running / waiting / to_be_rescheduled_request_id_set 进行了成员判断和 waiting.remove(...) 等修改,但没有持有 ResourceManagerV1 的 lock。这些结构在调度线程里也会在 with self.lock: 下被读写,这里不加锁容易产生竞态(例如判断成立后对象被移动/删除导致 remove 抛异常,或状态不一致)。建议在该函数内用 with self.resource_manager.lock: 包裹对这些共享结构的读取与修改,并尽量复用 abort_recycle_resource 内部的锁保护而不要在外层额外直接操作 deque/list。

Copilot uses AI. Check for mistakes.
Comment on lines +209 to +211
if task.request_id in self.resource_manager.abort_req_ids_set:
del self.current_request_ids[task.request_id]
return False, "task is aborted"
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里在检查 abort 时直接 del self.current_request_ids[task.request_id],但当 task.disaggregate_info 为 None(send_splitwise_tasks 会直接 skip,不会写入 current_request_ids)或该 key 已被其它逻辑提前删除时,会触发 KeyError,导致 prefill 线程异常退出。建议改为 pop(task.request_id, None),或至少把 task.disaggregate_info is None 的快速返回放到 abort 检查之前,并对删除做幂等处理。

Copilot uses AI. Check for mistakes.
Comment on lines 156 to +162
for i, expired_id in enumerate(expired_ids):
self.requests.pop(expired_id, None)
self.responses.pop(expired_id, None)
self.ids.pop(i)
scheduler_logger.debug(
f"expired request: {expired_id} in ids: {self.ids.index(expired_id)} should be recycled, ids_read_cursor:{self.ids_read_cursor}"
)
self.ids.pop(self.ids.index(expired_id))
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

回收过期请求时 expired_ids 是从 self.ids 头部按顺序收集的前缀(遇到第一个未过期就 break),因此这里可以直接一次性删除 self.ids 的前 len(expired_ids) 项。当前每次用 self.ids.index(expired_id) 查找并 pop,会导致 O(n^2) 的复杂度且做了重复查找(debug 日志和 pop 各一次)。建议用切片删除(并相应更新 ids_read_cursor),同时避免在 debug 日志里调用 .index()

Copilot uses AI. Check for mistakes.
Comment on lines +111 to 116
self.scheduler_logger.debug(
f"expired request: {expired_id} in ids: {self.ids.index(expired_id)} should be recycled, ids_read_cursor:{self.ids_read_cursor}"
)
self.ids.pop(self.ids.index(expired_id))

if len(expired_ids) > 0:
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

与 LocalScheduler 同理:expired_ids 是从 self.ids 头部按顺序收集的前缀,这里可以直接删除 self.ids[:len(expired_ids)]。当前对每个 expired_id 使用 self.ids.index(...) 再 pop,会引入 O(n^2) 查找开销并重复计算索引(debug + pop)。建议改为批量切片删除以降低开销并简化逻辑。

Suggested change
self.scheduler_logger.debug(
f"expired request: {expired_id} in ids: {self.ids.index(expired_id)} should be recycled, ids_read_cursor:{self.ids_read_cursor}"
)
self.ids.pop(self.ids.index(expired_id))
if len(expired_ids) > 0:
# expired_ids is the prefix of self.ids, so the index equals i
self.scheduler_logger.debug(
f"expired request: {expired_id} in ids: {i} should be recycled, ids_read_cursor:{self.ids_read_cursor}"
)
if len(expired_ids) > 0:
# Remove the expired prefix from ids in one batch to avoid O(n^2) deletions
del self.ids[: len(expired_ids)]

Copilot uses AI. Check for mistakes.
Comment on lines +2302 to +2314
else:
self.scheduler.put_results(
[
RequestOutput(
request_id=req_id,
finished=True,
error_code=499,
error_msg=f"Your request with request_id:{req_id} is aborted.",
)
]
)
if self.cfg.scheduler_config.splitwise_role == "mixed":
self.resource_manager.abort_req_ids_set.remove(req_id)
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

req_id 不在 resource_manager.requests(尚未分配资源/仍在 scheduler 队列中)时,这里只 put_results 但没有把该请求从 scheduler 中移除;同时在非 mixed 角色下也没有清理 abort_req_ids_set。这会导致已 abort 的请求仍可能被后续 scheduler.get_requests() 拉起并进入资源分配流程,造成资源浪费/请求状态混乱,并且 abort_req_ids_set 可能无限增长。建议在该分支显式从 scheduler 侧回收/取消该 request(例如调用现有的 _recycle(req_id) 或提供公开的 cancel API),并在完成终止响应后用 discard 幂等地移除 abort_req_ids_set(至少在“不会再下发 worker abort task”的所有分支都要清理)。

Copilot uses AI. Check for mistakes.
Comment on lines +777 to 784
else: # preempted and abort task
if request.task_type.value == RequestType.PREEMPTED.value:
logger.info(f"Handle preempted request {request} at idx {idx}")
else:
logger.info(f"Handle aborted request {request} at idx {idx}")
self.share_inputs["preempted_idx"][idx : idx + 1, :] = 1
self.share_inputs["block_tables"][idx : idx + 1, :] = -1
self.share_inputs["stop_flags"][idx : idx + 1] = True
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的 else 分支会把所有非 PREFILL/DECODE 的 task 都按“preempt/abort”处理并设置 stop_flags,但调度侧还会生成 RequestType.EXTEND(ScheduledExtendBlocksTask)。如果 EXTEND task 走到这里,会被错误地当作终止任务处理,导致扩展 block tables 的逻辑无法生效甚至误停推理。建议显式增加对 RequestType.EXTEND 的分支处理(例如把 extend_block_tables 写入共享输入/更新相应状态),或在上游保证 EXTEND 不会进入该函数,并在这里加断言防止静默错误。

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants