OpenAI Agents SDK - 深度技术调研¶
项目概览
调研日期: 2026-02-26
仓库: https://github.com/openai/openai-agents-python
许可证: MIT
版本: v0.10.2
Stars: 19.2k | Forks: 3.2k | Contributors: 219
目录¶
1. 项目概述¶
1.1 定位与演进¶
OpenAI Agents SDK 是 Swarm 框架的生产就绪演进版本:
- Swarm (已废弃): 实验性/教育性框架,用于探索轻量级多 agent 编排模式
- Agents SDK (当前): 生产级框架,由 OpenAI 团队积极维护,面向实际应用
关键演进:
Swarm (2024) Agents SDK (2025-2026)
├─ 教育性质 ├─ 生产就绪
├─ 基础 handoff 机制 ├─ 完整 handoff + sessions + tracing
├─ 简单示例 ├─ 12,775 行示例代码
├─ 无状态管理 ├─ RunState + Session 持久化
└─ 社区实验项目 └─ 官方维护 (1,135 commits)
1.2 核心特性¶
5 大核心原语:
- Agents - 配置化的 LLM 实例(instructions + tools + handoffs + guardrails + model)
- Handoffs - Agent 间控制权转移的专用机制(非简单函数调用)
- Guardrails - 输入/输出验证与安全检查
- Sessions - 自动会话历史管理(SQLite/Redis/自定义后端)
- Tracing - 内置可观测性(支持 Logfire、AgentOps、Braintrust 等)
设计哲学: - ✅ Lightweight over feature-heavy: 专注核心原语,避免过度抽象 - ✅ Provider-agnostic: 通过 LiteLLM 支持 100+ LLM(不绑定 OpenAI) - ✅ Protocol-based extensibility: 使用 Protocol 而非继承,鼓励组合 - ✅ Production-ready: 完整的状态管理、错误处理、流式传输、HITL(Human-in-the-Loop)
2. 代码架构深度分析¶
2.1 目录结构与模块职责¶
src/agents/
├── __init__.py # 公开 API 导出 (469 lines)
├── agent.py # Agent 类定义 (890 lines)
├── run.py # Runner & AgentRunner (1623 lines) ★★★
├── run_state.py # HITL 可序列化状态 (2384 lines) ★★★
│
├── memory/ # Session 后端 ★★
│ ├── session.py # Session Protocol (150 lines)
│ ├── sqlite_session.py # SQLite 实现
│ ├── openai_*_session.py # OpenAI 服务端 sessions
│ └── ...
│
├── models/ # 模型抽象层 ★★
│ ├── interface.py # Model/ModelProvider Protocol (141 lines)
│ ├── openai_responses.py # OpenAI Responses API
│ ├── openai_chatcompletions.py # Chat Completions API
│ └── multi_provider.py # 多供应商支持(LiteLLM)
│
├── tool.py # 工具定义 (1288 lines) ★★
├── handoffs/ # Handoff 系统 ★★★
│ ├── __init__.py # Handoff 类 (334 lines)
│ └── history.py # 历史映射
│
├── tracing/ # Tracing 基础设施 ★★★
│ ├── traces.py # Trace 抽象 (533 lines)
│ ├── spans.py # Span 抽象
│ ├── processor_interface.py # TracingProcessor Protocol (142 lines)
│ └── processors.py # 内置处理器
│
├── run_internal/ # 核心运行循环 ★★★★★
│ ├── run_loop.py # 主编排逻辑 (1623+ lines) ★★★★★
│ ├── turn_resolution.py # Turn 处理 (1623 lines)
│ ├── tool_execution.py # 工具执行 (~50KB, 1400 lines) ★★★★
│ ├── tool_planning.py # 工具规划/调度
│ ├── session_persistence.py # Session 保存逻辑
│ └── streaming.py # 流式传输辅助
│
├── guardrail.py # Input/Output guardrails
├── tool_guardrails.py # Tool-specific guardrails
│
└── extensions/ # 扩展实现
├── memory/
│ ├── redis_session.py # Redis 后端
│ ├── sqlalchemy_session.py # SQLAlchemy 后端
│ └── dapr_session.py # Dapr 后端
└── models/
└── litellm_model.py # LiteLLM 集成 ★★★
★ 标注说明: - ★★★★★ = 核心中的核心,必须深入理解 - ★★★ = 核心机制 - ★★ = 重要扩展点 - 无标注 = 工具性/配置性代码
2.2 核心类关系图¶
┌────────────────────────────────────────────────────────────────┐
│ Runner │
│ 静态入口点: │
│ • run() → AgentRunner.run() │
│ • run_sync() → asyncio.run(run()) │
│ • run_streamed() → AgentRunner.run_streamed() │
└──────────────────────────┬─────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────┐
│ AgentRunner │
│ 核心编排器 (src/agents/run.py:396-1329): │
│ • run() - 主 async 循环 │
│ • run_streamed() - 流式传输变体 │
│ • 职责: 管理 while loop,协调各子系统 │
└──────────────────────────┬─────────────────────────────────────┘
│
┌──────────────────┼──────────────────┬───────────┐
▼ ▼ ▼ ▼
┌──────────────┐ ┌─────────────┐ ┌────────────┐ ┌──────────┐
│ Agent │ │ RunState │ │RunContext │ │ Model │
│ │ │ │ │ Wrapper │ │ │
│ • name │ │ HITL 关键 │ │ │ │ Protocol │
│ • instruct'ns│ │ • serialize │ │ • context │ │ based │
│ • tools │ │ • resume │ │ • usage │ │ │
│ • handoffs │ │ • approval │ │ • approvals│ │ │
│ • model │ │ │ │ │ │ │
│ • guardrails │ │ │ │ │ │ │
└──────┬───────┘ └─────────────┘ └────────────┘ └──────────┘
│
├────────┬──────────┬──────────┬──────────┐
▼ ▼ ▼ ▼ ▼
┌──────┐ ┌────────┐ ┌──────────┐┌─────────┐┌────────┐
│ Tool │ │Handoff │ │Guardrail ││Session ││Tracing │
│ │ │ │ │ ││ ││ │
│func │ │→Agent │ │Input/Out ││Protocol ││Trace │
│tool │ │ │ │Tool I/O ││ ││ Span │
└──────┘ └────────┘ └──────────┘└─────────┘└────────┘
关系说明: 1. Runner → AgentRunner: 静态委托模式 2. AgentRunner → RunState: 创建/管理 HITL 状态 3. RunState → Session: 可选持久化后端 4. Agent → Tool/Handoff/Guardrail: 组合模式(一对多) 5. Model ← ModelProvider: 工厂模式
2.3 关键设计模式¶
| 模式 | 应用场景 | 代码位置 |
|---|---|---|
| Protocol-based Design | Session, Model, TracingProcessor | memory/session.py, models/interface.py |
| Factory Pattern | ModelProvider.get_model(), handoff() | models/interface.py:50-80 |
| Strategy Pattern | 不同 Session/Model/Tracing 实现 | memory/, models/, tracing/ |
| Context Manager | TraceCtxManager, agent_span() | tracing/context.py |
| Observer Pattern | Tracing processors | tracing/processor_interface.py |
| State Machine | NextStep variants | run_internal/turn_resolution.py:20-60 |
| Async Iterator | Streaming responses | run.py:1330+, run_loop.py:374-1042 |
3. 底层实现原理¶
3.1 Agent Loop 核心流程¶
入口: src/agents/run.py:396-1329 (AgentRunner.run())
async def run(self, starting_agent, input, **kwargs) -> RunResult:
# ═══════════════════════════════════════════════════════════
# 第一阶段: 初始化 (lines 402-634)
# ═══════════════════════════════════════════════════════════
# 1.1 RunState 准备
if is_resumed_state:
run_state = cast(RunState[TContext], input)
# 恢复会话设置
conversation_id, previous_response_id, auto_previous_response_id = \
apply_resumed_conversation_settings(run_state, ...)
# 恢复 context
context_wrapper = resolve_resumed_context(run_state, context)
max_turns = run_state._max_turns
else:
run_state = RunState[TContext](...)
# 1.2 Session 集成
if session:
input_items, new_items_for_save = await prepare_input_with_session(
raw_input, session, callback, settings, ...
)
# 1.3 Tracing 上下文
with TraceCtxManager(
workflow_name=trace_workflow_name,
trace_id=trace_id,
group_id=trace_group_id,
metadata=trace_metadata,
tracing=trace_config,
disabled=run_config.tracing_disabled,
):
# ═══════════════════════════════════════════════════════════
# 第二阶段: 主循环 (lines 637-1304)
# ═══════════════════════════════════════════════════════════
while True:
# ───────────────────────────────────────────────────────
# 2.1 处理中断恢复
# ───────────────────────────────────────────────────────
if run_state._current_step == NextStepInterruption:
# 恢复被中断的 turn (lines 647-836)
turn_result = await resolve_interrupted_turn(...)
# 跳过后续的 model 调用
skip_model_call = True
# ───────────────────────────────────────────────────────
# 2.2 工具准备
# ───────────────────────────────────────────────────────
all_tools = await get_all_tools(current_agent, context_wrapper)
await initialize_computer_tools(...)
# ───────────────────────────────────────────────────────
# 2.3 创建 Agent Span (Tracing)
# ───────────────────────────────────────────────────────
current_span = agent_span(
name=current_agent.name,
agent=current_agent,
...
)
current_span.start(mark_as_current=True)
# ───────────────────────────────────────────────────────
# 2.4 Turn 计数与限制检查
# ───────────────────────────────────────────────────────
current_turn += 1
if current_turn > max_turns:
# 触发 max_turns 处理 (lines 864-951)
# ... 可能创建 FinalOutput 或 Interruption
break
# ───────────────────────────────────────────────────────
# 2.5 Guardrails 检查 (仅第一个 turn)
# ───────────────────────────────────────────────────────
if current_turn <= 1 and not skip_model_call:
# 顺序执行的 guardrails
sequential_results = await run_input_guardrails(
context_wrapper, input_items,
guardrails=[g for g in input_guardrails if g.parallel==False]
)
# 并行 guardrails 与 model 调用
parallel_results, turn_result = await asyncio.gather(
run_input_guardrails(
context_wrapper, input_items,
guardrails=[g for g in input_guardrails if g.parallel==True]
),
run_single_turn(...)
)
else:
# ───────────────────────────────────────────────────
# 2.6 执行单个 Turn
# ───────────────────────────────────────────────────
turn_result = await run_single_turn(
agent=current_agent,
original_input=original_input,
input=current_input,
all_tools=all_tools,
handoffs=handoffs,
context_wrapper=context_wrapper,
run_state=run_state,
hooks=hooks,
...
)
# ───────────────────────────────────────────────────────
# 2.7 处理 Turn 结果
# ───────────────────────────────────────────────────────
generated_items = turn_result.pre_step_items + turn_result.new_step_items
session_items.extend(...)
# ───────────────────────────────────────────────────────
# 2.8 NextStep 分发
# ───────────────────────────────────────────────────────
if isinstance(turn_result.next_step, NextStepFinalOutput):
# ┌────────────────────────────────────────────────┐
# │ 情况 A: 最终输出 │
# └────────────────────────────────────────────────┘
# A.1 运行 Output Guardrails
for guardrail in output_guardrails:
result = await guardrail.func(
context_wrapper,
turn_result.next_step.final_output
)
if result.output.tripwire_triggered:
# 处理 guardrail 违规
...
# A.2 运行 PostRun Hooks
await run_hooks(hooks.get("PostRun", []), ...)
# A.3 保存 Session (如果有)
if session:
await save_result_to_session(
session, input_items, output_items, run_state, ...
)
# A.4 返回 RunResult
return RunResult(
final_output=turn_result.next_step.final_output,
run_context=context_wrapper,
model_usage=context_wrapper.usage,
...
)
elif isinstance(turn_result.next_step, NextStepHandoff):
# ┌────────────────────────────────────────────────┐
# │ 情况 B: Handoff 到新 Agent │
# └────────────────────────────────────────────────┘
# B.1 切换 Agent
current_agent = turn_result.next_step.new_agent
# B.2 更新 RunState
run_state._current_agent = current_agent.name
# B.3 结束当前 Span
current_span.finish(reset_current=True)
# B.4 继续循环(with 新 Agent)
continue
elif isinstance(turn_result.next_step, NextStepInterruption):
# ┌────────────────────────────────────────────────┐
# │ 情况 C: 中断(需要人工干预) │
# └────────────────────────────────────────────────┘
# C.1 保存状态
run_state._current_step = NextStepInterruption
run_state._last_processed_response = turn_result.processed_response
# C.2 构建 Interruption Result
return build_interruption_result(
interruptions=turn_result.next_step.interruptions,
run_state=run_state,
...
)
elif isinstance(turn_result.next_step, NextStepRunAgain):
# ┌────────────────────────────────────────────────┐
# │ 情况 D: 继续执行(工具调用完成,无 handoff) │
# └────────────────────────────────────────────────┘
continue
关键点:
1. 无状态设计: 每次 run() 调用都是独立的(除非通过 Session 或 RunState 恢复)
2. 事件驱动: Tracing spans 自动记录每个操作
3. 并发优化: Guardrails 与 Model 调用并行执行
4. 可中断性: 通过 NextStepInterruption + RunState 实现 HITL
3.2 Single Turn 执行详解¶
入口: src/agents/run_internal/run_loop.py:1044-1623+ (run_single_turn())
async def run_single_turn(...) -> SingleStepResult:
# ═══════════════════════════════════════════════════════════
# 阶段 1: 调用 Model
# ═══════════════════════════════════════════════════════════
new_response = await get_new_response(
agent=agent,
system_prompt=system_prompt,
input=input,
model_settings=model_settings,
all_tools=all_tools,
output_schema=output_schema,
handoffs=handoffs,
run_state=run_state,
...
)
# new_response: ModelResponse(output, usage, response_id)
# ═══════════════════════════════════════════════════════════
# 阶段 2: 处理 Model 响应
# ═══════════════════════════════════════════════════════════
processed_response = await process_model_response(
agent=agent,
new_response=new_response,
all_tools=all_tools,
handoffs=handoffs,
context_wrapper=context_wrapper,
run_state=run_state,
...
)
# processed_response: ProcessedResponse(
# run_tools=[ToolCallRun(tool, arguments), ...],
# run_handoffs=[HandoffRun(handoff, arguments), ...],
# final_output=<if no tools/handoffs>,
# ...
# )
# ═══════════════════════════════════════════════════════════
# 阶段 3: 执行工具与副作用
# ═══════════════════════════════════════════════════════════
turn_result = await execute_tools_and_side_effects(
agent=agent,
original_input=original_input,
new_response=new_response,
processed_response=processed_response,
hooks=hooks,
context_wrapper=context_wrapper,
...
)
# turn_result: SingleStepResult(
# next_step=<NextStepFinalOutput | NextStepHandoff | NextStepInterruption | NextStepRunAgain>,
# pre_step_items=[...],
# new_step_items=[...],
# ...
# )
return turn_result
阶段 1: get_new_response() 细节 (run_loop.py):
async def get_new_response(...):
# 1.1 准备系统提示
system_prompt = await resolve_system_prompt(agent, context_wrapper)
# 1.2 过滤输入(应用 handoff input_filter)
filtered_input = await maybe_filter_model_input(input, ...)
# 1.3 准备工具 schemas
tool_schemas = [tool.to_schema() for tool in all_tools]
# 1.4 调用 Model
if streaming:
response_stream = await model.stream_response(
system=system_prompt,
messages=filtered_input,
tools=tool_schemas,
...
)
# 流式处理...
else:
response = await model.get_response(
system=system_prompt,
messages=filtered_input,
tools=tool_schemas,
output_schema=output_schema, # 结构化输出
...
)
return ModelResponse(output=response, usage=usage, response_id=...)
阶段 2: process_model_response() 细节 (turn_resolution.py):
async def process_model_response(...):
# 2.1 解析 Model 输出
assistant_msg = new_response.output
# 2.2 提取工具调用
tool_use_blocks = [b for b in assistant_msg.content if isinstance(b, ToolUseBlock)]
# 2.3 分类为 Tools vs. Handoffs
run_tools = []
run_handoffs = []
for tool_use in tool_use_blocks:
tool_name = tool_use.name
# 检查是否是 Handoff
handoff = next((h for h in handoffs if h.tool_name == tool_name), None)
if handoff:
run_handoffs.append(HandoffRun(handoff, tool_use.input))
else:
tool = next((t for t in all_tools if t.name == tool_name), None)
if tool:
run_tools.append(ToolCallRun(tool, tool_use.input, tool_use.id))
# 2.4 判断 final_output
if not run_tools and not run_handoffs:
# 没有工具调用 → 这是最终输出
final_output = extract_final_output(assistant_msg, output_schema)
else:
final_output = None
return ProcessedResponse(
run_tools=run_tools,
run_handoffs=run_handoffs,
final_output=final_output,
...
)
阶段 3: execute_tools_and_side_effects() 细节 (tool_execution.py):
async def execute_tools_and_side_effects(...):
# 3.1 处理 Handoff (优先级最高)
if processed_response.run_handoffs:
return await execute_handoffs(
processed_response.run_handoffs,
context_wrapper,
...
)
# 3.2 处理工具调用
if processed_response.run_tools:
# 3.2.1 运行 PreToolUse Hooks
for tool_run in processed_response.run_tools:
hook_results = await run_hooks(
hooks.get("PreToolUse", []),
tool_run,
context_wrapper,
...
)
# 检查是否被 deny
if hook_results.permission_decision == "deny":
# 创建 denied tool result
...
# 3.2.2 并行执行工具
tool_results = await execute_function_tool_calls(
processed_response.run_tools,
context_wrapper,
hooks,
...
)
# 3.2.3 运行 PostToolUse Hooks
for tool_result in tool_results:
await run_hooks(
hooks.get("PostToolUse", []),
tool_result,
context_wrapper,
...
)
# 3.2.4 创建 Tool Result Items
new_step_items.extend(
[ToolResultItem(tool_use_id=..., content=...) for ... in tool_results]
)
return SingleStepResult(
next_step=NextStepRunAgain(), # 继续循环
new_step_items=new_step_items,
...
)
# 3.3 没有工具/handoff → 最终输出
if processed_response.final_output:
return SingleStepResult(
next_step=NextStepFinalOutput(final_output=processed_response.final_output),
...
)
3.3 Handoff 机制深度解析¶
核心思想: Handoff 是一个特殊的 Tool,但不返回 Tool Result,而是返回新的 Agent
实现位置: src/agents/handoffs/__init__.py:93-321
@dataclass
class Handoff(Generic[TContext, TAgent]):
"""Handoff 定义"""
tool_name: str # e.g., "transfer_to_Spanish_agent"
tool_description: str
input_json_schema: dict # Tool input schema
on_invoke_handoff: Callable[..., Awaitable[TAgent]] # 返回目标 Agent
agent_name: str # 目标 Agent 名称
input_filter: HandoffInputFilter | None # 可选:过滤传递给新 Agent 的历史
def to_tool_schema(self) -> dict:
"""转换为 Tool Schema(供 LLM 使用)"""
return {
"type": "function",
"function": {
"name": self.tool_name,
"description": self.tool_description,
"parameters": self.input_json_schema,
}
}
# ═══════════════════════════════════════════════════════════
# Factory 函数: handoff()
# ═══════════════════════════════════════════════════════════
def handoff(
agent: TAgent | str,
*,
tool_name: str | None = None,
tool_description: str | None = None,
input_filter: HandoffInputFilter | None = None,
) -> Handoff[Any, TAgent]:
"""创建 Handoff"""
# 1. 生成默认 tool_name
if tool_name is None:
if isinstance(agent, str):
tool_name = f"transfer_to_{agent}"
else:
tool_name = f"transfer_to_{agent.name}"
# 2. 生成默认 description
if tool_description is None:
tool_description = f"Transfer to {agent_name} agent"
# 3. 定义 invoke 函数
async def on_invoke_handoff(
context: RunContextWrapper[Any],
arguments_json: str
) -> TAgent:
# 直接返回 Agent(可以是动态的)
if callable(agent):
return await agent(context, arguments_json)
else:
return agent
return Handoff(
tool_name=tool_name,
tool_description=tool_description,
input_json_schema={"type": "object", "properties": {}, "additionalProperties": False},
on_invoke_handoff=on_invoke_handoff,
agent_name=agent_name,
input_filter=input_filter,
)
# ═══════════════════════════════════════════════════════════
# Handoff 执行: execute_handoffs()
# ═══════════════════════════════════════════════════════════
# 位置: src/agents/run_internal/turn_resolution.py:268-400+
async def execute_handoffs(...) -> SingleStepResult:
# 1. 只处理第一个 Handoff(忽略多个)
if len(run_handoffs) > 1:
warnings.warn("Multiple handoffs detected, only first will be processed")
handoff_run = run_handoffs[0]
# 2. 调用 Handoff 函数
new_agent = await handoff_run.handoff.on_invoke_handoff(
context_wrapper,
handoff_run.arguments_json
)
# 3. 应用 input_filter(如果有)
if handoff_run.handoff.input_filter:
handoff_input_data = HandoffInputData(
input_history=original_input,
pre_handoff_items=tuple(pre_step_items),
new_items=tuple(new_step_items),
run_context=context_wrapper,
)
filtered_data = await handoff_run.handoff.input_filter(handoff_input_data)
# 应用过滤结果
# filtered_data 包含: input_history, pre_handoff_items, new_items
# 这些将传递给新 Agent
# 4. 可选: 嵌套历史(summarize 以前的对话)
if nest_handoff_history:
# 使用 history.py 中的 mapper 来嵌套历史
filtered_data = nest_handoff_history(filtered_data, mapper=...)
# 5. 创建 Handoff Items
new_step_items.append(
HandoffCallItem(
tool_use_id=handoff_run.tool_use_id,
tool_name=handoff_run.handoff.tool_name,
arguments_json=handoff_run.arguments_json,
)
)
new_step_items.append(
HandoffOutputItem(
tool_use_id=handoff_run.tool_use_id,
output=f"Transferred to {new_agent.name}",
)
)
# 6. 返回 NextStepHandoff
return SingleStepResult(
next_step=NextStepHandoff(new_agent=new_agent),
pre_step_items=filtered_data.pre_handoff_items,
new_step_items=new_step_items,
...
)
Handoff 的特殊之处:
| 维度 | 普通 Tool | Handoff Tool |
|---|---|---|
| 返回值 | Tool Result (string/dict) | 新的 Agent 对象 |
| 执行后行为 | 添加 ToolResultItem,继续循环 | 切换 Agent,重置 Span,继续循环 |
| 历史处理 | 保留完整历史 | 可选 input_filter 过滤历史 |
| 调用次数 | 可并行多个 | 只处理第一个 |
| NextStep | NextStepRunAgain | NextStepHandoff |
Input Filter 示例:
async def spanish_agent_filter(data: HandoffInputData) -> HandoffInputData:
"""只传递最后 3 条消息给 Spanish Agent"""
return HandoffInputData(
input_history=data.input_history[-3:],
pre_handoff_items=data.pre_handoff_items,
new_items=data.new_items,
run_context=data.run_context,
)
spanish_agent = Agent(...)
triage_agent = Agent(
handoffs=[
handoff(spanish_agent, input_filter=spanish_agent_filter)
]
)
3.4 Session 存储与检索机制¶
Session Protocol (src/agents/memory/session.py:14-55):
@runtime_checkable
class Session(Protocol):
"""Session 后端必须实现的协议"""
session_id: str
session_settings: SessionSettings | None
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
"""获取会话历史
Args:
limit: 最多返回多少条(None = 全部)
Returns:
历史消息列表(按时间顺序)
"""
...
async def add_items(self, items: list[TResponseInputItem]) -> None:
"""添加新消息到会话"""
...
async def pop_item(self) -> TResponseInputItem | None:
"""移除并返回最新的一条消息"""
...
async def clear_session(self) -> None:
"""清空会话"""
...
SQLite 实现 (src/agents/memory/sqlite_session.py):
class SQLiteSession(SessionABC):
def __init__(
self,
session_id: str,
db_path: str = "conversations.db",
*,
session_settings: SessionSettings | None = None,
):
self.session_id = session_id
self.db_path = db_path
self._lock = asyncio.Lock()
self.session_settings = session_settings
# 创建表
self._create_tables()
def _create_tables(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
item_json TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_session_id
ON sessions(session_id)
""")
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
limit = resolve_session_limit(limit, self.session_settings)
async with self._lock:
with sqlite3.connect(self.db_path) as conn:
if limit is None or limit <= 0:
query = """
SELECT item_json FROM sessions
WHERE session_id = ?
ORDER BY id ASC
"""
rows = conn.execute(query, (self.session_id,)).fetchall()
else:
query = """
SELECT item_json FROM sessions
WHERE session_id = ?
ORDER BY id DESC
LIMIT ?
"""
rows = conn.execute(query, (self.session_id, limit)).fetchall()
rows = list(reversed(rows)) # 恢复时间顺序
return [json.loads(row[0]) for row in rows]
async def add_items(self, items: list[TResponseInputItem]) -> None:
async with self._lock:
with sqlite3.connect(self.db_path) as conn:
for item in items:
conn.execute(
"""
INSERT INTO sessions (session_id, item_json)
VALUES (?, ?)
""",
(self.session_id, json.dumps(item))
)
conn.commit()
async def pop_item(self) -> TResponseInputItem | None:
async with self._lock:
with sqlite3.connect(self.db_path) as conn:
row = conn.execute(
"""
SELECT id, item_json FROM sessions
WHERE session_id = ?
ORDER BY id DESC
LIMIT 1
""",
(self.session_id,)
).fetchone()
if row:
conn.execute("DELETE FROM sessions WHERE id = ?", (row[0],))
conn.commit()
return json.loads(row[1])
return None
Session 集成流程 (src/agents/run_internal/session_persistence.py):
# ═══════════════════════════════════════════════════════════
# 阶段 1: 准备输入(合并历史)
# ═══════════════════════════════════════════════════════════
async def prepare_input_with_session(
raw_input,
session: Session | None,
callback: Callable | None,
settings: SessionSettings | None,
...
):
if not session:
return raw_input, None
# 1. 获取历史
history = await session.get_items()
# 2. 合并历史 + 新输入
merged_input = history + normalize_input(raw_input)
# 3. 返回用于保存的新项
new_items_for_save = normalize_input(raw_input)
return merged_input, new_items_for_save
# ═══════════════════════════════════════════════════════════
# 阶段 2: 保存结果
# ═══════════════════════════════════════════════════════════
async def save_result_to_session(
session: Session | None,
input_items: list[TResponseInputItem],
output_items: list[TResponseInputItem],
run_state: RunState | None,
...
):
if not session:
return
# 合并输入 + 输出
all_items = input_items + output_items
# 保存到 session
await session.add_items(all_items)
使用示例:
from agents import Agent, Runner, SQLiteSession
agent = Agent(name="Assistant")
# 创建 session
session = SQLiteSession("user_123", "conversations.db")
# 第一轮
result1 = await Runner.run(
agent,
"What city is the Golden Gate Bridge in?",
session=session
)
print(result1.final_output) # "San Francisco"
# 第二轮(自动加载历史)
result2 = await Runner.run(
agent,
"What state is it in?",
session=session
)
print(result2.final_output) # "California"
# Session 中的内容:
# [
# {"role": "user", "content": "What city is the Golden Gate Bridge in?"},
# {"role": "assistant", "content": "San Francisco"},
# {"role": "user", "content": "What state is it in?"},
# {"role": "assistant", "content": "California"},
# ]
3.5 Tracing 实现详解¶
架构概览:
Trace (workflow-level, 一次 run() 调用)
├─ metadata: {workflow_name, trace_id, group_id, ...}
├─ Span (operation-level)
│ ├─ AgentSpan (agent execution)
│ │ └─ span_data: AgentSpanData(agent_name, model, ...)
│ ├─ GenerationSpan (LLM call)
│ │ └─ span_data: GenerationSpanData(model, prompt_tokens, ...)
│ ├─ FunctionSpan (tool call)
│ │ └─ span_data: FunctionSpanData(tool_name, arguments, result, ...)
│ ├─ HandoffSpan (handoff)
│ │ └─ span_data: HandoffSpanData(from_agent, to_agent, ...)
│ └─ GuardrailSpan (guardrail check)
│ └─ span_data: GuardrailSpanData(guardrail_name, triggered, ...)
└─ processors: [TracingProcessor, ...]
TracingProcessor Protocol (src/agents/tracing/processor_interface.py:9-130):
class TracingProcessor(abc.ABC):
"""Tracing 事件处理器"""
@abstractmethod
def on_trace_start(self, trace: Trace) -> None:
"""Trace 开始时调用"""
...
@abstractmethod
def on_trace_end(self, trace: Trace) -> None:
"""Trace 结束时调用(可导出数据)"""
...
@abstractmethod
def on_span_start(self, span: Span[Any]) -> None:
"""Span 开始时调用"""
...
@abstractmethod
def on_span_end(self, span: Span[Any]) -> None:
"""Span 结束时调用"""
...
@abstractmethod
def shutdown(self) -> None:
"""清理资源"""
...
@abstractmethod
def force_flush(self) -> None:
"""强制刷新缓冲数据"""
...
Trace 生命周期 (src/agents/run.py:545-554):
# 在 AgentRunner.run() 中
with TraceCtxManager(
workflow_name="customer_support",
trace_id="trace_abc123",
group_id="session_xyz",
metadata={"user_id": "user_456"},
tracing=trace_config, # TracingConfig(processors=[...])
disabled=False,
):
# ───────────────────────────────────────────────────────
# on_trace_start() 被调用
# ───────────────────────────────────────────────────────
while True:
# 创建 Agent Span
current_span = agent_span(
name=current_agent.name,
agent=current_agent,
...
)
current_span.start(mark_as_current=True)
# ───────────────────────────────────────────────────
# on_span_start(current_span) 被调用
# ───────────────────────────────────────────────────
# ... agent 执行 ...
# 在 run_single_turn() 内部:
with generation_span(model=..., input=...) as gen_span:
# on_span_start(gen_span) 被调用
response = await model.get_response(...)
# on_span_end(gen_span) 被调用
# 工具执行
for tool_run in processed_response.run_tools:
with function_span(tool_name=tool_run.tool.name, ...) as func_span:
# on_span_start(func_span) 被调用
result = await execute_tool(tool_run)
# on_span_end(func_span) 被调用
current_span.finish(reset_current=True)
# ───────────────────────────────────────────────────
# on_span_end(current_span) 被调用
# ───────────────────────────────────────────────────
# ... 循环或退出 ...
# ───────────────────────────────────────────────────────
# on_trace_end() 被调用(可导出完整 trace)
# ───────────────────────────────────────────────────────
Span 数据结构 (src/agents/tracing/span_data.py):
@dataclass
class AgentSpanData:
agent_name: str
agent_model: str
agent_instructions: str | None
tools: list[str]
handoffs: list[str]
output_type: str | None
@dataclass
class GenerationSpanData:
model: str
prompt_tokens: int
completion_tokens: int
total_tokens: int
latency_ms: float
system_prompt: str | None
messages: list[dict]
response: dict
@dataclass
class FunctionSpanData:
tool_name: str
arguments: dict
result: str | None
error: str | None
latency_ms: float
@dataclass
class HandoffSpanData:
from_agent: str
to_agent: str
input_filter_applied: bool
@dataclass
class GuardrailSpanData:
guardrail_name: str
guardrail_type: Literal["input", "output", "tool_input", "tool_output"]
tripwire_triggered: bool
reason: str | None
自定义 Processor 示例:
from agents import TracingProcessor, Trace, Span
import json
class JSONFileTracingProcessor(TracingProcessor):
def __init__(self, output_file: str):
self.output_file = output_file
self.traces = []
def on_trace_start(self, trace: Trace) -> None:
print(f"[TRACE START] {trace.name} ({trace.trace_id})")
def on_trace_end(self, trace: Trace) -> None:
# 导出完整 trace 数据
data = trace.export()
if data:
self.traces.append(data)
# 写入文件
with open(self.output_file, 'w') as f:
json.dump(self.traces, f, indent=2)
print(f"[TRACE END] Exported to {self.output_file}")
def on_span_start(self, span: Span) -> None:
print(f" [SPAN START] {span.span_data}")
def on_span_end(self, span: Span) -> None:
print(f" [SPAN END] {span.span_id}")
def shutdown(self) -> None:
print(f"[SHUTDOWN] Total traces: {len(self.traces)}")
def force_flush(self) -> None:
# 强制写入
with open(self.output_file, 'w') as f:
json.dump(self.traces, f, indent=2)
# 使用
from agents import set_trace_processors
processor = JSONFileTracingProcessor("traces.json")
set_trace_processors([processor])
# 运行 agent(自动 trace)
result = await Runner.run(agent, "Hello")
4. 与竞品对比¶
4.1 OpenAI Agents SDK vs. Claude Agent SDK¶
| 维度 | OpenAI Agents SDK | Claude Agent SDK |
|---|---|---|
| Provider 绑定 | ✅ Provider-agnostic (100+ LLMs via LiteLLM) | ❌ Claude-only |
| 核心抽象 | Agent, Handoff, Session, Guardrail, Tracing | Query, ClaudeSDKClient, Hooks, SDK MCP Server |
| Session 管理 | ✅ 内置(SQLite/Redis/自定义) | ❌ 无(需自行管理) |
| Tracing | ✅ 内置(多后端支持) | ❌ 无内置 tracing |
| Handoff 机制 | ✅ 一等公民(Handoff 类 + input_filter) | ✅ 通过 subagents 支持(文档化) |
| HITL 支持 | ✅ RunState + interruptions + approval | ✅ Hooks + permission_mode |
| 工具定义 | Python 函数 (@function_tool) |
Python 函数 (@tool + SDK MCP Server) |
| MCP 集成 | ❌ 无(但支持外部 MCP via tools) | ✅ 原生支持(in-process SDK MCP Server) |
| CLI 依赖 | ❌ 无(纯 Python SDK) | ✅ 依赖 Claude Code CLI(bundled) |
| License | MIT (纯开源) | MIT + Anthropic 商业条款 |
| 架构透明度 | ✅ 完全开源(run loop 可审计) | ⚠️ SDK 开源,但依赖闭源 CLI |
| Stars | 19.2k | 5k |
| 生产成熟度 | ✅ 高(活跃维护,219 contributors) | ✅ 中(官方维护,47 contributors) |
| 适用场景 | 跨 LLM 多 agent 编排 | Claude 深度集成 |
代码对比 - 基础 Agent:
# ═══════════════════════════════════════════════════════════
# OpenAI Agents SDK
# ═══════════════════════════════════════════════════════════
from agents import Agent, Runner
agent = Agent(
name="Assistant",
instructions="You are helpful",
model="gpt-4o" # 或 "claude-3-5-sonnet", "gemini-pro" 等
)
result = await Runner.run(agent, "Hello")
print(result.final_output)
# ═══════════════════════════════════════════════════════════
# Claude Agent SDK
# ═══════════════════════════════════════════════════════════
from claude_agent_sdk import query, ClaudeAgentOptions
options = ClaudeAgentOptions(
system_prompt="You are helpful",
max_turns=1
)
async for message in query(prompt="Hello", options=options):
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(block.text)
代码对比 - Handoff:
# ═══════════════════════════════════════════════════════════
# OpenAI Agents SDK
# ═══════════════════════════════════════════════════════════
from agents import Agent, Runner, handoff
spanish_agent = Agent(name="Spanish", instructions="Only speak Spanish")
english_agent = Agent(name="English", instructions="Only speak English")
triage_agent = Agent(
name="Triage",
instructions="Route to appropriate language agent",
handoffs=[spanish_agent, english_agent] # 自动生成 handoff tools
)
result = await Runner.run(triage_agent, "Hola")
# 自动 handoff 到 spanish_agent
# ═══════════════════════════════════════════════════════════
# Claude Agent SDK
# ═══════════════════════════════════════════════════════════
# 通过 hooks 实现(需手动管理 subagents)
# 详见 https://docs.anthropic.com/en/docs/claude-code/hooks
代码对比 - Session:
# ═══════════════════════════════════════════════════════════
# OpenAI Agents SDK
# ═══════════════════════════════════════════════════════════
from agents import SQLiteSession
session = SQLiteSession("user_123")
result1 = await Runner.run(agent, "What's the capital of France?", session=session)
result2 = await Runner.run(agent, "What's its population?", session=session)
# session 自动管理历史
# ═══════════════════════════════════════════════════════════
# Claude Agent SDK
# ═══════════════════════════════════════════════════════════
# 需要手动管理历史:
messages = []
async for msg in query("What's the capital of France?"):
messages.append(msg)
# 第二次需要传递历史(如果需要 context)
# 无内置 session 抽象
4.2 OpenAI Agents SDK vs. LangChain¶
| 维度 | OpenAI Agents SDK | LangChain |
|---|---|---|
| 设计哲学 | Lightweight, 核心原语 | Full-featured, 全栈框架 |
| 学习曲线 | ⭐⭐ 陡峭度低 | ⭐⭐⭐⭐ 陡峭度高 |
| 抽象层数 | 2 层(Agent → Runner) | 5+ 层(Chain → Agent → Executor → Tool → Memory → ...) |
| 代码行数 | ~15,000 lines (core) | ~500,000+ lines (framework) |
| 适用范围 | Multi-agent workflows | LLM apps (RAG, chains, agents, etc.) |
| RAG 支持 | ❌ 无(需自行实现) | ✅ 内置(VectorStore, Retriever, etc.) |
| Memory 抽象 | Session Protocol (简单) | ConversationBufferMemory, VectorStoreMemory, etc. (复杂) |
| 工具生态 | Python 函数 + 外部 MCP | LangChain Tools (1000+ integrations) |
| Provider 支持 | 100+ LLMs (via LiteLLM) | 50+ LLMs (内置) |
| Streaming | ✅ 一等公民 | ✅ 支持但较复杂 |
| 可观测性 | Tracing Protocol (extensible) | LangSmith (托管服务) + Callbacks |
| 依赖项 | 轻量 (~10 deps) | 重量 (50+ deps) |
何时选择 OpenAI Agents SDK: - ✅ 专注 multi-agent 编排 - ✅ 需要轻量级、可控的解决方案 - ✅ 想要 provider-agnostic - ✅ 不需要 RAG/Vector stores
何时选择 LangChain: - ✅ 需要 RAG、Vector stores、Document loaders - ✅ 需要丰富的工具集成(1000+ tools) - ✅ 构建复杂的 LLM 应用(不仅是 agents) - ✅ 团队已熟悉 LangChain 生态
4.3 Feature Matrix (全面对比)¶
| Feature | OpenAI Agents SDK | Claude Agent SDK | LangChain | Swarm (deprecated) |
|---|---|---|---|---|
| Multi-agent | ✅ ✅ ✅ | ✅ ✅ | ✅ ✅ | ✅ |
| Handoffs | ✅ ✅ ✅ (first-class) | ✅ ✅ (via hooks) | ✅ (via tools) | ✅ |
| Sessions | ✅ ✅ ✅ | ❌ | ✅ ✅ | ❌ |
| Tracing | ✅ ✅ ✅ | ❌ | ✅ (LangSmith) | ❌ |
| HITL | ✅ ✅ ✅ (RunState) | ✅ ✅ (Hooks) | ✅ (Callbacks) | ❌ |
| Streaming | ✅ ✅ ✅ | ✅ ✅ | ✅ ✅ | ✅ |
| Guardrails | ✅ ✅ ✅ | ✅ (Hooks) | ⚠️ (需自定义) | ❌ |
| MCP Support | ⚠️ (via tools) | ✅ ✅ ✅ (native) | ❌ | ❌ |
| RAG | ❌ | ❌ | ✅ ✅ ✅ | ❌ |
| Vector Stores | ❌ | ❌ | ✅ ✅ ✅ | ❌ |
| Provider-agnostic | ✅ ✅ ✅ | ❌ | ✅ ✅ | ⚠️ (基础) |
| Production-ready | ✅ ✅ ✅ | ✅ ✅ | ✅ ✅ ✅ | ❌ (教育性) |
| License | MIT | MIT + ToS | MIT | MIT |
| Stars | 19.2k | 5k | 100k+ | 21k |
5. 扩展开发指南¶
5.1 自定义工具开发¶
5.1.1 基础函数工具¶
from agents import function_tool, ToolContext
# ═══════════════════════════════════════════════════════════
# 简单工具(无 context)
# ═══════════════════════════════════════════════════════════
@function_tool
def get_weather(city: str) -> str:
"""Get current weather for a city.
Args:
city: The city name
"""
# 调用天气 API
return f"Weather in {city}: 72°F, sunny"
# ═══════════════════════════════════════════════════════════
# 带 context 的工具
# ═══════════════════════════════════════════════════════════
from typing import TypedDict
class MyContext(TypedDict):
user_id: str
api_key: str
@function_tool
def get_user_profile(
context: RunContextWrapper[MyContext],
field: str
) -> str:
"""Get user profile field.
Args:
field: The field to retrieve (e.g., 'name', 'email')
"""
user_id = context.context["user_id"]
api_key = context.context["api_key"]
# 调用用户 API
profile = fetch_user_profile(user_id, api_key)
return profile.get(field, "Not found")
# ═══════════════════════════════════════════════════════════
# 异步工具
# ═══════════════════════════════════════════════════════════
@function_tool
async def search_database(query: str) -> str:
"""Search the database.
Args:
query: SQL query to execute
"""
async with async_db_connection() as conn:
results = await conn.execute(query)
return json.dumps(results)
# ═══════════════════════════════════════════════════════════
# 使用
# ═══════════════════════════════════════════════════════════
agent = Agent(
name="Assistant",
tools=[get_weather, get_user_profile, search_database]
)
context = MyContext(user_id="user_123", api_key="sk-...")
result = await Runner.run(agent, "What's the weather in SF?", context=context)
5.1.2 需要审批的工具¶
from agents import function_tool
# ═══════════════════════════════════════════════════════════
# 静态审批需求
# ═══════════════════════════════════════════════════════════
@function_tool(needs_approval=True)
async def delete_user(user_id: str) -> str:
"""Delete a user account.
Args:
user_id: The user ID to delete
"""
# 执行删除
await db.users.delete(user_id)
return f"Deleted user {user_id}"
# ═══════════════════════════════════════════════════════════
# 动态审批逻辑
# ═══════════════════════════════════════════════════════════
async def should_approve_payment(
context: RunContextWrapper[Any],
params: dict[str, Any],
call_id: str
) -> bool:
"""动态决定是否需要审批"""
amount = params.get("amount", 0)
# 小额支付自动批准
if amount < 100:
return True
# 大额支付需要审批
return False
@function_tool(needs_approval=should_approve_payment)
async def process_payment(amount: float, recipient: str) -> str:
"""Process a payment.
Args:
amount: Payment amount in USD
recipient: Recipient user ID
"""
# 处理支付
tx_id = await payment_api.transfer(amount, recipient)
return f"Paid ${amount} to {recipient}. Transaction: {tx_id}"
# ═══════════════════════════════════════════════════════════
# HITL 工作流
# ═══════════════════════════════════════════════════════════
agent = Agent(
name="Payment Bot",
tools=[process_payment]
)
# 第一次运行
result = await Runner.run(agent, "Pay $500 to user_456")
if result.interruptions:
print(f"Approval needed: {result.interruptions}")
# [ToolApprovalInterruption(tool_name='process_payment', arguments={'amount': 500, ...})]
# 用户批准
run_state = result.to_state()
run_state.approve(result.interruptions[0])
# 恢复执行
final_result = await Runner.run(agent, run_state)
print(final_result.final_output)
5.1.3 高级 FunctionTool¶
from agents import FunctionTool, ToolContext
import asyncio
# ═══════════════════════════════════════════════════════════
# 手动创建 FunctionTool(完全控制)
# ═══════════════════════════════════════════════════════════
async def my_tool_impl(context: ToolContext, args_json: str) -> str:
import json
args = json.loads(args_json)
# 访问 context
print(f"Called by agent: {context.agent.name}")
print(f"Current usage: {context.usage}")
# 执行逻辑
await asyncio.sleep(1) # 模拟耗时操作
return f"Processed: {args}"
tool = FunctionTool(
name="my_advanced_tool",
description="Does advanced processing",
params_json_schema={
"type": "object",
"properties": {
"input": {"type": "string", "description": "Input data"},
"mode": {"type": "string", "enum": ["fast", "thorough"]},
},
"required": ["input"],
"additionalProperties": False,
},
on_invoke_tool=my_tool_impl,
strict_json_schema=True, # OpenAI Structured Outputs
timeout_seconds=30.0,
tool_input_guardrails=[...], # 可选 guardrails
tool_output_guardrails=[...],
)
agent = Agent(tools=[tool])
5.2 自定义 Session Backend¶
5.2.1 基础 Session 实现¶
from agents import SessionABC, TResponseInputItem
from typing import Literal
import asyncio
class MemorySession(SessionABC):
"""In-memory session (用于测试)"""
def __init__(self, session_id: str):
self.session_id = session_id
self.session_settings = None
self._items: list[TResponseInputItem] = []
self._lock = asyncio.Lock()
async def get_items(
self,
limit: int | None = None
) -> list[TResponseInputItem]:
async with self._lock:
if limit is None or limit <= 0:
return self._items.copy()
else:
return self._items[-limit:]
async def add_items(
self,
items: list[TResponseInputItem]
) -> None:
async with self._lock:
self._items.extend(items)
async def pop_item(self) -> TResponseInputItem | None:
async with self._lock:
if self._items:
return self._items.pop()
return None
async def clear_session(self) -> None:
async with self._lock:
self._items.clear()
5.2.2 Redis Session (生产级)¶
from agents import SessionABC, TResponseInputItem, SessionSettings
from redis.asyncio import Redis
import json
import asyncio
class RedisSession(SessionABC):
"""Redis-backed session"""
def __init__(
self,
session_id: str,
*,
redis_client: Redis,
ttl_seconds: int | None = None,
session_settings: SessionSettings | None = None,
):
self.session_id = session_id
self._redis = redis_client
self._ttl = ttl_seconds
self.session_settings = session_settings
self._lock = asyncio.Lock()
# Redis keys
self._messages_key = f"agents:session:{session_id}:messages"
self._counter_key = f"agents:session:{session_id}:counter"
async def get_items(
self,
limit: int | None = None
) -> list[TResponseInputItem]:
from agents.memory.session import resolve_session_limit
limit = resolve_session_limit(limit, self.session_settings)
async with self._lock:
if limit is None or limit <= 0:
# 获取全部
raw_items = await self._redis.lrange(
self._messages_key, 0, -1
)
else:
# 获取最后 N 条
raw_items = await self._redis.lrange(
self._messages_key, -limit, -1
)
items = [
json.loads(item.decode('utf-8'))
for item in raw_items
]
return items
async def add_items(
self,
items: list[TResponseInputItem]
) -> None:
async with self._lock:
pipe = self._redis.pipeline()
for item in items:
serialized = json.dumps(item).encode('utf-8')
pipe.rpush(self._messages_key, serialized)
# 设置 TTL
if self._ttl:
pipe.expire(self._messages_key, self._ttl)
await pipe.execute()
async def pop_item(self) -> TResponseInputItem | None:
async with self._lock:
raw_item = await self._redis.rpop(self._messages_key)
if raw_item:
return json.loads(raw_item.decode('utf-8'))
return None
async def clear_session(self) -> None:
async with self._lock:
await self._redis.delete(self._messages_key)
await self._redis.delete(self._counter_key)
@classmethod
async def from_url(
cls,
session_id: str,
url: str = "redis://localhost:6379/0",
**kwargs
) -> "RedisSession":
"""从 Redis URL 创建 session"""
redis_client = await Redis.from_url(url, decode_responses=False)
return cls(session_id, redis_client=redis_client, **kwargs)
# ═══════════════════════════════════════════════════════════
# 使用
# ═══════════════════════════════════════════════════════════
session = await RedisSession.from_url(
"user_123",
url="redis://localhost:6379/0",
ttl_seconds=86400, # 24 hours
)
result = await Runner.run(agent, "Hello", session=session)
5.2.3 加密 Session¶
from cryptography.fernet import Fernet
from agents import SessionABC, TResponseInputItem
import json
class EncryptedSession(SessionABC):
"""加密存储的 Session"""
def __init__(
self,
session_id: str,
encryption_key: bytes,
backend_session: SessionABC,
):
self.session_id = session_id
self.session_settings = backend_session.session_settings
self._cipher = Fernet(encryption_key)
self._backend = backend_session
def _encrypt_item(self, item: TResponseInputItem) -> TResponseInputItem:
"""加密单个 item"""
# 只加密 content 字段
encrypted_item = item.copy()
if "content" in encrypted_item:
original_content = json.dumps(encrypted_item["content"])
encrypted_content = self._cipher.encrypt(original_content.encode())
encrypted_item["content"] = encrypted_content.decode('utf-8')
return encrypted_item
def _decrypt_item(self, item: TResponseInputItem) -> TResponseInputItem:
"""解密单个 item"""
decrypted_item = item.copy()
if "content" in decrypted_item:
encrypted_content = decrypted_item["content"]
decrypted_content = self._cipher.decrypt(encrypted_content.encode())
decrypted_item["content"] = json.loads(decrypted_content.decode())
return decrypted_item
async def get_items(
self,
limit: int | None = None
) -> list[TResponseInputItem]:
encrypted_items = await self._backend.get_items(limit)
return [self._decrypt_item(item) for item in encrypted_items]
async def add_items(
self,
items: list[TResponseInputItem]
) -> None:
encrypted_items = [self._encrypt_item(item) for item in items]
await self._backend.add_items(encrypted_items)
async def pop_item(self) -> TResponseInputItem | None:
encrypted_item = await self._backend.pop_item()
if encrypted_item:
return self._decrypt_item(encrypted_item)
return None
async def clear_session(self) -> None:
await self._backend.clear_session()
# ═══════════════════════════════════════════════════════════
# 使用
# ═══════════════════════════════════════════════════════════
# 生成密钥
encryption_key = Fernet.generate_key()
# 包装 SQLite session
base_session = SQLiteSession("user_123")
encrypted_session = EncryptedSession("user_123", encryption_key, base_session)
result = await Runner.run(agent, "Sensitive data", session=encrypted_session)
5.3 自定义 Tracing Processor¶
5.3.1 基础 Processor¶
from agents import TracingProcessor, Trace, Span
from agents.tracing.span_data import AgentSpanData, GenerationSpanData, FunctionSpanData
from datetime import datetime
class ConsoleTracingProcessor(TracingProcessor):
"""输出到控制台的 Tracing Processor"""
def __init__(self, verbose: bool = True):
self.verbose = verbose
self.indent_level = 0
def _print(self, msg: str):
indent = " " * self.indent_level
print(f"{indent}{msg}")
def on_trace_start(self, trace: Trace) -> None:
self._print(f"🚀 TRACE START: {trace.name} [{trace.trace_id}]")
if trace.metadata:
self._print(f" Metadata: {trace.metadata}")
self.indent_level += 1
def on_trace_end(self, trace: Trace) -> None:
self.indent_level -= 1
self._print(f"✅ TRACE END: {trace.name}")
# 导出统计
data = trace.export()
if data:
total_tokens = sum(
span.get("prompt_tokens", 0) + span.get("completion_tokens", 0)
for span in data.get("spans", [])
if span.get("type") == "generation"
)
self._print(f" Total tokens: {total_tokens}")
def on_span_start(self, span: Span) -> None:
span_data = span.span_data
if isinstance(span_data, AgentSpanData):
self._print(f"🤖 AGENT: {span_data.agent_name} (model: {span_data.agent_model})")
elif isinstance(span_data, GenerationSpanData):
self._print(f"💭 LLM CALL: {span_data.model}")
elif isinstance(span_data, FunctionSpanData):
self._print(f"🔧 TOOL: {span_data.tool_name}")
if self.verbose:
self._print(f" Args: {span_data.arguments}")
self.indent_level += 1
def on_span_end(self, span: Span) -> None:
self.indent_level -= 1
span_data = span.span_data
if isinstance(span_data, GenerationSpanData):
self._print(
f" ⏱️ Latency: {span_data.latency_ms:.2f}ms, "
f"Tokens: {span_data.total_tokens}"
)
elif isinstance(span_data, FunctionSpanData):
if span_data.error:
self._print(f" ❌ Error: {span_data.error}")
elif self.verbose:
self._print(f" ✓ Result: {span_data.result[:100]}...")
def shutdown(self) -> None:
self._print("🛑 SHUTDOWN")
def force_flush(self) -> None:
pass # Console doesn't need flushing
# ═══════════════════════════════════════════════════════════
# 使用
# ═══════════════════════════════════════════════════════════
from agents import set_trace_processors
processor = ConsoleTracingProcessor(verbose=True)
set_trace_processors([processor])
result = await Runner.run(agent, "What's 2+2?")
# 输出:
# 🚀 TRACE START: workflow_name [trace_abc123]
# 🤖 AGENT: Assistant (model: gpt-4o)
# 💭 LLM CALL: gpt-4o
# ⏱️ Latency: 345.67ms, Tokens: 25
# ✅ TRACE END: workflow_name
# Total tokens: 25
5.3.2 数据库 Tracing Processor¶
from agents import TracingProcessor, Trace, Span
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String, JSON, DateTime, Float
from datetime import datetime
import json
Base = declarative_base()
class TraceRecord(Base):
__tablename__ = "traces"
id = Column(Integer, primary_key=True)
trace_id = Column(String, unique=True, index=True)
workflow_name = Column(String)
metadata = Column(JSON)
data = Column(JSON)
created_at = Column(DateTime, default=datetime.utcnow)
class SpanRecord(Base):
__tablename__ = "spans"
id = Column(Integer, primary_key=True)
span_id = Column(String, unique=True, index=True)
trace_id = Column(String, index=True)
span_type = Column(String)
data = Column(JSON)
latency_ms = Column(Float)
created_at = Column(DateTime, default=datetime.utcnow)
class DatabaseTracingProcessor(TracingProcessor):
"""将 traces 存储到数据库"""
def __init__(self, db_url: str):
self.engine = create_async_engine(db_url)
self.SessionLocal = sessionmaker(
self.engine,
class_=AsyncSession,
expire_on_commit=False
)
# 创建表
asyncio.create_task(self._create_tables())
async def _create_tables(self):
async with self.engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
def on_trace_start(self, trace: Trace) -> None:
pass # 等待 trace_end 时一次性写入
def on_trace_end(self, trace: Trace) -> None:
asyncio.create_task(self._save_trace(trace))
async def _save_trace(self, trace: Trace):
data = trace.export()
if not data:
return
async with self.SessionLocal() as session:
record = TraceRecord(
trace_id=trace.trace_id,
workflow_name=trace.name,
metadata=trace.metadata or {},
data=data,
)
session.add(record)
await session.commit()
def on_span_start(self, span: Span) -> None:
pass # 等待 span_end
def on_span_end(self, span: Span) -> None:
asyncio.create_task(self._save_span(span))
async def _save_span(self, span: Span):
from agents.tracing.span_data import GenerationSpanData, FunctionSpanData
span_data = span.span_data
span_type = type(span_data).__name__
# 提取 latency
latency_ms = None
if isinstance(span_data, (GenerationSpanData, FunctionSpanData)):
latency_ms = span_data.latency_ms
async with self.SessionLocal() as session:
record = SpanRecord(
span_id=span.span_id,
trace_id=span.trace.trace_id if span.trace else None,
span_type=span_type,
data=span_data.__dict__,
latency_ms=latency_ms,
)
session.add(record)
await session.commit()
def shutdown(self) -> None:
asyncio.create_task(self.engine.dispose())
def force_flush(self) -> None:
pass # 已经是即时写入
# ═══════════════════════════════════════════════════════════
# 查询 API
# ═══════════════════════════════════════════════════════════
class TraceQuery:
def __init__(self, db_url: str):
self.engine = create_async_engine(db_url)
self.SessionLocal = sessionmaker(
self.engine,
class_=AsyncSession,
expire_on_commit=False
)
async def get_trace(self, trace_id: str) -> dict | None:
from sqlalchemy import select
async with self.SessionLocal() as session:
result = await session.execute(
select(TraceRecord).where(TraceRecord.trace_id == trace_id)
)
record = result.scalar_one_or_none()
if record:
return {
"trace_id": record.trace_id,
"workflow_name": record.workflow_name,
"metadata": record.metadata,
"data": record.data,
"created_at": record.created_at.isoformat(),
}
return None
async def get_recent_traces(self, limit: int = 10) -> list[dict]:
from sqlalchemy import select
async with self.SessionLocal() as session:
result = await session.execute(
select(TraceRecord)
.order_by(TraceRecord.created_at.desc())
.limit(limit)
)
records = result.scalars().all()
return [
{
"trace_id": r.trace_id,
"workflow_name": r.workflow_name,
"created_at": r.created_at.isoformat(),
}
for r in records
]
# ═══════════════════════════════════════════════════════════
# 使用
# ═══════════════════════════════════════════════════════════
db_url = "sqlite+aiosqlite:///traces.db"
# 设置 processor
processor = DatabaseTracingProcessor(db_url)
set_trace_processors([processor])
# 运行 agent
result = await Runner.run(agent, "Hello")
# 查询 traces
query = TraceQuery(db_url)
recent = await query.get_recent_traces(limit=5)
print(recent)
5.3.3 集成外部服务(Logfire 示例)¶
from agents import TracingProcessor, Trace, Span
import logfire
class LogfireTracingProcessor(TracingProcessor):
"""集成 Logfire 的 Tracing Processor"""
def __init__(self, project_name: str):
logfire.configure(project_name=project_name)
self.current_trace_span = None
def on_trace_start(self, trace: Trace) -> None:
# 创建 Logfire trace
self.current_trace_span = logfire.span(
name=trace.name,
trace_id=trace.trace_id,
attributes=trace.metadata or {},
)
self.current_trace_span.__enter__()
def on_trace_end(self, trace: Trace) -> None:
if self.current_trace_span:
self.current_trace_span.__exit__(None, None, None)
self.current_trace_span = None
def on_span_start(self, span: Span) -> None:
from agents.tracing.span_data import GenerationSpanData, FunctionSpanData
span_data = span.span_data
span_type = type(span_data).__name__
# 创建 Logfire span
attributes = {"span_type": span_type}
if isinstance(span_data, GenerationSpanData):
attributes.update({
"model": span_data.model,
"prompt_tokens": span_data.prompt_tokens,
"completion_tokens": span_data.completion_tokens,
})
elif isinstance(span_data, FunctionSpanData):
attributes.update({
"tool_name": span_data.tool_name,
"arguments": str(span_data.arguments),
})
logfire.log(
level="info",
message=f"Span started: {span_type}",
attributes=attributes,
)
def on_span_end(self, span: Span) -> None:
from agents.tracing.span_data import GenerationSpanData, FunctionSpanData
span_data = span.span_data
attributes = {}
if isinstance(span_data, GenerationSpanData):
attributes["latency_ms"] = span_data.latency_ms
attributes["total_tokens"] = span_data.total_tokens
elif isinstance(span_data, FunctionSpanData):
attributes["latency_ms"] = span_data.latency_ms
if span_data.error:
attributes["error"] = span_data.error
logfire.log(
level="info",
message=f"Span ended",
attributes=attributes,
)
def shutdown(self) -> None:
pass # Logfire handles shutdown
def force_flush(self) -> None:
pass # Logfire handles flushing
# ═══════════════════════════════════════════════════════════
# 使用
# ═══════════════════════════════════════════════════════════
processor = LogfireTracingProcessor(project_name="my-agents")
set_trace_processors([processor])
# Traces 会自动发送到 Logfire
result = await Runner.run(agent, "Hello")
5.4 自定义 Model Provider¶
from agents import Model, ModelProvider, ModelResponse
from typing import AsyncIterator
# ═══════════════════════════════════════════════════════════
# 自定义 Model
# ═══════════════════════════════════════════════════════════
class MyCustomModel(Model):
def __init__(self, model_name: str, api_key: str):
self.model_name = model_name
self.api_key = api_key
async def get_response(
self,
*,
system: str | None = None,
messages: list[dict],
tools: list[dict] | None = None,
output_schema: dict | None = None,
**kwargs
) -> ModelResponse:
# 调用你的 LLM API
response = await self._call_api(
system=system,
messages=messages,
tools=tools,
output_schema=output_schema,
)
return ModelResponse(
output=response["output"],
usage=response["usage"],
response_id=response["id"],
)
async def stream_response(
self,
*,
system: str | None = None,
messages: list[dict],
tools: list[dict] | None = None,
**kwargs
) -> AsyncIterator[dict]:
# 流式调用
async for chunk in self._call_api_streaming(
system=system,
messages=messages,
tools=tools,
):
yield chunk
async def _call_api(self, **kwargs) -> dict:
# 实现你的 API 调用逻辑
...
# ═══════════════════════════════════════════════════════════
# 自定义 ModelProvider
# ═══════════════════════════════════════════════════════════
class MyCustomModelProvider(ModelProvider):
def __init__(self, api_key: str):
self.api_key = api_key
def get_model(self, model_name: str | None) -> Model:
model_name = model_name or "default-model"
return MyCustomModel(model_name, self.api_key)
# ═══════════════════════════════════════════════════════════
# 使用
# ═══════════════════════════════════════════════════════════
provider = MyCustomModelProvider(api_key="sk-...")
agent = Agent(
name="Assistant",
model=provider.get_model("my-model-v1"),
)
result = await Runner.run(agent, "Hello")
6. 生产部署考量¶
6.1 性能优化¶
并发控制:
import asyncio
from agents import Agent, Runner
# ═══════════════════════════════════════════════════════════
# 1. 限制并发 Agent 运行数
# ═══════════════════════════════════════════════════════════
semaphore = asyncio.Semaphore(10) # 最多 10 个并发
async def run_with_semaphore(agent, input, **kwargs):
async with semaphore:
return await Runner.run(agent, input, **kwargs)
# 批量运行
tasks = [
run_with_semaphore(agent, input)
for input in inputs
]
results = await asyncio.gather(*tasks)
# ═══════════════════════════════════════════════════════════
# 2. Session 连接池(Redis 示例)
# ═══════════════════════════════════════════════════════════
from redis.asyncio import ConnectionPool, Redis
pool = ConnectionPool.from_url(
"redis://localhost:6379/0",
max_connections=50,
decode_responses=False,
)
async def get_session(session_id: str):
redis = Redis(connection_pool=pool)
return RedisSession(session_id, redis_client=redis)
# ═══════════════════════════════════════════════════════════
# 3. 工具超时控制
# ═══════════════════════════════════════════════════════════
@function_tool
async def slow_tool(query: str) -> str:
try:
async with asyncio.timeout(5.0): # 5 秒超时
result = await expensive_operation(query)
return result
except asyncio.TimeoutError:
return "Operation timed out"
缓存优化:
from functools import lru_cache
import hashlib
# ═══════════════════════════════════════════════════════════
# 工具结果缓存
# ═══════════════════════════════════════════════════════════
cache = {}
@function_tool
async def cached_search(query: str) -> str:
"""带缓存的搜索工具"""
cache_key = hashlib.md5(query.encode()).hexdigest()
if cache_key in cache:
return cache[cache_key]
result = await perform_search(query)
cache[cache_key] = result
return result
# ═══════════════════════════════════════════════════════════
# Session 缓存层
# ═══════════════════════════════════════════════════════════
class CachedSession(SessionABC):
def __init__(self, backend: SessionABC, cache_size: int = 100):
self.session_id = backend.session_id
self.session_settings = backend.session_settings
self._backend = backend
self._cache = []
self._cache_size = cache_size
self._dirty = False
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
if not self._cache or self._dirty:
self._cache = await self._backend.get_items()
self._dirty = False
if limit is None or limit <= 0:
return self._cache.copy()
else:
return self._cache[-limit:]
async def add_items(self, items: list[TResponseInputItem]) -> None:
self._cache.extend(items)
self._dirty = True
# 定期写回
if len(self._cache) > self._cache_size:
await self._backend.add_items(items)
self._dirty = False
6.2 错误处理与重试¶
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
# ═══════════════════════════════════════════════════════════
# 1. Model 调用重试
# ═══════════════════════════════════════════════════════════
class RetryableModel(Model):
def __init__(self, base_model: Model):
self.base_model = base_model
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((TimeoutError, ConnectionError)),
)
async def get_response(self, **kwargs) -> ModelResponse:
return await self.base_model.get_response(**kwargs)
# ═══════════════════════════════════════════════════════════
# 2. Graceful degradation
# ═══════════════════════════════════════════════════════════
@function_tool
async def search_with_fallback(query: str) -> str:
"""带降级的搜索工具"""
try:
# 主搜索引擎
return await primary_search(query)
except Exception as e:
logger.warning(f"Primary search failed: {e}")
try:
# 备用搜索引擎
return await fallback_search(query)
except Exception as e2:
logger.error(f"Fallback search failed: {e2}")
return "Search unavailable. Please try again later."
# ═══════════════════════════════════════════════════════════
# 3. Circuit Breaker
# ═══════════════════════════════════════════════════════════
from pybreaker import CircuitBreaker
breaker = CircuitBreaker(fail_max=5, timeout_duration=60)
@function_tool
async def external_api_call(endpoint: str) -> str:
"""带 circuit breaker 的 API 调用"""
@breaker
async def _call():
return await http_client.get(endpoint)
try:
return await _call()
except CircuitBreakerError:
return "Service temporarily unavailable"
6.3 监控与日志¶
import logging
from prometheus_client import Counter, Histogram
import time
# ═══════════════════════════════════════════════════════════
# 1. Structured Logging
# ═══════════════════════════════════════════════════════════
import structlog
logger = structlog.get_logger()
@function_tool
async def monitored_tool(input: str) -> str:
logger.info(
"tool_called",
tool_name="monitored_tool",
input_length=len(input),
)
try:
result = await process(input)
logger.info(
"tool_completed",
tool_name="monitored_tool",
result_length=len(result),
)
return result
except Exception as e:
logger.error(
"tool_failed",
tool_name="monitored_tool",
error=str(e),
)
raise
# ═══════════════════════════════════════════════════════════
# 2. Prometheus Metrics
# ═══════════════════════════════════════════════════════════
agent_runs = Counter("agent_runs_total", "Total agent runs", ["agent_name", "status"])
agent_latency = Histogram("agent_latency_seconds", "Agent latency", ["agent_name"])
async def run_with_metrics(agent: Agent, input: str, **kwargs):
start = time.time()
try:
result = await Runner.run(agent, input, **kwargs)
agent_runs.labels(agent_name=agent.name, status="success").inc()
return result
except Exception as e:
agent_runs.labels(agent_name=agent.name, status="error").inc()
raise
finally:
duration = time.time() - start
agent_latency.labels(agent_name=agent.name).observe(duration)
# ═══════════════════════════════════════════════════════════
# 3. Custom Tracing Metrics Processor
# ═══════════════════════════════════════════════════════════
from agents import TracingProcessor, Trace, Span
from agents.tracing.span_data import GenerationSpanData
token_usage = Counter(
"llm_tokens_total",
"LLM token usage",
["model", "type"] # type: prompt/completion
)
class MetricsTracingProcessor(TracingProcessor):
def on_span_end(self, span: Span) -> None:
if isinstance(span.span_data, GenerationSpanData):
data = span.span_data
token_usage.labels(model=data.model, type="prompt").inc(data.prompt_tokens)
token_usage.labels(model=data.model, type="completion").inc(data.completion_tokens)
def on_trace_start(self, trace: Trace) -> None:
pass
def on_trace_end(self, trace: Trace) -> None:
pass
def on_span_start(self, span: Span) -> None:
pass
def shutdown(self) -> None:
pass
def force_flush(self) -> None:
pass
set_trace_processors([MetricsTracingProcessor()])
6.4 Security Best Practices¶
# ═══════════════════════════════════════════════════════════
# 1. Input Sanitization Guardrail
# ═══════════════════════════════════════════════════════════
from agents import input_guardrail, InputGuardrailResult, GuardrailFunctionOutput
@input_guardrail
async def sanitize_input(
context: RunContextWrapper[Any],
input_items: list[TResponseInputItem]
) -> InputGuardrailResult:
"""检查并清理恶意输入"""
for item in input_items:
if item.get("role") == "user":
content = str(item.get("content", ""))
# 检查注入攻击
dangerous_patterns = ["<script>", "DROP TABLE", "eval("]
for pattern in dangerous_patterns:
if pattern in content:
return InputGuardrailResult(
output=GuardrailFunctionOutput(
tripwire_triggered=True,
tripwire_reason=f"Dangerous pattern detected: {pattern}"
)
)
return InputGuardrailResult(
output=GuardrailFunctionOutput(tripwire_triggered=False)
)
# ═══════════════════════════════════════════════════════════
# 2. Output Filtering Guardrail
# ═══════════════════════════════════════════════════════════
from agents import output_guardrail
@output_guardrail
async def filter_pii(
context: RunContextWrapper[Any],
output: str | dict
) -> OutputGuardrailResult:
"""过滤 PII(个人身份信息)"""
import re
output_str = str(output)
# 移除 SSN
output_str = re.sub(r'\d{3}-\d{2}-\d{4}', '[SSN REDACTED]', output_str)
# 移除信用卡号
output_str = re.sub(r'\d{4}-\d{4}-\d{4}-\d{4}', '[CARD REDACTED]', output_str)
return OutputGuardrailResult(
output=GuardrailFunctionOutput(
tripwire_triggered=False
),
modified_output=output_str,
)
# ═══════════════════════════════════════════════════════════
# 3. Rate Limiting
# ═══════════════════════════════════════════════════════════
from collections import defaultdict
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window = timedelta(seconds=window_seconds)
self.requests = defaultdict(list)
async def check(self, user_id: str) -> bool:
now = datetime.utcnow()
cutoff = now - self.window
# 清理过期请求
self.requests[user_id] = [
ts for ts in self.requests[user_id]
if ts > cutoff
]
# 检查限制
if len(self.requests[user_id]) >= self.max_requests:
return False
self.requests[user_id].append(now)
return True
rate_limiter = RateLimiter(max_requests=10, window_seconds=60)
async def run_with_rate_limit(agent: Agent, input: str, user_id: str, **kwargs):
if not await rate_limiter.check(user_id):
raise Exception("Rate limit exceeded")
return await Runner.run(agent, input, **kwargs)
# ═══════════════════════════════════════════════════════════
# 4. Secrets Management
# ═══════════════════════════════════════════════════════════
from typing import TypedDict
class SecureContext(TypedDict):
user_id: str
# 不要在 context 中存储明文 secrets
# 使用环境变量或 secrets manager
import os
@function_tool
async def call_external_api(
context: RunContextWrapper[SecureContext],
endpoint: str
) -> str:
# 从环境变量获取 API key
api_key = os.getenv("EXTERNAL_API_KEY")
# 或从 secrets manager
# api_key = await secrets_manager.get_secret("external_api_key")
return await http_client.get(
endpoint,
headers={"Authorization": f"Bearer {api_key}"}
)
7. 评估与建议¶
核心优势
架构优势:
- ✅ 清晰的关注点分离: Runner → AgentRunner → run_internal/* 三层架构,职责明确
- ✅ Protocol-based 扩展: Session/Model/Tracing 使用 Protocol 而非 ABC,降低耦合
- ✅ 一等公民抽象: Handoff、Session、Tracing 都是框架核心,非事后添加
- ✅ 完整的状态管理: RunState 支持 HITL、中断恢复、序列化
- ✅ 生产就绪: 内置 tracing、session、guardrails、错误处理
开发者体验:
- ✅ 简单的入门:
Runner.run(agent, input)即可开始 - ✅ 渐进式复杂度: 从简单到复杂(基础 → handoff → session → tracing → HITL)
- ✅ 类型安全: 广泛使用 type hints,IDE 支持良好
- ✅ 丰富示例: 12,775 行示例代码,覆盖各种场景
生态优势:
- ✅ Provider-agnostic: 100+ LLMs(不绑定 OpenAI)
- ✅ 活跃社区: 19.2k stars, 219 contributors
- ✅ 官方维护: OpenAI 团队积极维护(1,135 commits)
局限性
功能限制:
- ❌ 无 RAG 支持: 需要自行实现 vector stores、embeddings、retrieval
- ❌ 无 MCP 原生支持: 虽然可通过 tools 接入,但不如 Claude SDK 的 in-process MCP
- ⚠️ 有限的工具生态: 依赖开发者自定义工具(vs. LangChain 的 1000+ tools)
- ⚠️ Session 功能基础: 仅支持 append-only 历史(无摘要、compaction 等高级功能)
设计权衡:
- ⚠️ 轻量 vs. 功能: 专注 agent 编排,牺牲了 RAG、chains 等 LangChain 特性
- ⚠️ Handoff 单向性: 一次只处理一个 handoff(无并行 handoff)
- ⚠️ Tracing 开销: 内置 tracing 会增加少量性能开销(可禁用)
生产考量:
- ⚠️ 缺少内置速率限制: 需自行实现
- ⚠️ 缺少内置重试: 需手动包装(如 tenacity)
- ⚠️ Session 扩展性: SQLite 不适合高并发(推荐生产环境用 Redis/PostgreSQL)
推荐场景
- ✅ 构建 multi-agent 编排系统(客服、工作流自动化等)
- ✅ 需要跨 LLM provider(不绑定特定供应商)
- ✅ 需要轻量级、可控的 agent 框架
- ✅ 需要内置 HITL(Human-in-the-Loop)
- ✅ Python 应用中嵌入 agent 能力
- ✅ 需要完整的 tracing/observability
不推荐场景
- ❌ 需要 RAG/Vector stores(选 LangChain 或自行集成)
- ❌ 需要丰富的预构建工具(选 LangChain)
- ❌ 只需要简单的 LLM 调用(直接用 OpenAI SDK)
- ❌ 需要 TypeScript/JavaScript(用 Agents SDK JS/TS 版本)
需评估场景
- ⚠️ 大规模生产部署(需自行实现 rate limiting、circuit breaker 等)
- ⚠️ 复杂的 memory 需求(Session 功能较基础)
- ⚠️ 需要 UI(需自行构建,SDK 本身无 UI)
7.4 最佳实践¶
开发阶段:
1. 从简单 agent 开始,逐步添加 handoffs
2. 早期使用 ConsoleTracingProcessor 调试
3. 为关键操作添加 guardrails
4. 为需要审批的工具设置 needs_approval=True
测试阶段:
1. 使用 MemorySession 进行单元测试
2. 测试 HITL 流程(interruptions + resume)
3. 测试 handoff 链路(确保 input_filter 正确)
4. 测试 max_turns 限制
生产阶段: 1. 使用 Redis/PostgreSQL 作为 Session backend 2. 配置 tracing 到外部服务(Logfire/AgentOps 等) 3. 实现 rate limiting 和 circuit breaker 4. 添加 PII 过滤 guardrails 5. 监控 token 使用量(通过 tracing metrics)
性能优化: 1. 启用 parallel guardrails(与 model 调用并行) 2. 使用 connection pool(Redis/DB) 3. 为耗时工具设置超时 4. 考虑工具结果缓存 5. 限制并发 agent 数量(semaphore)
附录¶
A. 关键文件速查表¶
| 文件路径 | 行数 | 关键内容 |
|---|---|---|
src/agents/run.py |
1623 | AgentRunner.run() 主循环 (lines 396-1329) |
src/agents/run_internal/run_loop.py |
1623+ | run_single_turn(), get_new_response() |
src/agents/run_internal/tool_execution.py |
~1400 | execute_function_tool_calls() |
src/agents/run_internal/turn_resolution.py |
1623 | execute_handoffs(), process_model_response() |
src/agents/run_state.py |
2384 | RunState 序列化/反序列化 |
src/agents/handoffs/__init__.py |
334 | Handoff 类定义 |
src/agents/memory/session.py |
150 | Session Protocol |
src/agents/tracing/processor_interface.py |
142 | TracingProcessor Protocol |
src/agents/agent.py |
890 | Agent 类定义 |
src/agents/tool.py |
1288 | FunctionTool, @function_tool |
B. 术语表¶
| 术语 | 定义 |
|---|---|
| Agent | 配置化的 LLM 实例(instructions + tools + handoffs + guardrails + model) |
| Handoff | Agent 间控制权转移的专用工具 |
| Session | 自动管理对话历史的后端(SQLite/Redis/自定义) |
| RunState | 可序列化的运行状态(用于 HITL) |
| Guardrail | 输入/输出验证函数 |
| Tracing | 自动记录 agent 运行的可观测性系统 |
| HITL | Human-in-the-Loop,人工参与的工作流 |
| NextStep | Turn 结束后的下一步(FinalOutput/Handoff/Interruption/RunAgain) |
| Turn | 一次完整的 LLM 调用 + 工具执行周期 |
| Span | Tracing 中的操作单元(Agent/Generation/Function/Handoff/Guardrail) |
C. 参考资源¶
官方文档: - GitHub: https://github.com/openai/openai-agents-python - Docs: https://openai.github.io/openai-agents-python/ - Examples: https://github.com/openai/openai-agents-python/tree/main/examples
社区资源: - Discussions: https://github.com/openai/openai-agents-python/discussions - Issues: https://github.com/openai/openai-agents-python/issues
相关项目: - Swarm (deprecated): https://github.com/openai/swarm - Agents SDK JS/TS: https://github.com/openai/openai-agents-js - LiteLLM: https://github.com/BerriAI/litellm
文档版本: 1.0
作者: 深度技术调研团队
最后更新: 2026-02-26