
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
本文将通过多个实战案例,展示如何使用 AskTable API 构建各种自定义应用。每个案例都包含完整的代码示例和详细说明。
构建一个简单的 Web 应用,让用户可以用自然语言查询数据库。
技术栈:
安装依赖:
pip install fastapi uvicorn requests python-dotenv
创建
:main.py
from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import requests import os from dotenv import load_dotenv load_dotenv() app = FastAPI() # 配置 CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # AskTable 配置 ASKTABLE_API_KEY = os.getenv("ASKTABLE_API_KEY") ASKTABLE_BASE_URL = "https://api.asktable.com/api/v1" DATASOURCE_ID = os.getenv("DATASOURCE_ID") headers = { "Authorization": f"Bearer {ASKTABLE_API_KEY}", "Content-Type": "application/json" } class QueryRequest(BaseModel): question: str class QueryResponse(BaseModel): question: str sql: str answer: str data: list @app.post("/api/query", response_model=QueryResponse) async def query_data(request: QueryRequest): """处理用户查询""" try: # 调用 AskTable API response = requests.post( f"{ASKTABLE_BASE_URL}/single-turn/q2a", headers=headers, json={ "datasource_id": DATASOURCE_ID, "question": request.question } ) response.raise_for_status() result = response.json() return QueryResponse( question=result["question"], sql=result["sql"], answer=result["answer"], data=result["dataframe"]["data"] ) except requests.exceptions.HTTPError as e: raise HTTPException( status_code=e.response.status_code, detail=e.response.json().get("detail", "API 错误") ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/datasources") async def get_datasources(): """获取数据源列表""" try: response = requests.get( f"{ASKTABLE_BASE_URL}/datasources", headers=headers ) response.raise_for_status() return response.json() except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
创建
文件:.env
ASKTABLE_API_KEY=your_api_key_here DATASOURCE_ID=your_datasource_id_here
运行后端:
python main.py
创建 React 应用:
npx create-react-app data-query-app --template typescript cd data-query-app npm install axios
创建
:src/App.tsx
import React, { useState } from 'react'; import axios from 'axios'; import './App.css'; interface QueryResult { question: string; sql: string; answer: string; data: any[][]; } function App() { const [question, setQuestion] = useState(''); const [result, setResult] = useState<QueryResult | null>(null); const [loading, setLoading] = useState(false); const [error, setError] = useState(''); const handleSubmit = async (e: React.FormEvent) => { e.preventDefault(); setLoading(true); setError(''); setResult(null); try { const response = await axios.post('http://localhost:8000/api/query', { question }); setResult(response.data); } catch (err: any) { setError(err.response?.data?.detail || '查询失败'); } finally { setLoading(false); } }; return ( <div className="App"> <header className="App-header"> <h1>数据查询助手</h1> <form onSubmit={handleSubmit}> <input type="text" value={question} onChange={(e) => setQuestion(e.target.value)} placeholder="输入你的问题,例如:本月销售额" disabled={loading} /> <button type="submit" disabled={loading}> {loading ? '查询中...' : '查询'} </button> </form> {error && <div className="error">{error}</div>} {result && ( <div className="result"> <h2>查询结果</h2> <div className="answer"> <strong>答案:</strong> <p>{result.answer}</p> </div> <div className="sql"> <strong>生成的 SQL:</strong> <pre>{result.sql}</pre> </div> {result.data.length > 0 && ( <div className="data"> <strong>数据:</strong> <table> <tbody> {result.data.map((row, i) => ( <tr key={i}> {row.map((cell, j) => ( <td key={j}>{cell}</td> ))} </tr> ))} </tbody> </table> </div> )} </div> )} </header> </div> ); } export default App;
运行前端:
npm start
创建一个 Slack 机器人,让团队成员可以在 Slack 中用自然语言查询数据。
功能:
安装依赖:
pip install slack-bolt requests python-dotenv
创建
:slack_bot.py
import os import requests from slack_bolt import App from slack_bolt.adapter.socket_mode import SocketModeHandler from dotenv import load_dotenv load_dotenv() # Slack 配置 app = App(token=os.environ.get("SLACK_BOT_TOKEN")) # AskTable 配置 ASKTABLE_API_KEY = os.environ.get("ASKTABLE_API_KEY") ASKTABLE_BASE_URL = "https://api.asktable.com/api/v1" DATASOURCE_ID = os.environ.get("DATASOURCE_ID") headers = { "Authorization": f"Bearer {ASKTABLE_API_KEY}", "Content-Type": "application/json" } def query_asktable(question: str) -> dict: """调用 AskTable API 查询数据""" try: response = requests.post( f"{ASKTABLE_BASE_URL}/single-turn/q2a", headers=headers, json={ "datasource_id": DATASOURCE_ID, "question": question }, timeout=30 ) response.raise_for_status() return response.json() except Exception as e: return {"error": str(e)} @app.event("app_mention") def handle_mention(event, say): """处理 @机器人 的消息""" # 提取问题(去掉 @机器人 部分) text = event["text"] question = text.split(">", 1)[1].strip() if ">" in text else text # 发送"正在查询"消息 say(f"正在查询:_{question}_") # 调用 AskTable API result = query_asktable(question) if "error" in result: say(f"❌ 查询失败:{result['error']}") return # 格式化响应 blocks = [ { "type": "section", "text": { "type": "mrkdwn", "text": f"*问题:*\n{result['question']}" } }, { "type": "section", "text": { "type": "mrkdwn", "text": f"*答案:*\n{result['answer']}" } }, { "type": "section", "text": { "type": "mrkdwn", "text": f"*SQL:*\n```{result['sql']}```" } } ] # 添加数据表格 if result.get("dataframe") and result["dataframe"].get("data"): data = result["dataframe"]["data"] columns = result["dataframe"]["columns"] # 构建表格文本 table_text = " | ".join(columns) + "\n" table_text += "-" * (len(table_text) - 1) + "\n" for row in data[:10]: # 最多显示 10 行 table_text += " | ".join(str(cell) for cell in row) + "\n" blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": f"*数据:*\n```{table_text}```" } }) say(blocks=blocks) @app.command("/query") def handle_query_command(ack, command, say): """处理 /query 命令""" ack() question = command["text"] if not question: say("请提供查询问题,例如:/query 本月销售额") return say(f"正在查询:_{question}_") result = query_asktable(question) if "error" in result: say(f"❌ 查询失败:{result['error']}") return say(f"*答案:*\n{result['answer']}\n\n*SQL:*\n```{result['sql']}```") if __name__ == "__main__": handler = SocketModeHandler(app, os.environ["SLACK_APP_TOKEN"]) handler.start()
配置
:.env
SLACK_BOT_TOKEN=xoxb-your-bot-token SLACK_APP_TOKEN=xapp-your-app-token ASKTABLE_API_KEY=your_api_key_here DATASOURCE_ID=your_datasource_id_here
运行机器人:
python slack_bot.py
app_mentions:readchat:writecommands创建一个定时任务,每天自动生成数据报告并发送邮件。
功能:
安装依赖:
pip install requests schedule jinja2 python-dotenv
创建
:report_generator.py
import os import requests import schedule import time from datetime import datetime from jinja2 import Template from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import smtplib from dotenv import load_dotenv load_dotenv() # AskTable 配置 ASKTABLE_API_KEY = os.getenv("ASKTABLE_API_KEY") ASKTABLE_BASE_URL = "https://api.asktable.com/api/v1" DATASOURCE_ID = os.getenv("DATASOURCE_ID") # 邮件配置 SMTP_SERVER = os.getenv("SMTP_SERVER") SMTP_PORT = int(os.getenv("SMTP_PORT", 587)) SMTP_USER = os.getenv("SMTP_USER") SMTP_PASSWORD = os.getenv("SMTP_PASSWORD") REPORT_RECIPIENTS = os.getenv("REPORT_RECIPIENTS").split(",") headers = { "Authorization": f"Bearer {ASKTABLE_API_KEY}", "Content-Type": "application/json" } def query_asktable(question: str) -> dict: """调用 AskTable API 查询数据""" try: response = requests.post( f"{ASKTABLE_BASE_URL}/single-turn/q2a", headers=headers, json={ "datasource_id": DATASOURCE_ID, "question": question }, timeout=30 ) response.raise_for_status() return response.json() except Exception as e: return {"error": str(e), "question": question} def generate_report(): """生成数据报告""" print(f"[{datetime.now()}] 开始生成报告...") # 定义要查询的问题 questions = [ "昨日销售额", "昨日订单数", "昨日新增用户数", "昨日销量前10的产品", "昨日各地区销售额" ] # 执行查询 results = [] for question in questions: print(f" 查询: {question}") result = query_asktable(question) results.append(result) # 生成 HTML 报告 html_template = """ <!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <style> body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; } h1 { color: #333; border-bottom: 2px solid #4CAF50; padding-bottom: 10px; } .metric { background: #f5f5f5; padding: 15px; margin: 10px 0; border-radius: 5px; } .metric h3 { margin-top: 0; color: #4CAF50; } .answer { font-size: 18px; font-weight: bold; color: #333; } .sql { background: #f0f0f0; padding: 10px; border-radius: 3px; font-family: monospace; font-size: 12px; overflow-x: auto; } table { width: 100%; border-collapse: collapse; margin-top: 10px; } th, td { border: 1px solid #ddd; padding: 8px; text-align: left; } th { background-color: #4CAF50; color: white; } .error { color: red; } </style> </head> <body> <h1>每日数据报告</h1> <p>生成时间:{{ report_time }}</p> {% for result in results %} <div class="metric"> <h3>{{ result.question }}</h3> {% if result.error %} <p class="error">查询失败:{{ result.error }}</p> {% else %} <p class="answer">{{ result.answer }}</p> {% if result.dataframe and result.dataframe.data %} <table> <thead> <tr> {% for col in result.dataframe.columns %} <th>{{ col }}</th> {% endfor %} </tr> </thead> <tbody> {% for row in result.dataframe.data[:10] %} <tr> {% for cell in row %} <td>{{ cell }}</td> {% endfor %} </tr> {% endfor %} </tbody> </table> {% endif %} <details> <summary>查看 SQL</summary> <div class="sql">{{ result.sql }}</div> </details> {% endif %} </div> {% endfor %} </body> </html> """ template = Template(html_template) html_content = template.render( report_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), results=results ) # 发送邮件 send_email(html_content) print(f"[{datetime.now()}] 报告生成完成") def send_email(html_content: str): """发送邮件""" try: msg = MIMEMultipart('alternative') msg['Subject'] = f"每日数据报告 - {datetime.now().strftime('%Y-%m-%d')}" msg['From'] = SMTP_USER msg['To'] = ", ".join(REPORT_RECIPIENTS) html_part = MIMEText(html_content, 'html') msg.attach(html_part) with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server: server.starttls() server.login(SMTP_USER, SMTP_PASSWORD) server.send_message(msg) print(f" 邮件已发送到: {', '.join(REPORT_RECIPIENTS)}") except Exception as e: print(f" 邮件发送失败: {e}") # 配置定时任务 schedule.every().day.at("09:00").do(generate_report) if __name__ == "__main__": print("数据报告生成器已启动") print(f"将在每天 09:00 生成报告并发送到: {', '.join(REPORT_RECIPIENTS)}") # 立即执行一次(测试用) # generate_report() # 运行定时任务 while True: schedule.run_pending() time.sleep(60)
配置
:.env
ASKTABLE_API_KEY=your_api_key_here DATASOURCE_ID=your_datasource_id_here SMTP_SERVER=smtp.gmail.com SMTP_PORT=587 SMTP_USER=your_email@gmail.com SMTP_PASSWORD=your_app_password REPORT_RECIPIENTS=recipient1@example.com,recipient2@example.com
运行报告生成器:
python report_generator.py
创建一个实时数据看板,展示关键业务指标。
技术栈:
创建
:dashboard_api.py
from fastapi import FastAPI, WebSocket from fastapi.middleware.cors import CORSMiddleware import requests import asyncio import os from dotenv import load_dotenv load_dotenv() app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # AskTable 配置 ASKTABLE_API_KEY = os.getenv("ASKTABLE_API_KEY") ASKTABLE_BASE_URL = "https://api.asktable.com/api/v1" DATASOURCE_ID = os.getenv("DATASOURCE_ID") headers = { "Authorization": f"Bearer {ASKTABLE_API_KEY}", "Content-Type": "application/json" } def query_metric(question: str) -> dict: """查询单个指标""" try: response = requests.post( f"{ASKTABLE_BASE_URL}/single-turn/q2a", headers=headers, json={ "datasource_id": DATASOURCE_ID, "question": question }, timeout=30 ) response.raise_for_status() return response.json() except Exception as e: return {"error": str(e)} @app.get("/api/metrics") async def get_metrics(): """获取所有指标""" metrics = [ "今日销售额", "今日订单数", "今日新增用户数", "本月销售额", "本月订单数" ] results = {} for metric in metrics: result = query_metric(metric) results[metric] = result return results @app.websocket("/ws/metrics") async def websocket_metrics(websocket: WebSocket): """WebSocket 实时推送指标""" await websocket.accept() try: while True: # 查询指标 metrics = await get_metrics() # 发送给客户端 await websocket.send_json(metrics) # 每 30 秒更新一次 await asyncio.sleep(30) except Exception as e: print(f"WebSocket 错误: {e}") finally: await websocket.close() if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
创建
:Dashboard.tsx
import React, { useEffect, useState } from 'react'; import { Chart as ChartJS, ArcElement, Tooltip, Legend } from 'chart.js'; import { Doughnut } from 'react-chartjs-2'; ChartJS.register(ArcElement, Tooltip, Legend); interface Metrics { [key: string]: { answer: string; dataframe?: { data: any[][]; }; }; } function Dashboard() { const [metrics, setMetrics] = useState<Metrics>({}); const [lastUpdate, setLastUpdate] = useState<Date>(new Date()); useEffect(() => { // 建立 WebSocket 连接 const ws = new WebSocket('ws://localhost:8000/ws/metrics'); ws.onmessage = (event) => { const data = JSON.parse(event.data); setMetrics(data); setLastUpdate(new Date()); }; ws.onerror = (error) => { console.error('WebSocket 错误:', error); }; return () => { ws.close(); }; }, []); return ( <div className="dashboard"> <h1>实时数据看板</h1> <p>最后更新: {lastUpdate.toLocaleTimeString()}</p> <div className="metrics-grid"> {Object.entries(metrics).map(([question, result]) => ( <div key={question} className="metric-card"> <h3>{question}</h3> {result.error ? ( <p className="error">{result.error}</p> ) : ( <p className="value">{result.answer}</p> )} </div> ))} </div> </div> ); } export default Dashboard;
通过这些实战案例,我们展示了如何使用 AskTable API 构建各种自定义应用:
AskTable API 的灵活性让你可以将 AI 数据分析能力集成到任何应用场景中,为用户提供自然语言查询数据的能力。