AskTable

多智能体架构:Text-to-SQL 系统的工程化实践

AskTable 团队
AskTable 团队 2026年3月4日

在构建企业级 Text-to-SQL 系统时,单一 LLM 调用往往难以处理复杂的数据分析场景。本文将深入解析 AskTable 如何通过多智能体架构,将复杂任务分解为可管理的子任务,实现高质量的自然语言到 SQL 的转换。

为什么需要多智能体架构?

传统的 Text-to-SQL 系统通常采用"一次性生成"的方式:用户提问 → LLM 生成 SQL → 执行查询。这种方式在面对复杂场景时存在明显局限:

  1. 上下文过载:将所有数据库元数据一次性塞给 LLM,容易超出 token 限制
  2. 缺乏推理链:无法处理需要多步推理的复杂查询
  3. 错误难以恢复:生成错误的 SQL 后无法自我修正
  4. 工具能力受限:无法动态调用外部工具获取信息

AskTable 通过多智能体架构解决了这些问题,让每个 Agent 专注于特定任务,通过工具调用和流式响应实现复杂的数据分析流程。

Agent 执行流程

加载图表中...

上图展示了 Agent 的核心执行流程:接收问题后,Agent 会判断是否需要调用工具(如查询元数据、执行 SQL),如果需要则调用工具并处理结果,然后继续判断是否需要更多工具,直到生成最终答案。

核心架构设计

1. 基础 Agent 类

AskTable 的 Agent 基类提供了完整的工具调用和流式响应能力:

class Agent:
    def __init__(
        self,
        system_prompt: Any,
        max_tool_calls: int | None = None,
        max_completions: int | None = None,
        model: str | None = None,
    ):
        self.llm_client, default_model = get_current_llm("report")
        self.model = model or default_model

        # 核心状态
        self.message_builder = ChatMessageBuilder(system_prompt)
        self.tools: list[ChatCompletionToolParam] = []
        self.actions: dict[str, Callable[..., Any]] = {}
        self.output_parser: Callable[..., Any] | None = None

        # 安全限制
        self.max_tool_calls = max_tool_calls or config.at_agent_max_tool_calls
        self.max_completions = max_completions or (config.at_agent_max_tool_calls + 10)

设计亮点

2. DBAgent:数据库交互专家

DBAgent 是专门处理数据库相关任务的智能体,它封装了元数据检索、SQL 执行等核心能力:

class DBAgent:
    def __init__(
        self,
        prompt_name: str,
        datasource: DataSourceAdmin,
        meta: MetaAdmin | None = None,
        assumed_role: RoleAdmin | None = None,
        model: str | None = None,
    ):
        self.datasource = datasource
        self.assumed_role = assumed_role
        self.meta = meta or get_accessible_meta(datasource, assumed_role)

        # 构建数据库元数据摘要
        self.db_meta = {
            "schemas": [
                {
                    "name": schema.name,
                    "tables": [{"name": table.name}]
                }
                for schema in self.meta.schemas.values()
                for table in schema.tables.values()
            ]
        }

        # 初始化 Agent 并注册工具
        system_prompt = get_prompt(prompt_name).compile(meta=self.db_meta)
        self.agent = Agent(system_prompt=system_prompt, model=model)

核心工具函数

show_table:获取表详细信息

def show_table(
    self,
    table_names: list[str] = Field(
        ...,
        description="table full names, e.g. `schema_name.table_name`",
    ),
) -> str:
    meta = self.meta.filter_tables_by_names(
        [(f.split(".")[0], f.split(".")[1]) for f in table_names]
    )
    tables = [
        table
        for schema in meta.schemas.values()
        for table in schema.tables.values()
    ]
    return json.dumps([
        {
            "name": table.name,
            "description": table.curr_desc,
            "fields": [
                {
                    "name": field.name,
                    "desc": field.curr_desc,
                    "data_type": field.data_type,
                }
                for field in table.fields.values()
            ],
        }
        for table in tables
    ])

search_metadata:语义搜索元数据

async def search_metadata(
    self,
    queries: list[str] = Field(
        ...,
        description="subqueries to perform semantic search on meta data",
    ),
    keywords: list[str] = Field(
        ...,
        description="keywords to perform full-text search on database content",
    ),
) -> str:
    meta = await retrieve_entities(
        meta=self.meta,
        datasource=self.datasource,
        subqueries=queries,
        keywords=keywords,
    )
    tables = [
        table
        for schema in meta.schemas.values()
        for table in schema.tables.values()
    ]
    return json.dumps([
        {
            "table_full_name": table.full_name,
            "table_description": table.curr_desc,
        }
        for table in tables
    ])

execute_sql:执行 SQL 并管理结果

async def execute_sql(self, sql: str) -> str:
    """
    Execute sql

    ## DataFrame Management
    the data will be stored in the data_workspace automatically
    """
    description = sql.strip().split("\n")[0][3:].strip()

    def _connect_and_query():
        with self.datasource.accessor.connect():
            data = self.datasource.accessor.query(sql)
        return data

    self.last_dataframe = await asyncio.to_thread(_connect_and_query)
    df_id = gen_id("df")
    dataframe_json = json.dumps({
        "id": df_id,
        "dataframe": {
            "columns": self.last_dataframe.columns.tolist(),
            "shape": self.last_dataframe.shape,
            "content_head": dataframe_to_dicts(
                self.last_dataframe, self.dataframe_serialize_max_rows
            ),
        },
    })
    self.data_workspace[df_id] = {
        "df": self.last_dataframe,
        "description": description,
        "sql": sql,
    }
    return dataframe_json

设计亮点

3. PlanAgent:分析规划专家

PlanAgent 继承自 DBAgent,专门负责生成数据分析计划:

class PlanAgent(DBAgent):
    plan: str | None = None

    def __init__(
        self,
        datasource: DataSourceAdmin,
        type: ReportType,
        assumed_role: RoleAdmin | None = None,
    ):
        if type == ReportType.summary:
            prompt_name = "agent/outline_generator"
        elif type == ReportType.analysis:
            prompt_name = "agent/analysis_outline_generator"

        super().__init__(
            prompt_name=prompt_name,
            datasource=datasource,
            assumed_role=assumed_role
        )

        self.add_tool(self.show_table)
        self.add_tool(self.search_metadata)
        self.set_output_parser(self.output_parser)

    def output_parser(self, output: str) -> None:
        """从输出文本中抽取 <plan> </plan> 标签中的内容"""
        import re
        match = re.search(r"<plan>(.*?)</plan>", output, re.DOTALL)
        if match:
            self.plan = match.group(1).strip()
        else:
            raise errors.ParameterError("未找到 <plan> 标签")

设计亮点

工具调用机制

AskTable 的工具调用机制是多智能体架构的核心,它实现了 LLM 与外部工具的无缝集成:

1. 工具注册

def add_tool(self, tool: Callable[..., Any]) -> None:
    tool_name = getattr(tool, "__name__", tool.__class__.__name__)
    self.tools.append({
        "type": "function",
        "function": {
            "name": tool_name,
            "description": tool.__doc__ or "",
            "parameters": TypeAdapter(tool).json_schema(),
        },
    })
    self.actions[tool_name] = tool

技术细节

2. 流式执行

async def _step(self) -> AsyncGenerator[StreamEvent, None]:
    self.completion_count += 1
    completion = await self.llm_client.chat.completions.create(
        messages=self.message_builder.dump_openai(),
        model=self.model,
        tools=self.tools,
        stream=True,
    )

    # 流式接收 LLM 响应
    async for chunk in completion:
        delta = self.message_builder.append_openai_delta(chunk)
        if delta:
            if isinstance(delta, list):
                for event in delta:
                    yield event
            else:
                yield delta

    # 执行工具调用
    for tool_use in self.message_builder.get_unresolved_tool_use_blocks():
        func = self.actions[tool_use["name"]]
        params = json.loads(tool_use["input"])
        try:
            func_handler = func(**params)
            if iscoroutine(func_handler):
                result = await func_handler
            else:
                result = func_handler
        except Exception as e:
            result = json.dumps({"error": str(e)})

        self.tool_called_count += 1
        yield self.message_builder.append_tool_result(tool_use["id"], result)

设计亮点

3. 自动循环执行

async def run(
    self, user_input: str | None = None
) -> AsyncGenerator[StreamEvent, None]:
    """持续执行直到 LLM 返回最终答案"""
    self.tool_called_count = 0
    self.completion_count = 0

    # 第一步:处理用户输入
    async for chunk in self.step(user_input):
        yield chunk

    counter = 0
    # 持续执行直到得到最终答案
    while True:
        counter += 1
        if counter > 50:
            break
        if self.completion_count >= self.max_completions:
            raise CannotHandle(
                f"Completion count exceeds max completions: {self.max_completions}"
            )

        last_message = self.message_builder.dump_anthropic()[-1]
        if last_message["role"] == "assistant":
            last_block = last_message["content"][-1]
            if last_block["type"] == "text":
                if not self.output_parser:
                    break
                # 验证输出格式
                try:
                    self.output_parser(last_block["text"])
                    break
                except Exception as e:
                    # 格式错误,要求 LLM 重新生成
                    async for chunk in self.step(str(e)):
                        yield chunk
        elif last_message["role"] == "user":
            last_block = last_message["content"][-1]
            if last_block["type"] == "tool_result":
                if self.tool_called_count >= self.max_tool_calls:
                    raise CannotHandle(
                        f"Tool called count exceeds max tool calls: {self.max_tool_calls}"
                    )
                # 继续执行下一步
                async for chunk in self.step():
                    yield chunk

设计亮点

实际应用场景

场景 1:复杂数据分析

用户问题:"分析上个月各地区销售额,找出增长最快的前 3 个地区,并给出原因分析"

Agent 执行流程:

  1. search_metadata:搜索"销售额"、"地区"相关的表和字段
  2. show_table:获取销售表和地区表的详细结构
  3. execute_sql:查询上个月各地区销售额
  4. execute_sql:查询上上个月各地区销售额用于对比
  5. execute_sql:查询增长最快地区的详细订单数据
  6. 生成分析报告:基于查询结果生成自然语言分析

场景 2:数据探索

用户问题:"我想了解客户表的结构"

Agent 执行流程:

  1. search_metadata:搜索"客户"相关的表
  2. show_table:获取客户表的详细字段信息
  3. execute_sql:查询表的行数和示例数据
  4. 生成描述:用自然语言描述表结构和数据特征

性能优化实践

1. 元数据分层加载

# 初始化时只提供表名摘要
self.db_meta = {
    "schemas": [
        {"name": schema.name, "tables": [{"name": table.name}]}
        for schema in self.meta.schemas.values()
        for table in schema.tables.values()
    ]
}

# 需要时才加载详细字段信息
def show_table(self, table_names: list[str]) -> str:
    meta = self.meta.filter_tables_by_names(...)
    # 返回详细的字段信息

优势

2. 并行工具调用

虽然当前实现是串行执行工具调用,但架构支持并行执行:

# 未来可以支持并行工具调用
async def execute_tools_parallel(self, tool_uses):
    tasks = [self.execute_tool(tool_use) for tool_use in tool_uses]
    results = await asyncio.gather(*tasks)
    return results

3. 结果缓存

# DataFrame 工作空间自动缓存查询结果
self.data_workspace[df_id] = {
    "df": self.last_dataframe,
    "description": description,
    "sql": sql,
}

优势

最佳实践建议

1. Prompt 工程

2. 错误处理

3. 安全控制

4. 可观测性

总结

AskTable 的多智能体架构展示了如何将复杂的 Text-to-SQL 任务分解为可管理的子任务:

  1. 专业化分工:每个 Agent 专注于特定领域(数据库交互、分析规划等)
  2. 工具调用机制:LLM 可以动态调用外部工具获取信息和执行操作
  3. 流式响应:提供实时反馈,提升用户体验
  4. 自动循环执行:无需手动管理对话状态,自动完成多步推理
  5. 安全可控:通过限制和验证机制防止滥用

这种架构不仅适用于 Text-to-SQL,也可以推广到其他需要多步推理和工具调用的 LLM 应用场景,如代码生成、数据分析、自动化运维等。

通过合理的架构设计和工程实践,我们可以构建出既强大又可控的 AI 应用系统。