
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
AskTable 提供了完整的 RESTful API,让你可以将 AI 数据分析能力集成到自己的应用中。本文将系统介绍 AskTable API 的使用方法,帮助你快速上手。
Base URL:
https://api.asktable.com/api/v1
认证方式:
Authorization: Bearer YOUR_API_KEY响应格式:
application/jsonAskTable API 按功能分为以下几类:
| 分类 | 说明 | 主要端点 |
|---|---|---|
| 认证 | API Key 管理 | |
| 数据源 | 数据源 CRUD、连接测试 | |
| 元数据 | 表结构、字段管理 | |
| 索引 | 向量索引、值索引 | |
| 聊天 | 多轮对话 | |
| 单轮查询 | 问题转 SQL/答案/图表 | |
| Canvas | 画卷和节点管理 | |
| 角色权限 | 角色和策略管理 | , |
| 训练数据 | 训练样本管理 | |
| 机器人 | 聊天机器人配置 | |
API Key 权限范围:
priv_admin:管理员权限,可以管理数据源、角色、策略等priv_asker:查询权限,可以发起查询、创建聊天等priv_visitor:访客权限,只能访问指定的聊天Python 示例:
import requests API_KEY = "your_api_key_here" BASE_URL = "https://api.asktable.com/api/v1" headers = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json" } response = requests.get(f"{BASE_URL}/datasources", headers=headers) print(response.json())
cURL 示例:
curl -X GET "https://api.asktable.com/api/v1/datasources" \ -H "Authorization: Bearer YOUR_API_KEY" \ -H "Content-Type: application/json"
API 使用标准 HTTP 状态码:
| 状态码 | 说明 |
|---|---|
| 200 | 成功 |
| 201 | 创建成功 |
| 204 | 删除成功(无内容) |
| 400 | 请求参数错误 |
| 401 | 未认证(API Key 无效) |
| 403 | 权限不足 |
| 404 | 资源不存在 |
| 500 | 服务器错误 |
错误响应格式:
{ "detail": "错误描述信息" }
端点:
POST /datasources
请求体:
{ "name": "我的 MySQL 数据库", "engine": "mysql", "access_config": { "host": "localhost", "port": 3306, "user": "root", "password": "password", "database": "mydb" } }
支持的数据库类型:
mysql, postgresql, sqlite, sqlserver, oracleclickhouse, snowflake, bigquery, redshiftmongodb, elasticsearchexcel, csv, parquetPython 示例:
def create_datasource(name, engine, access_config): response = requests.post( f"{BASE_URL}/datasources", headers=headers, json={ "name": name, "engine": engine, "access_config": access_config } ) return response.json() # 创建 MySQL 数据源 ds = create_datasource( name="Production DB", engine="mysql", access_config={ "host": "db.example.com", "port": 3306, "user": "readonly", "password": "secret", "database": "sales" } ) print(f"数据源 ID: {ds['id']}")
端点:
POST /datasources/test-connection
请求体:
{ "engine": "mysql", "access_config": { "host": "localhost", "port": 3306, "user": "root", "password": "password", "database": "mydb" } }
响应:
{ "success": true, "message": "连接成功" }
端点:
GET /datasources
查询参数:
name:按名称过滤(可选)page:页码(默认 1)size:每页数量(默认 50)响应:
{ "items": [ { "id": "ds_abc123", "name": "Production DB", "engine": "mysql", "meta_status": "success", "schema_count": 1, "table_count": 15, "field_count": 120, "created_at": "2024-01-01T00:00:00Z" } ], "total": 1, "page": 1, "size": 50 }
端点:
GET /datasources/{datasource_id}
响应:
{ "id": "ds_abc123", "name": "Production DB", "engine": "mysql", "access_config": { "host": "db.example.com", "port": 3306, "user": "readonly", "database": "sales" }, "meta_status": "success", "schema_count": 1, "table_count": 15, "field_count": 120, "sample_questions": [ "本月销售额", "销量前10的产品" ], "created_at": "2024-01-01T00:00:00Z", "modified_at": "2024-01-01T00:00:00Z" }
端点:
PATCH /datasources/{datasource_id}
请求体:
{ "name": "新名称", "desc": "数据源描述", "sample_questions": [ "本月销售额", "销量前10的产品" ] }
端点:
DELETE /datasources/{datasource_id}
响应:204 No Content
端点:
POST /datasources/{datasource_id}/meta/sync
说明:从数据库同步表结构到 AskTable
响应:
{ "job_id": "job_xyz789", "status": "running" }
端点:
GET /datasources/{datasource_id}/meta
响应:
{ "schemas": { "public": { "name": "public", "tables": { "orders": { "name": "orders", "description": "订单表", "fields": { "id": { "name": "id", "data_type": "INTEGER", "description": "订单ID", "visibility": true }, "amount": { "name": "amount", "data_type": "DECIMAL", "description": "订单金额", "visibility": true } } } } } } }
端点:
PATCH /datasources/{datasource_id}/meta/tables/{table_name}
请求体:
{ "description": "客户订单记录,包含订单基本信息、金额、状态等" }
端点:
PATCH /datasources/{datasource_id}/meta/tables/{table_name}/fields/{field_name}
请求体:
{ "description": "订单状态:pending(待处理)、paid(已支付)、shipped(已发货)、completed(已完成)、cancelled(已取消)", "visibility": true }
端点:
PUT /datasources/{datasource_id}/meta
请求体:
{ "schemas": { "public": { "tables": { "orders": { "description": "订单表", "fields": { "status": { "description": "订单状态", "visibility": true } } } } } } }
端点:
POST /single-turn/q2s
请求体:
{ "datasource_id": "ds_abc123", "question": "上周华东地区的销售额", "role_id": "role_xyz", "role_variables": { "user_department": "销售部" }, "parameterize": false }
响应:
{ "id": "q2s_123", "question": "上周华东地区的销售额", "sql": "SELECT SUM(amount) as total_sales\nFROM sales\nWHERE region = '华东'\n AND order_date >= DATE_SUB(CURDATE(), INTERVAL 1 WEEK)\n AND order_date < CURDATE()", "status": "success", "timing": { "total_duration": 2.5, "llm_duration": 1.8 } }
Python 示例:
def question_to_sql(datasource_id, question, role_id=None): response = requests.post( f"{BASE_URL}/single-turn/q2s", headers=headers, json={ "datasource_id": datasource_id, "question": question, "role_id": role_id } ) return response.json() # 使用示例 result = question_to_sql( datasource_id="ds_abc123", question="本月销售额" ) print(f"生成的 SQL: {result['sql']}")
端点:
POST /single-turn/q2a
请求体:
{ "datasource_id": "ds_abc123", "question": "上周华东地区的销售额" }
响应:
{ "id": "q2a_123", "question": "上周华东地区的销售额", "sql": "SELECT SUM(amount) as total_sales FROM ...", "answer": "上周华东地区的销售额为 1,234,567 元", "dataframe": { "columns": ["total_sales"], "data": [[1234567]] }, "status": "success" }
端点:
POST /single-turn/q2w
请求体:
{ "datasource_id": "ds_abc123", "question": "各地区销售额对比" }
响应:
{ "id": "q2w_123", "question": "各地区销售额对比", "sql": "SELECT region, SUM(amount) as total FROM sales GROUP BY region", "chart_code": "const data = load_dataframe('df_abc');\n<BarChart data={data} x=\"region\" y=\"total\" />", "dataframe": { "columns": ["region", "total"], "data": [ ["华东", 1000000], ["华北", 800000], ["华南", 900000] ] }, "status": "success" }
端点:
POST /chats
请求体:
{ "bot_id": "bot_abc123" }
响应:
{ "id": "chat_xyz789", "bot_id": "bot_abc123", "status": "active", "created_at": "2024-01-01T00:00:00Z" }
端点:
POST /chats/{chat_id}/messages
请求体:
{ "question": "本月销售额" }
响应:
{ "id": "msg_123", "chat_id": "chat_xyz789", "role": "assistant", "content": { "text": "本月销售额为 2,345,678 元", "sql": "SELECT SUM(amount) FROM orders WHERE ...", "dataframe": { "columns": ["total"], "data": [[2345678]] } }, "created_at": "2024-01-01T00:00:00Z" }
端点:
GET /chats/{chat_id}
响应:
{ "id": "chat_xyz789", "bot_id": "bot_abc123", "status": "active", "messages": [ { "id": "msg_1", "role": "human", "content": {"text": "本月销售额"}, "created_at": "2024-01-01T00:00:00Z" }, { "id": "msg_2", "role": "assistant", "content": { "text": "本月销售额为 2,345,678 元", "sql": "SELECT SUM(amount) FROM orders WHERE ..." }, "created_at": "2024-01-01T00:00:01Z" } ] }
端点:
DELETE /chats/{chat_id}
响应:204 No Content
端点:
POST /canvas
请求体:
{ "name": "销售分析", "datasource_id": "ds_abc123" }
响应:
{ "id": "canvas_123", "name": "销售分析", "datasource_id": "ds_abc123", "created_at": "2024-01-01T00:00:00Z" }
端点:
POST /canvas/{canvas_id}/nodes
请求体:
{ "type": "data", "question": "本月销售额", "parent_ids": [] }
响应:
{ "id": "node_123", "type": "data", "question": "本月销售额", "status": "pending", "created_at": "2024-01-01T00:00:00Z" }
端点:
POST /canvas/{canvas_id}/nodes/{node_id}/run
响应:Server-Sent Events (SSE) 流
data: {"type": "text", "text": "正在搜索相关表..."} data: {"type": "text", "text": "找到表: sales"} data: {"type": "tool_use", "tool": "execute_sql", "args": {...}} data: {"type": "tool_result", "result": {...}} data: {"type": "complete", "data": {...}}
Python 示例(流式接收):
import sseclient def run_node_stream(canvas_id, node_id): response = requests.post( f"{BASE_URL}/canvas/{canvas_id}/nodes/{node_id}/run", headers=headers, stream=True ) client = sseclient.SSEClient(response) for event in client.events(): data = json.loads(event.data) print(f"事件类型: {data['type']}") if data['type'] == 'complete': return data['data'] # 使用示例 result = run_node_stream("canvas_123", "node_456") print(f"节点结果: {result}")
端点:
GET /canvas/{canvas_id}/nodes/{node_id}
响应:
{ "id": "node_123", "type": "data", "question": "本月销售额", "status": "completed", "data": { "sql": "SELECT SUM(amount) FROM orders WHERE ...", "description": "本月销售额", "dataframe": { "columns": ["total"], "data": [[2345678]] } }, "created_at": "2024-01-01T00:00:00Z", "completed_at": "2024-01-01T00:00:05Z" }
端点:
POST /canvas/{canvas_id}/nodes/batch-refresh
请求体:
{ "node_ids": ["node_1", "node_2", "node_3"] }
说明:按拓扑顺序刷新多个节点
端点:
POST /roles
请求体:
{ "name": "销售部角色", "variables": [ { "name": "user_department", "type": "string", "required": true } ] }
端点:
POST /policies
请求体:
{ "name": "销售部数据访问策略", "role_id": "role_123", "datasource_id": "ds_abc123", "rules": [ { "table": "sales", "row_filter": "department = '{{user_department}}'" } ] }
在查询时传入
role_id 和 role_variables:
result = question_to_sql( datasource_id="ds_abc123", question="本月销售额", role_id="role_123", role_variables={ "user_department": "销售部" } )
生成的 SQL 会自动添加行级过滤:
SELECT SUM(amount) FROM sales WHERE department = '销售部' -- 自动添加 AND order_date >= '2024-01-01'
端点:
POST /training
请求体:
{ "datasource_id": "ds_abc123", "question": "本月 GMV", "sql": "SELECT SUM(amount) as gmv FROM orders WHERE order_date >= DATE_TRUNC('month', CURRENT_DATE)" }
端点:
GET /training?datasource_id=ds_abc123
响应:
{ "items": [ { "id": "train_123", "question": "本月 GMV", "sql": "SELECT SUM(amount) as gmv FROM ...", "created_at": "2024-01-01T00:00:00Z" } ] }
端点:
DELETE /training/{training_id}
def safe_api_call(func, *args, **kwargs): try: response = func(*args, **kwargs) response.raise_for_status() return response.json() except requests.exceptions.HTTPError as e: if e.response.status_code == 401: print("API Key 无效或已过期") elif e.response.status_code == 403: print("权限不足") elif e.response.status_code == 404: print("资源不存在") else: print(f"API 错误: {e.response.json()}") raise except requests.exceptions.RequestException as e: print(f"网络错误: {e}") raise # 使用示例 result = safe_api_call( requests.post, f"{BASE_URL}/single-turn/q2s", headers=headers, json={"datasource_id": "ds_123", "question": "本月销售额"} )
from tenacity import retry, stop_after_attempt, wait_exponential @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10) ) def api_call_with_retry(url, **kwargs): response = requests.post(url, headers=headers, **kwargs) response.raise_for_status() return response.json() # 使用示例 result = api_call_with_retry( f"{BASE_URL}/single-turn/q2s", json={"datasource_id": "ds_123", "question": "本月销售额"} )
import asyncio import aiohttp async def batch_query(questions): async with aiohttp.ClientSession() as session: tasks = [] for question in questions: task = session.post( f"{BASE_URL}/single-turn/q2s", headers=headers, json={ "datasource_id": "ds_123", "question": question } ) tasks.append(task) responses = await asyncio.gather(*tasks) return [await r.json() for r in responses] # 使用示例 questions = [ "本月销售额", "本月订单数", "本月新增用户数" ] results = asyncio.run(batch_query(questions)) for result in results: print(f"{result['question']}: {result['sql']}")
from functools import lru_cache import hashlib @lru_cache(maxsize=100) def cached_query(datasource_id, question): response = requests.post( f"{BASE_URL}/single-turn/q2s", headers=headers, json={ "datasource_id": datasource_id, "question": question } ) return response.json() # 使用示例 result1 = cached_query("ds_123", "本月销售额") # API 调用 result2 = cached_query("ds_123", "本月销售额") # 从缓存返回
from asktable import AskTable # 初始化客户端 client = AskTable(api_key="YOUR_API_KEY") # 创建数据源 ds = client.datasources.create( name="My Database", engine="mysql", access_config={ "host": "localhost", "port": 3306, "user": "root", "password": "password", "database": "mydb" } ) # 问题转 SQL result = client.query.q2s( datasource_id=ds.id, question="本月销售额" ) print(result.sql) # 问题转答案 result = client.query.q2a( datasource_id=ds.id, question="本月销售额" ) print(result.answer)
import { AskTable } from '@asktable/sdk'; // 初始化客户端 const client = new AskTable({ apiKey: 'YOUR_API_KEY' }); // 创建数据源 const ds = await client.datasources.create({ name: 'My Database', engine: 'mysql', accessConfig: { host: 'localhost', port: 3306, user: 'root', password: 'password', database: 'mydb' } }); // 问题转 SQL const result = await client.query.q2s({ datasourceId: ds.id, question: '本月销售额' }); console.log(result.sql);
AskTable API 提供了完整的功能,让你可以:
通过 API,你可以将 AskTable 的 AI 数据分析能力无缝集成到自己的应用中,为用户提供自然语言查询数据的能力。