Agentic Task Execution:深入

这篇深入文章扩展 Agent 循环概览。它关注 Agent 实际如何从用户请求推进到模型事件、工具调用、工具结果、重试和最终输出。

这里的实现细节使用 Codex 和 Claw 作为参考来源。Claw 是本仓库中的本地 Claude-Code-like Rust 实现,而不是 Anthropic 的闭源代码库。

ReAct Pattern 的运行时形态

经典模式是 ReAct:reason、act、observe、repeat。

def react_loop(task):
    observation = {"user_task": task}

    while True:
        thought_and_action = model(observation)

        if thought_and_action.final_answer:
            return thought_and_action.final_answer

        result = execute_tool(thought_and_action.tool_call)
        observation = {
            "previous": observation,
            "tool_result": result,
        }

真实 Agent runtimes 会加入若干约束:

  • 模型输出以流的形式到达。
  • 工具调用可能在流结束前到达。
  • 一些工具可以并行;修改型工具必须串行。
  • 权限检查可能暂停或拒绝执行。
  • 上下文可能超过模型窗口。
  • 用户可以取消。
  • UI 需要在最终答案前收到进度事件。

Codex Execution Trace

Codex 使用面向 turn 的运行时。一个 turn 不只是一次模型调用;它可以包含模型流、多个工具调用、工具结果记录、压缩,以及是否再次采样的决策。

1. 准备 Turn

async def prepare_codex_turn(session, user_input):
    turn = TurnContext(
        model=session.config.model,
        cwd=session.cwd,
        sandbox_policy=session.config.sandbox,
        approval_policy=session.config.approval,
        cancellation=session.new_cancellation_token(),
    )

    await session.record_user_input(user_input)
    await maybe_run_pre_sampling_compaction(session, turn)
    return turn

turn context 会冻结重要运行时设置,让模型调用和工具调用共享同一份策略视图。

2. 构建 Prompt Input

Codex 不会直接发送原始存储历史。它向 history manager 请求 prompt-ready items。

def build_prompt_input(session, turn):
    history = session.history.clone()
    prompt_items = history.for_prompt(
        model=turn.model,
        supported_modalities=turn.model.capabilities,
    )

    return {
        "instructions": session.base_instructions,
        "input": prompt_items,
        "tools": turn.tool_router.model_visible_tools(),
    }

这里可以移除 unsupported items,过滤 images,并规范化 tool-call/tool-result 配对。

3. 流式处理模型响应

采样期间,Codex 处理 text deltas、reasoning deltas、completed output items、token usage 和 final completion events。

async def try_run_sampling_request(client_session, request, runtime):
    in_flight_tools = []
    needs_follow_up = False

    async for event in client_session.stream(request):
        if event.kind == "text_delta":
            runtime.emit_text_delta(event.text)

        elif event.kind == "reasoning_delta":
            runtime.emit_reasoning_delta(event.text)

        elif event.kind == "output_item_done":
            tool_call = runtime.router.build_tool_call(event.item)
            if tool_call:
                future = runtime.handle_tool_call(tool_call)
                in_flight_tools.append(future)
                needs_follow_up = True
            else:
                runtime.record_assistant_item(event.item)

        elif event.kind == "completed":
            runtime.record_usage(event.usage)
            break

    await drain_in_flight(in_flight_tools)
    return SamplingResult(needs_follow_up=needs_follow_up)

关键细节是 completed output item 可以在模型流仍活跃时启动工具 future。turn 仍会在结束前等待所有 tool futures。

4. 控制并行工具执行

Codex 使用 runtime gate,让 parallel-safe tools 可以重叠,修改型工具排他运行。

class ToolParallelGate:
    def __init__(self):
        self.lock = ReaderWriterLock()

    async def run(self, tool_call, handler):
        if handler.supports_parallel(tool_call):
            async with self.lock.read():
                return await handler.run(tool_call)

        async with self.lock.write():
            return await handler.run(tool_call)

这样 file reads、image views 或其他安全操作可以重叠,而 shell commands 和 patch operations 避免彼此踩踏。

5. 通过 Orchestrator 路由执行

对执行敏感的工具,Codex 会经过 orchestrator:

async def execute_codex_tool(tool, request, turn):
    approval = compute_approval_requirement(request, turn.policy)

    if approval.requires_user:
        decision = await turn.session.ask_for_approval(request)
        if not decision.allowed:
            return denied(decision.reason)

    sandbox = select_sandbox(turn.sandbox_policy, request)
    result = await tool.run(request, sandbox=sandbox)

    if result.sandbox_denied and turn.policy.can_retry_without_sandbox:
        retry = await turn.session.ask_for_approval("retry unsandboxed")
        if retry.allowed:
            return await tool.run(request, sandbox=None)

    return result

orchestrator 是 permission policy、sandbox policy、network approval、hooks 和 retry behavior 汇合的位置。

6. 决定是否继续

async def finish_codex_sampling(session, result):
    if result.needs_follow_up:
        return "continue"

    if result.hit_token_limit:
        await compact_history(session)
        return "continue"

    stop_hook = await run_stop_hooks(session)
    if stop_hook.injected_message:
        session.record_user_input(stop_hook.message)
        return "continue"

    return "stop"

最终答案只是可能出口之一。工具调用、压缩和 hooks 都可能把循环送回另一次模型采样。

Claw Execution Trace

Claw 的 ConversationRuntime 有更直接的循环。它把 session messages 发给 provider,接收 assistant events,构建 assistant message,执行 tool uses,追加结果并重复。

1. 构建运行时依赖

def build_claw_runtime(config, workspace):
    session = Session.new(workspace)
    prompt = SystemPromptBuilder(config, workspace).render()
    api_client = ProviderClient.from_config(config.model)
    tools = GlobalToolRegistry.from_config(config)
    policy = PermissionPolicy.from_config(config.permission_mode)

    return ConversationRuntime(
        session=session,
        system_prompt=prompt,
        api_client=api_client,
        tools=tools,
        permission_policy=policy,
    )

运行时拥有主要移动部件,因此可以从单个对象追踪控制流。

2. 运行一个 Conversation Turn

async def run_claw_turn(runtime, user_text):
    runtime.session.push_user_text(user_text)

    while runtime.iteration_count < runtime.max_iterations:
        request = {
            "system_prompt": runtime.system_prompt,
            "messages": runtime.session.messages,
            "tools": runtime.tools.specs(),
        }

        events = await runtime.api_client.stream(request)
        assistant = build_assistant_message(events)
        runtime.session.push_assistant(assistant)

        tool_uses = assistant.tool_uses()
        if not tool_uses:
            break

        for tool_use in tool_uses:
            result = await execute_claw_tool_use(runtime, tool_use)
            runtime.session.push_tool_result(tool_use.id, result)

    if runtime.should_auto_compact():
        runtime.session.compact()

与 Codex 不同,可见的 Claw runtime 不以模型流内的 in-flight futures 为中心。它在 assistant events 收集成消息后处理 tool uses。

3. 授权并执行工具

async def execute_claw_tool_use(runtime, tool_use):
    await runtime.hooks.pre_tool(tool_use)

    required_mode = classify_required_mode(tool_use)
    decision = await runtime.permission_policy.authorize(
        tool_use,
        required_mode=required_mode,
    )

    if decision.allowed:
        result = await runtime.tools.execute(tool_use)
    else:
        result = denied_tool_result(decision.reason)

    await runtime.hooks.post_tool(tool_use, result)
    return result

权限拒绝会作为工具结果返回,让模型可以调整,而不是丢失对话状态。

4. 需要时压缩

def claw_should_compact(runtime):
    threshold = runtime.compaction.input_token_threshold
    return runtime.usage.cumulative_input_tokens > threshold


def claw_compact(session):
    recent = keep_recent_messages_without_breaking_tool_pairs(session.messages)
    summary = summarize_older_messages(session.messages, excluding=recent)
    session.messages = [system_summary(summary), *recent]

目标不是完美总结。目标是为下一次模型调用保留足够 operational state。

时序对比

步骤 Codex Claw
用户输入 记录进 session/thread history 推入 session messages
Prompt preparation History 投影成 prompt-ready response items Session messages 与构建好的 system prompt 一起发送
Stream processing 处理 text、reasoning、output items、completion、usage Provider events 规范化为 assistant events
Tool start time output item 完成时即可启动 tool future assistant message 构建后运行 tool use
Tool parallelism Parallel-safe tools 通过 lock gate 重叠 核心运行时顺序处理 tool uses
Tool result sync sampling 结束前 drain in-flight tools 执行后追加每个 result
Continue condition Follow-up flag、compaction、hooks、API state 更多 tool uses 或 max iteration/compaction logic

取消

Cancellation 必须停止模型流和工具执行。

async def cancellable_tool_run(tool, args, cancellation):
    tool_task = create_task(tool.run(args))
    cancel_task = create_task(cancellation.wait())

    done = await wait_first(tool_task, cancel_task)
    if done is cancel_task:
        tool_task.cancel()
        return aborted_tool_result()

    return tool_task.result()

Codex 在 turn 和工具路径中都使用 cancellation tokens。Claw 更简单的运行时在 caller wiring 完成时,也可以通过 API client 和 tool executor interfaces 传播取消。

恢复模式

Transport Recovery

async def recover_transport(client, request):
    for attempt in range(client.retry_budget):
        try:
            return await client.stream(request)
        except RetryableTransportError:
            await sleep(backoff(attempt))

    if client.can_switch_transport():
        client.switch_transport()
        return await client.stream(request)

    raise RuntimeError("transport failed")

Codex 有显式 stream retry 和 transport fallback behavior。Claw 的 provider abstraction 对 conversation loop 隐藏 provider-specific streaming details。

Context Recovery

async def recover_context(session, error):
    if error.kind == "context_too_large":
        compact_history(session)
        return "retry"

    if session.token_pressure_high():
        compact_history(session)
        return "continue"

    return "fail"

两个系统都需要在对话不可用前压缩。

Permission Recovery

async def recover_permission_denial(history, tool_call, denial):
    history.append_tool_result(tool_call.id, {
        "ok": False,
        "error": "permission denied",
        "reason": denial.reason,
    })
    return "let_model_choose_alternative"

被拒绝的工具应该变成 observations。模型可以选择 read-only alternative,询问用户,或解释 blocker。

为什么 Tool Pair Integrity 重要

Provider APIs 期望工具结果对应之前的工具调用。如果 compactor 丢掉 pair 的一侧,下一个请求可能无效。

def preserve_tool_pairs(items):
    kept = []
    pending_tool_ids = set()

    for item in items:
        if item.is_tool_call:
            kept.append(item)
            pending_tool_ids.add(item.id)

        elif item.is_tool_result:
            if item.tool_call_id in pending_tool_ids:
                kept.append(item)
                pending_tool_ids.remove(item.tool_call_id)

        else:
            kept.append(item)

    return kept

Codex 在 history normalization 中处理这一点。Claw 的 compaction 避免拆分 tool-use/tool-result pairs。

实用要点

  • 即使模型流、工具执行和历史记录有重叠,也要把它们视作不同阶段。
  • 当运行时能保留顺序时,尽早启动安全工具。
  • 除非工具契约证明不会冲突,否则序列化修改型工具。
  • 让权限拒绝对模型可见。
  • 尽可能在 provider errors 前压缩。
  • 保持 cancellation paths 显式。
  • 在每次历史转换中保留 tool-call/tool-result pairs。

源码锚点

对 Codex,阅读 turn.rsstream_events_utils.rsparallel.rsorchestrator.rshistory.rs。对 Claw,阅读 conversation.rsclient.rstools/lib.rspermissions.rscompact.rs