摘要 :给一个”一问一答”的自主 Agent 框架加上完整任务生命周期管理,涉及 SQLite WAL 状态存储、EventBus 事件驱动、方法论自动重启、以及任务认领强制——三个子系统的全链路集成。
背景 GenericAgent 是一个具备工具调用能力的自主 Agent 框架。它原本只有”一问一答”的执行模式,缺少任务的生命周期管理 ——创建、认领、执行、评审、完成。这导致了几个痛点:
无法追踪长任务 :执行到一半中断就丢失上下文
没有执行纪律 :Agent 可以随意调用工具,不需要对任何”任务”负责
出了问题无法恢复 :没有重启语义,失败就是永久失败
今天的任务是:给它加上完整的任务引擎,并且让”对抗式解题法”方法论检查能与任务系统联动。
架构总览 三个子系统协同工作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ┌─────────────────────────────────────────────────────┐ │ Agent Loop │ ├─────────────────────────────────────────────────────┤ │ Plugins │ │ ┌──────────────────┐ ┌────────────────────────┐ │ │ │ adversarial_ │ │ task_enforcer │ │ │ │ solver │ │ (P2: 任务认领强制) │ │ │ └────────┬─────────┘ └───────────┬────────────┘ │ │ ▼ ▼ │ │ ┌────────────────────────────────────────────────┐ │ │ │ task_engine 包 │ │ │ │ TaskManager → TaskStorage → SQLite/WAL │ │ │ │ EventBus → Monitor → 日志/通知 │ │ │ └────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────┘
Part 1: task_engine 核心包 数据模型 任务有 6 个状态,一个严格的状态机:
1 2 3 4 PENDING → CLAIMED → IN_PROGRESS → REVIEW → COMPLETED ↓ (失败) FAILED CLAIMED → CANCELLED (by creator)
1 2 3 4 5 6 7 8 class TaskState (Enum ): PENDING = "pending" CLAIMED = "claimed" IN_PROGRESS = "in_progress" REVIEW = "review" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled"
存储层 TaskStorage 基于 SQLite + WAL 模式 ,保证跨进程安全:
1 2 3 4 5 6 7 8 9 10 11 12 13 class TaskStorage : def __init__ (self, db_path: str = None ): self .db_path = db_path or _DEFAULT_DB self ._init_db() def _init_db (self ): with self ._connect() as conn: conn.executescript(""" PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000; CREATE TABLE IF NOT EXISTS tasks (...); CREATE TABLE IF NOT EXISTS task_events (...); """ )
几个关键设计:
BEGIN IMMEDIATE 事务模式,防止写冲突
busy_timeout=5000,5 秒重试窗口
事件日志表 task_events 记录所有状态变更
TaskManager 高层 API,封装完整生命周期:
1 2 3 4 5 6 class TaskManager : def create_task (self, description, creator ) -> Task def claim_task (self, task_id, assignee ) -> Task def transition_task (self, task_id, target_status, actor ) -> Task def cancel_task (self, task_id, actor ) -> Task def find_tasks (self, status=None , assignee=None ) -> List [Task]
EventBus & Monitor 事件驱动架构,解耦任务引擎和外部系统:
1 2 3 4 5 6 7 8 task_event_bus.subscribe("task.created" , console_notifier) task_event_bus.subscribe("task.completed" , scheduled_cleanup) class DispatcherMonitor : def check_stale_tasks (self ): def heartbeat (self, task_id, agent_name ):
注册在 tools_schema_cn.json 中:
工具
功能
task_create
创建任务(PENDING)
task_list
列出待认领 / 已认领任务
task_claim
认领任务(PENDING→CLAIMED)
task_transition
推进状态(CLAIMED→REVIEW→COMPLETED)
task_cancel
取消任务
在 ga.py 中实现为 5 个 do_task_*() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 def do_task_create (self, args ): task = self .task_manager.create_task( description=args['description' ], creator=self .agent_name ) return f"✅ 任务创建成功: {task.id } " def do_task_claim (self, args ): task = self .task_manager.claim_task( task_id=args['task_id' ], assignee=self .agent_name )
Part 2: 对抗式解题法 + 拦截重启逻辑 问题 原来的对抗式解题法拦截器有一个硬伤——达到最大重试次数后直接终止任务 ,Agent 就卡在那里不动了。
解决方案:自动重启(最多 3 次) 1 2 3 4 5 6 7 8 9 10 每轮回答 → _score_compliance() 打分 ├─ Score ≥ 2(充分方法论引用)→ ✅ 通过,重置计数器 ├─ Score = 0(完全遗漏)→ 🚫 阻塞,retry_count+1 └─ Score = 1(关键词填充)→ 🚫 阻塞,weak_count+1 当 retry_count ≥ 2 或 weak_count ≥ 3: ├─ restart_count ≤ 3 → 🔄 自动重启 │ 清空工作记忆,重置计数器,重新执行 └─ restart_count > 3 → ⛔ 永久终止 引导 ask_user 汇报
关键代码逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 restart_count = getattr (_tls, 'adversarial_restarts' , 0 ) + 1 _tls.adversarial_restarts = restart_count _tls.adversarial_retries = 0 if restart_count <= 3 : corrective_prompt = ( f"已达到最大方法论重试次数,系统已自动重启任务。\n" f"原任务:{raw_query} \n" f"请在回答中展示完整的方法论执行计划..." ) else : corrective_prompt = "任务已终止,请 ask_user 汇报情况。"
阻塞消息优化 原来重启时只显示两行”任务自动重启中”,没有上下文。现在每条消息都包含原因、要求和原任务:
场景
消息头
内容
方法遗漏
⛔ 方法遗漏
原因 + 4 步要求 + 原任务
引用不足
⛔ 方法引用不足
原因(关键词填充)+ 4 步要求 + 原任务
重启
🔄 任务自动重启(第N/3次)
原因 + 具体问题 + 要求 + 原任务
终止
⛔ 任务已终止
原因 + ask_user 指引
Part 3: task_enforcer 插件 功能 在 tool_before 钩子中检查:如果 Agent 没有认领任何激活的任务,就阻止执行工具调用 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @register('tool_before' ) def _enforce_claim (ctx ): """Block execution tools if no active task is claimed.""" name = ctx.get('name' , '' ) if name not in _EXECUTION_TOOLS: return if _has_active_task(db_path, caller): return return { '_tool_blocked' : True , '_tool_block_msg' : ( f"⛔ Agent '{caller} ' 没有激活的任务。\n" "请先 task_claim 认领一个任务..." ), }
两个修复(原本的 bug)
register() 定义但没调用 :函数写好了,但模块级没有执行它——钩子从未注册
_get_db_path() 只在导入时检查环境变量 :改为运行时检查,支持动态设置
两个拦截器(adversarial_solver + task_enforcer)共存验证:
1 2 3 4 PASS 1: adversarial 先拦截方法论 PASS 2: task_enforcer 拦截无任务执行 PASS 3: 有任务时 code_run 正常通过 PASS 4: web_scan 总是放行(只读)
关于”重启 agent”的陷阱 在开发过程中发现一个问题:当用户说”重启agent”时,scripts/agent_resume.sh 只重启了后台调度/自治服务,但当前对话进程的 TLS(线程本地存储)计数器没有被清空 。
结果是:对抗式解题法的 adversarial_retries 计数器从之前累积的会话中延续,可能直接触发终止条件。
教训 :重启agent 需要同时重置插件 TLS 状态,否则”重启语义”不完整。后续可以加一个 _on_session_reset 钩子来统一清理。
提交一览 1 2 3 ba2c5a5 fix: blocked messages — 4 scenarios with reason+requirements 0c7503a fix: task_enforcer — register hooks + runtime env lookup 71e7cef feat: task engine integration + restart logic (+3064 lines)
总结 今天的集成工作中最核心的决策:
SQLite WAL 模式 作为跨进程状态存储——简单可靠,无需额外基础设施
事件驱动架构 (EventBus)——解耦任务引擎和监控/通知
可插拔钩子系统 (hooks)——方法论检查和任务强制通过相同机制扩展
自动重启 代替永久终止——给 Agent 自我纠正的机会,而不是死胡同
运行时环境变量 ——避免导入时冻结配置,支持动态切换
下一步可以做的:
任务看板 Web UI(在 fsapp.py 基础上)
更细粒度的任务治理(超时自动失败、deadline 机制)
任务间依赖和 DAG 调度