OpenManus 为Manus项目的开源复刻,实现比较简单。
Manus 爆火后,MetaGPT团队3个小时完成第一版OpenManus(单Agent)。
项目地址: https://github.com/FoundationAgents/OpenManus
理解ReAct
下图源自Huggingface Agents Course
Query -> 模型 think -> Action -> 执行 Action -> 结果
user: 北京天气如何
assistant:
think: 需要获取北京的天气
action: {"name": "weather", "action_args": {"location": "北京"}}
observation: 今天是晴天
assistant: 今天是晴天
OpenManus 架构
单Agent
例如
help me write a brief instruction about Andrej Karpathy
帮我写一下关于 Andrej Karpathy 的简短说明
采用分层架构设计。通过继承构建智能体框架。
BaseAgent <-- ReActAgent <-- ToolCallAgent <-- Manus
BaseAgent: Agent基础框架,包括状态管理、内存管理、执行循环控制。
ReActAgent: ReAct(Reasoning Action) 模式定义,分为 think 和 act
ToolCallAgent: ReAct 工具实现
Manus: 与用户交互的Agent,集成多个工具。
BaseAgent
所有Agent的基类,定义了Agent 状态管理和通过while执行循环的核心逻辑。
这里使用的模板方法设计模式,BaseAgent定义 Agent 执行框架,具体执行方法(step)由子类实现。
class BaseAgent(BaseModel, ABC):
async def run(self, request: Optional[str] = None) -> str:
"""Execute the agent's main loop asynchronously.
Args:
request: Optional initial user request to process.
Returns:
A string summarizing the execution results.
Raises:
RuntimeError: If the agent is not in IDLE state at start.
"""
if self.state != AgentState.IDLE:
raise RuntimeError(f"Cannot run agent from state: {self.state}")
if request:
self.update_memory("user", request)
results: List[str] = []
async with self.state_context(AgentState.RUNNING):
while (
self.current_step < self.max_steps and self.state != AgentState.FINISHED
):
self.current_step += 1
logger.info(f"Executing step {self.current_step}/{self.max_steps}")
step_result = await self.step()
# Check for stuck state
if self.is_stuck():
self.handle_stuck_state()
results.append(f"Step {self.current_step}: {step_result}")
if self.current_step >= self.max_steps:
self.current_step = 0
self.state = AgentState.IDLE
results.append(f"Terminated: Reached max steps ({self.max_steps})")
await SANDBOX_CLIENT.cleanup()
return "\n".join(results) if results else "No steps executed"
ReActAgent
ReActAgent 实现了ReAct模式,将Agent 执行过程分为 think 和 act 。
使用模板方法设计模式,体现了 ReAct 核心思想 Think -> Action -> Observation 循环过程,具体流程交子类实现
class ReActAgent(BaseAgent, ABC):
name: str
description: Optional[str] = None
system_prompt: Optional[str] = None
next_step_prompt: Optional[str] = None
llm: Optional[LLM] = Field(default_factory=LLM)
memory: Memory = Field(default_factory=Memory)
state: AgentState = AgentState.IDLE
max_steps: int = 10
current_step: int = 0
@abstractmethod
async def think(self) -> bool:
"""Process current state and decide next action"""
@abstractmethod
async def act(self) -> str:
"""Execute decided actions"""
async def step(self) -> str:
"""Execute a single step: think and act."""
should_act = await self.think()
if not should_act:
return "Thinking complete - no action needed"
return await self.act()
ToolCallAgent
基于ReAct基础上增加工具调用功能。
Think: 调用大模型思考使用什么工具
Action: 根据模型的输出执行工具
Observation:将工具结果给大模型
class ToolCallAgent(ReActAgent):
system_prompt: str = SYSTEM_PROMPT
next_step_prompt: str = NEXT_STEP_PROMPT
available_tools: ToolCollection = ToolCollection(
CreateChatCompletion(), Terminate()
)
tool_choices: TOOL_CHOICE_TYPE = ToolChoice.AUTO # type: ignore
special_tool_names: List[str] = Field(default_factory=lambda: [Terminate().name])
tool_calls: List[ToolCall] = Field(default_factory=list)
async def think(self) -> bool:
"""Process current state and decide next actions using tools"""
if self.next_step_prompt:
user_msg = Message.user_message(self.next_step_prompt)
self.messages += [user_msg]
# Get response with tool options
response = await self.llm.ask_tool(
messages=self.messages,
system_msgs=(
[Message.system_message(self.system_prompt)]
if self.system_prompt
else None
),
tools=self.available_tools.to_params(),
tool_choice=self.tool_choices,
)
# 处理工具调用
self.tool_calls = tool_calls = (
response.tool_calls if response and response.tool_calls else []
)
content = response.content if response and response.content else ""
# Create and add assistant message
assistant_msg = (
Message.from_tool_calls(content=content, tool_calls=self.tool_calls)
if self.tool_calls
else Message.assistant_message(content)
)
self.memory.add_message(assistant_msg)
if self.tool_choices == ToolChoice.REQUIRED and not self.tool_calls:
return True # Will be handled in act()
# For 'auto' mode, continue with content if no commands but content exists
if self.tool_choices == ToolChoice.AUTO and not self.tool_calls:
return bool(content)
return bool(self.tool_calls)
async def act(self) -> str:
"""Execute tool calls and handle their results"""
if not self.tool_calls:
if self.tool_choices == ToolChoice.REQUIRED:
raise ValueError(TOOL_CALL_REQUIRED)
# Return last message content if no tool calls
return self.messages[-1].content or "No content or commands to execute"
results = []
for command in self.tool_calls:
# Reset base64_image for each tool call
self._current_base64_image = None
result = await self.execute_tool(command)
if self.max_observe:
result = result[: self.max_observe]
# Add tool response to memory
tool_msg = Message.tool_message(
content=result,
tool_call_id=command.id,
name=command.function.name,
base64_image=self._current_base64_image,
)
self.memory.add_message(tool_msg)
results.append(result)
return "\n\n".join(results)
Manus
能用Agent 实例,集成各种工具
class Manus(ToolCallAgent):
"""A versatile general-purpose agent with support for both local and MCP tools."""
name: str = "Manus"
description: str = "A versatile agent that can solve various tasks using multiple tools including MCP-based tools"
# MCP clients for remote tool access
mcp_clients: MCPClients = Field(default_factory=MCPClients)
# Add general-purpose tools to the tool collection
available_tools: ToolCollection = Field(
default_factory=lambda: ToolCollection(
PythonExecute(),
BrowserUseTool(),
StrReplaceEditor(),
AskHuman(),
Terminate(),
)
)
special_tool_names: list[str] = Field(default_factory=lambda: [Terminate().name])
@classmethod
async def create(cls, **kwargs) -> "Manus":
"""Factory method to create and properly initialize a Manus instance."""
instance = cls(**kwargs)
await instance.initialize_mcp_servers()
instance._initialized = True
return instance
async def cleanup(self):
"""Clean up Manus agent resources."""
if self.browser_context_helper:
await self.browser_context_helper.cleanup_browser()
# Disconnect from all MCP servers only if we were initialized
if self._initialized:
await self.disconnect_mcp_server()
self._initialized = False
async def think(self) -> bool:
"""Process current state and decide next actions with appropriate context."""
if not self._initialized:
await self.initialize_mcp_servers()
self._initialized = True
original_prompt = self.next_step_prompt
recent_messages = self.memory.messages[-3:] if self.memory.messages else []
browser_in_use = any(
tc.function.name == BrowserUseTool().name
for msg in recent_messages
if msg.tool_calls
for tc in msg.tool_calls
)
# 为浏览器工具添加特殊上下文
if browser_in_use:
self.next_step_prompt = (
await self.browser_context_helper.format_next_step_prompt()
)
result = await super().think()
# Restore original prompt
self.next_step_prompt = original_prompt
return result
工具系统
BaseTool
通过 BaseTool 实现了一套统一的工具接口。
class BaseTool(ABC, BaseModel):
name: str
description: str
parameters: Optional[dict] = None
class Config:
arbitrary_types_allowed = True
async def __call__(self, **kwargs) -> Any:
"""Execute the tool with given parameters."""
return await self.execute(**kwargs)
@abstractmethod
async def execute(self, **kwargs) -> Any:
"""Execute the tool with given parameters."""
def to_param(self) -> Dict:
"""Convert tool to function call format."""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
},
}
终止工具 Terminate
Terminate 是一个特殊工具,允许Agent通过大模型自主决定何时结束任务,避免无限循环或过早结束。
_TERMINATE_DESCRIPTION = """Terminate the interaction when the request is met OR if the assistant cannot proceed further with the task.
When you have finished all the tasks, call this tool to end the work."""
class Terminate(BaseTool):
name: str = "terminate"
description: str = _TERMINATE_DESCRIPTION
parameters: dict = {
"type": "object",
"properties": {
"status": {
"type": "string",
"description": "The finish status of the interaction.",
"enum": ["success", "failure"],
}
},
"required": ["status"],
}
async def execute(self, status: str) -> str:
"""Finish the current execution"""
return f"The interaction has been completed with status: {status}"
询问工具 AskHuman
允许Agent 在遇到无法解决的问题时向人类寻求帮助(给用户输入框),让我们可以更好地干预Agent完成任务的过程。
class AskHuman(BaseTool):
"""Add a tool to ask human for help."""
name: str = "ask_human"
description: str = "Use this tool to ask human for help."
parameters: str = {
"type": "object",
"properties": {
"inquire": {
"type": "string",
"description": "The question you want to ask human.",
}
},
"required": ["inquire"],
}
async def execute(self, inquire: str) -> str:
return input(f"""Bot: {inquire}\n\nYou: """).strip()
ToolCollection
可以灵活管理多个工具实例,提供统一的工具注册和执行接口,实现工具可以插拔。
class ToolCollection:
"""A collection of defined tools."""
class Config:
arbitrary_types_allowed = True
def __init__(self, *tools: BaseTool):
self.tools = tools
self.tool_map = {tool.name: tool for tool in tools}
def __iter__(self):
return iter(self.tools)
def to_params(self) -> List[Dict[str, Any]]:
return [tool.to_param() for tool in self.tools]
async def execute(
self, *, name: str, tool_input: Dict[str, Any] = None
) -> ToolResult:
tool = self.tool_map.get(name)
if not tool:
return ToolFailure(error=f"Tool {name} is invalid")
try:
result = await tool(**tool_input)
return result
except ToolError as e:
return ToolFailure(error=e.message)
async def execute_all(self) -> List[ToolResult]:
"""Execute all tools in the collection sequentially."""
results = []
for tool in self.tools:
try:
result = await tool()
results.append(result)
except ToolError as e:
results.append(ToolFailure(error=e.message))
return results
def get_tool(self, name: str) -> BaseTool:
return self.tool_map.get(name)
def add_tool(self, tool: BaseTool):
"""Add a single tool to the collection.
If a tool with the same name already exists, it will be skipped and a warning will be logged.
"""
if tool.name in self.tool_map:
logger.warning(f"Tool {tool.name} already exists in collection, skipping")
return self
self.tools += (tool,)
self.tool_map[tool.name] = tool
return self
def add_tools(self, *tools: BaseTool):
"""Add multiple tools to the collection.
If any tool has a name conflict with an existing tool, it will be skipped and a warning will be logged.
"""
for tool in tools:
self.add_tool(tool)
return self
执行流程
基于ReAct模式实现了智能体的执行流程,通过 think act 不断迭代解决复杂问题
class ReActAgent(BaseAgent, ABC):
name: str
description: Optional[str] = None
system_prompt: Optional[str] = None
next_step_prompt: Optional[str] = None
llm: Optional[LLM] = Field(default_factory=LLM)
memory: Memory = Field(default_factory=Memory)
state: AgentState = AgentState.IDLE
max_steps: int = 10
current_step: int = 0
@abstractmethod
async def think(self) -> bool:
"""Process current state and decide next action"""
@abstractmethod
async def act(self) -> str:
"""Execute decided actions"""
async def step(self) -> str:
"""Execute a single step: think and act."""
should_act = await self.think()
if not should_act:
return "Thinking complete - no action needed"
return await self.act()
提示词工程
通过系统提示词和下一步提示词,引导LLM生成响应和工具调用。
async def think(self) -> bool:
# Get response with tool options
response = await self.llm.ask_tool(
messages=self.messages,
system_msgs=(
[Message.system_message(self.system_prompt)]
if self.system_prompt
else None
),
tools=self.available_tools.to_params(),
tool_choice=self.tool_choices,
)
self.tool_calls = tool_calls = (
response.tool_calls if response and response.tool_calls else []
)
# ...
async def act(self) -> str:
"""Execute tool calls and handle their results"""
for command in self.tool_calls:
result = await self.execute_tool(command)
...
# Add tool response to memory
tool_msg = Message.tool_message(
content=result,
tool_call_id=command.id,
name=command.function.name,
base64_image=self._current_base64_image,
)
self.memory.add_message(tool_msg)
....