
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
AskTable 的核心能力来自于其强大的 AI Agent 系统。本文将深入解析 AskTable AI Agent 的架构设计和工作原理,帮助你理解如何构建一个生产级的 AI Agent 系统。
AskTable 的 AI Agent 系统由以下核心组件构成:
┌─────────────────────────────────────────────────────────┐ │ Agent Base Class │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ Message │ │ Tool │ │ Output │ │ │ │ Builder │ │ Registry │ │ Parser │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ └─────────────────────────────────────────────────────────┘ ▲ │ ┌─────────────────┼─────────────────┐ │ │ │ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │ DB │ │ Data │ │ Chart │ │ Agent │ │ Node │ │ Node │ │ │ │ Agent │ │ Agent │ └─────────┘ └─────────┘ └─────────┘
AskTable Agent 系统遵循以下设计原则:
Agent 基类位于
app/atserver/ai/agent_rev.py,提供了完整的 Agent 运行框架:
class Agent: def __init__( self, system_prompt: Any, max_tool_calls: int | None = None, max_completions: int | None = None, model: str | None = None, ): self.llm_client, default_model = get_current_llm("report") self.model = model or default_model # 核心状态 self.system_prompt = system_prompt self.message_builder = ChatMessageBuilder(system_prompt) self.tools: list[ChatCompletionToolParam] = [] self.actions: dict[str, Callable[..., Any]] = {} self.output_parser: Callable[..., Any] | None = None # 限制设置 self.max_tool_calls = max_tool_calls or config.at_agent_max_tool_calls self.max_completions = max_completions or (config.at_agent_max_tool_calls + 10)
关键组件说明:
message_builder:管理对话历史,支持 OpenAI 和 Anthropic 格式转换tools:注册的工具列表,符合 OpenAI Function Calling 规范actions:工具名称到实际执行函数的映射output_parser:可选的输出解析器,用于提取结构化结果Agent 通过
add_tool() 方法动态注册工具:
def add_tool(self, tool: Callable[..., Any]) -> None: tool_name = getattr(tool, "__name__", tool.__class__.__name__) self.tools.append({ "type": "function", "function": { "name": tool_name, "description": tool.__doc__ or "", "parameters": TypeAdapter(tool).json_schema(), }, }) self.actions[tool_name] = tool
工作原理:
TypeAdapter 自动生成 JSON Schematools 列表actions 字典这种设计使得添加新工具非常简单,只需定义一个带类型注解的函数即可。
Agent 的
run() 方法实现了完整的执行循环:
async def run(self, user_input: str | None = None) -> AsyncGenerator[StreamEvent, None]: # 第一步:处理用户输入 async for chunk in self.step(user_input): yield chunk # 持续执行直到完成 while True: if self.completion_count >= self.max_completions: raise CannotHandle("超过最大完成次数") last_message = self.message_builder.dump_anthropic()[-1] # 检查是否完成 if last_message["role"] == "assistant": last_block = last_message["content"][-1] if last_block["type"] == "text": if self.output_parser: try: self.output_parser(last_block["text"]) break except Exception as e: # 解析失败,将错误反馈给 Agent async for chunk in self.step(str(e)): yield chunk else: break # 继续执行工具调用 elif last_message["role"] == "user": if self.tool_called_count >= self.max_tool_calls: raise CannotHandle("超过最大工具调用次数") async for chunk in self.step(): yield chunk
执行流程:
DBAgent 是最基础的数据库交互 Agent,提供三个核心工具:
class DBAgent: def __init__( self, prompt_name: str, datasource: DataSourceAdmin, meta: MetaAdmin | None = None, assumed_role: RoleAdmin | None = None, model: str | None = None, ): self.datasource = datasource self.assumed_role = assumed_role self.meta = meta or get_accessible_meta(datasource, assumed_role) # 构建数据库元数据 self.db_meta = { "schemas": [ { "name": schema.name, "tables": [{"name": table.name}] } for schema in self.meta.schemas.values() for table in schema.tables.values() ] } # 初始化 Agent system_prompt = get_prompt(prompt_name).compile(meta=self.db_meta) self.agent = Agent(system_prompt=system_prompt, model=model) # 注册工具 self.add_tool = self.agent.add_tool self.set_output_parser = self.agent.set_output_parser # 数据工作区 self.data_workspace: dict[str, dict[str, Any]] = {}
核心工具:
def show_table(self, table_names: list[str]) -> str: """显示指定表的详细结构""" meta = self.meta.filter_tables_by_names( [(f.split(".")[0], f.split(".")[1]) for f in table_names] ) tables = [ table for schema in meta.schemas.values() for table in schema.tables.values() ] return json.dumps([ { "name": table.name, "description": table.curr_desc, "fields": [ { "name": field.name, "desc": field.curr_desc, "data_type": field.data_type, } for field in table.fields.values() ], } for table in tables ])
async def search_metadata( self, queries: list[str], # 语义搜索查询 keywords: list[str], # 全文搜索关键词 ) -> str: """通过语义搜索和关键词搜索查找相关表""" meta = await retrieve_entities( meta=self.meta, datasource=self.datasource, subqueries=queries, keywords=keywords, ) tables = [ table for schema in meta.schemas.values() for table in schema.tables.values() ] return json.dumps([ { "table_full_name": table.full_name, "table_description": table.curr_desc, } for table in tables ])
async def execute_sql(self, sql: str) -> str: """执行 SQL 并将结果存储到数据工作区""" # 提取 SQL 注释作为描述 description = sql.strip().split("\n")[0][3:].strip() # 异步执行查询 def _connect_and_query(): with self.datasource.accessor.connect(): return self.datasource.accessor.query(sql) dataframe = await asyncio.to_thread(_connect_and_query) # 生成 DataFrame ID 并存储 df_id = gen_id("df") self.data_workspace[df_id] = { "df": dataframe, "description": description, "sql": sql, } # 返回 DataFrame 摘要 return json.dumps({ "id": df_id, "dataframe": { "columns": dataframe.columns.tolist(), "shape": dataframe.shape, "content_head": dataframe_to_dicts( dataframe, self.dataframe_serialize_max_rows ), }, })
数据工作区设计:
df_idDataNodeAgentV2 是 Canvas 中数据节点的专用 Agent,支持更高级的功能:
class DataNodeAgentV2: def __init__( self, datasource: DataSourceAdmin, meta: MetaAdmin, assumed_role: RoleAdmin | None = None, reference_context: str | None = None, model: str | None = None, reasoning_effort: Literal["low", "medium", "high"] = "medium", history_messages: list[dict] | None = None, ): self.datasource = datasource self.meta = meta self.model = model or get_current_model_name("canvas") self.reasoning_effort = reasoning_effort # 虚拟文件系统(用于元数据探索) self.vfs = VirtualFS(self.meta) self._shell_interpreter = ShellInterpreter(self.vfs) # 注册工具 self._register_tools() # 构建系统提示 self._system_prompt = self._build_system_prompt(reference_context) # 数据工作区 self.data_workspace: dict[str, dict[str, Any]] = {} self.submitted_df_id: str | None = None self.submitted_description: str | None = None
核心特性:
def shell(self, command: str) -> str: """执行 shell 命令探索数据库元数据(ls/cat/find/grep)""" result = self._shell_interpreter.execute(command) return result
DataNodeAgentV2 提供了一个虚拟文件系统,将数据库元数据映射为文件系统结构:
/ ├── schema1/ │ ├── table1.table │ ├── table2.table │ └── ... ├── schema2/ │ └── ...
Agent 可以使用
ls、cat、find、grep 等命令探索元数据,就像操作真实文件系统一样。
response = await self._client.chat.completions.create( model=self.model, messages=messages, tools=self._tools, stream=True, max_tokens=MAX_TOKENS, extra_body={"reasoning": {"effort": self.reasoning_effort}}, )
通过
reasoning_effort 参数控制 AI 的思考深度:
low:快速响应,适合简单查询medium:平衡性能和质量(默认)high:深度思考,适合复杂分析def submit_node( self, description: str, df_id: str | None = None, ) -> str: """ 提交最终节点结果 如果无法回答问题(没有相关表/字段),设置 df_id 为 None 并在 description 中说明原因 """ if df_id is not None and df_id not in self.data_workspace: raise ValueError(f"DataFrame {df_id} 不在工作区中") self.submitted_df_id = df_id self.submitted_description = description status_msg = "error" if df_id is None else "success" return json.dumps({ "df_id": df_id, "description": description, "status": status_msg })
ChartNodeAgent 负责根据数据节点生成可视化图表:
class ChartNodeAgent: def __init__( self, parent_nodes_context: list[dict], history_messages: list[dict] | None = None, ): # 格式化父节点信息 parent_info_parts = [] for i, node in enumerate(parent_nodes_context, 1): parent_info_parts.append(f"Node {i} (ID: {node['id']}):") if node.get("description"): parent_info_parts.append(f" Description: {node['description']}") if node.get("sql"): parent_info_parts.append(f" SQL: {node['sql']}") if node.get("dataframe"): df = node["dataframe"] parent_info_parts.append( f" Columns: {', '.join(df.get('columns', []))}" ) formatted_parent_info = "\n".join(parent_info_parts) # 构建系统提示 system_prompt = get_prompt("agent/canvas/create_chart_node").compile( formatted_parent_info=formatted_parent_info ) self.agent = Agent( system_prompt=system_prompt, model=get_current_model_name("canvas") ) self.agent.add_tool(self.submit_chart_code)
核心工具:
def submit_chart_code( self, description: str, code: str | None = None, ) -> str: """ 提交最终图表代码 如果无法生成图表,设置 code 为 None 并在 description 中说明原因 """ if code is not None: try: # 编译 JSX 代码 self.compiled_code = compile_jsx(code) self.source_code = code except Exception as e: raise ValueError(f"代码编译失败: {str(e)}") else: self.compiled_code = None self.source_code = None self.submitted_description = description status_msg = "error" if code is None else "success" return json.dumps({ "description": description, "status": status_msg, "has_code": code is not None, })
ReportAgent 用于生成数据报告,支持 Python 代码执行:
class ReportAgent(DBAgent): def __init__( self, datasource: DataSourceAdmin, assumed_role: RoleAdmin | None = None, ): super().__init__( prompt_name="agent/report_generator", datasource=datasource, assumed_role=assumed_role, ) self.add_tool(self.show_table) self.add_tool(self.search_metadata) self.add_tool(self.execute_sql) self.set_output_parser(self.output_parser)
输出解析器:
def output_parser(self, output: str) -> None: # 提取 <code>...</code> 标签中的代码 pattern = r"<code>(.*?)</code>" match = re.search(pattern, output, re.DOTALL) if not match: raise ValueError("无效的输出格式,期望: <code>...</code>") code = match.group(1).strip() # 提取所有 load_dataframe('...') 的 df_id load_df_pattern = r"load_dataframe\(\s*['\"]([^'\"]+)['\"]" referenced_dataframes = re.findall(load_df_pattern, code) if not referenced_dataframes: raise ValueError("代码中未找到 load_dataframe('df_id') 模式") # 编译 JSX 代码 self.compiled_code = compile_jsx(code) self.source_code = code # 检查引用的 dataframes 是否都在 workspace 中 missing_ids = set(referenced_dataframes) - set(self.data_workspace.keys()) if missing_ids: raise ValueError(f"引用的 dataframes {missing_ids} 不在数据工作区中") self.referenced_dataframes = referenced_dataframes
AskTable 使用 Langfuse 管理 Prompt,支持版本控制和动态更新:
class PromptProxy: def __init__(self, from_local: bool): self._from_local = from_local if not self._from_local: # 运行时从 Langfuse 获取 self.prompt_cache = None else: # 从本地 assets/prompts.json 加载 with open("assets/prompts.json") as f: prompts = json.load(f) self.prompt_cache = { prompt.name: TextPromptClient(prompt) for prompt in prompts } def get_prompt(self, prompt_name: str) -> PromptClient: if not self._from_local: return langfuse.get_prompt( prompt_name, label=config.observer_prompt_label ) else: return self.prompt_cache[prompt_name]
Prompt 支持 Jinja2 模板语法,可以动态注入上下文:
# 获取 Prompt system_prompt = get_prompt("agent/canvas/data_agent").compile( meta=db_meta, history=reference_context )
Prompt 命名规范:
agent/outline_generator:大纲生成 Agentagent/report_generator:报告生成 Agentagent/canvas/data_agent:Canvas 数据节点 Agentagent/canvas/create_chart_node:Canvas 图表节点 AgentAskTable 定义了多种流式事件类型:
@dataclass class TextDelta: type: Literal["text"] text: str @dataclass class AssistantStreamEvent: role: Literal["assistant"] content: TextDelta @dataclass class ToolUseStreamEvent: role: Literal["assistant"] tool_use: dict @dataclass class ToolResultStreamEvent: role: Literal["user"] tool_result: dict
async def run(self, question: str) -> AsyncGenerator[StreamEvent, None]: # 发送用户消息 self._message_builder.append_openai_message({ "role": "user", "content": question }) for _ in range(MAX_ITERATIONS): # 调用 LLM response = await self._client.chat.completions.create( model=self.model, messages=messages, tools=self._tools, stream=True, max_tokens=MAX_TOKENS, ) # 流式接收响应 async for chunk in response: event = self._message_builder.append_openai_delta(chunk) if event: yield event # 实时返回给前端 # 执行工具调用 tool_use_blocks = self._message_builder.get_unresolved_tool_use_blocks() if not tool_use_blocks: return # 完成 for tool_use in tool_use_blocks: result = await self._execute_tool_use(tool_use) yield self._message_builder.append_tool_result( tool_use["id"], result )
流式响应的优势:
async def _execute_tool_use(self, tool_use: dict) -> str: func_name = tool_use["name"] func_args_str = tool_use["input"] func = self._actions.get(func_name) if not func: return json.dumps({"error": f"未知工具: {func_name}"}) try: args = json.loads(func_args_str) if iscoroutinefunction(func): result = await func(**args) else: result = func(**args) return result except Exception as e: return json.dumps({"error": str(e)})
错误处理策略:
if last_block["type"] == "text": if self.output_parser: try: self.output_parser(last_block["text"]) break except Exception as e: # 将解析错误反馈给 LLM async for chunk in self.step(str(e)): yield chunk
自动修正机制:
# 最大工具调用次数 if self.tool_called_count >= self.max_tool_calls: raise CannotHandle("超过最大工具调用次数") # 最大完成次数 if self.completion_count >= self.max_completions: raise CannotHandle("超过最大完成次数")
保护机制:
好的工具设计:
def execute_sql( self, sql: str = Field(..., description="要执行的 SQL 查询") ) -> str: """ 执行 SQL 查询 ## 代码风格 SQL 开头用注释描述查询目的: ```sql -- 上周华东地区所有门店的销量 SELECT ... ``` ## DataFrame 管理 查询结果会自动存储到数据工作区 """ ...
设计要点:
Prompt 结构:
你是一个数据分析 Agent,负责根据用户问题生成 SQL 查询。 ## 可用工具 - show_table: 查看表结构 - search_metadata: 搜索相关表 - execute_sql: 执行 SQL 查询 ## 工作流程 1. 使用 search_metadata 找到相关表 2. 使用 show_table 查看表结构 3. 编写并执行 SQL 查询 4. 调用 submit_node 提交结果 ## 数据库元数据 {{ meta }} ## 注意事项 - SQL 必须以注释开头说明查询目的 - 优先使用已有的表和字段 - 如果找不到相关数据,设置 df_id 为 None
Prompt 要点:
历史消息持久化:
# 导出对话历史 messages = agent.get_messages() # 保存到数据库 save_to_database(messages) # 恢复对话 agent = DataNodeAgentV2.create( datasource=datasource, history_messages=messages )
历史消息的作用:
Prompt Caching:
messages = self._message_builder.dump_openai(cache_control=True)
启用 Prompt Caching 可以:
并发执行:
# 并发执行多个独立的工具调用 results = await asyncio.gather( self.search_metadata(["销售"], ["华东"]), self.search_metadata(["库存"], ["仓库"]), )
数据采样:
# 只返回前 N 行数据给 LLM "content_head": dataframe_to_dicts( dataframe, DATAFRAME_SERIALIZE_MAX_ROWS )
AskTable 的 AI Agent 系统展示了如何构建一个生产级的 Agent 架构:
通过理解这些设计原则和实现细节,你可以构建自己的 AI Agent 系统,或者更好地使用 AskTable 的 Agent 能力。