
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在构建企业级 Text-to-SQL 系统时,单一 LLM 调用往往难以处理复杂的数据分析场景。本文将深入解析 AskTable 如何通过多智能体架构,将复杂任务分解为可管理的子任务,实现高质量的自然语言到 SQL 的转换。
传统的 Text-to-SQL 系统通常采用"一次性生成"的方式:用户提问 → LLM 生成 SQL → 执行查询。这种方式在面对复杂场景时存在明显局限:
AskTable 通过多智能体架构解决了这些问题,让每个 Agent 专注于特定任务,通过工具调用和流式响应实现复杂的数据分析流程。
加载图表中...
上图展示了 Agent 的核心执行流程:接收问题后,Agent 会判断是否需要调用工具(如查询元数据、执行 SQL),如果需要则调用工具并处理结果,然后继续判断是否需要更多工具,直到生成最终答案。
AskTable 的 Agent 基类提供了完整的工具调用和流式响应能力:
class Agent: def __init__( self, system_prompt: Any, max_tool_calls: int | None = None, max_completions: int | None = None, model: str | None = None, ): self.llm_client, default_model = get_current_llm("report") self.model = model or default_model # 核心状态 self.message_builder = ChatMessageBuilder(system_prompt) self.tools: list[ChatCompletionToolParam] = [] self.actions: dict[str, Callable[..., Any]] = {} self.output_parser: Callable[..., Any] | None = None # 安全限制 self.max_tool_calls = max_tool_calls or config.at_agent_max_tool_calls self.max_completions = max_completions or (config.at_agent_max_tool_calls + 10)
设计亮点:
add_tool() 动态注册工具函数DBAgent 是专门处理数据库相关任务的智能体,它封装了元数据检索、SQL 执行等核心能力:
class DBAgent: def __init__( self, prompt_name: str, datasource: DataSourceAdmin, meta: MetaAdmin | None = None, assumed_role: RoleAdmin | None = None, model: str | None = None, ): self.datasource = datasource self.assumed_role = assumed_role self.meta = meta or get_accessible_meta(datasource, assumed_role) # 构建数据库元数据摘要 self.db_meta = { "schemas": [ { "name": schema.name, "tables": [{"name": table.name}] } for schema in self.meta.schemas.values() for table in schema.tables.values() ] } # 初始化 Agent 并注册工具 system_prompt = get_prompt(prompt_name).compile(meta=self.db_meta) self.agent = Agent(system_prompt=system_prompt, model=model)
核心工具函数:
def show_table( self, table_names: list[str] = Field( ..., description="table full names, e.g. `schema_name.table_name`", ), ) -> str: meta = self.meta.filter_tables_by_names( [(f.split(".")[0], f.split(".")[1]) for f in table_names] ) tables = [ table for schema in meta.schemas.values() for table in schema.tables.values() ] return json.dumps([ { "name": table.name, "description": table.curr_desc, "fields": [ { "name": field.name, "desc": field.curr_desc, "data_type": field.data_type, } for field in table.fields.values() ], } for table in tables ])
async def search_metadata( self, queries: list[str] = Field( ..., description="subqueries to perform semantic search on meta data", ), keywords: list[str] = Field( ..., description="keywords to perform full-text search on database content", ), ) -> str: meta = await retrieve_entities( meta=self.meta, datasource=self.datasource, subqueries=queries, keywords=keywords, ) tables = [ table for schema in meta.schemas.values() for table in schema.tables.values() ] return json.dumps([ { "table_full_name": table.full_name, "table_description": table.curr_desc, } for table in tables ])
async def execute_sql(self, sql: str) -> str: """ Execute sql ## DataFrame Management the data will be stored in the data_workspace automatically """ description = sql.strip().split("\n")[0][3:].strip() def _connect_and_query(): with self.datasource.accessor.connect(): data = self.datasource.accessor.query(sql) return data self.last_dataframe = await asyncio.to_thread(_connect_and_query) df_id = gen_id("df") dataframe_json = json.dumps({ "id": df_id, "dataframe": { "columns": self.last_dataframe.columns.tolist(), "shape": self.last_dataframe.shape, "content_head": dataframe_to_dicts( self.last_dataframe, self.dataframe_serialize_max_rows ), }, }) self.data_workspace[df_id] = { "df": self.last_dataframe, "description": description, "sql": sql, } return dataframe_json
设计亮点:
show_tableasyncio.to_thread 避免阻塞主线程PlanAgent 继承自 DBAgent,专门负责生成数据分析计划:
class PlanAgent(DBAgent): plan: str | None = None def __init__( self, datasource: DataSourceAdmin, type: ReportType, assumed_role: RoleAdmin | None = None, ): if type == ReportType.summary: prompt_name = "agent/outline_generator" elif type == ReportType.analysis: prompt_name = "agent/analysis_outline_generator" super().__init__( prompt_name=prompt_name, datasource=datasource, assumed_role=assumed_role ) self.add_tool(self.show_table) self.add_tool(self.search_metadata) self.set_output_parser(self.output_parser) def output_parser(self, output: str) -> None: """从输出文本中抽取 <plan> </plan> 标签中的内容""" import re match = re.search(r"<plan>(.*?)</plan>", output, re.DOTALL) if match: self.plan = match.group(1).strip() else: raise errors.ParameterError("未找到 <plan> 标签")
设计亮点:
AskTable 的工具调用机制是多智能体架构的核心,它实现了 LLM 与外部工具的无缝集成:
def add_tool(self, tool: Callable[..., Any]) -> None: tool_name = getattr(tool, "__name__", tool.__class__.__name__) self.tools.append({ "type": "function", "function": { "name": tool_name, "description": tool.__doc__ or "", "parameters": TypeAdapter(tool).json_schema(), }, }) self.actions[tool_name] = tool
技术细节:
TypeAdapter 自动生成 JSON Schemaasync def _step(self) -> AsyncGenerator[StreamEvent, None]: self.completion_count += 1 completion = await self.llm_client.chat.completions.create( messages=self.message_builder.dump_openai(), model=self.model, tools=self.tools, stream=True, ) # 流式接收 LLM 响应 async for chunk in completion: delta = self.message_builder.append_openai_delta(chunk) if delta: if isinstance(delta, list): for event in delta: yield event else: yield delta # 执行工具调用 for tool_use in self.message_builder.get_unresolved_tool_use_blocks(): func = self.actions[tool_use["name"]] params = json.loads(tool_use["input"]) try: func_handler = func(**params) if iscoroutine(func_handler): result = await func_handler else: result = func_handler except Exception as e: result = json.dumps({"error": str(e)}) self.tool_called_count += 1 yield self.message_builder.append_tool_result(tool_use["id"], result)
设计亮点:
async def run( self, user_input: str | None = None ) -> AsyncGenerator[StreamEvent, None]: """持续执行直到 LLM 返回最终答案""" self.tool_called_count = 0 self.completion_count = 0 # 第一步:处理用户输入 async for chunk in self.step(user_input): yield chunk counter = 0 # 持续执行直到得到最终答案 while True: counter += 1 if counter > 50: break if self.completion_count >= self.max_completions: raise CannotHandle( f"Completion count exceeds max completions: {self.max_completions}" ) last_message = self.message_builder.dump_anthropic()[-1] if last_message["role"] == "assistant": last_block = last_message["content"][-1] if last_block["type"] == "text": if not self.output_parser: break # 验证输出格式 try: self.output_parser(last_block["text"]) break except Exception as e: # 格式错误,要求 LLM 重新生成 async for chunk in self.step(str(e)): yield chunk elif last_message["role"] == "user": last_block = last_message["content"][-1] if last_block["type"] == "tool_result": if self.tool_called_count >= self.max_tool_calls: raise CannotHandle( f"Tool called count exceeds max tool calls: {self.max_tool_calls}" ) # 继续执行下一步 async for chunk in self.step(): yield chunk
设计亮点:
用户问题:"分析上个月各地区销售额,找出增长最快的前 3 个地区,并给出原因分析"
Agent 执行流程:
用户问题:"我想了解客户表的结构"
Agent 执行流程:
# 初始化时只提供表名摘要 self.db_meta = { "schemas": [ {"name": schema.name, "tables": [{"name": table.name}]} for schema in self.meta.schemas.values() for table in schema.tables.values() ] } # 需要时才加载详细字段信息 def show_table(self, table_names: list[str]) -> str: meta = self.meta.filter_tables_by_names(...) # 返回详细的字段信息
优势:
虽然当前实现是串行执行工具调用,但架构支持并行执行:
# 未来可以支持并行工具调用 async def execute_tools_parallel(self, tool_uses): tasks = [self.execute_tool(tool_use) for tool_use in tool_uses] results = await asyncio.gather(*tasks) return results
# DataFrame 工作空间自动缓存查询结果 self.data_workspace[df_id] = { "df": self.last_dataframe, "description": description, "sql": sql, }
优势:
AskTable 的多智能体架构展示了如何将复杂的 Text-to-SQL 任务分解为可管理的子任务:
这种架构不仅适用于 Text-to-SQL,也可以推广到其他需要多步推理和工具调用的 LLM 应用场景,如代码生成、数据分析、自动化运维等。
通过合理的架构设计和工程实践,我们可以构建出既强大又可控的 AI 应用系统。