AskTable
sidebar.freeTrial

How AskTable AI Agent Works: From Architecture to Implementation

AskTable Team
AskTable Team 2025-12-30

How AskTable AI Agent Works: From Architecture to Implementation

AskTable's core capability comes from its powerful AI Agent system. This article deeply analyzes AskTable AI Agent's architectural design and working principles, helping you understand how to build a production-level AI Agent system.

1. Agent Architecture Overview

1.1 Core Components

AskTable's AI Agent system consists of the following core components:

┌─────────────────────────────────────────────────────────┐
│                    Agent Base Class                      │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐  │
│  │ Message      │  │ Tool         │  │ Output       │  │
│  │ Builder      │  │ Registry     │  │ Parser       │  │
│  └──────────────┘  └──────────────┘  └──────────────┘  │
└─────────────────────────────────────────────────────────┘
                          ▲
                          │
        ┌─────────────────┼─────────────────┐
        │                 │                 │
   ┌────▼────┐      ┌────▼────┐      ┌────▼────┐
   │ DB      │      │ Data    │      │ Chart   │
   │ Agent   │      │ Node    │      │ Node    │
   │         │      │ Agent   │      │ Agent   │
   └─────────┘      └─────────┘      └─────────┘

1.2 Design Principles

The AskTable Agent system follows these design principles:

  • Modularization: Base Agent class provides common capabilities; specialized Agents inherit or compose to implement specific functions
  • Tool-driven: Extend Agent capabilities through dynamic tool registration
  • Streaming response: Return AI thinking process and execution results in real-time
  • State management: Maintain conversation history and data workspace
  • Error handling: Gracefully handle tool execution failures and output parsing errors

2. Agent Base Framework

2.1 Agent Class Core Implementation

The Agent base class is located in app/atserver/ai/agent_rev.py and provides a complete Agent runtime framework:

class Agent:
    def __init__(
        self,
        system_prompt: Any,
        max_tool_calls: int | None = None,
        max_completions: int | None = None,
        model: str | None = None,
    ):
        self.llm_client, default_model = get_current_llm("report")
        self.model = model or default_model

        # Core state
        self.system_prompt = system_prompt
        self.message_builder = ChatMessageBuilder(system_prompt)
        self.tools: list[ChatCompletionToolParam] = []
        self.actions: dict[str, Callable[..., Any]] = {}
        self.output_parser: Callable[..., Any] | None = None

        # Limit settings
        self.max_tool_calls = max_tool_calls or config.at_agent_max_tool_calls
        self.max_completions = max_completions or (config.at_agent_max_tool_calls + 10)

Key Component Descriptions:

  • message_builder: Manages conversation history, supports OpenAI and Anthropic format conversion
  • tools: Registered tool list, conforming to OpenAI Function Calling specifications
  • actions: Mapping from tool names to actual execution functions
  • output_parser: Optional output parser for extracting structured results

2.2 Tool Registration Mechanism

Agents dynamically register tools through the add_tool() method:

def add_tool(self, tool: Callable[..., Any]) -> None:
    tool_name = getattr(tool, "__name__", tool.__class__.__name__)
    self.tools.append({
        "type": "function",
        "function": {
            "name": tool_name,
            "description": tool.__doc__ or "",
            "parameters": TypeAdapter(tool).json_schema(),
        },
    })
    self.actions[tool_name] = tool

How It Works:

  1. Extract name and docstring from the function
  2. Use Pydantic's TypeAdapter to automatically generate JSON Schema
  3. Add tool definition to the tools list
  4. Map execution function to the actions dictionary

This design makes adding new tools very simple—just define a function with type annotations.

2.3 Streaming Execution Loop

The Agent's run() method implements the complete execution loop:

async def run(self, user_input: str | None = None) -> AsyncGenerator[StreamEvent, None]:
    # Step 1: Process user input
    async for chunk in self.step(user_input):
        yield chunk

    # Continue executing until completion
    while True:
        if self.completion_count >= self.max_completions:
            raise CannotHandle("Exceeded maximum completion count")

        last_message = self.message_builder.dump_anthropic()[-1]

        # Check if complete
        if last_message["role"] == "assistant":
            last_block = last_message["content"][-1]
            if last_block["type"] == "text":
                if self.output_parser:
                    try:
                        self.output_parser(last_block["text"])
                        break
                    except Exception as e:
                        # Parsing failed, feedback error to Agent
                        async for chunk in self.step(str(e)):
                            yield chunk
                else:
                    break

        # Continue executing tool calls
        elif last_message["role"] == "user":
            if self.tool_called_count >= self.max_tool_calls:
                raise CannotHandle("Exceeded maximum tool call count")
            async for chunk in self.step():
                yield chunk

Execution Flow:

  1. Send user input to LLM
  2. Stream receive LLM responses
  3. If LLM calls a tool, execute the tool and return results
  4. Send tool results back to LLM
  5. Repeat steps 2-4 until LLM returns final text
  6. If output_parser is set, verify output format
  7. If parsing fails, feedback error to LLM for retry

3. Specialized Agent Implementation

3.1 DBAgent: Database Query Agent

DBAgent is the most basic database interaction Agent, providing three core tools:

class DBAgent:
    def __init__(
        self,
        prompt_name: str,
        datasource: DataSourceAdmin,
        meta: MetaAdmin | None = None,
        assumed_role: RoleAdmin | None = None,
        model: str | None = None,
    ):
        self.datasource = datasource
        self.assumed_role = assumed_role
        self.meta = meta or get_accessible_meta(datasource, assumed_role)

        # Build database metadata
        self.db_meta = {
            "schemas": [
                {
                    "name": schema.name,
                    "tables": [{"name": table.name}]
                }
                for schema in self.meta.schemas.values()
                for table in schema.tables.values()
            ]
        }

        # Initialize Agent
        system_prompt = get_prompt(prompt_name).compile(meta=self.db_meta)
        self.agent = Agent(system_prompt=system_prompt, model=model)

        # Register tools
        self.add_tool = self.agent.add_tool
        self.set_output_parser = self.agent.set_output_parser

        # Data workspace
        self.data_workspace: dict[str, dict[str, Any]] = {}

Core Tools:

1. show_table: Display Table Structure

def show_table(self, table_names: list[str]) -> str:
    """Display detailed structure of specified tables"""
    meta = self.meta.filter_tables_by_names(
        [(f.split(".")[0], f.split(".")[1]) for f in table_names]
    )
    tables = [
        table
        for schema in meta.schemas.values()
        for table in schema.tables.values()
    ]
    return json.dumps([
        {
            "name": table.name,
            "description": table.curr_desc,
            "fields": [
                {
                    "name": field.name,
                    "desc": field.curr_desc,
                    "data_type": field.data_type,
                }
                for field in table.fields.values()
            ],
        }
        for table in tables
    ])

2. search_metadata: Semantic Search Metadata

async def search_metadata(
    self,
    queries: list[str],  # Semantic search queries
    keywords: list[str],  # Full-text search keywords
) -> str:
    """Find related tables through semantic and keyword search"""
    meta = await retrieve_entities(
        meta=self.meta,
        datasource=self.datasource,
        subqueries=queries,
        keywords=keywords,
    )
    tables = [
        table
        for schema in meta.schemas.values()
        for table in schema.tables.values()
    ]
    return json.dumps([
        {
            "table_full_name": table.full_name,
            "table_description": table.curr_desc,
        }
        for table in tables
    ])

3. execute_sql: Execute SQL Query

async def execute_sql(self, sql: str) -> str:
    """Execute SQL and store results in data workspace"""
    # Extract SQL comment as description
    description = sql.strip().split("\n")[0][3:].strip()

    # Asynchronously execute query
    def _connect_and_query():
        with self.datasource.accessor.connect():
            return self.datasource.accessor.query(sql)

    dataframe = await asyncio.to_thread(_connect_and_query)

    # Generate DataFrame ID and store
    df_id = gen_id("df")
    self.data_workspace[df_id] = {
        "df": dataframe,
        "description": description,
        "sql": sql,
    }

    # Return DataFrame summary
    return json.dumps({
        "id": df_id,
        "dataframe": {
            "columns": dataframe.columns.tolist(),
            "shape": dataframe.shape,
            "content_head": dataframe_to_dicts(
                dataframe, self.dataframe_serialize_max_rows
            ),
        },
    })

Data Workspace Design:

  • Each SQL execution generates a unique df_id
  • Complete DataFrame is stored in memory
  • Only the first N rows of data are returned to LLM to avoid token waste
  • Supports management and referencing of multiple DataFrames

3.2 DataNodeAgentV2: Canvas Data Node Agent

DataNodeAgentV2 is a specialized Agent for data nodes in Canvas, supporting more advanced features:

class DataNodeAgentV2:
    def __init__(
        self,
        datasource: DataSourceAdmin,
        meta: MetaAdmin,
        assumed_role: RoleAdmin | None = None,
        reference_context: str | None = None,
        model: str | None = None,
        reasoning_effort: Literal["low", "medium", "high"] = "medium",
        history_messages: list[dict] | None = None,
    ):
        self.datasource = datasource
        self.meta = meta
        self.model = model or get_current_model_name("canvas")
        self.reasoning_effort = reasoning_effort

        # Virtual file system (for metadata exploration)
        self.vfs = VirtualFS(self.meta)
        self._shell_interpreter = ShellInterpreter(self.vfs)

        # Register tools
        self._register_tools()

        # Build system prompt
        self._system_prompt = self._build_system_prompt(reference_context)

        # Data workspace
        self.data_workspace: dict[str, dict[str, Any]] = {}
        self.submitted_df_id: str | None = None
        self.submitted_description: str | None = None

Core Features:

1. Shell Tool: Metadata Exploration

def shell(self, command: str) -> str:
    """Execute shell commands to explore database metadata (ls/cat/find/grep)"""
    result = self._shell_interpreter.execute(command)
    return result

DataNodeAgentV2 provides a virtual file system that maps database metadata as a file system structure:

/
├── schema1/
│   ├── table1.table
│   ├── table2.table
│   └── ...
├── schema2/
│   └── ...

Agents can use commands like ls, cat, find, grep to explore metadata, just like operating a real file system.

2. Reasoning Support

response = await self._client.chat.completions.create(
    model=self.model,
    messages=messages,
    tools=self._tools,
    stream=True,
    max_tokens=MAX_TOKENS,
    extra_body={"reasoning": {"effort": self.reasoning_effort}},
)

Through the reasoning_effort parameter controlling AI's thinking depth:

  • low: Fast response, suitable for simple queries
  • medium: Balanced performance and quality (default)
  • high: Deep thinking, suitable for complex analysis

3. submit_node: Submit Node Result

def submit_node(
    self,
    description: str,
    df_id: str | None = None,
) -> str:
    """
    Submit final node result

    If unable to answer the question (no related tables/fields), set df_id to None
    and explain the reason in description
    """
    if df_id is not None and df_id not in self.data_workspace:
        raise ValueError(f"DataFrame {df_id} not in workspace")

    self.submitted_df_id = df_id
    self.submitted_description = description

    status_msg = "error" if df_id is None else "success"
    return json.dumps({
        "df_id": df_id,
        "description": description,
        "status": status_msg
    })

3.3 ChartNodeAgent: Chart Generation Agent

ChartNodeAgent is responsible for generating visualization charts based on data nodes:

class ChartNodeAgent:
    def __init__(
        self,
        parent_nodes_context: list[dict],
        history_messages: list[dict] | None = None,
    ):
        # Format parent node information
        parent_info_parts = []
        for i, node in enumerate(parent_nodes_context, 1):
            parent_info_parts.append(f"Node {i} (ID: {node['id']}):")
            if node.get("description"):
                parent_info_parts.append(f"  Description: {node['description']}")
            if node.get("sql"):
                parent_info_parts.append(f"  SQL: {node['sql']}")
            if node.get("dataframe"):
                df = node["dataframe"]
                parent_info_parts.append(
                    f"  Columns: {', '.join(df.get('columns', []))}"
                )

        formatted_parent_info = "\n".join(parent_info_parts)

        # Build system prompt
        system_prompt = get_prompt("agent/canvas/create_chart_node").compile(
            formatted_parent_info=formatted_parent_info
        )

        self.agent = Agent(
            system_prompt=system_prompt,
            model=get_current_model_name("canvas")
        )
        self.agent.add_tool(self.submit_chart_code)

Core Tool:

def submit_chart_code(
    self,
    description: str,
    code: str | None = None,
) -> str:
    """
    Submit final chart code

    If unable to generate chart, set code to None and explain reason in description
    """
    if code is not None:
        try:
            # Compile JSX code
            self.compiled_code = compile_jsx(code)
            self.source_code = code
        except Exception as e:
            raise ValueError(f"Code compilation failed: {str(e)}")
    else:
        self.compiled_code = None
        self.source_code = None

    self.submitted_description = description

    status_msg = "error" if code is None else "success"
    return json.dumps({
        "description": description,
        "status": status_msg,
        "has_code": code is not None,
    })

3.4 ReportAgent: Report Generation Agent

ReportAgent is used for generating data reports, supporting Python code execution:

class ReportAgent(DBAgent):
    def __init__(
        self,
        datasource: DataSourceAdmin,
        assumed_role: RoleAdmin | None = None,
    ):
        super().__init__(
            prompt_name="agent/report_generator",
            datasource=datasource,
            assumed_role=assumed_role,
        )
        self.add_tool(self.show_table)
        self.add_tool(self.search_metadata)
        self.add_tool(self.execute_sql)
        self.set_output_parser(self.output_parser)

Output Parser:

def output_parser(self, output: str) -> None:
    # Extract code from <code>...</code> tags
    pattern = r"<code>(.*?)</code>"
    match = re.search(pattern, output, re.DOTALL)

    if not match:
        raise ValueError("Invalid output format, expected: <code>...</code>")

    code = match.group(1).strip()

    # Extract all df_ids from load_dataframe('...')
    load_df_pattern = r"load_dataframe\(\s*['\"]([^'\"]+)['\"]"
    referenced_dataframes = re.findall(load_df_pattern, code)

    if not referenced_dataframes:
        raise ValueError("No load_dataframe('df_id') pattern found in code")

    # Compile JSX code
    self.compiled_code = compile_jsx(code)
    self.source_code = code

    # Check if referenced dataframes are all in workspace
    missing_ids = set(referenced_dataframes) - set(self.data_workspace.keys())
    if missing_ids:
        raise ValueError(f"Referenced dataframes {missing_ids} not in workspace")

    self.referenced_dataframes = referenced_dataframes

4. Prompt Management System

4.1 Prompt Architecture

AskTable uses Langfuse to manage Prompts, supporting version control and dynamic updates:

class PromptProxy:
    def __init__(self, from_local: bool):
        self._from_local = from_local
        if not self._from_local:
            # Get from Langfuse at runtime
            self.prompt_cache = None
        else:
            # Load from local assets/prompts.json
            with open("assets/prompts.json") as f:
                prompts = json.load(f)
            self.prompt_cache = {
                prompt.name: TextPromptClient(prompt)
                for prompt in prompts
            }

    def get_prompt(self, prompt_name: str) -> PromptClient:
        if not self._from_local:
            return langfuse.get_prompt(
                prompt_name,
                label=config.observer_prompt_label
            )
        else:
            return self.prompt_cache[prompt_name]

4.2 Prompt Template Compilation

Prompts support Jinja2 template syntax for dynamic context injection:

# Get Prompt
system_prompt = get_prompt("agent/canvas/data_agent").compile(
    meta=db_meta,
    history=reference_context
)

Prompt Naming Conventions:

  • agent/outline_generator: Outline generation Agent
  • agent/report_generator: Report generation Agent
  • agent/canvas/data_agent: Canvas data node Agent
  • agent/canvas/create_chart_node: Canvas chart node Agent

5. Streaming Response Mechanism

5.1 StreamEvent Types

AskTable defines multiple streaming event types:

@dataclass
class TextDelta:
    type: Literal["text"]
    text: str

@dataclass
class AssistantStreamEvent:
    role: Literal["assistant"]
    content: TextDelta

@dataclass
class ToolUseStreamEvent:
    role: Literal["assistant"]
    tool_use: dict

@dataclass
class ToolResultStreamEvent:
    role: Literal["user"]
    tool_result: dict

5.2 Streaming Processing Flow

async def run(self, question: str) -> AsyncGenerator[StreamEvent, None]:
    # Send user message
    self._message_builder.append_openai_message({
        "role": "user",
        "content": question
    })

    for _ in range(MAX_ITERATIONS):
        # Call LLM
        response = await self._client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=self._tools,
            stream=True,
            max_tokens=MAX_TOKENS,
        )

        # Stream receive response
        async for chunk in response:
            event = self._message_builder.append_openai_delta(chunk)
            if event:
                yield event  # Return to frontend in real-time

        # Execute tool calls
        tool_use_blocks = self._message_builder.get_unresolved_tool_use_blocks()
        if not tool_use_blocks:
            return  # Complete

        for tool_use in tool_use_blocks:
            result = await self._execute_tool_use(tool_use)
            yield self._message_builder.append_tool_result(
                tool_use["id"],
                result
            )

Advantages of Streaming Response:

  1. Real-time feedback: Users can see AI's thinking process
  2. Reduced perceived latency: Even if total time is the same, streaming output makes users feel faster
  3. Interruptible: Users can stop unwanted queries at any time
  4. Transparency: Shows tool calls and execution results, enhancing credibility

6. Error Handling and Retry

6.1 Tool Execution Error Handling

async def _execute_tool_use(self, tool_use: dict) -> str:
    func_name = tool_use["name"]
    func_args_str = tool_use["input"]

    func = self._actions.get(func_name)
    if not func:
        return json.dumps({"error": f"Unknown tool: {func_name}"})

    try:
        args = json.loads(func_args_str)
        if iscoroutinefunction(func):
            result = await func(**args)
        else:
            result = func(**args)
        return result
    except Exception as e:
        return json.dumps({"error": str(e)})

Error Handling Strategy:

  • When tool execution fails, return error message to LLM
  • LLM can adjust strategy and retry based on error information
  • Avoid directly throwing exceptions to interrupt the entire process

6.2 Output Parsing Error Handling

if last_block["type"] == "text":
    if self.output_parser:
        try:
            self.output_parser(last_block["text"])
            break
        except Exception as e:
            # Feedback parsing error to LLM
            async for chunk in self.step(str(e)):
                yield chunk

Automatic Correction Mechanism:

  1. LLM output does not match expected format
  2. Output Parser throws exception
  3. Send error message as new user message to LLM
  4. LLM corrects output based on error prompt
  5. Repeat until output is correct or maximum retry count reached

6.3 Limits and Protection

# Maximum tool call count
if self.tool_called_count >= self.max_tool_calls:
    raise CannotHandle("Exceeded maximum tool call count")

# Maximum completion count
if self.completion_count >= self.max_completions:
    raise CannotHandle("Exceeded maximum completion count")

Protection Mechanisms:

  • Prevent infinite loops
  • Control costs (LLM API call count)
  • Fail promptly to avoid long waiting times

7. Best Practices

7.1 Designing Agent Tools

Good Tool Design:

def execute_sql(
    self,
    sql: str = Field(..., description="SQL query to execute")
) -> str:
    """
    Execute SQL query

    ## Code Style
    Describe query purpose at the beginning of SQL with comments:
    ```sql
    -- Sales of all stores in East China last week
    SELECT ...
    ```

    ## DataFrame Management
    Query results are automatically stored in data workspace
    """
    ...

Design Points:

  1. Clear function signatures: Use Pydantic Field to add parameter descriptions
  2. Detailed docstrings: Explain tool purpose, usage methods, precautions
  3. Structured returns: Return JSON format results for easy LLM parsing
  4. Error messages: Return meaningful error information to help LLM understand problems

7.2 Writing System Prompts

Prompt Structure:

You are a data analysis Agent, responsible for generating SQL queries based on user questions.

## Available Tools
- show_table: View table structure
- search_metadata: Search related tables
- execute_sql: Execute SQL query

## Workflow
1. Use search_metadata to find related tables
2. Use show_table to view table structure
3. Write and execute SQL query
4. Call submit_node to submit results

## Database Metadata
{{ meta }}

## Precautions
- SQL must start with comment explaining query purpose
- Prefer using existing tables and fields
- If related data not found, set df_id to None

Prompt Points:

  1. Clear role: Tell Agent who it is and what it should do
  2. Tool instructions: List available tools and their purposes
  3. Workflow: Provide recommended execution steps
  4. Context injection: Use template variables to inject dynamic information
  5. Constraints: Explain output format, precautions, etc.

7.3 Managing Conversation History

History Message Persistence:

# Export conversation history
messages = agent.get_messages()

# Save to database
save_to_database(messages)

# Restore conversation
agent = DataNodeAgentV2.create(
    datasource=datasource,
    history_messages=messages
)

History Message Purpose:

  • Support multi-round conversations
  • Preserve context information
  • Users can follow up and correct
  • Improve query accuracy

7.4 Performance Optimization

Prompt Caching:

messages = self._message_builder.dump_openai(cache_control=True)

Enabling Prompt Caching can:

  • Reduce processing time for repeated content
  • Lower API costs
  • Especially suitable for System Prompts containing large amounts of metadata

Concurrent Execution:

# Concurrently execute multiple independent tool calls
results = await asyncio.gather(
    self.search_metadata(["sales"], ["East China"]),
    self.search_metadata(["inventory"], ["warehouse"]),
)

Data Sampling:

# Only return first N rows of data to LLM
"content_head": dataframe_to_dicts(
    dataframe,
    DATAFRAME_SERIALIZE_MAX_ROWS
)

8. Summary

AskTable's AI Agent system demonstrates how to build a production-level Agent architecture:

  1. Modular design: Base Agent class + specialized Agent implementations
  2. Tool-driven: Extend capabilities through dynamic tool registration
  3. Streaming response: Real-time feedback improves user experience
  4. Error handling: Gracefully handle failures and automatically retry
  5. State management: Maintain conversation history and data workspace
  6. Performance optimization: Prompt Caching, data sampling, concurrent execution

By understanding these design principles and implementation details, you can build your own AI Agent system or better utilize AskTable's Agent capabilities.

cta.readyToSimplify

sidebar.noProgrammingNeededsidebar.startFreeTrial

cta.noCreditCard
cta.quickStart
cta.dbSupport