[BugFix] Fix several bugs in the request interruption and inference termination functionality#6278
[BugFix] Fix several bugs in the request interruption and inference termination functionality#6278qwes5s5 wants to merge 2 commits intoPaddlePaddle:developfrom
Conversation
|
Thanks for your contribution! |
faff7ba to
db64caf
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #6278 +/- ##
==========================================
Coverage ? 67.57%
==========================================
Files ? 391
Lines ? 52867
Branches ? 8244
==========================================
Hits ? 35727
Misses ? 14523
Partials ? 2617
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
db64caf to
5c327eb
Compare
5c327eb to
ef43e3f
Compare
ef43e3f to
450e9b6
Compare
There was a problem hiding this comment.
Pull request overview
该 PR 旨在修复推理请求中断(ABORT)与推理终止流程在集中式与 PD(prefill/decode)拆分模式下的若干问题,避免资源泄漏、服务不稳定与请求卡死。
Changes:
- 引入 ABORT 类型与 ScheduledAbortTask,用于将“终止推理释放资源”与“PREEMPT 让出资源重调度”区分开。
- 在 Engine/ResourceManager/Connector 等路径增加对 abort 的差异化处理与资源回收逻辑。
- 调整 Local/DP Scheduler 的回收游标更新与过期请求回收逻辑,并补充调试日志。
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| fastdeploy/worker/gpu_model_runner.py | 在 worker 侧对 preempt/abort 任务增加区分日志与 stop 标记写入 |
| fastdeploy/splitwise/splitwise_connector.py | prefill 等待 decode 资源分配时增加 abort 快速退出逻辑 |
| fastdeploy/scheduler/local_scheduler.py | 修正 recycle 时 ids_read_cursor 的更新方式,并改动过期回收实现 |
| fastdeploy/scheduler/dp_scheduler.py | 同 local_scheduler,对 DP 场景的 recycle 逻辑做一致修正 |
| fastdeploy/output/token_processor.py | 调整 abort 负 token 场景下的回收与 abort set 清理时机,并在 prefill 回收等待中加入 abort 处理 |
| fastdeploy/engine/sched/resource_manager_v1.py | 新增 ScheduledAbortTask / abort_recycle_resource,并调整 add_request_in_p 行为 |
| fastdeploy/engine/request.py | 新增 RequestType.ABORT |
| fastdeploy/engine/common_engine.py | 统一封装 abort 请求处理流程(_process_abort_task),并在多处增加 abort 跳过/回收逻辑 |
| task = self.resource_manager._prepare_abort_task(req) | ||
| self.engine_worker_queue.put_tasks(([task], self.resource_manager.real_bsz)) | ||
| self.llm_logger.info( | ||
| f"a runing task need to be aborted, put abort task in engine worker queue, req_id: {req_id}" |
There was a problem hiding this comment.
日志里出现拼写错误:a runing task 建议改为 a running task,避免影响日志检索与可读性。
| f"a runing task need to be aborted, put abort task in engine worker queue, req_id: {req_id}" | |
| f"a running task need to be aborted, put abort task in engine worker queue, req_id: {req_id}" |
| ] | ||
| ) | ||
| self.llm_logger.info(f"waiting req aborted, put a fininsed result in scheduler, req_id: {req_id}") |
There was a problem hiding this comment.
日志里出现拼写错误:fininsed result 建议改为 finished result,避免影响日志检索与可读性。
| while self.current_request_ids[task.request_id] == "init": | ||
| time.sleep(0.001) | ||
| if task.request_id in self.resource_manager.abort_req_ids_set: | ||
| del self.current_request_ids[task.request_id] | ||
| return False, "task is aborted" |
There was a problem hiding this comment.
while self.current_request_ids[task.request_id] == "init": 循环里遇到 abort 时同样直接 del self.current_request_ids[task.request_id];如果 receiver 线程/其它分支已删除该 key,会再次触发 KeyError。建议和函数开头一致改为 pop(..., None),并在访问 self.current_request_ids[task.request_id] 前考虑用 .get()/存在性判断来避免竞态下的 KeyError。
| if len(req.block_tables) != 0: | ||
| self._free_blocks(req) |
There was a problem hiding this comment.
abort_recycle_resource 里只有在 len(req.block_tables) != 0 时才调用 _free_blocks(req)。但 _free_blocks 还负责回收 extend tables(using_extend_tables_req_id)以及清理相关 map;如果 block_tables 已为空但该请求仍处于 extend 模式,会导致 extend blocks/状态未被回收,可能造成资源泄漏。建议把是否调用 _free_blocks 的条件改为“存在任何需要释放的资源”(例如 block_tables 非空或 request_id 在 using_extend_tables_req_id),或直接无条件调用 _free_blocks(req)(让内部处理空列表的情况)。
| if len(req.block_tables) != 0: | |
| self._free_blocks(req) | |
| self._free_blocks(req) |
| if req_id in self.resource_manager.requests: | ||
| req = self.resource_manager.requests[req_id] | ||
| if req in self.resource_manager.running: | ||
| task = self.resource_manager._prepare_abort_task(req) | ||
| self.engine_worker_queue.put_tasks(([task], self.resource_manager.real_bsz)) | ||
| self.llm_logger.info( | ||
| f"a runing task need to be aborted, put abort task in engine worker queue, req_id: {req_id}" | ||
| ) | ||
| elif req in self.resource_manager.waiting: | ||
| self.llm_logger.info(f"a waiting task need to be aborted, req_id: {req_id}") | ||
| self.resource_manager.waiting.remove(req) | ||
| self.resource_manager.abort_recycle_resource(req) |
There was a problem hiding this comment.
_process_abort_task 里对 resource_manager.running / waiting / to_be_rescheduled_request_id_set 进行了成员判断和 waiting.remove(...) 等修改,但没有持有 ResourceManagerV1 的 lock。这些结构在调度线程里也会在 with self.lock: 下被读写,这里不加锁容易产生竞态(例如判断成立后对象被移动/删除导致 remove 抛异常,或状态不一致)。建议在该函数内用 with self.resource_manager.lock: 包裹对这些共享结构的读取与修改,并尽量复用 abort_recycle_resource 内部的锁保护而不要在外层额外直接操作 deque/list。
| if task.request_id in self.resource_manager.abort_req_ids_set: | ||
| del self.current_request_ids[task.request_id] | ||
| return False, "task is aborted" |
There was a problem hiding this comment.
这里在检查 abort 时直接 del self.current_request_ids[task.request_id],但当 task.disaggregate_info 为 None(send_splitwise_tasks 会直接 skip,不会写入 current_request_ids)或该 key 已被其它逻辑提前删除时,会触发 KeyError,导致 prefill 线程异常退出。建议改为 pop(task.request_id, None),或至少把 task.disaggregate_info is None 的快速返回放到 abort 检查之前,并对删除做幂等处理。
| for i, expired_id in enumerate(expired_ids): | ||
| self.requests.pop(expired_id, None) | ||
| self.responses.pop(expired_id, None) | ||
| self.ids.pop(i) | ||
| scheduler_logger.debug( | ||
| f"expired request: {expired_id} in ids: {self.ids.index(expired_id)} should be recycled, ids_read_cursor:{self.ids_read_cursor}" | ||
| ) | ||
| self.ids.pop(self.ids.index(expired_id)) |
There was a problem hiding this comment.
回收过期请求时 expired_ids 是从 self.ids 头部按顺序收集的前缀(遇到第一个未过期就 break),因此这里可以直接一次性删除 self.ids 的前 len(expired_ids) 项。当前每次用 self.ids.index(expired_id) 查找并 pop,会导致 O(n^2) 的复杂度且做了重复查找(debug 日志和 pop 各一次)。建议用切片删除(并相应更新 ids_read_cursor),同时避免在 debug 日志里调用 .index()。
| self.scheduler_logger.debug( | ||
| f"expired request: {expired_id} in ids: {self.ids.index(expired_id)} should be recycled, ids_read_cursor:{self.ids_read_cursor}" | ||
| ) | ||
| self.ids.pop(self.ids.index(expired_id)) | ||
|
|
||
| if len(expired_ids) > 0: |
There was a problem hiding this comment.
与 LocalScheduler 同理:expired_ids 是从 self.ids 头部按顺序收集的前缀,这里可以直接删除 self.ids[:len(expired_ids)]。当前对每个 expired_id 使用 self.ids.index(...) 再 pop,会引入 O(n^2) 查找开销并重复计算索引(debug + pop)。建议改为批量切片删除以降低开销并简化逻辑。
| self.scheduler_logger.debug( | |
| f"expired request: {expired_id} in ids: {self.ids.index(expired_id)} should be recycled, ids_read_cursor:{self.ids_read_cursor}" | |
| ) | |
| self.ids.pop(self.ids.index(expired_id)) | |
| if len(expired_ids) > 0: | |
| # expired_ids is the prefix of self.ids, so the index equals i | |
| self.scheduler_logger.debug( | |
| f"expired request: {expired_id} in ids: {i} should be recycled, ids_read_cursor:{self.ids_read_cursor}" | |
| ) | |
| if len(expired_ids) > 0: | |
| # Remove the expired prefix from ids in one batch to avoid O(n^2) deletions | |
| del self.ids[: len(expired_ids)] |
| else: | ||
| self.scheduler.put_results( | ||
| [ | ||
| RequestOutput( | ||
| request_id=req_id, | ||
| finished=True, | ||
| error_code=499, | ||
| error_msg=f"Your request with request_id:{req_id} is aborted.", | ||
| ) | ||
| ] | ||
| ) | ||
| if self.cfg.scheduler_config.splitwise_role == "mixed": | ||
| self.resource_manager.abort_req_ids_set.remove(req_id) |
There was a problem hiding this comment.
当 req_id 不在 resource_manager.requests(尚未分配资源/仍在 scheduler 队列中)时,这里只 put_results 但没有把该请求从 scheduler 中移除;同时在非 mixed 角色下也没有清理 abort_req_ids_set。这会导致已 abort 的请求仍可能被后续 scheduler.get_requests() 拉起并进入资源分配流程,造成资源浪费/请求状态混乱,并且 abort_req_ids_set 可能无限增长。建议在该分支显式从 scheduler 侧回收/取消该 request(例如调用现有的 _recycle(req_id) 或提供公开的 cancel API),并在完成终止响应后用 discard 幂等地移除 abort_req_ids_set(至少在“不会再下发 worker abort task”的所有分支都要清理)。
| else: # preempted and abort task | ||
| if request.task_type.value == RequestType.PREEMPTED.value: | ||
| logger.info(f"Handle preempted request {request} at idx {idx}") | ||
| else: | ||
| logger.info(f"Handle aborted request {request} at idx {idx}") | ||
| self.share_inputs["preempted_idx"][idx : idx + 1, :] = 1 | ||
| self.share_inputs["block_tables"][idx : idx + 1, :] = -1 | ||
| self.share_inputs["stop_flags"][idx : idx + 1] = True |
There was a problem hiding this comment.
这里的 else 分支会把所有非 PREFILL/DECODE 的 task 都按“preempt/abort”处理并设置 stop_flags,但调度侧还会生成 RequestType.EXTEND(ScheduledExtendBlocksTask)。如果 EXTEND task 走到这里,会被错误地当作终止任务处理,导致扩展 block tables 的逻辑无法生效甚至误停推理。建议显式增加对 RequestType.EXTEND 的分支处理(例如把 extend_block_tables 写入共享输入/更新相应状态),或在上游保证 EXTEND 不会进入该函数,并在这里加断言防止静默错误。
Motivation
Current request interruption for inference termination has issues in both centralized and PD-disaggregated modes. These bugs can potentially cause resource leakage, service instability, and request hangs.
Modifications
Usage or Command
Accuracy Tests
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.