OpenManus - 深度技术调研¶
项目概览
调研日期: 2026-02-26
仓库: https://github.com/FoundationAgents/OpenManus
许可证: MIT
主要语言: Python
定位: 快速复现通用 Manus 风格 Agent,强调可用性和社区共建
1. 项目概述¶
1.1 核心定位¶
OpenManus 是一个 Python 主体的可运行框架,旨在快速复现 Manus 风格的通用 Agent,特点: - ✅ 基于 ReAct 模式的 Agent 实现 - ✅ MCP 客户端与服务器双角色 - ✅ 多 Agent 流程系统(实验性) - ✅ 丰富的工具集成 - ✅ 浏览器自动化支持
1.2 三种运行模式¶
- main.py: 基础 Manus agent
- run_mcp.py: MCP-enabled agent(支持 stdio/SSE)
- run_flow.py: Multi-agent flow 系统(标注为不稳定)
2. 代码架构深度分析¶
2.1 目录结构¶
OpenManus/
├── main.py # 基础 agent 启动器
├── run_mcp.py # MCP agent 启动器 (116 lines)
├── run_flow.py # Multi-agent flow 启动器 (52 lines)
├── run_mcp_server.py # MCP server 启动器
├── app/
│ ├── agent/ # Agent 实现
│ │ ├── base.py # BaseAgent 抽象类 (196 lines)
│ │ ├── react.py # ReAct agent (38 lines)
│ │ ├── toolcall.py # Tool-calling agent (250 lines)
│ │ ├── manus.py # 主 Manus agent (165 lines)
│ │ ├── mcp.py # MCP-specific agent (185 lines)
│ │ ├── browser.py # Browser context helper
│ │ ├── data_analysis.py # Data analysis agent
│ │ └── swe.py # Software engineering agent
│ ├── flow/ # Multi-agent flow 系统
│ │ ├── base.py # BaseFlow 抽象 (57 lines)
│ │ ├── flow_factory.py # Flow creation factory (30 lines)
│ │ └── planning.py # Planning flow (442 lines)
│ ├── tool/ # 工具实现
│ │ ├── base.py # BaseTool 抽象类 (181 lines)
│ │ ├── tool_collection.py # 工具集合管理器 (71 lines)
│ │ ├── mcp.py # MCP client tools (194 lines)
│ │ ├── python_execute.py # Python 执行
│ │ ├── str_replace_editor.py # 文件编辑 (432 lines)
│ │ ├── planning.py # 规划工具 (363 lines)
│ │ ├── bash.py # Bash 执行
│ │ ├── browser_use_tool.py # 浏览器自动化
│ │ └── search/ # Web 搜索工具
│ ├── mcp/ # MCP server 实现
│ │ └── server.py # FastMCP server (180 lines)
│ ├── prompt/ # Agent prompts
│ │ ├── manus.py
│ │ ├── mcp.py
│ │ ├── planning.py
│ │ └── toolcall.py
│ ├── sandbox/ # Sandbox 执行
│ ├── llm.py # LLM 接口 (766 lines)
│ ├── config.py # 配置管理 (372 lines)
│ ├── schema.py # 数据模型 (187 lines)
│ └── logger.py # 日志工具
├── config/
│ ├── config.example.toml # 主配置 (113 lines)
│ └── mcp.example.json # MCP 服务器配置 (8 lines)
└── protocol/a2a/ # Agent-to-Agent 协议
2.2 Agent 继承链¶
BaseAgent (app/agent/base.py:13-196):
- 核心属性: name, description, llm, memory, state
- 状态管理: IDLE → RUNNING → FINISHED → ERROR
- 主循环: run() 方法 (Lines 116-154)
- 抽象方法: step() 必须由子类实现
关键代码 (app/agent/base.py:116-154):
async def run(self, request: Optional[str] = None) -> str:
"""Execute the agent's main loop asynchronously."""
if self.state != AgentState.IDLE:
raise RuntimeError(f"Cannot run agent from state: {self.state}")
if request:
self.update_memory("user", request)
results: List[str] = []
async with self.state_context(AgentState.RUNNING):
while (
self.current_step < self.max_steps and self.state != AgentState.FINISHED
):
self.current_step += 1
logger.info(f"Executing step {self.current_step}/{self.max_steps}")
step_result = await self.step()
# Check for stuck state
if self.is_stuck():
self.handle_stuck_state()
results.append(f"Step {self.current_step}: {step_result}")
if self.current_step >= self.max_steps:
self.current_step = 0
self.state = AgentState.IDLE
results.append(f"Terminated: Reached max steps ({self.max_steps})")
await SANDBOX_CLIENT.cleanup()
return "\n".join(results) if results else "No steps executed"
ReActAgent (app/agent/react.py):
class ReActAgent(BaseAgent, ABC):
@abstractmethod
async def think(self) -> bool:
"""Process current state and decide next action"""
@abstractmethod
async def act(self) -> str:
"""Execute decided actions"""
async def step(self) -> str:
"""Execute a single step: think and act."""
should_act = await self.think()
if not should_act:
return "Thinking complete - no action needed"
return await self.act()
ToolCallAgent (app/agent/toolcall.py:18-250):
- 工具执行引擎
- 关键方法:
- think(): 使用 LLM 决定调用哪些工具 (Lines 39-129)
- act(): 执行工具调用并处理结果 (Lines 131-164)
- execute_tool(): 处理单个工具执行 (Lines 166-208)
关键代码 - 工具执行 (app/agent/toolcall.py):
async def think(self) -> bool:
"""Process current state and decide next actions using tools"""
response = await self.llm.ask_tool(
messages=self.messages,
tools=self.available_tools.to_params(),
tool_choice=self.tool_choices,
)
self.tool_calls = response.tool_calls if response else []
return bool(self.tool_calls)
async def act(self) -> str:
"""Execute tool calls and handle their results"""
results = []
for command in self.tool_calls:
result = await self.execute_tool(command)
tool_msg = Message.tool_message(
content=result,
tool_call_id=command.id,
name=command.function.name,
)
self.memory.add_message(tool_msg)
results.append(result)
return "\n\n".join(results)
3. 底层实现原理¶
3.1 Manus Agent - 主实现¶
文件: app/agent/manus.py:18-165
架构:
class Manus(ToolCallAgent):
# 内置工具集合
available_tools: ToolCollection = Field(
default_factory=lambda: ToolCollection(
PythonExecute(),
BrowserUseTool(),
StrReplaceEditor(),
AskHuman(),
Terminate(), # Lines 34-42
)
)
# MCP 客户端用于远程工具
mcp_clients: MCPClients = Field(default_factory=MCPClients) # Line 31
# 浏览器上下文助手
browser_context_helper: Optional[BrowserContextHelper] = None
MCP Server 初始化 (Lines 67-89):
async def initialize_mcp_servers(self) -> None:
for server_id, server_config in config.mcp_config.servers.items():
if server_config.type == "sse":
await self.connect_mcp_server(server_config.url, server_id)
elif server_config.type == "stdio":
await self.connect_mcp_server(
server_config.command,
server_id,
use_stdio=True,
stdio_args=server_config.args, # Lines 78-84
)
动态工具管理 (Lines 91-112):
async def connect_mcp_server(self, server_url: str, server_id: str):
if use_stdio:
await self.mcp_clients.connect_stdio(
server_url, stdio_args or [], server_id # Lines 100-102
)
else:
await self.mcp_clients.connect_sse(server_url, server_id)
# 添加新 MCP 工具到可用工具
new_tools = [
tool for tool in self.mcp_clients.tools
if tool.server_id == server_id
]
self.available_tools.add_tools(*new_tools) # Lines 109-112
3.2 MCP 实现 - Client & Server¶
MCP Client (app/tool/mcp.py)¶
MCPClients 类架构:
class MCPClients(ToolCollection):
sessions: Dict[str, ClientSession] = {}
async def connect_sse(self, server_url: str, server_id: str = "") -> None:
exit_stack = AsyncExitStack()
streams = await exit_stack.enter_async_context(sse_client(url=server_url))
session = await exit_stack.enter_async_context(ClientSession(*streams))
self.sessions[server_id] = session
async def connect_stdio(self, command: str, args: List[str], server_id: str = "") -> None:
server_params = StdioServerParameters(command=command, args=args)
transport = await exit_stack.enter_async_context(stdio_client(server_params))
session = await exit_stack.enter_async_context(ClientSession(*transport))
self.sessions[server_id] = session
MCPClientTool - 远程工具代理:
class MCPClientTool(BaseTool):
session: Optional[ClientSession] = None
server_id: str = ""
original_name: str = ""
async def execute(self, **kwargs) -> ToolResult:
result = await self.session.call_tool(self.original_name, kwargs)
content_str = ", ".join(
item.text for item in result.content
if isinstance(item, TextContent)
)
return ToolResult(output=content_str or "No output returned.")
MCP Server (app/mcp/server.py)¶
服务器架构:
class MCPServer:
def __init__(self, name: str = "openmanus"):
self.server = FastMCP(name)
self.tools["bash"] = Bash()
self.tools["browser"] = BrowserUseTool()
self.tools["editor"] = StrReplaceEditor()
self.tools["terminate"] = Terminate()
工具注册:
def register_tool(self, tool: BaseTool, method_name: Optional[str] = None):
tool_name = method_name or tool.name
tool_param = tool.to_param()
tool_function = tool_param["function"]
# 创建异步包装器
async def tool_method(**kwargs):
result = await tool.execute(**kwargs)
if hasattr(result, "model_dump"):
return json.dumps(result.model_dump())
elif isinstance(result, dict):
return json.dumps(result)
return result
# 从工具参数构建函数签名
tool_method.__name__ = tool_name
tool_method.__doc__ = self._build_docstring(tool_function)
tool_method.__signature__ = self._build_signature(tool_function)
self.server.tool()(tool_method) # 注册到 FastMCP
3.3 Flow 系统 - Multi-Agent 编排¶
BaseFlow (app/flow/base.py:9-57)¶
class BaseFlow(BaseModel, ABC):
agents: Dict[str, BaseAgent]
tools: Optional[List] = None
primary_agent_key: Optional[str] = None
@property
def primary_agent(self) -> Optional[BaseAgent]:
return self.agents.get(self.primary_agent_key)
@abstractmethod
async def execute(self, input_text: str) -> str:
"""Execute the flow with given input"""
PlanningFlow (app/flow/planning.py)¶
核心架构:
class PlanningFlow(BaseFlow):
llm: LLM = Field(default_factory=lambda: LLM())
planning_tool: PlanningTool = Field(default_factory=PlanningTool)
executor_keys: List[str] = Field(default_factory=list)
active_plan_id: str = Field(default_factory=lambda: f"plan_{int(time.time())}")
current_step_index: Optional[int] = None
执行流程:
async def execute(self, input_text: str) -> str:
if input_text:
await self._create_initial_plan(input_text)
while True:
self.current_step_index, step_info = await self._get_current_step_info()
if self.current_step_index is None:
result += await self._finalize_plan()
break
step_type = step_info.get("type") if step_info else None
executor = self.get_executor(step_type)
step_result = await self._execute_step(executor, step_info)
result += step_result + "\n"
Agent 选择 (Lines 77-92):
def get_executor(self, step_type: Optional[str] = None) -> BaseAgent:
# 如果步骤类型匹配 agent key,使用该 agent
if step_type and step_type in self.agents:
return self.agents[step_type]
# 否则使用第一个可用的 executor
for key in self.executor_keys:
if key in self.agents:
return self.agents[key]
# 回退到 primary agent
return self.primary_agent
使用 LLM 创建计划 (Lines 136-211):
async def _create_initial_plan(self, request: str):
# 使用 agent 描述构建系统消息
agents_description = [
{"name": key.upper(), "description": self.agents[key].description}
for key in self.executor_keys if key in self.agents
]
system_message_content = (
"You are a planning assistant. Create a concise, actionable plan..."
)
if len(agents_description) > 1:
system_message_content += (
f"\nNow we have {agents_description} agents. "
"When creating steps, specify agent names using '[agent_name]'."
)
# 使用 PlanningTool 调用 LLM
response = await self.llm.ask_tool(
messages=[user_message],
system_msgs=[system_message],
tools=[self.planning_tool.to_param()],
tool_choice=ToolChoice.AUTO, # Lines 170-176
)
3.4 工具系统架构¶
BaseTool (app/tool/base.py:78-175)¶
class BaseTool(ABC, BaseModel):
name: str
description: str
parameters: Optional[dict] = None
@abstractmethod
async def execute(self, **kwargs) -> Any:
"""使用给定参数执行工具"""
def to_param(self) -> Dict:
"""转换为 OpenAI function calling 格式"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
},
}
ToolResult 模型 (Lines 38-76):
class ToolResult(BaseModel):
output: Any = Field(default=None)
error: Optional[str] = Field(default=None)
base64_image: Optional[str] = Field(default=None)
system: Optional[str] = Field(default=None)
def __str__(self):
return f"Error: {self.error}" if self.error else self.output
ToolCollection (app/tool/tool_collection.py:9-71)¶
class ToolCollection:
def __init__(self, *tools: BaseTool):
self.tools = tools
self.tool_map = {tool.name: tool for tool in tools}
def to_params(self) -> List[Dict[str, Any]]:
return [tool.to_param() for tool in self.tools]
async def execute(self, *, name: str, tool_input: Dict[str, Any] = None):
tool = self.tool_map.get(name)
if not tool:
return ToolFailure(error=f"Tool {name} is invalid")
result = await tool(**tool_input)
return result
def add_tools(self, *tools: BaseTool):
for tool in tools:
if tool.name in self.tool_map:
logger.warning(f"Tool {tool.name} already exists, skipping")
continue
self.tools += (tool,)
self.tool_map[tool.name] = tool
3.5 LLM 模块¶
文件: app/llm.py:174-766
Singleton 模式:
class LLM:
_instances: Dict[str, "LLM"] = {}
def __new__(cls, config_name: str = "default", llm_config: Optional[LLMSettings] = None):
if config_name not in cls._instances:
instance = super().__new__(cls)
instance.__init__(config_name, llm_config)
cls._instances[config_name] = instance
return cls._instances[config_name]
Token 管理 (Lines 238-264):
def update_token_count(self, input_tokens: int, completion_tokens: int = 0):
self.total_input_tokens += input_tokens
self.total_completion_tokens += completion_tokens
logger.info(
f"Token usage: Input={input_tokens}, Completion={completion_tokens}, "
f"Cumulative Total={self.total_input_tokens + self.total_completion_tokens}"
)
def check_token_limit(self, input_tokens: int) -> bool:
if self.max_input_tokens is not None:
return (self.total_input_tokens + input_tokens) <= self.max_input_tokens
return True
工具调用 (Lines 644-766):
async def ask_tool(
self,
messages: List[Union[dict, Message]],
system_msgs: Optional[List[Union[dict, Message]]] = None,
tools: Optional[List[dict]] = None,
tool_choice: TOOL_CHOICE_TYPE = ToolChoice.AUTO,
**kwargs,
) -> ChatCompletionMessage | None:
# 格式化消息(支持图片)
# 计算 tokens
# 检查 token 限制
# 设置参数
# 调用 OpenAI API
response: ChatCompletion = await self.client.chat.completions.create(**params)
return response.choices[0].message
4. 扩展开发指南¶
4.1 添加自定义工具¶
方法 1: 创建 BaseTool 子类
# app/tool/custom_tool.py
from app.tool.base import BaseTool, ToolResult
class CustomTool(BaseTool):
name: str = "custom_tool"
description: str = "Tool description"
parameters: dict = {
"type": "object",
"properties": {
"param1": {
"type": "string",
"description": "Parameter description",
},
},
"required": ["param1"],
}
async def execute(self, param1: str, **kwargs) -> ToolResult:
result = f"Processed: {param1}"
return ToolResult(output=result)
# 添加到 Manus agent
from app.tool.custom_tool import CustomTool
available_tools: ToolCollection = Field(
default_factory=lambda: ToolCollection(
PythonExecute(),
CustomTool(), # 在这里添加工具
Terminate(),
)
)
4.2 配置 MCP Servers¶
编辑 config/mcp.json:
{
"mcpServers": {
"filesystem": {
"type": "stdio",
"command": "python",
"args": ["-m", "mcp_server_filesystem"]
},
"web_server": {
"type": "sse",
"url": "http://localhost:8000/sse"
}
}
}
4.3 定义 Multi-Agent Flows¶
创建自定义 Flow:
# app/flow/custom_flow.py
from app.flow.base import BaseFlow
class CustomFlow(BaseFlow):
async def execute(self, input_text: str) -> str:
# 步骤 1: 使用 primary agent 进行初始分析
analysis = await self.primary_agent.run(
f"Analyze this request: {input_text}"
)
# 步骤 2: 路由到专门的 agent
if "data" in input_text.lower():
specialist = self.get_agent("data_analysis")
result = await specialist.run(analysis)
else:
specialist = self.get_agent("manus")
result = await specialist.run(analysis)
return result
# 在 FlowFactory 中注册
class FlowType(str, Enum):
PLANNING = "planning"
CUSTOM = "custom"
flows = {
FlowType.PLANNING: PlanningFlow,
FlowType.CUSTOM: CustomFlow,
}
5. 评估与建议¶
核心优势
- ✅ 简单易用: Python + 清晰的继承链
- ✅ MCP 双角色: 既是客户端也是服务器
- ✅ 多 Agent 支持: Flow 系统(虽然不稳定)
- ✅ 工具丰富: 内置多种工具 + MCP 扩展
局限性
- ⚠️ Flow 系统不稳定:
run_flow.py标注为不稳定 - ❌ 无内置 Session 管理: 需手动管理历史
- ❌ 无 Tracing: 需自行实现
- ⚠️ 文档不完整: 部分功能缺少文档
推荐场景
- ✅ 快速搭建通用任务 Agent
- ✅ MCP 工具实验
- ✅ 多 Agent 研究(接受不稳定)
不推荐场景
- ❌ 生产级应用(Flow 系统不稳定)
- ❌ 需要企业级特性(session、tracing)
6. 关键文件清单¶
Core Implementation Files¶
| 文件路径 | 描述 |
|---|---|
app/agent/base.py |
核心 Agent 循环,BaseAgent 抽象类 |
app/agent/react.py |
ReAct 模式实现 |
app/agent/toolcall.py |
工具调用 Agent 实现 |
app/mcp/server.py |
MCP 服务器实现 |
app/tool/mcp.py |
MCP 客户端实现 |
app/flow/planning.py |
Flow 编排系统 |
app/tool/tool_collection.py |
工具集合管理 |
Agent 继承链¶
BaseAgent (app/agent/base.py)
↓
ReActAgent (app/agent/react.py)
↓
ToolCallAgent (app/agent/toolcall.py)
↓
Manus (app/agent/manus.py)
文档版本: 1.1
最后更新: 2026-02-27