
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
AskTable exposes a full REST API so you can embed AI analytics in your own apps. This guide walks through the main workflows.
Base URL:
https://api.asktable.com/api/v1
Authentication:
Authorization: Bearer YOUR_API_KEYResponses:
application/json| Area | Purpose | Main routes |
|---|---|---|
| Auth | API key lifecycle | /auth/* |
| Datasources | CRUD + test connection | /datasources/* |
| Metadata | Tables and fields | /datasources/{id}/meta/* |
| Indexes | Vector and value indexes | /datasources/{id}/indexes/* |
| Chats | Multi-turn conversations | /chats/* |
| Single-turn | Q → SQL / answer / chart | /single-turn/* |
| Canvas | Boards and nodes | /canvas/* |
| Roles & policies | RBAC + row filters | /roles/*, /policies/* |
| Training | Question/SQL pairs | /training/* |
| Bots | Chatbot configuration | /bots/* |
Scopes:
priv_admin — manage datasources, roles, policiespriv_asker — run queries and create chatspriv_visitor — access only allowed chatsPython:
import requests
API_KEY = "your_api_key_here"
BASE_URL = "https://api.asktable.com/api/v1"
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
response = requests.get(f"{BASE_URL}/datasources", headers=headers)
print(response.json())
cURL:
curl -X GET "https://api.asktable.com/api/v1/datasources" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json"
Standard HTTP codes:
| Code | Meaning |
|---|---|
| 200 | OK |
| 201 | Created |
| 204 | Deleted (no body) |
| 400 | Bad request |
| 401 | Unauthorized (invalid key) |
| 403 | Forbidden |
| 404 | Not found |
| 500 | Server error |
Error body:
{
"detail": "Human-readable error message"
}
Endpoint: POST /datasources
Body:
{
"name": "My MySQL database",
"engine": "mysql",
"access_config": {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "password",
"database": "mydb"
}
}
Supported engines:
mysql, postgresql, sqlite, sqlserver, oracleclickhouse, snowflake, bigquery, redshiftmongodb, elasticsearchexcel, csv, parquetPython:
def create_datasource(name, engine, access_config):
response = requests.post(
f"{BASE_URL}/datasources",
headers=headers,
json={
"name": name,
"engine": engine,
"access_config": access_config
}
)
return response.json()
# Create MySQL datasource
ds = create_datasource(
name="Production DB",
engine="mysql",
access_config={
"host": "db.example.com",
"port": 3306,
"user": "readonly",
"password": "secret",
"database": "sales"
}
)
print(f"Datasource id: {ds['id']}")
Endpoint:POST /datasources/test-connection
Body:
{
"engine": "mysql",
"access_config": {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "password",
"database": "mydb"
}
}
Response:
{
"success": true,
"message": "Connected"
}
Endpoint:GET /datasources
Query params:
name — filter by name (optional)page — page number (default 1)size — page size (default 50)Response:
{
"items": [
{
"id": "ds_abc123",
"name": "Production DB",
"engine": "mysql",
"meta_status": "success",
"schema_count": 1,
"table_count": 15,
"field_count": 120,
"created_at": "2024-01-01T00:00:00Z"
}
],
"total": 1,
"page": 1,
"size": 50
}
Endpoint:GET /datasources/{datasource_id}
Response:
{
"id": "ds_abc123",
"name": "Production DB",
"engine": "mysql",
"access_config": {
"host": "db.example.com",
"port": 3306,
"user": "readonly",
"database": "sales"
},
"meta_status": "success",
"schema_count": 1,
"table_count": 15,
"field_count": 120,
"sample_questions": [
"Revenue this month",
"Top 10 products by sales"
],
"created_at": "2024-01-01T00:00:00Z",
"modified_at": "2024-01-01T00:00:00Z"
}
Endpoint:PATCH /datasources/{datasource_id}
Body:
{
"name": "New name",
"desc": "Datasource description",
"sample_questions": [
"Revenue this month",
"Top 10 products by sales"
]
}
Endpoint:DELETE /datasources/{datasource_id}
Response:204 No Content
Endpoint:POST /datasources/{datasource_id}/meta/sync
Notes:Sync table schema from the database into AskTable
Response:
{
"job_id": "job_xyz789",
"status": "running"
}
Endpoint:GET /datasources/{datasource_id}/meta
Response:
{
"schemas": {
"public": {
"name": "public",
"tables": {
"orders": {
"name": "orders",
"description": "Orders table",
"fields": {
"id": {
"name": "id",
"data_type": "INTEGER",
"description": "Order ID",
"visibility": true
},
"amount": {
"name": "amount",
"data_type": "DECIMAL",
"description": "Order amount",
"visibility": true
}
}
}
}
}
}
}
Endpoint:PATCH /datasources/{datasource_id}/meta/tables/{table_name}
Body:
{
"description": "Customer orders - basics, amounts, status"
}
Endpoint:PATCH /datasources/{datasource_id}/meta/tables/{table_name}/fields/{field_name}
Body:
{
"description": "Order status: pending, paid, shipped, completed, cancelled",
"visibility": true
}
Endpoint:PUT /datasources/{datasource_id}/meta
Body:
{
"schemas": {
"public": {
"tables": {
"orders": {
"description": "Orders table",
"fields": {
"status": {
"description": "Order status",
"visibility": true
}
}
}
}
}
}
}
Endpoint:POST /single-turn/q2s
Body:
{
"datasource_id": "ds_abc123",
"question": "Revenue last week in East China",
"role_id": "role_xyz",
"role_variables": {
"user_department": "Sales"
},
"parameterize": false
}
Response:
{
"id": "q2s_123",
"question": "Revenue last week in East China",
"sql": "SELECT SUM(amount) as total_sales\nFROM sales\nWHERE region = 'East China'\n AND order_date >= DATE_SUB(CURDATE(), INTERVAL 1 WEEK)\n AND order_date < CURDATE()",
"status": "success",
"timing": {
"total_duration": 2.5,
"llm_duration": 1.8
}
}
Python:
def question_to_sql(datasource_id, question, role_id=None):
response = requests.post(
f"{BASE_URL}/single-turn/q2s",
headers=headers,
json={
"datasource_id": datasource_id,
"question": question,
"role_id": role_id
}
)
return response.json()
# Example
result = question_to_sql(
datasource_id="ds_abc123",
question="Revenue this month"
)
print(f"Generated SQL: {result['sql']}")
Endpoint:POST /single-turn/q2a
Body:
{
"datasource_id": "ds_abc123",
"question": "Revenue last week in East China"
}
Response:
{
"id": "q2a_123",
"question": "Revenue last week in East China",
"sql": "SELECT SUM(amount) as total_sales FROM ...",
"answer": "Revenue last week in East China was 1,234,567 CNY",
"dataframe": {
"columns": ["total_sales"],
"data": [[1234567]]
},
"status": "success"
}
Endpoint:POST /single-turn/q2w
Body:
{
"datasource_id": "ds_abc123",
"question": "Revenue by region"
}
Response:
{
"id": "q2w_123",
"question": "Revenue by region",
"sql": "SELECT region, SUM(amount) as total FROM sales GROUP BY region",
"chart_code": "const data = load_dataframe('df_abc');\n<BarChart data={data} x=\"region\" y=\"total\" />",
"dataframe": {
"columns": ["region", "total"],
"data": [
["East China", 1000000],
["North China", 800000],
["South China", 900000]
]
},
"status": "success"
}
Endpoint:POST /chats
Body:
{
"bot_id": "bot_abc123"
}
Response:
{
"id": "chat_xyz789",
"bot_id": "bot_abc123",
"status": "active",
"created_at": "2024-01-01T00:00:00Z"
}
Endpoint:POST /chats/{chat_id}/messages
Body:
{
"question": "Revenue this month"
}
Response:
{
"id": "msg_123",
"chat_id": "chat_xyz789",
"role": "assistant",
"content": {
"text": "Revenue this month is 2,345,678 CNY",
"sql": "SELECT SUM(amount) FROM orders WHERE ...",
"dataframe": {
"columns": ["total"],
"data": [[2345678]]
}
},
"created_at": "2024-01-01T00:00:00Z"
}
Endpoint:GET /chats/{chat_id}
Response:
{
"id": "chat_xyz789",
"bot_id": "bot_abc123",
"status": "active",
"messages": [
{
"id": "msg_1",
"role": "human",
"content": {"text": "Revenue this month"},
"created_at": "2024-01-01T00:00:00Z"
},
{
"id": "msg_2",
"role": "assistant",
"content": {
"text": "Revenue this month is 2,345,678 CNY",
"sql": "SELECT SUM(amount) FROM orders WHERE ..."
},
"created_at": "2024-01-01T00:00:01Z"
}
]
}
Endpoint:DELETE /chats/{chat_id}
Response:204 No Content
Endpoint:POST /canvas
Body:
{
"name": "Sales analysis",
"datasource_id": "ds_abc123"
}
Response:
{
"id": "canvas_123",
"name": "Sales analysis",
"datasource_id": "ds_abc123",
"created_at": "2024-01-01T00:00:00Z"
}
Endpoint:POST /canvas/{canvas_id}/nodes
Body:
{
"type": "data",
"question": "Revenue this month",
"parent_ids": []
}
Response:
{
"id": "node_123",
"type": "data",
"question": "Revenue this month",
"status": "pending",
"created_at": "2024-01-01T00:00:00Z"
}
Endpoint:POST /canvas/{canvas_id}/nodes/{node_id}/run
Response: Server-Sent Events (SSE) stream
data: {"type": "text", "text": "Searching related tables..."}
data: {"type": "text", "text": "Found table: sales"}
data: {"type": "tool_use", "tool": "execute_sql", "args": {...}}
data: {"type": "tool_result", "result": {...}}
data: {"type": "complete", "data": {...}}
Python (streaming):
import sseclient
def run_node_stream(canvas_id, node_id):
response = requests.post(
f"{BASE_URL}/canvas/{canvas_id}/nodes/{node_id}/run",
headers=headers,
stream=True
)
client = sseclient.SSEClient(response)
for event in client.events():
data = json.loads(event.data)
print(f"Event type: {data['type']}")
if data['type'] == 'complete':
return data['data']
# Example
result = run_node_stream("canvas_123", "node_456")
print(f"Node result: {result}")
Endpoint:GET /canvas/{canvas_id}/nodes/{node_id}
Response:
{
"id": "node_123",
"type": "data",
"question": "Revenue this month",
"status": "completed",
"data": {
"sql": "SELECT SUM(amount) FROM orders WHERE ...",
"description": "Revenue this month",
"dataframe": {
"columns": ["total"],
"data": [[2345678]]
}
},
"created_at": "2024-01-01T00:00:00Z",
"completed_at": "2024-01-01T00:00:05Z"
}
Endpoint:POST /canvas/{canvas_id}/nodes/batch-refresh
Body:
{
"node_ids": ["node_1", "node_2", "node_3"]
}
Notes: Refresh nodes in topological order
Endpoint:POST /roles
Body:
{
"name": "Sales role",
"variables": [
{
"name": "user_department",
"type": "string",
"required": true
}
]
}
Endpoint:POST /policies
Body:
{
"name": "Sales data access policy",
"role_id": "role_123",
"datasource_id": "ds_abc123",
"rules": [
{
"table": "sales",
"row_filter": "department = '{{user_department}}'"
}
]
}
Pass role_id and role_variables:
result = question_to_sql(
datasource_id="ds_abc123",
question="Revenue this month",
role_id="role_123",
role_variables={
"user_department": "Sales"
}
)
Generated SQL includes row-level filters:
SELECT SUM(amount)
FROM sales
WHERE department = 'Sales' -- added automatically
AND order_date >= '2024-01-01'
Endpoint:POST /training
Body:
{
"datasource_id": "ds_abc123",
"question": "GMV this month",
"sql": "SELECT SUM(amount) as gmv FROM orders WHERE order_date >= DATE_TRUNC('month', CURRENT_DATE)"
}
Endpoint:GET /training?datasource_id=ds_abc123
Response:
{
"items": [
{
"id": "train_123",
"question": "GMV this month",
"sql": "SELECT SUM(amount) as gmv FROM ...",
"created_at": "2024-01-01T00:00:00Z"
}
]
}
Endpoint:DELETE /training/{training_id}
def safe_api_call(func, *args, **kwargs):
try:
response = func(*args, **kwargs)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
print("Invalid or expired API key")
elif e.response.status_code == 403:
print("Forbidden")
elif e.response.status_code == 404:
print("Not found")
else:
print(f"API error: {e.response.json()}")
raise
except requests.exceptions.RequestException as e:
print(f"Network error: {e}")
raise
# Example
result = safe_api_call(
requests.post,
f"{BASE_URL}/single-turn/q2s",
headers=headers,
json={"datasource_id": "ds_123", "question": "Revenue this month"}
)
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def api_call_with_retry(url, **kwargs):
response = requests.post(url, headers=headers, **kwargs)
response.raise_for_status()
return response.json()
# Example
result = api_call_with_retry(
f"{BASE_URL}/single-turn/q2s",
json={"datasource_id": "ds_123", "question": "Revenue this month"}
)
import asyncio
import aiohttp
async def batch_query(questions):
async with aiohttp.ClientSession() as session:
tasks = []
for question in questions:
task = session.post(
f"{BASE_URL}/single-turn/q2s",
headers=headers,
json={
"datasource_id": "ds_123",
"question": question
}
)
tasks.append(task)
responses = await asyncio.gather(*tasks)
return [await r.json() for r in responses]
# Example
questions = [
"Revenue this month",
"Orders this month",
"New users this month"
]
results = asyncio.run(batch_query(questions))
for result in results:
print(f"{result['question']}: {result['sql']}")
from functools import lru_cache
import hashlib
@lru_cache(maxsize=100)
def cached_query(datasource_id, question):
response = requests.post(
f"{BASE_URL}/single-turn/q2s",
headers=headers,
json={
"datasource_id": datasource_id,
"question": question
}
)
return response.json()
# Example
result1 = cached_query("ds_123", "Revenue this month") # API call
result2 = cached_query("ds_123", "Revenue this month") # cache hit
from asktable import AskTable
# Init client
client = AskTable(api_key="YOUR_API_KEY")
# Create datasource
ds = client.datasources.create(
name="My Database",
engine="mysql",
access_config={
"host": "localhost",
"port": 3306,
"user": "root",
"password": "password",
"database": "mydb"
}
)
# Question to SQL
result = client.query.q2s(
datasource_id=ds.id,
question="Revenue this month"
)
print(result.sql)
# Question to answer
result = client.query.q2a(
datasource_id=ds.id,
question="Revenue this month"
)
print(result.answer)
import { AskTable } from '@asktable/sdk';
// Init client
const client = new AskTable({
apiKey: 'YOUR_API_KEY'
});
// Create datasource
const ds = await client.datasources.create({
name: 'My Database',
engine: 'mysql',
accessConfig: {
host: 'localhost',
port: 3306,
user: 'root',
password: 'password',
database: 'mydb'
}
});
// Question to SQL
const result = await client.query.q2s({
datasourceId: ds.id,
question: 'Revenue this month'
});
console.log(result.sql);
The AskTable API lets you:
Expose natural-language analytics in your product through the same API the console uses.
sidebar.noProgrammingNeeded
sidebar.startFreeTrial