
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
When building enterprise-grade Text-to-SQL systems, a single LLM call often falls short of handling complex data analysis scenarios. This article takes a deep dive into how AskTable uses a multi-agent architecture to decompose complex tasks into manageable sub-tasks, achieving high-quality natural language to SQL conversion.
Traditional Text-to-SQL systems typically follow a "one-shot generation" approach: user asks a question → LLM generates SQL → execute the query. This approach has clear limitations when facing complex scenarios:
AskTable solves these problems through a multi-agent architecture, where each Agent focuses on a specific task, achieving complex data analysis workflows through tool calling and streaming responses.
The diagram above shows the core Agent execution flow: after receiving a question, the Agent determines whether it needs to call a tool (such as querying metadata or executing SQL). If so, it calls the tool and processes the result, then continues to evaluate whether more tools are needed until the final answer is generated.
AskTable's Agent base class provides complete tool calling and streaming response capabilities:
class Agent:
def __init__(
self,
system_prompt: Any,
max_tool_calls: int | None = None,
max_completions: int | None = None,
model: str | None = None,
):
self.llm_client, default_model = get_current_llm("report")
self.model = model or default_model
# Core state
self.message_builder = ChatMessageBuilder(system_prompt)
self.tools: list[ChatCompletionToolParam] = []
self.actions: dict[str, Callable[..., Any]] = {}
self.output_parser: Callable[..., Any] | None = None
# Safety limits
self.max_tool_calls = max_tool_calls or config.at_agent_max_tool_calls
self.max_completions = max_completions or (config.at_agent_max_tool_calls + 10)
Design Highlights:
add_tool()DBAgent is a specialized agent for database-related tasks, encapsulating core capabilities such as metadata retrieval and SQL execution:
class DBAgent:
def __init__(
self,
prompt_name: str,
datasource: DataSourceAdmin,
meta: MetaAdmin | None = None,
assumed_role: RoleAdmin | None = None,
model: str | None = None,
):
self.datasource = datasource
self.assumed_role = assumed_role
self.meta = meta or get_accessible_meta(datasource, assumed_role)
# Build database metadata summary
self.db_meta = {
"schemas": [
{
"name": schema.name,
"tables": [{"name": table.name}]
}
for schema in self.meta.schemas.values()
for table in schema.tables.values()
]
}
# Initialize Agent and register tools
system_prompt = get_prompt(prompt_name).compile(meta=self.db_meta)
self.agent = Agent(system_prompt=system_prompt, model=model)
Core Tool Functions:
def show_table(
self,
table_names: list[str] = Field(
...,
description="table full names, e.g. `schema_name.table_name`",
),
) -> str:
meta = self.meta.filter_tables_by_names(
[(f.split(".")[0], f.split(".")[1]) for f in table_names]
)
tables = [
table
for schema in meta.schemas.values()
for table in schema.tables.values()
]
return json.dumps([
{
"name": table.name,
"description": table.curr_desc,
"fields": [
{
"name": field.name,
"desc": field.curr_desc,
"data_type": field.data_type,
}
for field in table.fields.values()
],
}
for table in tables
])
async def search_metadata(
self,
queries: list[str] = Field(
...,
description="subqueries to perform semantic search on meta data",
),
keywords: list[str] = Field(
...,
description="keywords to perform full-text search on database content",
),
) -> str:
meta = await retrieve_entities(
meta=self.meta,
datasource=self.datasource,
subqueries=queries,
keywords=keywords,
)
tables = [
table
for schema in meta.schemas.values()
for table in schema.tables.values()
]
return json.dumps([
{
"table_full_name": table.full_name,
"table_description": table.curr_desc,
}
for table in tables
])
async def execute_sql(self, sql: str) -> str:
"""
Execute sql
## DataFrame Management
the data will be stored in the data_workspace automatically
"""
description = sql.strip().split("\n")[0][3:].strip()
def _connect_and_query():
with self.datasource.accessor.connect():
data = self.datasource.accessor.query(sql)
return data
self.last_dataframe = await asyncio.to_thread(_connect_and_query)
df_id = gen_id("df")
dataframe_json = json.dumps({
"id": df_id,
"dataframe": {
"columns": self.last_dataframe.columns.tolist(),
"shape": self.last_dataframe.shape,
"content_head": dataframe_to_dicts(
self.last_dataframe, self.dataframe_serialize_max_rows
),
},
})
self.data_workspace[df_id] = {
"df": self.last_dataframe,
"description": description,
"sql": sql,
}
return dataframe_json
Design Highlights:
show_table when neededasyncio.to_thread to avoid blocking the main threadPlanAgent inherits from DBAgent and specializes in generating data analysis plans:
class PlanAgent(DBAgent):
plan: str | None = None
def __init__(
self,
datasource: DataSourceAdmin,
type: ReportType,
assumed_role: RoleAdmin | None = None,
):
if type == ReportType.summary:
prompt_name = "agent/outline_generator"
elif type == ReportType.analysis:
prompt_name = "agent/analysis_outline_generator"
super().__init__(
prompt_name=prompt_name,
datasource=datasource,
assumed_role=assumed_role
)
self.add_tool(self.show_table)
self.add_tool(self.search_metadata)
self.set_output_parser(self.output_parser)
def output_parser(self, output: str) -> None:
"""Extract content from <plan> </plan> tags in the output"""
import re
match = re.search(r"<plan>(.*?)</plan>", output, re.DOTALL)
if match:
self.plan = match.group(1).strip()
else:
raise errors.ParameterError("No <plan> tag found")
Design Highlights:
AskTable's tool calling mechanism is the core of the multi-agent architecture, enabling seamless integration between the LLM and external tools:
def add_tool(self, tool: Callable[..., Any]) -> None:
tool_name = getattr(tool, "__name__", tool.__class__.__name__)
self.tools.append({
"type": "function",
"function": {
"name": tool_name,
"description": tool.__doc__ or "",
"parameters": TypeAdapter(tool).json_schema(),
},
})
self.actions[tool_name] = tool
Technical Details:
TypeAdapter to automatically generate JSON Schemaasync def _step(self) -> AsyncGenerator[StreamEvent, None]:
self.completion_count += 1
completion = await self.llm_client.chat.completions.create(
messages=self.message_builder.dump_openai(),
model=self.model,
tools=self.tools,
stream=True,
)
# Stream LLM response token by token
async for chunk in completion:
delta = self.message_builder.append_openai_delta(chunk)
if delta:
if isinstance(delta, list):
for event in delta:
yield event
else:
yield delta
# Execute tool calls
for tool_use in self.message_builder.get_unresolved_tool_use_blocks():
func = self.actions[tool_use["name"]]
params = json.loads(tool_use["input"])
try:
func_handler = func(**params)
if iscoroutine(func_handler):
result = await func_handler
else:
result = func_handler
except Exception as e:
result = json.dumps({"error": str(e)})
self.tool_called_count += 1
yield self.message_builder.append_tool_result(tool_use["id"], result)
Design Highlights:
async def run(
self, user_input: str | None = None
) -> AsyncGenerator[StreamEvent, None]:
"""Keep executing until the LLM returns a final answer"""
self.tool_called_count = 0
self.completion_count = 0
# Step 1: Process user input
async for chunk in self.step(user_input):
yield chunk
counter = 0
# Continue until a final answer is reached
while True:
counter += 1
if counter > 50:
break
if self.completion_count >= self.max_completions:
raise CannotHandle(
f"Completion count exceeds max completions: {self.max_completions}"
)
last_message = self.message_builder.dump_anthropic()[-1]
if last_message["role"] == "assistant":
last_block = last_message["content"][-1]
if last_block["type"] == "text":
if not self.output_parser:
break
# Validate output format
try:
self.output_parser(last_block["text"])
break
except Exception as e:
# Format error, ask LLM to regenerate
async for chunk in self.step(str(e)):
yield chunk
elif last_message["role"] == "user":
last_block = last_message["content"][-1]
if last_block["type"] == "tool_result":
if self.tool_called_count >= self.max_tool_calls:
raise CannotHandle(
f"Tool called count exceeds max tool calls: {self.max_tool_calls}"
)
# Continue to next step
async for chunk in self.step():
yield chunk
Design Highlights:
User question: "Analyze last month's sales by region, identify the top 3 fastest-growing regions, and provide cause analysis."
Agent execution flow:
User question: "I want to understand the structure of the customers table."
Agent execution flow:
# Initially, only table name summaries are provided
self.db_meta = {
"schemas": [
{"name": schema.name, "tables": [{"name": table.name}]}
for schema in self.meta.schemas.values()
for table in schema.tables.values()
]
}
# Detailed field info is loaded only when needed
def show_table(self, table_names: list[str]) -> str:
meta = self.meta.filter_tables_by_names(...)
# Return detailed field information
Advantages:
Although the current implementation executes tool calls serially, the architecture supports parallel execution:
# Future support for parallel tool calls
async def execute_tools_parallel(self, tool_uses):
tasks = [self.execute_tool(tool_use) for tool_use in tool_uses]
results = await asyncio.gather(*tasks)
return results
# DataFrame workspace automatically caches query results
self.data_workspace[df_id] = {
"df": self.last_dataframe,
"description": description,
"sql": sql,
}
Advantages:
AskTable's multi-agent architecture demonstrates how to decompose complex Text-to-SQL tasks into manageable sub-tasks:
This architecture is not only applicable to Text-to-SQL but can also be extended to other LLM application scenarios requiring multi-step reasoning and tool calling, such as code generation, data analysis, and automated operations.
Through sound architectural design and engineering practices, we can build AI application systems that are both powerful and controllable.
sidebar.noProgrammingNeeded
sidebar.startFreeTrial