AskTable

AskTable API 完全指南:从入门到精通

AskTable 团队
AskTable 团队 2026年1月1日

AskTable API 完全指南:从入门到精通

AskTable 提供了完整的 RESTful API,让你可以将 AI 数据分析能力集成到自己的应用中。本文将系统介绍 AskTable API 的使用方法,帮助你快速上手。

一、API 概览

1.1 API 基础信息

Base URL

https://api.asktable.com/api/v1

认证方式

响应格式

1.2 API 分类

AskTable API 按功能分为以下几类:

分类说明主要端点
认证API Key 管理
/auth/*
数据源数据源 CRUD、连接测试
/datasources/*
元数据表结构、字段管理
/datasources/{id}/meta/*
索引向量索引、值索引
/datasources/{id}/indexes/*
聊天多轮对话
/chats/*
单轮查询问题转 SQL/答案/图表
/single-turn/*
Canvas画卷和节点管理
/canvas/*
角色权限角色和策略管理
/roles/*
,
/policies/*
训练数据训练样本管理
/training/*
机器人聊天机器人配置
/bots/*

二、认证与授权

2.1 获取 API Key

  1. 登录 AskTable 控制台
  2. 进入"设置" → "API Keys"
  3. 点击"创建 API Key"
  4. 选择权限范围(Scopes)
  5. 复制生成的 API Key

API Key 权限范围

2.2 使用 API Key

Python 示例

import requests

API_KEY = "your_api_key_here"
BASE_URL = "https://api.asktable.com/api/v1"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

response = requests.get(f"{BASE_URL}/datasources", headers=headers)
print(response.json())

cURL 示例

curl -X GET "https://api.asktable.com/api/v1/datasources" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json"

2.3 错误处理

API 使用标准 HTTP 状态码:

状态码说明
200成功
201创建成功
204删除成功(无内容)
400请求参数错误
401未认证(API Key 无效)
403权限不足
404资源不存在
500服务器错误

错误响应格式

{
  "detail": "错误描述信息"
}

三、数据源管理

3.1 创建数据源

端点

POST /datasources

请求体

{
  "name": "我的 MySQL 数据库",
  "engine": "mysql",
  "access_config": {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "password",
    "database": "mydb"
  }
}

支持的数据库类型

Python 示例

def create_datasource(name, engine, access_config):
    response = requests.post(
        f"{BASE_URL}/datasources",
        headers=headers,
        json={
            "name": name,
            "engine": engine,
            "access_config": access_config
        }
    )
    return response.json()

# 创建 MySQL 数据源
ds = create_datasource(
    name="Production DB",
    engine="mysql",
    access_config={
        "host": "db.example.com",
        "port": 3306,
        "user": "readonly",
        "password": "secret",
        "database": "sales"
    }
)
print(f"数据源 ID: {ds['id']}")

3.2 测试连接

端点

POST /datasources/test-connection

请求体

{
  "engine": "mysql",
  "access_config": {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "password",
    "database": "mydb"
  }
}

响应

{
  "success": true,
  "message": "连接成功"
}

3.3 获取数据源列表

端点

GET /datasources

查询参数

响应

{
  "items": [
    {
      "id": "ds_abc123",
      "name": "Production DB",
      "engine": "mysql",
      "meta_status": "success",
      "schema_count": 1,
      "table_count": 15,
      "field_count": 120,
      "created_at": "2024-01-01T00:00:00Z"
    }
  ],
  "total": 1,
  "page": 1,
  "size": 50
}

3.4 获取数据源详情

端点

GET /datasources/{datasource_id}

响应

{
  "id": "ds_abc123",
  "name": "Production DB",
  "engine": "mysql",
  "access_config": {
    "host": "db.example.com",
    "port": 3306,
    "user": "readonly",
    "database": "sales"
  },
  "meta_status": "success",
  "schema_count": 1,
  "table_count": 15,
  "field_count": 120,
  "sample_questions": [
    "本月销售额",
    "销量前10的产品"
  ],
  "created_at": "2024-01-01T00:00:00Z",
  "modified_at": "2024-01-01T00:00:00Z"
}

3.5 更新数据源

端点

PATCH /datasources/{datasource_id}

请求体

{
  "name": "新名称",
  "desc": "数据源描述",
  "sample_questions": [
    "本月销售额",
    "销量前10的产品"
  ]
}

3.6 删除数据源

端点

DELETE /datasources/{datasource_id}

响应:204 No Content

四、元数据管理

4.1 同步元数据

端点

POST /datasources/{datasource_id}/meta/sync

说明:从数据库同步表结构到 AskTable

响应

{
  "job_id": "job_xyz789",
  "status": "running"
}

4.2 获取元数据

端点

GET /datasources/{datasource_id}/meta

响应

{
  "schemas": {
    "public": {
      "name": "public",
      "tables": {
        "orders": {
          "name": "orders",
          "description": "订单表",
          "fields": {
            "id": {
              "name": "id",
              "data_type": "INTEGER",
              "description": "订单ID",
              "visibility": true
            },
            "amount": {
              "name": "amount",
              "data_type": "DECIMAL",
              "description": "订单金额",
              "visibility": true
            }
          }
        }
      }
    }
  }
}

4.3 更新表描述

端点

PATCH /datasources/{datasource_id}/meta/tables/{table_name}

请求体

{
  "description": "客户订单记录,包含订单基本信息、金额、状态等"
}

4.4 更新字段描述

端点

PATCH /datasources/{datasource_id}/meta/tables/{table_name}/fields/{field_name}

请求体

{
  "description": "订单状态:pending(待处理)、paid(已支付)、shipped(已发货)、completed(已完成)、cancelled(已取消)",
  "visibility": true
}

4.5 批量更新元数据

端点

PUT /datasources/{datasource_id}/meta

请求体

{
  "schemas": {
    "public": {
      "tables": {
        "orders": {
          "description": "订单表",
          "fields": {
            "status": {
              "description": "订单状态",
              "visibility": true
            }
          }
        }
      }
    }
  }
}

五、单轮查询 API

5.1 问题转 SQL (Q2S)

端点

POST /single-turn/q2s

请求体

{
  "datasource_id": "ds_abc123",
  "question": "上周华东地区的销售额",
  "role_id": "role_xyz",
  "role_variables": {
    "user_department": "销售部"
  },
  "parameterize": false
}

响应

{
  "id": "q2s_123",
  "question": "上周华东地区的销售额",
  "sql": "SELECT SUM(amount) as total_sales\nFROM sales\nWHERE region = '华东'\n  AND order_date >= DATE_SUB(CURDATE(), INTERVAL 1 WEEK)\n  AND order_date < CURDATE()",
  "status": "success",
  "timing": {
    "total_duration": 2.5,
    "llm_duration": 1.8
  }
}

Python 示例

def question_to_sql(datasource_id, question, role_id=None):
    response = requests.post(
        f"{BASE_URL}/single-turn/q2s",
        headers=headers,
        json={
            "datasource_id": datasource_id,
            "question": question,
            "role_id": role_id
        }
    )
    return response.json()

# 使用示例
result = question_to_sql(
    datasource_id="ds_abc123",
    question="本月销售额"
)
print(f"生成的 SQL: {result['sql']}")

5.2 问题转答案 (Q2A)

端点

POST /single-turn/q2a

请求体

{
  "datasource_id": "ds_abc123",
  "question": "上周华东地区的销售额"
}

响应

{
  "id": "q2a_123",
  "question": "上周华东地区的销售额",
  "sql": "SELECT SUM(amount) as total_sales FROM ...",
  "answer": "上周华东地区的销售额为 1,234,567 元",
  "dataframe": {
    "columns": ["total_sales"],
    "data": [[1234567]]
  },
  "status": "success"
}

5.3 问题转图表 (Q2W)

端点

POST /single-turn/q2w

请求体

{
  "datasource_id": "ds_abc123",
  "question": "各地区销售额对比"
}

响应

{
  "id": "q2w_123",
  "question": "各地区销售额对比",
  "sql": "SELECT region, SUM(amount) as total FROM sales GROUP BY region",
  "chart_code": "const data = load_dataframe('df_abc');\n<BarChart data={data} x=\"region\" y=\"total\" />",
  "dataframe": {
    "columns": ["region", "total"],
    "data": [
      ["华东", 1000000],
      ["华北", 800000],
      ["华南", 900000]
    ]
  },
  "status": "success"
}

六、聊天对话 API

6.1 创建聊天

端点

POST /chats

请求体

{
  "bot_id": "bot_abc123"
}

响应

{
  "id": "chat_xyz789",
  "bot_id": "bot_abc123",
  "status": "active",
  "created_at": "2024-01-01T00:00:00Z"
}

6.2 发送消息

端点

POST /chats/{chat_id}/messages

请求体

{
  "question": "本月销售额"
}

响应

{
  "id": "msg_123",
  "chat_id": "chat_xyz789",
  "role": "assistant",
  "content": {
    "text": "本月销售额为 2,345,678 元",
    "sql": "SELECT SUM(amount) FROM orders WHERE ...",
    "dataframe": {
      "columns": ["total"],
      "data": [[2345678]]
    }
  },
  "created_at": "2024-01-01T00:00:00Z"
}

6.3 获取聊天历史

端点

GET /chats/{chat_id}

响应

{
  "id": "chat_xyz789",
  "bot_id": "bot_abc123",
  "status": "active",
  "messages": [
    {
      "id": "msg_1",
      "role": "human",
      "content": {"text": "本月销售额"},
      "created_at": "2024-01-01T00:00:00Z"
    },
    {
      "id": "msg_2",
      "role": "assistant",
      "content": {
        "text": "本月销售额为 2,345,678 元",
        "sql": "SELECT SUM(amount) FROM orders WHERE ..."
      },
      "created_at": "2024-01-01T00:00:01Z"
    }
  ]
}

6.4 删除聊天

端点

DELETE /chats/{chat_id}

响应:204 No Content

七、Canvas API

7.1 创建 Canvas

端点

POST /canvas

请求体

{
  "name": "销售分析",
  "datasource_id": "ds_abc123"
}

响应

{
  "id": "canvas_123",
  "name": "销售分析",
  "datasource_id": "ds_abc123",
  "created_at": "2024-01-01T00:00:00Z"
}

7.2 创建节点

端点

POST /canvas/{canvas_id}/nodes

请求体

{
  "type": "data",
  "question": "本月销售额",
  "parent_ids": []
}

响应

{
  "id": "node_123",
  "type": "data",
  "question": "本月销售额",
  "status": "pending",
  "created_at": "2024-01-01T00:00:00Z"
}

7.3 执行节点(流式)

端点

POST /canvas/{canvas_id}/nodes/{node_id}/run

响应:Server-Sent Events (SSE) 流

data: {"type": "text", "text": "正在搜索相关表..."}

data: {"type": "text", "text": "找到表: sales"}

data: {"type": "tool_use", "tool": "execute_sql", "args": {...}}

data: {"type": "tool_result", "result": {...}}

data: {"type": "complete", "data": {...}}

Python 示例(流式接收)

import sseclient

def run_node_stream(canvas_id, node_id):
    response = requests.post(
        f"{BASE_URL}/canvas/{canvas_id}/nodes/{node_id}/run",
        headers=headers,
        stream=True
    )

    client = sseclient.SSEClient(response)
    for event in client.events():
        data = json.loads(event.data)
        print(f"事件类型: {data['type']}")
        if data['type'] == 'complete':
            return data['data']

# 使用示例
result = run_node_stream("canvas_123", "node_456")
print(f"节点结果: {result}")

7.4 获取节点结果

端点

GET /canvas/{canvas_id}/nodes/{node_id}

响应

{
  "id": "node_123",
  "type": "data",
  "question": "本月销售额",
  "status": "completed",
  "data": {
    "sql": "SELECT SUM(amount) FROM orders WHERE ...",
    "description": "本月销售额",
    "dataframe": {
      "columns": ["total"],
      "data": [[2345678]]
    }
  },
  "created_at": "2024-01-01T00:00:00Z",
  "completed_at": "2024-01-01T00:00:05Z"
}

7.5 批量刷新节点

端点

POST /canvas/{canvas_id}/nodes/batch-refresh

请求体

{
  "node_ids": ["node_1", "node_2", "node_3"]
}

说明:按拓扑顺序刷新多个节点

八、角色和权限 API

8.1 创建角色

端点

POST /roles

请求体

{
  "name": "销售部角色",
  "variables": [
    {
      "name": "user_department",
      "type": "string",
      "required": true
    }
  ]
}

8.2 创建策略

端点

POST /policies

请求体

{
  "name": "销售部数据访问策略",
  "role_id": "role_123",
  "datasource_id": "ds_abc123",
  "rules": [
    {
      "table": "sales",
      "row_filter": "department = '{{user_department}}'"
    }
  ]
}

8.3 使用角色查询

在查询时传入

role_id
role_variables

result = question_to_sql(
    datasource_id="ds_abc123",
    question="本月销售额",
    role_id="role_123",
    role_variables={
        "user_department": "销售部"
    }
)

生成的 SQL 会自动添加行级过滤:

SELECT SUM(amount)
FROM sales
WHERE department = '销售部'  -- 自动添加
  AND order_date >= '2024-01-01'

九、训练数据 API

9.1 添加训练样本

端点

POST /training

请求体

{
  "datasource_id": "ds_abc123",
  "question": "本月 GMV",
  "sql": "SELECT SUM(amount) as gmv FROM orders WHERE order_date >= DATE_TRUNC('month', CURRENT_DATE)"
}

9.2 获取训练样本列表

端点

GET /training?datasource_id=ds_abc123

响应

{
  "items": [
    {
      "id": "train_123",
      "question": "本月 GMV",
      "sql": "SELECT SUM(amount) as gmv FROM ...",
      "created_at": "2024-01-01T00:00:00Z"
    }
  ]
}

9.3 删除训练样本

端点

DELETE /training/{training_id}

十、最佳实践

10.1 错误处理

def safe_api_call(func, *args, **kwargs):
    try:
        response = func(*args, **kwargs)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 401:
            print("API Key 无效或已过期")
        elif e.response.status_code == 403:
            print("权限不足")
        elif e.response.status_code == 404:
            print("资源不存在")
        else:
            print(f"API 错误: {e.response.json()}")
        raise
    except requests.exceptions.RequestException as e:
        print(f"网络错误: {e}")
        raise

# 使用示例
result = safe_api_call(
    requests.post,
    f"{BASE_URL}/single-turn/q2s",
    headers=headers,
    json={"datasource_id": "ds_123", "question": "本月销售额"}
)

10.2 重试机制

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10)
)
def api_call_with_retry(url, **kwargs):
    response = requests.post(url, headers=headers, **kwargs)
    response.raise_for_status()
    return response.json()

# 使用示例
result = api_call_with_retry(
    f"{BASE_URL}/single-turn/q2s",
    json={"datasource_id": "ds_123", "question": "本月销售额"}
)

10.3 批量操作

import asyncio
import aiohttp

async def batch_query(questions):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for question in questions:
            task = session.post(
                f"{BASE_URL}/single-turn/q2s",
                headers=headers,
                json={
                    "datasource_id": "ds_123",
                    "question": question
                }
            )
            tasks.append(task)

        responses = await asyncio.gather(*tasks)
        return [await r.json() for r in responses]

# 使用示例
questions = [
    "本月销售额",
    "本月订单数",
    "本月新增用户数"
]
results = asyncio.run(batch_query(questions))
for result in results:
    print(f"{result['question']}: {result['sql']}")

10.4 缓存结果

from functools import lru_cache
import hashlib

@lru_cache(maxsize=100)
def cached_query(datasource_id, question):
    response = requests.post(
        f"{BASE_URL}/single-turn/q2s",
        headers=headers,
        json={
            "datasource_id": datasource_id,
            "question": question
        }
    )
    return response.json()

# 使用示例
result1 = cached_query("ds_123", "本月销售额")  # API 调用
result2 = cached_query("ds_123", "本月销售额")  # 从缓存返回

十一、SDK 和工具

11.1 Python SDK

from asktable import AskTable

# 初始化客户端
client = AskTable(api_key="YOUR_API_KEY")

# 创建数据源
ds = client.datasources.create(
    name="My Database",
    engine="mysql",
    access_config={
        "host": "localhost",
        "port": 3306,
        "user": "root",
        "password": "password",
        "database": "mydb"
    }
)

# 问题转 SQL
result = client.query.q2s(
    datasource_id=ds.id,
    question="本月销售额"
)
print(result.sql)

# 问题转答案
result = client.query.q2a(
    datasource_id=ds.id,
    question="本月销售额"
)
print(result.answer)

11.2 JavaScript/TypeScript SDK

import { AskTable } from '@asktable/sdk';

// 初始化客户端
const client = new AskTable({
  apiKey: 'YOUR_API_KEY'
});

// 创建数据源
const ds = await client.datasources.create({
  name: 'My Database',
  engine: 'mysql',
  accessConfig: {
    host: 'localhost',
    port: 3306,
    user: 'root',
    password: 'password',
    database: 'mydb'
  }
});

// 问题转 SQL
const result = await client.query.q2s({
  datasourceId: ds.id,
  question: '本月销售额'
});
console.log(result.sql);

十二、总结

AskTable API 提供了完整的功能,让你可以:

  1. 数据源管理:创建、配置、测试数据源连接
  2. 元数据管理:同步、更新表和字段描述
  3. 单轮查询:问题转 SQL/答案/图表
  4. 多轮对话:创建聊天机器人,支持上下文对话
  5. Canvas 操作:创建数据分析工作流
  6. 权限控制:配置角色和策略,实现行级权限
  7. 训练优化:添加训练样本,提升查询准确性

通过 API,你可以将 AskTable 的 AI 数据分析能力无缝集成到自己的应用中,为用户提供自然语言查询数据的能力。

相关资源