AskTable

使用 AskTable API 构建自定义应用:实战案例

AskTable 团队
AskTable 团队 2026年1月2日

使用 AskTable API 构建自定义应用:实战案例

本文将通过多个实战案例,展示如何使用 AskTable API 构建各种自定义应用。每个案例都包含完整的代码示例和详细说明。

一、案例 1:数据查询 Web 应用

1.1 应用概述

构建一个简单的 Web 应用,让用户可以用自然语言查询数据库。

技术栈

1.2 后端实现

安装依赖

pip install fastapi uvicorn requests python-dotenv

创建

main.py

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import requests
import os
from dotenv import load_dotenv

load_dotenv()

app = FastAPI()

# 配置 CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# AskTable 配置
ASKTABLE_API_KEY = os.getenv("ASKTABLE_API_KEY")
ASKTABLE_BASE_URL = "https://api.asktable.com/api/v1"
DATASOURCE_ID = os.getenv("DATASOURCE_ID")

headers = {
    "Authorization": f"Bearer {ASKTABLE_API_KEY}",
    "Content-Type": "application/json"
}

class QueryRequest(BaseModel):
    question: str

class QueryResponse(BaseModel):
    question: str
    sql: str
    answer: str
    data: list

@app.post("/api/query", response_model=QueryResponse)
async def query_data(request: QueryRequest):
    """处理用户查询"""
    try:
        # 调用 AskTable API
        response = requests.post(
            f"{ASKTABLE_BASE_URL}/single-turn/q2a",
            headers=headers,
            json={
                "datasource_id": DATASOURCE_ID,
                "question": request.question
            }
        )
        response.raise_for_status()
        result = response.json()

        return QueryResponse(
            question=result["question"],
            sql=result["sql"],
            answer=result["answer"],
            data=result["dataframe"]["data"]
        )
    except requests.exceptions.HTTPError as e:
        raise HTTPException(
            status_code=e.response.status_code,
            detail=e.response.json().get("detail", "API 错误")
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/datasources")
async def get_datasources():
    """获取数据源列表"""
    try:
        response = requests.get(
            f"{ASKTABLE_BASE_URL}/datasources",
            headers=headers
        )
        response.raise_for_status()
        return response.json()
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

创建

.env
文件

ASKTABLE_API_KEY=your_api_key_here
DATASOURCE_ID=your_datasource_id_here

运行后端

python main.py

1.3 前端实现

创建 React 应用

npx create-react-app data-query-app --template typescript
cd data-query-app
npm install axios

创建

src/App.tsx

import React, { useState } from 'react';
import axios from 'axios';
import './App.css';

interface QueryResult {
  question: string;
  sql: string;
  answer: string;
  data: any[][];
}

function App() {
  const [question, setQuestion] = useState('');
  const [result, setResult] = useState<QueryResult | null>(null);
  const [loading, setLoading] = useState(false);
  const [error, setError] = useState('');

  const handleSubmit = async (e: React.FormEvent) => {
    e.preventDefault();
    setLoading(true);
    setError('');
    setResult(null);

    try {
      const response = await axios.post('http://localhost:8000/api/query', {
        question
      });
      setResult(response.data);
    } catch (err: any) {
      setError(err.response?.data?.detail || '查询失败');
    } finally {
      setLoading(false);
    }
  };

  return (
    <div className="App">
      <header className="App-header">
        <h1>数据查询助手</h1>
        <form onSubmit={handleSubmit}>
          <input
            type="text"
            value={question}
            onChange={(e) => setQuestion(e.target.value)}
            placeholder="输入你的问题,例如:本月销售额"
            disabled={loading}
          />
          <button type="submit" disabled={loading}>
            {loading ? '查询中...' : '查询'}
          </button>
        </form>

        {error && <div className="error">{error}</div>}

        {result && (
          <div className="result">
            <h2>查询结果</h2>
            <div className="answer">
              <strong>答案:</strong>
              <p>{result.answer}</p>
            </div>

            <div className="sql">
              <strong>生成的 SQL:</strong>
              <pre>{result.sql}</pre>
            </div>

            {result.data.length > 0 && (
              <div className="data">
                <strong>数据:</strong>
                <table>
                  <tbody>
                    {result.data.map((row, i) => (
                      <tr key={i}>
                        {row.map((cell, j) => (
                          <td key={j}>{cell}</td>
                        ))}
                      </tr>
                    ))}
                  </tbody>
                </table>
              </div>
            )}
          </div>
        )}
      </header>
    </div>
  );
}

export default App;

运行前端

npm start

二、案例 2:Slack 数据查询机器人

2.1 应用概述

创建一个 Slack 机器人,让团队成员可以在 Slack 中用自然语言查询数据。

功能

2.2 实现代码

安装依赖

pip install slack-bolt requests python-dotenv

创建

slack_bot.py

import os
import requests
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
from dotenv import load_dotenv

load_dotenv()

# Slack 配置
app = App(token=os.environ.get("SLACK_BOT_TOKEN"))

# AskTable 配置
ASKTABLE_API_KEY = os.environ.get("ASKTABLE_API_KEY")
ASKTABLE_BASE_URL = "https://api.asktable.com/api/v1"
DATASOURCE_ID = os.environ.get("DATASOURCE_ID")

headers = {
    "Authorization": f"Bearer {ASKTABLE_API_KEY}",
    "Content-Type": "application/json"
}

def query_asktable(question: str) -> dict:
    """调用 AskTable API 查询数据"""
    try:
        response = requests.post(
            f"{ASKTABLE_BASE_URL}/single-turn/q2a",
            headers=headers,
            json={
                "datasource_id": DATASOURCE_ID,
                "question": question
            },
            timeout=30
        )
        response.raise_for_status()
        return response.json()
    except Exception as e:
        return {"error": str(e)}

@app.event("app_mention")
def handle_mention(event, say):
    """处理 @机器人 的消息"""
    # 提取问题(去掉 @机器人 部分)
    text = event["text"]
    question = text.split(">", 1)[1].strip() if ">" in text else text

    # 发送"正在查询"消息
    say(f"正在查询:_{question}_")

    # 调用 AskTable API
    result = query_asktable(question)

    if "error" in result:
        say(f"❌ 查询失败:{result['error']}")
        return

    # 格式化响应
    blocks = [
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*问题:*\n{result['question']}"
            }
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*答案:*\n{result['answer']}"
            }
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*SQL:*\n```{result['sql']}```"
            }
        }
    ]

    # 添加数据表格
    if result.get("dataframe") and result["dataframe"].get("data"):
        data = result["dataframe"]["data"]
        columns = result["dataframe"]["columns"]

        # 构建表格文本
        table_text = " | ".join(columns) + "\n"
        table_text += "-" * (len(table_text) - 1) + "\n"
        for row in data[:10]:  # 最多显示 10 行
            table_text += " | ".join(str(cell) for cell in row) + "\n"

        blocks.append({
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*数据:*\n```{table_text}```"
            }
        })

    say(blocks=blocks)

@app.command("/query")
def handle_query_command(ack, command, say):
    """处理 /query 命令"""
    ack()

    question = command["text"]
    if not question:
        say("请提供查询问题,例如:/query 本月销售额")
        return

    say(f"正在查询:_{question}_")

    result = query_asktable(question)

    if "error" in result:
        say(f"❌ 查询失败:{result['error']}")
        return

    say(f"*答案:*\n{result['answer']}\n\n*SQL:*\n```{result['sql']}```")

if __name__ == "__main__":
    handler = SocketModeHandler(app, os.environ["SLACK_APP_TOKEN"])
    handler.start()

配置

.env

SLACK_BOT_TOKEN=xoxb-your-bot-token
SLACK_APP_TOKEN=xapp-your-app-token
ASKTABLE_API_KEY=your_api_key_here
DATASOURCE_ID=your_datasource_id_here

运行机器人

python slack_bot.py

2.3 Slack 配置

  1. 创建 Slack App:https://api.slack.com/apps
  2. 启用 Socket Mode
  3. 添加 Bot Token Scopes:
    • app_mentions:read
    • chat:write
    • commands
  4. 安装 App 到工作区
  5. 复制 Bot Token 和 App Token

三、案例 3:定时数据报告生成器

3.1 应用概述

创建一个定时任务,每天自动生成数据报告并发送邮件。

功能

3.2 实现代码

安装依赖

pip install requests schedule jinja2 python-dotenv

创建

report_generator.py

import os
import requests
import schedule
import time
from datetime import datetime
from jinja2 import Template
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import smtplib
from dotenv import load_dotenv

load_dotenv()

# AskTable 配置
ASKTABLE_API_KEY = os.getenv("ASKTABLE_API_KEY")
ASKTABLE_BASE_URL = "https://api.asktable.com/api/v1"
DATASOURCE_ID = os.getenv("DATASOURCE_ID")

# 邮件配置
SMTP_SERVER = os.getenv("SMTP_SERVER")
SMTP_PORT = int(os.getenv("SMTP_PORT", 587))
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
REPORT_RECIPIENTS = os.getenv("REPORT_RECIPIENTS").split(",")

headers = {
    "Authorization": f"Bearer {ASKTABLE_API_KEY}",
    "Content-Type": "application/json"
}

def query_asktable(question: str) -> dict:
    """调用 AskTable API 查询数据"""
    try:
        response = requests.post(
            f"{ASKTABLE_BASE_URL}/single-turn/q2a",
            headers=headers,
            json={
                "datasource_id": DATASOURCE_ID,
                "question": question
            },
            timeout=30
        )
        response.raise_for_status()
        return response.json()
    except Exception as e:
        return {"error": str(e), "question": question}

def generate_report():
    """生成数据报告"""
    print(f"[{datetime.now()}] 开始生成报告...")

    # 定义要查询的问题
    questions = [
        "昨日销售额",
        "昨日订单数",
        "昨日新增用户数",
        "昨日销量前10的产品",
        "昨日各地区销售额"
    ]

    # 执行查询
    results = []
    for question in questions:
        print(f"  查询: {question}")
        result = query_asktable(question)
        results.append(result)

    # 生成 HTML 报告
    html_template = """
    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="UTF-8">
        <style>
            body {
                font-family: Arial, sans-serif;
                max-width: 800px;
                margin: 0 auto;
                padding: 20px;
            }
            h1 {
                color: #333;
                border-bottom: 2px solid #4CAF50;
                padding-bottom: 10px;
            }
            .metric {
                background: #f5f5f5;
                padding: 15px;
                margin: 10px 0;
                border-radius: 5px;
            }
            .metric h3 {
                margin-top: 0;
                color: #4CAF50;
            }
            .answer {
                font-size: 18px;
                font-weight: bold;
                color: #333;
            }
            .sql {
                background: #f0f0f0;
                padding: 10px;
                border-radius: 3px;
                font-family: monospace;
                font-size: 12px;
                overflow-x: auto;
            }
            table {
                width: 100%;
                border-collapse: collapse;
                margin-top: 10px;
            }
            th, td {
                border: 1px solid #ddd;
                padding: 8px;
                text-align: left;
            }
            th {
                background-color: #4CAF50;
                color: white;
            }
            .error {
                color: red;
            }
        </style>
    </head>
    <body>
        <h1>每日数据报告</h1>
        <p>生成时间:{{ report_time }}</p>

        {% for result in results %}
        <div class="metric">
            <h3>{{ result.question }}</h3>

            {% if result.error %}
            <p class="error">查询失败:{{ result.error }}</p>
            {% else %}
            <p class="answer">{{ result.answer }}</p>

            {% if result.dataframe and result.dataframe.data %}
            <table>
                <thead>
                    <tr>
                        {% for col in result.dataframe.columns %}
                        <th>{{ col }}</th>
                        {% endfor %}
                    </tr>
                </thead>
                <tbody>
                    {% for row in result.dataframe.data[:10] %}
                    <tr>
                        {% for cell in row %}
                        <td>{{ cell }}</td>
                        {% endfor %}
                    </tr>
                    {% endfor %}
                </tbody>
            </table>
            {% endif %}

            <details>
                <summary>查看 SQL</summary>
                <div class="sql">{{ result.sql }}</div>
            </details>
            {% endif %}
        </div>
        {% endfor %}
    </body>
    </html>
    """

    template = Template(html_template)
    html_content = template.render(
        report_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        results=results
    )

    # 发送邮件
    send_email(html_content)

    print(f"[{datetime.now()}] 报告生成完成")

def send_email(html_content: str):
    """发送邮件"""
    try:
        msg = MIMEMultipart('alternative')
        msg['Subject'] = f"每日数据报告 - {datetime.now().strftime('%Y-%m-%d')}"
        msg['From'] = SMTP_USER
        msg['To'] = ", ".join(REPORT_RECIPIENTS)

        html_part = MIMEText(html_content, 'html')
        msg.attach(html_part)

        with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
            server.starttls()
            server.login(SMTP_USER, SMTP_PASSWORD)
            server.send_message(msg)

        print(f"  邮件已发送到: {', '.join(REPORT_RECIPIENTS)}")
    except Exception as e:
        print(f"  邮件发送失败: {e}")

# 配置定时任务
schedule.every().day.at("09:00").do(generate_report)

if __name__ == "__main__":
    print("数据报告生成器已启动")
    print(f"将在每天 09:00 生成报告并发送到: {', '.join(REPORT_RECIPIENTS)}")

    # 立即执行一次(测试用)
    # generate_report()

    # 运行定时任务
    while True:
        schedule.run_pending()
        time.sleep(60)

配置

.env

ASKTABLE_API_KEY=your_api_key_here
DATASOURCE_ID=your_datasource_id_here
SMTP_SERVER=smtp.gmail.com
SMTP_PORT=587
SMTP_USER=your_email@gmail.com
SMTP_PASSWORD=your_app_password
REPORT_RECIPIENTS=recipient1@example.com,recipient2@example.com

运行报告生成器

python report_generator.py

四、案例 4:数据看板应用

4.1 应用概述

创建一个实时数据看板,展示关键业务指标。

技术栈

4.2 后端实现

创建

dashboard_api.py

from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
import requests
import asyncio
import os
from dotenv import load_dotenv

load_dotenv()

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# AskTable 配置
ASKTABLE_API_KEY = os.getenv("ASKTABLE_API_KEY")
ASKTABLE_BASE_URL = "https://api.asktable.com/api/v1"
DATASOURCE_ID = os.getenv("DATASOURCE_ID")

headers = {
    "Authorization": f"Bearer {ASKTABLE_API_KEY}",
    "Content-Type": "application/json"
}

def query_metric(question: str) -> dict:
    """查询单个指标"""
    try:
        response = requests.post(
            f"{ASKTABLE_BASE_URL}/single-turn/q2a",
            headers=headers,
            json={
                "datasource_id": DATASOURCE_ID,
                "question": question
            },
            timeout=30
        )
        response.raise_for_status()
        return response.json()
    except Exception as e:
        return {"error": str(e)}

@app.get("/api/metrics")
async def get_metrics():
    """获取所有指标"""
    metrics = [
        "今日销售额",
        "今日订单数",
        "今日新增用户数",
        "本月销售额",
        "本月订单数"
    ]

    results = {}
    for metric in metrics:
        result = query_metric(metric)
        results[metric] = result

    return results

@app.websocket("/ws/metrics")
async def websocket_metrics(websocket: WebSocket):
    """WebSocket 实时推送指标"""
    await websocket.accept()

    try:
        while True:
            # 查询指标
            metrics = await get_metrics()

            # 发送给客户端
            await websocket.send_json(metrics)

            # 每 30 秒更新一次
            await asyncio.sleep(30)
    except Exception as e:
        print(f"WebSocket 错误: {e}")
    finally:
        await websocket.close()

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

4.3 前端实现

创建

Dashboard.tsx

import React, { useEffect, useState } from 'react';
import { Chart as ChartJS, ArcElement, Tooltip, Legend } from 'chart.js';
import { Doughnut } from 'react-chartjs-2';

ChartJS.register(ArcElement, Tooltip, Legend);

interface Metrics {
  [key: string]: {
    answer: string;
    dataframe?: {
      data: any[][];
    };
  };
}

function Dashboard() {
  const [metrics, setMetrics] = useState<Metrics>({});
  const [lastUpdate, setLastUpdate] = useState<Date>(new Date());

  useEffect(() => {
    // 建立 WebSocket 连接
    const ws = new WebSocket('ws://localhost:8000/ws/metrics');

    ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      setMetrics(data);
      setLastUpdate(new Date());
    };

    ws.onerror = (error) => {
      console.error('WebSocket 错误:', error);
    };

    return () => {
      ws.close();
    };
  }, []);

  return (
    <div className="dashboard">
      <h1>实时数据看板</h1>
      <p>最后更新: {lastUpdate.toLocaleTimeString()}</p>

      <div className="metrics-grid">
        {Object.entries(metrics).map(([question, result]) => (
          <div key={question} className="metric-card">
            <h3>{question}</h3>
            {result.error ? (
              <p className="error">{result.error}</p>
            ) : (
              <p className="value">{result.answer}</p>
            )}
          </div>
        ))}
      </div>
    </div>
  );
}

export default Dashboard;

五、最佳实践总结

5.1 性能优化

  1. 缓存查询结果:对于不经常变化的数据,使用缓存减少 API 调用
  2. 批量查询:使用异步并发执行多个查询
  3. 限流控制:避免短时间内大量 API 调用
  4. 超时设置:设置合理的请求超时时间

5.2 错误处理

  1. 重试机制:对于临时性错误,实现自动重试
  2. 降级策略:API 失败时提供备用方案
  3. 日志记录:记录所有 API 调用和错误信息
  4. 用户友好提示:将技术错误转换为用户可理解的提示

5.3 安全性

  1. API Key 保护:不要在前端代码中暴露 API Key
  2. 权限控制:使用角色和策略限制数据访问
  3. 输入验证:验证用户输入,防止注入攻击
  4. HTTPS:始终使用 HTTPS 通信

5.4 可维护性

  1. 配置管理:使用环境变量管理配置
  2. 代码复用:封装通用的 API 调用逻辑
  3. 文档完善:为自定义应用编写使用文档
  4. 监控告警:监控 API 调用状态和错误率

六、总结

通过这些实战案例,我们展示了如何使用 AskTable API 构建各种自定义应用:

  1. Web 应用:提供用户友好的查询界面
  2. Slack 机器人:在团队协作工具中集成数据查询
  3. 定时报告:自动化数据报告生成和分发
  4. 实时看板:展示关键业务指标

AskTable API 的灵活性让你可以将 AI 数据分析能力集成到任何应用场景中,为用户提供自然语言查询数据的能力。

相关资源