
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
AskTable's core capability comes from its powerful AI Agent system. This article deeply analyzes AskTable AI Agent's architectural design and working principles, helping you understand how to build a production-level AI Agent system.
AskTable's AI Agent system consists of the following core components:
┌─────────────────────────────────────────────────────────┐
│ Agent Base Class │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Message │ │ Tool │ │ Output │ │
│ │ Builder │ │ Registry │ │ Parser │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────┘
▲
│
┌─────────────────┼─────────────────┐
│ │ │
┌────▼────┐ ┌────▼────┐ ┌────▼────┐
│ DB │ │ Data │ │ Chart │
│ Agent │ │ Node │ │ Node │
│ │ │ Agent │ │ Agent │
└─────────┘ └─────────┘ └─────────┘
The AskTable Agent system follows these design principles:
The Agent base class is located in app/atserver/ai/agent_rev.py and provides a complete Agent runtime framework:
class Agent:
def __init__(
self,
system_prompt: Any,
max_tool_calls: int | None = None,
max_completions: int | None = None,
model: str | None = None,
):
self.llm_client, default_model = get_current_llm("report")
self.model = model or default_model
# Core state
self.system_prompt = system_prompt
self.message_builder = ChatMessageBuilder(system_prompt)
self.tools: list[ChatCompletionToolParam] = []
self.actions: dict[str, Callable[..., Any]] = {}
self.output_parser: Callable[..., Any] | None = None
# Limit settings
self.max_tool_calls = max_tool_calls or config.at_agent_max_tool_calls
self.max_completions = max_completions or (config.at_agent_max_tool_calls + 10)
Key Component Descriptions:
message_builder: Manages conversation history, supports OpenAI and Anthropic format conversiontools: Registered tool list, conforming to OpenAI Function Calling specificationsactions: Mapping from tool names to actual execution functionsoutput_parser: Optional output parser for extracting structured resultsAgents dynamically register tools through the add_tool() method:
def add_tool(self, tool: Callable[..., Any]) -> None:
tool_name = getattr(tool, "__name__", tool.__class__.__name__)
self.tools.append({
"type": "function",
"function": {
"name": tool_name,
"description": tool.__doc__ or "",
"parameters": TypeAdapter(tool).json_schema(),
},
})
self.actions[tool_name] = tool
How It Works:
TypeAdapter to automatically generate JSON Schematools listactions dictionaryThis design makes adding new tools very simple—just define a function with type annotations.
The Agent's run() method implements the complete execution loop:
async def run(self, user_input: str | None = None) -> AsyncGenerator[StreamEvent, None]:
# Step 1: Process user input
async for chunk in self.step(user_input):
yield chunk
# Continue executing until completion
while True:
if self.completion_count >= self.max_completions:
raise CannotHandle("Exceeded maximum completion count")
last_message = self.message_builder.dump_anthropic()[-1]
# Check if complete
if last_message["role"] == "assistant":
last_block = last_message["content"][-1]
if last_block["type"] == "text":
if self.output_parser:
try:
self.output_parser(last_block["text"])
break
except Exception as e:
# Parsing failed, feedback error to Agent
async for chunk in self.step(str(e)):
yield chunk
else:
break
# Continue executing tool calls
elif last_message["role"] == "user":
if self.tool_called_count >= self.max_tool_calls:
raise CannotHandle("Exceeded maximum tool call count")
async for chunk in self.step():
yield chunk
Execution Flow:
DBAgent is the most basic database interaction Agent, providing three core tools:
class DBAgent:
def __init__(
self,
prompt_name: str,
datasource: DataSourceAdmin,
meta: MetaAdmin | None = None,
assumed_role: RoleAdmin | None = None,
model: str | None = None,
):
self.datasource = datasource
self.assumed_role = assumed_role
self.meta = meta or get_accessible_meta(datasource, assumed_role)
# Build database metadata
self.db_meta = {
"schemas": [
{
"name": schema.name,
"tables": [{"name": table.name}]
}
for schema in self.meta.schemas.values()
for table in schema.tables.values()
]
}
# Initialize Agent
system_prompt = get_prompt(prompt_name).compile(meta=self.db_meta)
self.agent = Agent(system_prompt=system_prompt, model=model)
# Register tools
self.add_tool = self.agent.add_tool
self.set_output_parser = self.agent.set_output_parser
# Data workspace
self.data_workspace: dict[str, dict[str, Any]] = {}
Core Tools:
def show_table(self, table_names: list[str]) -> str:
"""Display detailed structure of specified tables"""
meta = self.meta.filter_tables_by_names(
[(f.split(".")[0], f.split(".")[1]) for f in table_names]
)
tables = [
table
for schema in meta.schemas.values()
for table in schema.tables.values()
]
return json.dumps([
{
"name": table.name,
"description": table.curr_desc,
"fields": [
{
"name": field.name,
"desc": field.curr_desc,
"data_type": field.data_type,
}
for field in table.fields.values()
],
}
for table in tables
])
async def search_metadata(
self,
queries: list[str], # Semantic search queries
keywords: list[str], # Full-text search keywords
) -> str:
"""Find related tables through semantic and keyword search"""
meta = await retrieve_entities(
meta=self.meta,
datasource=self.datasource,
subqueries=queries,
keywords=keywords,
)
tables = [
table
for schema in meta.schemas.values()
for table in schema.tables.values()
]
return json.dumps([
{
"table_full_name": table.full_name,
"table_description": table.curr_desc,
}
for table in tables
])
async def execute_sql(self, sql: str) -> str:
"""Execute SQL and store results in data workspace"""
# Extract SQL comment as description
description = sql.strip().split("\n")[0][3:].strip()
# Asynchronously execute query
def _connect_and_query():
with self.datasource.accessor.connect():
return self.datasource.accessor.query(sql)
dataframe = await asyncio.to_thread(_connect_and_query)
# Generate DataFrame ID and store
df_id = gen_id("df")
self.data_workspace[df_id] = {
"df": dataframe,
"description": description,
"sql": sql,
}
# Return DataFrame summary
return json.dumps({
"id": df_id,
"dataframe": {
"columns": dataframe.columns.tolist(),
"shape": dataframe.shape,
"content_head": dataframe_to_dicts(
dataframe, self.dataframe_serialize_max_rows
),
},
})
Data Workspace Design:
df_idDataNodeAgentV2 is a specialized Agent for data nodes in Canvas, supporting more advanced features:
class DataNodeAgentV2:
def __init__(
self,
datasource: DataSourceAdmin,
meta: MetaAdmin,
assumed_role: RoleAdmin | None = None,
reference_context: str | None = None,
model: str | None = None,
reasoning_effort: Literal["low", "medium", "high"] = "medium",
history_messages: list[dict] | None = None,
):
self.datasource = datasource
self.meta = meta
self.model = model or get_current_model_name("canvas")
self.reasoning_effort = reasoning_effort
# Virtual file system (for metadata exploration)
self.vfs = VirtualFS(self.meta)
self._shell_interpreter = ShellInterpreter(self.vfs)
# Register tools
self._register_tools()
# Build system prompt
self._system_prompt = self._build_system_prompt(reference_context)
# Data workspace
self.data_workspace: dict[str, dict[str, Any]] = {}
self.submitted_df_id: str | None = None
self.submitted_description: str | None = None
Core Features:
def shell(self, command: str) -> str:
"""Execute shell commands to explore database metadata (ls/cat/find/grep)"""
result = self._shell_interpreter.execute(command)
return result
DataNodeAgentV2 provides a virtual file system that maps database metadata as a file system structure:
/
├── schema1/
│ ├── table1.table
│ ├── table2.table
│ └── ...
├── schema2/
│ └── ...
Agents can use commands like ls, cat, find, grep to explore metadata, just like operating a real file system.
response = await self._client.chat.completions.create(
model=self.model,
messages=messages,
tools=self._tools,
stream=True,
max_tokens=MAX_TOKENS,
extra_body={"reasoning": {"effort": self.reasoning_effort}},
)
Through the reasoning_effort parameter controlling AI's thinking depth:
low: Fast response, suitable for simple queriesmedium: Balanced performance and quality (default)high: Deep thinking, suitable for complex analysisdef submit_node(
self,
description: str,
df_id: str | None = None,
) -> str:
"""
Submit final node result
If unable to answer the question (no related tables/fields), set df_id to None
and explain the reason in description
"""
if df_id is not None and df_id not in self.data_workspace:
raise ValueError(f"DataFrame {df_id} not in workspace")
self.submitted_df_id = df_id
self.submitted_description = description
status_msg = "error" if df_id is None else "success"
return json.dumps({
"df_id": df_id,
"description": description,
"status": status_msg
})
ChartNodeAgent is responsible for generating visualization charts based on data nodes:
class ChartNodeAgent:
def __init__(
self,
parent_nodes_context: list[dict],
history_messages: list[dict] | None = None,
):
# Format parent node information
parent_info_parts = []
for i, node in enumerate(parent_nodes_context, 1):
parent_info_parts.append(f"Node {i} (ID: {node['id']}):")
if node.get("description"):
parent_info_parts.append(f" Description: {node['description']}")
if node.get("sql"):
parent_info_parts.append(f" SQL: {node['sql']}")
if node.get("dataframe"):
df = node["dataframe"]
parent_info_parts.append(
f" Columns: {', '.join(df.get('columns', []))}"
)
formatted_parent_info = "\n".join(parent_info_parts)
# Build system prompt
system_prompt = get_prompt("agent/canvas/create_chart_node").compile(
formatted_parent_info=formatted_parent_info
)
self.agent = Agent(
system_prompt=system_prompt,
model=get_current_model_name("canvas")
)
self.agent.add_tool(self.submit_chart_code)
Core Tool:
def submit_chart_code(
self,
description: str,
code: str | None = None,
) -> str:
"""
Submit final chart code
If unable to generate chart, set code to None and explain reason in description
"""
if code is not None:
try:
# Compile JSX code
self.compiled_code = compile_jsx(code)
self.source_code = code
except Exception as e:
raise ValueError(f"Code compilation failed: {str(e)}")
else:
self.compiled_code = None
self.source_code = None
self.submitted_description = description
status_msg = "error" if code is None else "success"
return json.dumps({
"description": description,
"status": status_msg,
"has_code": code is not None,
})
ReportAgent is used for generating data reports, supporting Python code execution:
class ReportAgent(DBAgent):
def __init__(
self,
datasource: DataSourceAdmin,
assumed_role: RoleAdmin | None = None,
):
super().__init__(
prompt_name="agent/report_generator",
datasource=datasource,
assumed_role=assumed_role,
)
self.add_tool(self.show_table)
self.add_tool(self.search_metadata)
self.add_tool(self.execute_sql)
self.set_output_parser(self.output_parser)
Output Parser:
def output_parser(self, output: str) -> None:
# Extract code from <code>...</code> tags
pattern = r"<code>(.*?)</code>"
match = re.search(pattern, output, re.DOTALL)
if not match:
raise ValueError("Invalid output format, expected: <code>...</code>")
code = match.group(1).strip()
# Extract all df_ids from load_dataframe('...')
load_df_pattern = r"load_dataframe\(\s*['\"]([^'\"]+)['\"]"
referenced_dataframes = re.findall(load_df_pattern, code)
if not referenced_dataframes:
raise ValueError("No load_dataframe('df_id') pattern found in code")
# Compile JSX code
self.compiled_code = compile_jsx(code)
self.source_code = code
# Check if referenced dataframes are all in workspace
missing_ids = set(referenced_dataframes) - set(self.data_workspace.keys())
if missing_ids:
raise ValueError(f"Referenced dataframes {missing_ids} not in workspace")
self.referenced_dataframes = referenced_dataframes
AskTable uses Langfuse to manage Prompts, supporting version control and dynamic updates:
class PromptProxy:
def __init__(self, from_local: bool):
self._from_local = from_local
if not self._from_local:
# Get from Langfuse at runtime
self.prompt_cache = None
else:
# Load from local assets/prompts.json
with open("assets/prompts.json") as f:
prompts = json.load(f)
self.prompt_cache = {
prompt.name: TextPromptClient(prompt)
for prompt in prompts
}
def get_prompt(self, prompt_name: str) -> PromptClient:
if not self._from_local:
return langfuse.get_prompt(
prompt_name,
label=config.observer_prompt_label
)
else:
return self.prompt_cache[prompt_name]
Prompts support Jinja2 template syntax for dynamic context injection:
# Get Prompt
system_prompt = get_prompt("agent/canvas/data_agent").compile(
meta=db_meta,
history=reference_context
)
Prompt Naming Conventions:
agent/outline_generator: Outline generation Agentagent/report_generator: Report generation Agentagent/canvas/data_agent: Canvas data node Agentagent/canvas/create_chart_node: Canvas chart node AgentAskTable defines multiple streaming event types:
@dataclass
class TextDelta:
type: Literal["text"]
text: str
@dataclass
class AssistantStreamEvent:
role: Literal["assistant"]
content: TextDelta
@dataclass
class ToolUseStreamEvent:
role: Literal["assistant"]
tool_use: dict
@dataclass
class ToolResultStreamEvent:
role: Literal["user"]
tool_result: dict
async def run(self, question: str) -> AsyncGenerator[StreamEvent, None]:
# Send user message
self._message_builder.append_openai_message({
"role": "user",
"content": question
})
for _ in range(MAX_ITERATIONS):
# Call LLM
response = await self._client.chat.completions.create(
model=self.model,
messages=messages,
tools=self._tools,
stream=True,
max_tokens=MAX_TOKENS,
)
# Stream receive response
async for chunk in response:
event = self._message_builder.append_openai_delta(chunk)
if event:
yield event # Return to frontend in real-time
# Execute tool calls
tool_use_blocks = self._message_builder.get_unresolved_tool_use_blocks()
if not tool_use_blocks:
return # Complete
for tool_use in tool_use_blocks:
result = await self._execute_tool_use(tool_use)
yield self._message_builder.append_tool_result(
tool_use["id"],
result
)
Advantages of Streaming Response:
async def _execute_tool_use(self, tool_use: dict) -> str:
func_name = tool_use["name"]
func_args_str = tool_use["input"]
func = self._actions.get(func_name)
if not func:
return json.dumps({"error": f"Unknown tool: {func_name}"})
try:
args = json.loads(func_args_str)
if iscoroutinefunction(func):
result = await func(**args)
else:
result = func(**args)
return result
except Exception as e:
return json.dumps({"error": str(e)})
Error Handling Strategy:
if last_block["type"] == "text":
if self.output_parser:
try:
self.output_parser(last_block["text"])
break
except Exception as e:
# Feedback parsing error to LLM
async for chunk in self.step(str(e)):
yield chunk
Automatic Correction Mechanism:
# Maximum tool call count
if self.tool_called_count >= self.max_tool_calls:
raise CannotHandle("Exceeded maximum tool call count")
# Maximum completion count
if self.completion_count >= self.max_completions:
raise CannotHandle("Exceeded maximum completion count")
Protection Mechanisms:
Good Tool Design:
def execute_sql(
self,
sql: str = Field(..., description="SQL query to execute")
) -> str:
"""
Execute SQL query
## Code Style
Describe query purpose at the beginning of SQL with comments:
```sql
-- Sales of all stores in East China last week
SELECT ...
```
## DataFrame Management
Query results are automatically stored in data workspace
"""
...
Design Points:
Prompt Structure:
You are a data analysis Agent, responsible for generating SQL queries based on user questions.
## Available Tools
- show_table: View table structure
- search_metadata: Search related tables
- execute_sql: Execute SQL query
## Workflow
1. Use search_metadata to find related tables
2. Use show_table to view table structure
3. Write and execute SQL query
4. Call submit_node to submit results
## Database Metadata
{{ meta }}
## Precautions
- SQL must start with comment explaining query purpose
- Prefer using existing tables and fields
- If related data not found, set df_id to None
Prompt Points:
History Message Persistence:
# Export conversation history
messages = agent.get_messages()
# Save to database
save_to_database(messages)
# Restore conversation
agent = DataNodeAgentV2.create(
datasource=datasource,
history_messages=messages
)
History Message Purpose:
Prompt Caching:
messages = self._message_builder.dump_openai(cache_control=True)
Enabling Prompt Caching can:
Concurrent Execution:
# Concurrently execute multiple independent tool calls
results = await asyncio.gather(
self.search_metadata(["sales"], ["East China"]),
self.search_metadata(["inventory"], ["warehouse"]),
)
Data Sampling:
# Only return first N rows of data to LLM
"content_head": dataframe_to_dicts(
dataframe,
DATAFRAME_SERIALIZE_MAX_ROWS
)
AskTable's AI Agent system demonstrates how to build a production-level Agent architecture:
By understanding these design principles and implementation details, you can build your own AI Agent system or better utilize AskTable's Agent capabilities.
sidebar.noProgrammingNeeded
sidebar.startFreeTrial