
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在 Text-to-SQL 系统中,用户提出问题后,系统如何生成 SQL?生成的 SQL 是否正确?耗时多久?如果出错了如何追踪?AskTable 通过 Q2S(Question-to-SQL)日志系统,记录了从问题到 SQL 的完整转换过程。
用户反馈:"我问了'上个月的销售额',但结果不对"
没有日志时的困境:
有日志时的优势:
加载图表中...
上图展示了 Q2S 日志的完整生命周期:从创建(processing)到成功(success)或失败(failed)的状态转换,每个状态都记录了关键信息,确保完整的可追溯性。
class Q2SModel(Base): __tablename__ = "q2s_logs" # 基础信息 id: str # q2s_xxx project_id: str # 项目 ID datasource_id: str # 数据源 ID # 输入 question: str # 用户问题 role_id: str | None # 角色 ID role_variables: dict | None # 角色变量 # 输出 query: dict | None # 生成的查询(包含 SQL、图表配置等) err_msg: str | None # 错误信息 # 性能指标 duration: int # 耗时(毫秒) trace_id: str | None # Langfuse trace ID # 状态 status: str # processing | success | failed STATUS_PROCESSING = "processing" STATUS_SUCCESS = "success" STATUS_FAILED = "failed" # 时间戳 created_at: datetime updated_at: datetime
设计亮点:
# 输入 question: str # "上个月的销售额是多少?" role_id: str # "sales_manager" role_variables: dict # {"region": "华东"} # 输出 query: dict # { # "sql": "SELECT SUM(amount) FROM sales WHERE ...", # "chart_type": "number", # "title": "上个月销售额" # }
用途:
duration: int # 2500 (毫秒) trace_id: str # Langfuse trace ID
用途:
status: str # processing | success | failed
状态流转:
创建 → processing ↓ 成功 → success ↓ 失败 → failed
async def create_q2s_log( db_session: AsyncSession, datasource_id: str, question: str, role_id: str | None = None, role_variables: dict | None = None, ) -> Q2SModel: """创建 Q2S 日志""" q2s_log = Q2SModel( id=gen_id("q2s"), # 生成唯一 ID project_id=project_id_var.get(), # 从上下文获取 datasource_id=datasource_id, question=question, role_id=role_id, role_variables=role_variables, duration=0, status=Q2SModel.STATUS_PROCESSING, # 初始状态 ) db_session.add(q2s_log) await db_session.flush() # 立即写入数据库 # 刷新以获取服务器端时间戳 await db_session.refresh(q2s_log) return q2s_log
设计亮点:
await db_session.flush() # 立即写入
为什么要立即写入?
project_id=project_id_var.get() # 从上下文获取
使用 contextvars:
from contextvars import ContextVar project_id_var: ContextVar[str] = ContextVar("project_id") # 在请求开始时设置 project_id_var.set(current_project_id) # 在任何地方都可以获取 project_id = project_id_var.get()
优势:
async def update_q2s_log( db_session: AsyncSession, q2s_id: str, query: dict | None = None, err_msg: str | None = None, duration: int = 0, trace_id: str | None = None, ) -> Q2SModel: """更新 Q2S 日志""" # 查询日志 q2s_log = ( await db_session.execute(select(Q2SModel).filter_by(id=q2s_id)) ).scalar_one() # 更新状态 if err_msg: q2s_log.status = Q2SModel.STATUS_FAILED q2s_log.err_msg = err_msg[:128] # 限制长度 else: q2s_log.query = query q2s_log.status = Q2SModel.STATUS_SUCCESS # 更新性能指标 q2s_log.duration = duration q2s_log.trace_id = trace_id await db_session.flush() await db_session.refresh(q2s_log) # 刷新时间戳 return q2s_log
设计亮点:
q2s_log.err_msg = err_msg[:128] # 限制长度
为什么要截断?
if err_msg: q2s_log.status = Q2SModel.STATUS_FAILED q2s_log.err_msg = err_msg else: q2s_log.query = query q2s_log.status = Q2SModel.STATUS_SUCCESS
清晰的状态管理:
async def get_q2s_logs( db_session: AsyncSession, datasource_id=None ) -> Page[Q2SModel]: """查询 Q2S 日志(分页)""" project_id = project_id_var.get() # 构建查询 if datasource_id: stmt = select(Q2SModel).filter_by( project_id=project_id, datasource_id=datasource_id ) else: stmt = select(Q2SModel).filter_by(project_id=project_id) # 按时间倒序 stmt = stmt.order_by(Q2SModel.created_at.desc()) # 分页 return await paginate(db_session, stmt)
设计亮点:
from fastapi_pagination import Page from fastapi_pagination.ext.sqlalchemy import paginate # 自动处理分页参数(page, size) return await paginate(db_session, stmt)
返回格式:
{ "items": [...], "total": 1000, "page": 1, "size": 20, "pages": 50 }
# 按项目过滤 filter_by(project_id=project_id) # 按数据源过滤 filter_by(datasource_id=datasource_id) # 按时间排序 order_by(Q2SModel.created_at.desc())
用户反馈:"查询结果不对"
诊断流程:
# 1. 查询用户的 Q2S 日志 logs = await get_q2s_logs(db_session, datasource_id=user_datasource_id) # 2. 找到问题日志 problem_log = logs.items[0] # 3. 查看详细信息 print(f"问题:{problem_log.question}") print(f"生成的 SQL:{problem_log.query['sql']}") print(f"错误信息:{problem_log.err_msg}") print(f"Trace ID:{problem_log.trace_id}") # 4. 在 Langfuse 中查看完整 trace # 包括:Prompt、LLM 响应、工具调用、中间结果
发现问题:
# 问题:上个月的销售额 # SQL:SELECT SUM(amount) FROM sales WHERE date >= '2024-02-01' # 错误:没有结束日期,查询了 2 月至今的数据
修复方案:
# 优化 Prompt,明确时间范围处理 # 修改后 SQL: # SELECT SUM(amount) FROM sales # WHERE date >= '2024-02-01' AND date < '2024-03-01'
目标:找出慢查询,优化性能
分析流程:
# 1. 查询耗时超过 5 秒的日志 slow_logs = await db_session.execute( select(Q2SModel) .filter(Q2SModel.duration > 5000) .order_by(Q2SModel.duration.desc()) .limit(100) ) # 2. 分析慢查询模式 for log in slow_logs: print(f"问题:{log.question}") print(f"耗时:{log.duration}ms") print(f"SQL:{log.query['sql']}") print("---") # 3. 发现问题 # - 某些查询没有使用索引 # - 某些查询扫描了全表 # - 某些查询 JOIN 了过多表 # 4. 优化方案 # - 添加索引 # - 优化 SQL 生成逻辑 # - 限制 JOIN 数量
目标:监控 SQL 生成质量
监控指标:
# 1. 成功率 success_rate = ( count(status == 'success') / count(total) ) * 100 # 2. 平均耗时 avg_duration = avg(duration) # 3. 错误分布 error_distribution = group_by(err_msg).count() # 4. 热门问题 popular_questions = group_by(question).count().order_by(desc).limit(10)
告警规则:
# 成功率低于 90% if success_rate < 0.9: send_alert("Q2S success rate is low") # 平均耗时超过 3 秒 if avg_duration > 3000: send_alert("Q2S average duration is high") # 错误率突增 if error_rate > last_hour_error_rate * 2: send_alert("Q2S error rate spike")
目标:了解用户如何使用系统
分析维度:
# 1. 最常问的问题 SELECT question, COUNT(*) as count FROM q2s_logs GROUP BY question ORDER BY count DESC LIMIT 20 # 2. 不同角色的查询模式 SELECT role_id, COUNT(*) as count, AVG(duration) as avg_duration FROM q2s_logs GROUP BY role_id # 3. 查询时间分布 SELECT HOUR(created_at) as hour, COUNT(*) as count FROM q2s_logs GROUP BY hour ORDER BY hour # 4. 数据源使用情况 SELECT datasource_id, COUNT(*) as count FROM q2s_logs GROUP BY datasource_id ORDER BY count DESC
# 定期清理旧日志 async def cleanup_old_logs(days=90): cutoff_date = datetime.now() - timedelta(days=days) await db_session.execute( delete(Q2SModel).where(Q2SModel.created_at < cutoff_date) )
def mask_sensitive_data(question: str) -> str: """脱敏敏感信息""" # 脱敏手机号 question = re.sub(r'\d{11}', '***********', question) # 脱敏身份证号 question = re.sub(r'\d{18}', '******************', question) return question # 创建日志时脱敏 q2s_log = Q2SModel( question=mask_sensitive_data(question), ... )
# 在 Langfuse 中记录 Q2S ID from langfuse import Langfuse langfuse = Langfuse() trace = langfuse.trace( name="q2s", metadata={"q2s_id": q2s_log.id} ) # 在 Q2S 日志中记录 Trace ID q2s_log.trace_id = trace.id
优势:
AskTable 的 Q2S 日志系统提供了完整的查询追踪能力:
通过 Q2S 日志,我们可以:
日志不仅是调试工具,更是产品优化的数据基础。