AskTable
sidebar.freeTrial

Multi-Agent Architecture: Engineering a Text-to-SQL System

AskTable Team
AskTable Team 2026-03-04

When building enterprise-grade Text-to-SQL systems, a single LLM call often falls short of handling complex data analysis scenarios. This article takes a deep dive into how AskTable uses a multi-agent architecture to decompose complex tasks into manageable sub-tasks, achieving high-quality natural language to SQL conversion.

Why a Multi-Agent Architecture?

Traditional Text-to-SQL systems typically follow a "one-shot generation" approach: user asks a question → LLM generates SQL → execute the query. This approach has clear limitations when facing complex scenarios:

  1. Context overload: Stuffing all database metadata into the LLM at once easily exceeds token limits
  2. No reasoning chain: Cannot handle complex queries requiring multi-step reasoning
  3. Hard error recovery: Cannot self-correct after generating incorrect SQL
  4. Limited tool capabilities: Cannot dynamically call external tools to gather information

AskTable solves these problems through a multi-agent architecture, where each Agent focuses on a specific task, achieving complex data analysis workflows through tool calling and streaming responses.

Agent Execution Flow

加载图表中...

The diagram above shows the core Agent execution flow: after receiving a question, the Agent determines whether it needs to call a tool (such as querying metadata or executing SQL). If so, it calls the tool and processes the result, then continues to evaluate whether more tools are needed until the final answer is generated.

Core Architecture Design

1. Base Agent Class

AskTable's Agent base class provides complete tool calling and streaming response capabilities:

class Agent:
    def __init__(
        self,
        system_prompt: Any,
        max_tool_calls: int | None = None,
        max_completions: int | None = None,
        model: str | None = None,
    ):
        self.llm_client, default_model = get_current_llm("report")
        self.model = model or default_model

        # Core state
        self.message_builder = ChatMessageBuilder(system_prompt)
        self.tools: list[ChatCompletionToolParam] = []
        self.actions: dict[str, Callable[..., Any]] = {}
        self.output_parser: Callable[..., Any] | None = None

        # Safety limits
        self.max_tool_calls = max_tool_calls or config.at_agent_max_tool_calls
        self.max_completions = max_completions or (config.at_agent_max_tool_calls + 10)

Design Highlights:

  • Tool registration: Dynamically register tool functions via add_tool()
  • Output parser: Supports custom output format validation and parsing
  • Safety limits: Prevents infinite tool call and LLM request loops
  • Streaming: Supports real-time streaming responses for better user experience

2. DBAgent: Database Interaction Specialist

DBAgent is a specialized agent for database-related tasks, encapsulating core capabilities such as metadata retrieval and SQL execution:

class DBAgent:
    def __init__(
        self,
        prompt_name: str,
        datasource: DataSourceAdmin,
        meta: MetaAdmin | None = None,
        assumed_role: RoleAdmin | None = None,
        model: str | None = None,
    ):
        self.datasource = datasource
        self.assumed_role = assumed_role
        self.meta = meta or get_accessible_meta(datasource, assumed_role)

        # Build database metadata summary
        self.db_meta = {
            "schemas": [
                {
                    "name": schema.name,
                    "tables": [{"name": table.name}]
                }
                for schema in self.meta.schemas.values()
                for table in schema.tables.values()
            ]
        }

        # Initialize Agent and register tools
        system_prompt = get_prompt(prompt_name).compile(meta=self.db_meta)
        self.agent = Agent(system_prompt=system_prompt, model=model)

Core Tool Functions:

show_table: Get Detailed Table Information

def show_table(
    self,
    table_names: list[str] = Field(
        ...,
        description="table full names, e.g. `schema_name.table_name`",
    ),
) -> str:
    meta = self.meta.filter_tables_by_names(
        [(f.split(".")[0], f.split(".")[1]) for f in table_names]
    )
    tables = [
        table
        for schema in meta.schemas.values()
        for table in schema.tables.values()
    ]
    return json.dumps([
        {
            "name": table.name,
            "description": table.curr_desc,
            "fields": [
                {
                    "name": field.name,
                    "desc": field.curr_desc,
                    "data_type": field.data_type,
                }
                for field in table.fields.values()
            ],
        }
        for table in tables
    ])
async def search_metadata(
    self,
    queries: list[str] = Field(
        ...,
        description="subqueries to perform semantic search on meta data",
    ),
    keywords: list[str] = Field(
        ...,
        description="keywords to perform full-text search on database content",
    ),
) -> str:
    meta = await retrieve_entities(
        meta=self.meta,
        datasource=self.datasource,
        subqueries=queries,
        keywords=keywords,
    )
    tables = [
        table
        for schema in meta.schemas.values()
        for table in schema.tables.values()
    ]
    return json.dumps([
        {
            "table_full_name": table.full_name,
            "table_description": table.curr_desc,
        }
        for table in tables
    ])

execute_sql: Execute SQL and Manage Results

async def execute_sql(self, sql: str) -> str:
    """
    Execute sql

    ## DataFrame Management
    the data will be stored in the data_workspace automatically
    """
    description = sql.strip().split("\n")[0][3:].strip()

    def _connect_and_query():
        with self.datasource.accessor.connect():
            data = self.datasource.accessor.query(sql)
        return data

    self.last_dataframe = await asyncio.to_thread(_connect_and_query)
    df_id = gen_id("df")
    dataframe_json = json.dumps({
        "id": df_id,
        "dataframe": {
            "columns": self.last_dataframe.columns.tolist(),
            "shape": self.last_dataframe.shape,
            "content_head": dataframe_to_dicts(
                self.last_dataframe, self.dataframe_serialize_max_rows
            ),
        },
    })
    self.data_workspace[df_id] = {
        "df": self.last_dataframe,
        "description": description,
        "sql": sql,
    }
    return dataframe_json

Design Highlights:

  • On-demand metadata loading: Initially only table names are provided; detailed info is fetched via show_table when needed
  • Semantic search integration: Finds relevant tables and fields through vector retrieval
  • DataFrame workspace: Automatically manages query results, supporting multi-step analysis
  • Async execution: Uses asyncio.to_thread to avoid blocking the main thread

3. PlanAgent: Analysis Planning Specialist

PlanAgent inherits from DBAgent and specializes in generating data analysis plans:

class PlanAgent(DBAgent):
    plan: str | None = None

    def __init__(
        self,
        datasource: DataSourceAdmin,
        type: ReportType,
        assumed_role: RoleAdmin | None = None,
    ):
        if type == ReportType.summary:
            prompt_name = "agent/outline_generator"
        elif type == ReportType.analysis:
            prompt_name = "agent/analysis_outline_generator"

        super().__init__(
            prompt_name=prompt_name,
            datasource=datasource,
            assumed_role=assumed_role
        )

        self.add_tool(self.show_table)
        self.add_tool(self.search_metadata)
        self.set_output_parser(self.output_parser)

    def output_parser(self, output: str) -> None:
        """Extract content from <plan> </plan> tags in the output"""
        import re
        match = re.search(r"<plan>(.*?)</plan>", output, re.DOTALL)
        if match:
            self.plan = match.group(1).strip()
        else:
            raise errors.ParameterError("No <plan> tag found")

Design Highlights:

  • Specialized prompts: Selects different prompt templates based on report type
  • Structured output: Forces the LLM to output structured plans via XML tags
  • Output validation: Automatically validates output format; requests regeneration on format errors

Tool Calling Mechanism

AskTable's tool calling mechanism is the core of the multi-agent architecture, enabling seamless integration between the LLM and external tools:

1. Tool Registration

def add_tool(self, tool: Callable[..., Any]) -> None:
    tool_name = getattr(tool, "__name__", tool.__class__.__name__)
    self.tools.append({
        "type": "function",
        "function": {
            "name": tool_name,
            "description": tool.__doc__ or "",
            "parameters": TypeAdapter(tool).json_schema(),
        },
    })
    self.actions[tool_name] = tool

Technical Details:

  • Uses Pydantic's TypeAdapter to automatically generate JSON Schema
  • Automatically extracts parameter definitions from function signatures and type annotations
  • Supports both synchronous and asynchronous tool functions

2. Streaming Execution

async def _step(self) -> AsyncGenerator[StreamEvent, None]:
    self.completion_count += 1
    completion = await self.llm_client.chat.completions.create(
        messages=self.message_builder.dump_openai(),
        model=self.model,
        tools=self.tools,
        stream=True,
    )

    # Stream LLM response token by token
    async for chunk in completion:
        delta = self.message_builder.append_openai_delta(chunk)
        if delta:
            if isinstance(delta, list):
                for event in delta:
                    yield event
            else:
                yield delta

    # Execute tool calls
    for tool_use in self.message_builder.get_unresolved_tool_use_blocks():
        func = self.actions[tool_use["name"]]
        params = json.loads(tool_use["input"])
        try:
            func_handler = func(**params)
            if iscoroutine(func_handler):
                result = await func_handler
            else:
                result = func_handler
        except Exception as e:
            result = json.dumps({"error": str(e)})

        self.tool_called_count += 1
        yield self.message_builder.append_tool_result(tool_use["id"], result)

Design Highlights:

  • True streaming: Returns token by token instead of waiting for a complete response
  • Automatic tool execution: Automatically executes and returns results after the LLM requests a tool call
  • Error handling: Returns error information to the LLM when tool execution fails
  • Sync/async compatibility: Automatically detects and properly handles both sync and async tools

3. Automatic Loop Execution

async def run(
    self, user_input: str | None = None
) -> AsyncGenerator[StreamEvent, None]:
    """Keep executing until the LLM returns a final answer"""
    self.tool_called_count = 0
    self.completion_count = 0

    # Step 1: Process user input
    async for chunk in self.step(user_input):
        yield chunk

    counter = 0
    # Continue until a final answer is reached
    while True:
        counter += 1
        if counter > 50:
            break
        if self.completion_count >= self.max_completions:
            raise CannotHandle(
                f"Completion count exceeds max completions: {self.max_completions}"
            )

        last_message = self.message_builder.dump_anthropic()[-1]
        if last_message["role"] == "assistant":
            last_block = last_message["content"][-1]
            if last_block["type"] == "text":
                if not self.output_parser:
                    break
                # Validate output format
                try:
                    self.output_parser(last_block["text"])
                    break
                except Exception as e:
                    # Format error, ask LLM to regenerate
                    async for chunk in self.step(str(e)):
                        yield chunk
        elif last_message["role"] == "user":
            last_block = last_message["content"][-1]
            if last_block["type"] == "tool_result":
                if self.tool_called_count >= self.max_tool_calls:
                    raise CannotHandle(
                        f"Tool called count exceeds max tool calls: {self.max_tool_calls}"
                    )
                # Continue to next step
                async for chunk in self.step():
                    yield chunk

Design Highlights:

  • Automatic multi-turn conversation: No manual conversation state management needed
  • Output validation loop: Automatically requests LLM correction on format errors
  • Safety limits: Prevents infinite loops from consuming resources
  • Complete reasoning chain: Retains all intermediate steps for debugging and auditing

Practical Application Scenarios

Scenario 1: Complex Data Analysis

User question: "Analyze last month's sales by region, identify the top 3 fastest-growing regions, and provide cause analysis."

Agent execution flow:

  1. search_metadata: Search for tables and fields related to "sales" and "region"
  2. show_table: Get detailed structure of sales and region tables
  3. execute_sql: Query last month's sales by region
  4. execute_sql: Query the previous month's sales by region for comparison
  5. execute_sql: Query detailed order data for the fastest-growing regions
  6. Generate analysis report: Produce natural language analysis based on query results

Scenario 2: Data Exploration

User question: "I want to understand the structure of the customers table."

Agent execution flow:

  1. search_metadata: Search for tables related to "customer"
  2. show_table: Get detailed field information for the customers table
  3. execute_sql: Query row count and sample data
  4. Generate description: Describe table structure and data characteristics in natural language

Performance Optimization Practices

1. Layered Metadata Loading

# Initially, only table name summaries are provided
self.db_meta = {
    "schemas": [
        {"name": schema.name, "tables": [{"name": table.name}]}
        for schema in self.meta.schemas.values()
        for table in schema.tables.values()
    ]
}

# Detailed field info is loaded only when needed
def show_table(self, table_names: list[str]) -> str:
    meta = self.meta.filter_tables_by_names(...)
    # Return detailed field information

Advantages:

  • Reduces initial token consumption
  • Avoids context overload
  • On-demand loading improves response speed

2. Parallel Tool Calling

Although the current implementation executes tool calls serially, the architecture supports parallel execution:

# Future support for parallel tool calls
async def execute_tools_parallel(self, tool_uses):
    tasks = [self.execute_tool(tool_use) for tool_use in tool_uses]
    results = await asyncio.gather(*tasks)
    return results

3. Result Caching

# DataFrame workspace automatically caches query results
self.data_workspace[df_id] = {
    "df": self.last_dataframe,
    "description": description,
    "sql": sql,
}

Advantages:

  • Avoids redundant queries
  • Supports multi-step analysis
  • Reduces database load

Best Practice Recommendations

1. Prompt Engineering

  • Clarify tool purposes: Clearly describe usage scenarios in tool docstrings
  • Provide examples: Include tool calling examples in the system prompt
  • Set constraints: Clearly inform the LLM about tool calling limitations and caveats

2. Error Handling

  • Graceful degradation: Return meaningful error messages when tool execution fails
  • Retry mechanism: Automatically retry for transient errors (e.g., network timeouts)
  • Error reporting: Return error information to the LLM so it can try alternative approaches

3. Safety Control

  • Limit call counts: Prevent infinite loops
  • Timeout control: Avoid long blocking
  • Permission checks: Validate user permissions before tool execution

4. Observability

  • Complete logging: Record all tool calls and LLM responses
  • Performance monitoring: Track the time taken for each step
  • Error tracking: Record failure reasons and context

Summary

AskTable's multi-agent architecture demonstrates how to decompose complex Text-to-SQL tasks into manageable sub-tasks:

  1. Specialized分工: Each Agent focuses on a specific domain (database interaction, analysis planning, etc.)
  2. Tool calling mechanism: The LLM can dynamically call external tools to gather information and perform operations
  3. Streaming responses: Provides real-time feedback for a better user experience
  4. Automatic loop execution: No manual conversation state management, automatic multi-step reasoning
  5. Safety and control: Prevents abuse through limits and validation mechanisms

This architecture is not only applicable to Text-to-SQL but can also be extended to other LLM application scenarios requiring multi-step reasoning and tool calling, such as code generation, data analysis, and automated operations.

Through sound architectural design and engineering practices, we can build AI application systems that are both powerful and controllable.

cta.readyToSimplify

sidebar.noProgrammingNeededsidebar.startFreeTrial

cta.noCreditCard
cta.quickStart
cta.dbSupport