摘要:给一个”一问一答”的自主 Agent 框架加上完整任务生命周期管理,涉及 SQLite WAL 状态存储、EventBus 事件驱动、方法论自动重启、以及任务认领强制——三个子系统的全链路集成。


背景

GenericAgent 是一个具备工具调用能力的自主 Agent 框架。它原本只有”一问一答”的执行模式,缺少任务的生命周期管理——创建、认领、执行、评审、完成。这导致了几个痛点:

  1. 无法追踪长任务:执行到一半中断就丢失上下文
  2. 没有执行纪律:Agent 可以随意调用工具,不需要对任何”任务”负责
  3. 出了问题无法恢复:没有重启语义,失败就是永久失败

今天的任务是:给它加上完整的任务引擎,并且让”对抗式解题法”方法论检查能与任务系统联动。


架构总览

三个子系统协同工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
┌─────────────────────────────────────────────────────┐
│ Agent Loop │
├─────────────────────────────────────────────────────┤
│ Plugins │
│ ┌──────────────────┐ ┌────────────────────────┐ │
│ │ adversarial_ │ │ task_enforcer │ │
│ │ solver │ │ (P2: 任务认领强制) │ │
│ └────────┬─────────┘ └───────────┬────────────┘ │
│ ▼ ▼ │
│ ┌────────────────────────────────────────────────┐ │
│ │ task_engine 包 │ │
│ │ TaskManager → TaskStorage → SQLite/WAL │ │
│ │ EventBus → Monitor → 日志/通知 │ │
│ └────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘

Part 1: task_engine 核心包

数据模型

任务有 6 个状态,一个严格的状态机:

1
2
3
4
PENDING → CLAIMED → IN_PROGRESS → REVIEW → COMPLETED
↓ (失败)
FAILED
CLAIMED → CANCELLED (by creator)
1
2
3
4
5
6
7
8
class TaskState(Enum):
PENDING = "pending" # 刚创建,等待认领
CLAIMED = "claimed" # 已被 agent 认领
IN_PROGRESS = "in_progress" # 正在执行
REVIEW = "review" # 待评审
COMPLETED = "completed" # 完成
FAILED = "failed" # 执行失败
CANCELLED = "cancelled" # 取消

存储层

TaskStorage 基于 SQLite + WAL 模式,保证跨进程安全:

1
2
3
4
5
6
7
8
9
10
11
12
13
class TaskStorage:
def __init__(self, db_path: str = None):
self.db_path = db_path or _DEFAULT_DB
self._init_db()

def _init_db(self):
with self._connect() as conn:
conn.executescript("""
PRAGMA journal_mode=WAL;
PRAGMA busy_timeout=5000;
CREATE TABLE IF NOT EXISTS tasks (...);
CREATE TABLE IF NOT EXISTS task_events (...);
""")

几个关键设计:

  • BEGIN IMMEDIATE 事务模式,防止写冲突
  • busy_timeout=5000,5 秒重试窗口
  • 事件日志表 task_events 记录所有状态变更

TaskManager

高层 API,封装完整生命周期:

1
2
3
4
5
6
class TaskManager:
def create_task(self, description, creator) -> Task
def claim_task(self, task_id, assignee) -> Task
def transition_task(self, task_id, target_status, actor) -> Task
def cancel_task(self, task_id, actor) -> Task
def find_tasks(self, status=None, assignee=None) -> List[Task]

EventBus & Monitor

事件驱动架构,解耦任务引擎和外部系统:

1
2
3
4
5
6
7
8
# event_bus.py
task_event_bus.subscribe("task.created", console_notifier)
task_event_bus.subscribe("task.completed", scheduled_cleanup)

# monitor.py
class DispatcherMonitor:
def check_stale_tasks(self): # 检测超时任务
def heartbeat(self, task_id, agent_name): # 保活

5 个 Tools 暴露给 Agent

注册在 tools_schema_cn.json 中:

工具 功能
task_create 创建任务(PENDING)
task_list 列出待认领 / 已认领任务
task_claim 认领任务(PENDING→CLAIMED)
task_transition 推进状态(CLAIMED→REVIEW→COMPLETED)
task_cancel 取消任务

ga.py 中实现为 5 个 do_task_*() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
def do_task_create(self, args):
task = self.task_manager.create_task(
description=args['description'],
creator=self.agent_name
)
return f"✅ 任务创建成功: {task.id}"

def do_task_claim(self, args):
task = self.task_manager.claim_task(
task_id=args['task_id'],
assignee=self.agent_name
)

Part 2: 对抗式解题法 + 拦截重启逻辑

问题

原来的对抗式解题法拦截器有一个硬伤——达到最大重试次数后直接终止任务,Agent 就卡在那里不动了。

解决方案:自动重启(最多 3 次)

1
2
3
4
5
6
7
8
9
10
每轮回答 → _score_compliance() 打分
├─ Score ≥ 2(充分方法论引用)→ ✅ 通过,重置计数器
├─ Score = 0(完全遗漏)→ 🚫 阻塞,retry_count+1
└─ Score = 1(关键词填充)→ 🚫 阻塞,weak_count+1

当 retry_count ≥ 2 或 weak_count ≥ 3:
├─ restart_count ≤ 3 → 🔄 自动重启
│ 清空工作记忆,重置计数器,重新执行
└─ restart_count > 3 → ⛔ 永久终止
引导 ask_user 汇报

关键代码逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
restart_count = getattr(_tls, 'adversarial_restarts', 0) + 1
_tls.adversarial_restarts = restart_count
_tls.adversarial_retries = 0 # 重置重试计数

if restart_count <= 3:
corrective_prompt = (
f"已达到最大方法论重试次数,系统已自动重启任务。\n"
f"原任务:{raw_query}\n"
f"请在回答中展示完整的方法论执行计划..."
)
else:
corrective_prompt = "任务已终止,请 ask_user 汇报情况。"

阻塞消息优化

原来重启时只显示两行”任务自动重启中”,没有上下文。现在每条消息都包含原因、要求和原任务:

场景 消息头 内容
方法遗漏 ⛔ 方法遗漏 原因 + 4 步要求 + 原任务
引用不足 ⛔ 方法引用不足 原因(关键词填充)+ 4 步要求 + 原任务
重启 🔄 任务自动重启(第N/3次) 原因 + 具体问题 + 要求 + 原任务
终止 ⛔ 任务已终止 原因 + ask_user 指引

Part 3: task_enforcer 插件

功能

tool_before 钩子中检查:如果 Agent 没有认领任何激活的任务,就阻止执行工具调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@register('tool_before')
def _enforce_claim(ctx):
"""Block execution tools if no active task is claimed."""
name = ctx.get('name', '')
if name not in _EXECUTION_TOOLS:
return # read-only tools (web_scan) pass through

if _has_active_task(db_path, caller):
return # 有任务 → 放行

return {
'_tool_blocked': True,
'_tool_block_msg': (
f"⛔ Agent '{caller}' 没有激活的任务。\n"
"请先 task_claim 认领一个任务..."
),
}

两个修复(原本的 bug)

  1. register() 定义但没调用:函数写好了,但模块级没有执行它——钩子从未注册
  2. _get_db_path() 只在导入时检查环境变量:改为运行时检查,支持动态设置

两个拦截器(adversarial_solver + task_enforcer)共存验证:

1
2
3
4
PASS 1: adversarial 先拦截方法论
PASS 2: task_enforcer 拦截无任务执行
PASS 3: 有任务时 code_run 正常通过
PASS 4: web_scan 总是放行(只读)

关于”重启 agent”的陷阱

在开发过程中发现一个问题:当用户说”重启agent”时,scripts/agent_resume.sh 只重启了后台调度/自治服务,但当前对话进程的 TLS(线程本地存储)计数器没有被清空

结果是:对抗式解题法的 adversarial_retries 计数器从之前累积的会话中延续,可能直接触发终止条件。

教训重启agent 需要同时重置插件 TLS 状态,否则”重启语义”不完整。后续可以加一个 _on_session_reset 钩子来统一清理。


提交一览

1
2
3
ba2c5a5 fix: blocked messages — 4 scenarios with reason+requirements
0c7503a fix: task_enforcer — register hooks + runtime env lookup
71e7cef feat: task engine integration + restart logic (+3064 lines)

总结

今天的集成工作中最核心的决策:

  1. SQLite WAL 模式作为跨进程状态存储——简单可靠,无需额外基础设施
  2. 事件驱动架构(EventBus)——解耦任务引擎和监控/通知
  3. 可插拔钩子系统(hooks)——方法论检查和任务强制通过相同机制扩展
  4. 自动重启代替永久终止——给 Agent 自我纠正的机会,而不是死胡同
  5. 运行时环境变量——避免导入时冻结配置,支持动态切换

下一步可以做的:

  • 任务看板 Web UI(在 fsapp.py 基础上)
  • 更细粒度的任务治理(超时自动失败、deadline 机制)
  • 任务间依赖和 DAG 调度