
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
AskTable 的核心能力来自于其强大的 AI Agent 系统。本文将深入解析 AskTable AI Agent 的架构设计和工作原理,帮助你理解如何构建一个生产级的 AI Agent 系统。
AskTable 的 AI Agent 系统由以下核心组件构成:
┌─────────────────────────────────────────────────────────┐
│ Agent Base Class │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Message │ │ Tool │ │ Output │ │
│ │ Builder │ │ Registry │ │ Parser │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────┘
▲
│
┌─────────────────┼─────────────────┐
│ │ │
┌────▼────┐ ┌────▼────┐ ┌────▼────┐
│ DB │ │ Data │ │ Chart │
│ Agent │ │ Node │ │ Node │
│ │ │ Agent │ │ Agent │
└─────────┘ └─────────┘ └─────────┘
AskTable Agent 系统遵循以下设计原则:
Agent 基类位于 app/atserver/ai/agent_rev.py,提供了完整的 Agent 运行框架:
class Agent:
def __init__(
self,
system_prompt: Any,
max_tool_calls: int | None = None,
max_completions: int | None = None,
model: str | None = None,
):
self.llm_client, default_model = get_current_llm("report")
self.model = model or default_model
# 核心状态
self.system_prompt = system_prompt
self.message_builder = ChatMessageBuilder(system_prompt)
self.tools: list[ChatCompletionToolParam] = []
self.actions: dict[str, Callable[..., Any]] = {}
self.output_parser: Callable[..., Any] | None = None
# 限制设置
self.max_tool_calls = max_tool_calls or config.at_agent_max_tool_calls
self.max_completions = max_completions or (config.at_agent_max_tool_calls + 10)
关键组件说明:
message_builder:管理对话历史,支持 OpenAI 和 Anthropic 格式转换tools:注册的工具列表,符合 OpenAI Function Calling 规范actions:工具名称到实际执行函数的映射output_parser:可选的输出解析器,用于提取结构化结果Agent 通过 add_tool() 方法动态注册工具:
def add_tool(self, tool: Callable[..., Any]) -> None:
tool_name = getattr(tool, "__name__", tool.__class__.__name__)
self.tools.append({
"type": "function",
"function": {
"name": tool_name,
"description": tool.__doc__ or "",
"parameters": TypeAdapter(tool).json_schema(),
},
})
self.actions[tool_name] = tool
工作原理:
TypeAdapter 自动生成 JSON Schematools 列表actions 字典这种设计使得添加新工具非常简单,只需定义一个带类型注解的函数即可。
Agent 的 run() 方法实现了完整的执行循环:
async def run(self, user_input: str | None = None) -> AsyncGenerator[StreamEvent, None]:
# 第一步:处理用户输入
async for chunk in self.step(user_input):
yield chunk
# 持续执行直到完成
while True:
if self.completion_count >= self.max_completions:
raise CannotHandle("超过最大完成次数")
last_message = self.message_builder.dump_anthropic()[-1]
# 检查是否完成
if last_message["role"] == "assistant":
last_block = last_message["content"][-1]
if last_block["type"] == "text":
if self.output_parser:
try:
self.output_parser(last_block["text"])
break
except Exception as e:
# 解析失败,将错误反馈给 Agent
async for chunk in self.step(str(e)):
yield chunk
else:
break
# 继续执行工具调用
elif last_message["role"] == "user":
if self.tool_called_count >= self.max_tool_calls:
raise CannotHandle("超过最大工具调用次数")
async for chunk in self.step():
yield chunk
执行流程:
DBAgent 是最基础的数据库交互 Agent,提供三个核心工具:
class DBAgent:
def __init__(
self,
prompt_name: str,
datasource: DataSourceAdmin,
meta: MetaAdmin | None = None,
assumed_role: RoleAdmin | None = None,
model: str | None = None,
):
self.datasource = datasource
self.assumed_role = assumed_role
self.meta = meta or get_accessible_meta(datasource, assumed_role)
# 构建数据库元数据
self.db_meta = {
"schemas": [
{
"name": schema.name,
"tables": [{"name": table.name}]
}
for schema in self.meta.schemas.values()
for table in schema.tables.values()
]
}
# 初始化 Agent
system_prompt = get_prompt(prompt_name).compile(meta=self.db_meta)
self.agent = Agent(system_prompt=system_prompt, model=model)
# 注册工具
self.add_tool = self.agent.add_tool
self.set_output_parser = self.agent.set_output_parser
# 数据工作区
self.data_workspace: dict[str, dict[str, Any]] = {}
核心工具:
def show_table(self, table_names: list[str]) -> str:
"""显示指定表的详细结构"""
meta = self.meta.filter_tables_by_names(
[(f.split(".")[0], f.split(".")[1]) for f in table_names]
)
tables = [
table
for schema in meta.schemas.values()
for table in schema.tables.values()
]
return json.dumps([
{
"name": table.name,
"description": table.curr_desc,
"fields": [
{
"name": field.name,
"desc": field.curr_desc,
"data_type": field.data_type,
}
for field in table.fields.values()
],
}
for table in tables
])
async def search_metadata(
self,
queries: list[str], # 语义搜索查询
keywords: list[str], # 全文搜索关键词
) -> str:
"""通过语义搜索和关键词搜索查找相关表"""
meta = await retrieve_entities(
meta=self.meta,
datasource=self.datasource,
subqueries=queries,
keywords=keywords,
)
tables = [
table
for schema in meta.schemas.values()
for table in schema.tables.values()
]
return json.dumps([
{
"table_full_name": table.full_name,
"table_description": table.curr_desc,
}
for table in tables
])
async def execute_sql(self, sql: str) -> str:
"""执行 SQL 并将结果存储到数据工作区"""
# 提取 SQL 注释作为描述
description = sql.strip().split("\n")[0][3:].strip()
# 异步执行查询
def _connect_and_query():
with self.datasource.accessor.connect():
return self.datasource.accessor.query(sql)
dataframe = await asyncio.to_thread(_connect_and_query)
# 生成 DataFrame ID 并存储
df_id = gen_id("df")
self.data_workspace[df_id] = {
"df": dataframe,
"description": description,
"sql": sql,
}
# 返回 DataFrame 摘要
return json.dumps({
"id": df_id,
"dataframe": {
"columns": dataframe.columns.tolist(),
"shape": dataframe.shape,
"content_head": dataframe_to_dicts(
dataframe, self.dataframe_serialize_max_rows
),
},
})
数据工作区设计:
df_idDataNodeAgentV2 是 Canvas 中数据节点的专用 Agent,支持更高级的功能:
class DataNodeAgentV2:
def __init__(
self,
datasource: DataSourceAdmin,
meta: MetaAdmin,
assumed_role: RoleAdmin | None = None,
reference_context: str | None = None,
model: str | None = None,
reasoning_effort: Literal["low", "medium", "high"] = "medium",
history_messages: list[dict] | None = None,
):
self.datasource = datasource
self.meta = meta
self.model = model or get_current_model_name("canvas")
self.reasoning_effort = reasoning_effort
# 虚拟文件系统(用于元数据探索)
self.vfs = VirtualFS(self.meta)
self._shell_interpreter = ShellInterpreter(self.vfs)
# 注册工具
self._register_tools()
# 构建系统提示
self._system_prompt = self._build_system_prompt(reference_context)
# 数据工作区
self.data_workspace: dict[str, dict[str, Any]] = {}
self.submitted_df_id: str | None = None
self.submitted_description: str | None = None
核心特性:
def shell(self, command: str) -> str:
"""执行 shell 命令探索数据库元数据(ls/cat/find/grep)"""
result = self._shell_interpreter.execute(command)
return result
DataNodeAgentV2 提供了一个虚拟文件系统,将数据库元数据映射为文件系统结构:
/
├── schema1/
│ ├── table1.table
│ ├── table2.table
│ └── ...
├── schema2/
│ └── ...
Agent 可以使用 ls、cat、find、grep 等命令探索元数据,就像操作真实文件系统一样。
response = await self._client.chat.completions.create(
model=self.model,
messages=messages,
tools=self._tools,
stream=True,
max_tokens=MAX_TOKENS,
extra_body={"reasoning": {"effort": self.reasoning_effort}},
)
通过 reasoning_effort 参数控制 AI 的思考深度:
low:快速响应,适合简单查询medium:平衡性能和质量(默认)high:深度思考,适合复杂分析def submit_node(
self,
description: str,
df_id: str | None = None,
) -> str:
"""
提交最终节点结果
如果无法回答问题(没有相关表/字段),设置 df_id 为 None
并在 description 中说明原因
"""
if df_id is not None and df_id not in self.data_workspace:
raise ValueError(f"DataFrame {df_id} 不在工作区中")
self.submitted_df_id = df_id
self.submitted_description = description
status_msg = "error" if df_id is None else "success"
return json.dumps({
"df_id": df_id,
"description": description,
"status": status_msg
})
ChartNodeAgent 负责根据数据节点生成可视化图表:
class ChartNodeAgent:
def __init__(
self,
parent_nodes_context: list[dict],
history_messages: list[dict] | None = None,
):
# 格式化父节点信息
parent_info_parts = []
for i, node in enumerate(parent_nodes_context, 1):
parent_info_parts.append(f"Node {i} (ID: {node['id']}):")
if node.get("description"):
parent_info_parts.append(f" Description: {node['description']}")
if node.get("sql"):
parent_info_parts.append(f" SQL: {node['sql']}")
if node.get("dataframe"):
df = node["dataframe"]
parent_info_parts.append(
f" Columns: {', '.join(df.get('columns', []))}"
)
formatted_parent_info = "\n".join(parent_info_parts)
# 构建系统提示
system_prompt = get_prompt("agent/canvas/create_chart_node").compile(
formatted_parent_info=formatted_parent_info
)
self.agent = Agent(
system_prompt=system_prompt,
model=get_current_model_name("canvas")
)
self.agent.add_tool(self.submit_chart_code)
核心工具:
def submit_chart_code(
self,
description: str,
code: str | None = None,
) -> str:
"""
提交最终图表代码
如果无法生成图表,设置 code 为 None 并在 description 中说明原因
"""
if code is not None:
try:
# 编译 JSX 代码
self.compiled_code = compile_jsx(code)
self.source_code = code
except Exception as e:
raise ValueError(f"代码编译失败: {str(e)}")
else:
self.compiled_code = None
self.source_code = None
self.submitted_description = description
status_msg = "error" if code is None else "success"
return json.dumps({
"description": description,
"status": status_msg,
"has_code": code is not None,
})
ReportAgent 用于生成数据报告,支持 Python 代码执行:
class ReportAgent(DBAgent):
def __init__(
self,
datasource: DataSourceAdmin,
assumed_role: RoleAdmin | None = None,
):
super().__init__(
prompt_name="agent/report_generator",
datasource=datasource,
assumed_role=assumed_role,
)
self.add_tool(self.show_table)
self.add_tool(self.search_metadata)
self.add_tool(self.execute_sql)
self.set_output_parser(self.output_parser)
输出解析器:
def output_parser(self, output: str) -> None:
# 提取 <code>...</code> 标签中的代码
pattern = r"<code>(.*?)</code>"
match = re.search(pattern, output, re.DOTALL)
if not match:
raise ValueError("无效的输出格式,期望: <code>...</code>")
code = match.group(1).strip()
# 提取所有 load_dataframe('...') 的 df_id
load_df_pattern = r"load_dataframe\(\s*['\"]([^'\"]+)['\"]"
referenced_dataframes = re.findall(load_df_pattern, code)
if not referenced_dataframes:
raise ValueError("代码中未找到 load_dataframe('df_id') 模式")
# 编译 JSX 代码
self.compiled_code = compile_jsx(code)
self.source_code = code
# 检查引用的 dataframes 是否都在 workspace 中
missing_ids = set(referenced_dataframes) - set(self.data_workspace.keys())
if missing_ids:
raise ValueError(f"引用的 dataframes {missing_ids} 不在数据工作区中")
self.referenced_dataframes = referenced_dataframes
AskTable 使用 Langfuse 管理 Prompt,支持版本控制和动态更新:
class PromptProxy:
def __init__(self, from_local: bool):
self._from_local = from_local
if not self._from_local:
# 运行时从 Langfuse 获取
self.prompt_cache = None
else:
# 从本地 assets/prompts.json 加载
with open("assets/prompts.json") as f:
prompts = json.load(f)
self.prompt_cache = {
prompt.name: TextPromptClient(prompt)
for prompt in prompts
}
def get_prompt(self, prompt_name: str) -> PromptClient:
if not self._from_local:
return langfuse.get_prompt(
prompt_name,
label=config.observer_prompt_label
)
else:
return self.prompt_cache[prompt_name]
Prompt 支持 Jinja2 模板语法,可以动态注入上下文:
# 获取 Prompt
system_prompt = get_prompt("agent/canvas/data_agent").compile(
meta=db_meta,
history=reference_context
)
Prompt 命名规范:
agent/outline_generator:大纲生成 Agentagent/report_generator:报告生成 Agentagent/canvas/data_agent:Canvas 数据节点 Agentagent/canvas/create_chart_node:Canvas 图表节点 AgentAskTable 定义了多种流式事件类型:
@dataclass
class TextDelta:
type: Literal["text"]
text: str
@dataclass
class AssistantStreamEvent:
role: Literal["assistant"]
content: TextDelta
@dataclass
class ToolUseStreamEvent:
role: Literal["assistant"]
tool_use: dict
@dataclass
class ToolResultStreamEvent:
role: Literal["user"]
tool_result: dict
async def run(self, question: str) -> AsyncGenerator[StreamEvent, None]:
# 发送用户消息
self._message_builder.append_openai_message({
"role": "user",
"content": question
})
for _ in range(MAX_ITERATIONS):
# 调用 LLM
response = await self._client.chat.completions.create(
model=self.model,
messages=messages,
tools=self._tools,
stream=True,
max_tokens=MAX_TOKENS,
)
# 流式接收响应
async for chunk in response:
event = self._message_builder.append_openai_delta(chunk)
if event:
yield event # 实时返回给前端
# 执行工具调用
tool_use_blocks = self._message_builder.get_unresolved_tool_use_blocks()
if not tool_use_blocks:
return # 完成
for tool_use in tool_use_blocks:
result = await self._execute_tool_use(tool_use)
yield self._message_builder.append_tool_result(
tool_use["id"],
result
)
流式响应的优势:
async def _execute_tool_use(self, tool_use: dict) -> str:
func_name = tool_use["name"]
func_args_str = tool_use["input"]
func = self._actions.get(func_name)
if not func:
return json.dumps({"error": f"未知工具: {func_name}"})
try:
args = json.loads(func_args_str)
if iscoroutinefunction(func):
result = await func(**args)
else:
result = func(**args)
return result
except Exception as e:
return json.dumps({"error": str(e)})
错误处理策略:
if last_block["type"] == "text":
if self.output_parser:
try:
self.output_parser(last_block["text"])
break
except Exception as e:
# 将解析错误反馈给 LLM
async for chunk in self.step(str(e)):
yield chunk
自动修正机制:
# 最大工具调用次数
if self.tool_called_count >= self.max_tool_calls:
raise CannotHandle("超过最大工具调用次数")
# 最大完成次数
if self.completion_count >= self.max_completions:
raise CannotHandle("超过最大完成次数")
保护机制:
好的工具设计:
def execute_sql(
self,
sql: str = Field(..., description="要执行的 SQL 查询")
) -> str:
"""
执行 SQL 查询
## 代码风格
SQL 开头用注释描述查询目的:
```sql
-- 上周华东地区所有门店的销量
SELECT ...
```
## DataFrame 管理
查询结果会自动存储到数据工作区
"""
...
设计要点:
Prompt 结构:
你是一个数据分析 Agent,负责根据用户问题生成 SQL 查询。
## 可用工具
- show_table: 查看表结构
- search_metadata: 搜索相关表
- execute_sql: 执行 SQL 查询
## 工作流程
1. 使用 search_metadata 找到相关表
2. 使用 show_table 查看表结构
3. 编写并执行 SQL 查询
4. 调用 submit_node 提交结果
## 数据库元数据
{{ meta }}
## 注意事项
- SQL 必须以注释开头说明查询目的
- 优先使用已有的表和字段
- 如果找不到相关数据,设置 df_id 为 None
Prompt 要点:
历史消息持久化:
# 导出对话历史
messages = agent.get_messages()
# 保存到数据库
save_to_database(messages)
# 恢复对话
agent = DataNodeAgentV2.create(
datasource=datasource,
history_messages=messages
)
历史消息的作用:
Prompt Caching:
messages = self._message_builder.dump_openai(cache_control=True)
启用 Prompt Caching 可以:
并发执行:
# 并发执行多个独立的工具调用
results = await asyncio.gather(
self.search_metadata(["销售"], ["华东"]),
self.search_metadata(["库存"], ["仓库"]),
)
数据采样:
# 只返回前 N 行数据给 LLM
"content_head": dataframe_to_dicts(
dataframe,
DATAFRAME_SERIALIZE_MAX_ROWS
)
AskTable 的 AI Agent 系统展示了如何构建一个生产级的 Agent 架构:
通过理解这些设计原则和实现细节,你可以构建自己的 AI Agent 系统,或者更好地使用 AskTable 的 Agent 能力。
sidebar.noProgrammingNeeded
sidebar.startFreeTrial