AskTable

Q2S 日志追踪:让每一次查询都可追溯

AskTable 团队
AskTable 团队 2026年3月4日

在 Text-to-SQL 系统中,用户提出问题后,系统如何生成 SQL?生成的 SQL 是否正确?耗时多久?如果出错了如何追踪?AskTable 通过 Q2S(Question-to-SQL)日志系统,记录了从问题到 SQL 的完整转换过程。

为什么需要 Q2S 日志?

问题场景

用户反馈:"我问了'上个月的销售额',但结果不对"

没有日志时的困境

有日志时的优势

Q2S 日志生命周期

加载图表中...

上图展示了 Q2S 日志的完整生命周期:从创建(processing)到成功(success)或失败(failed)的状态转换,每个状态都记录了关键信息,确保完整的可追溯性。

核心设计

1. Q2S 数据模型

class Q2SModel(Base):
    __tablename__ = "q2s_logs"

    # 基础信息
    id: str  # q2s_xxx
    project_id: str  # 项目 ID
    datasource_id: str  # 数据源 ID

    # 输入
    question: str  # 用户问题
    role_id: str | None  # 角色 ID
    role_variables: dict | None  # 角色变量

    # 输出
    query: dict | None  # 生成的查询(包含 SQL、图表配置等)
    err_msg: str | None  # 错误信息

    # 性能指标
    duration: int  # 耗时(毫秒)
    trace_id: str | None  # Langfuse trace ID

    # 状态
    status: str  # processing | success | failed
    STATUS_PROCESSING = "processing"
    STATUS_SUCCESS = "success"
    STATUS_FAILED = "failed"

    # 时间戳
    created_at: datetime
    updated_at: datetime

设计亮点

1.1 完整的输入输出

# 输入
question: str  # "上个月的销售额是多少?"
role_id: str  # "sales_manager"
role_variables: dict  # {"region": "华东"}

# 输出
query: dict  # {
#     "sql": "SELECT SUM(amount) FROM sales WHERE ...",
#     "chart_type": "number",
#     "title": "上个月销售额"
# }

用途

1.2 性能追踪

duration: int  # 2500 (毫秒)
trace_id: str  # Langfuse trace ID

用途

1.3 状态管理

status: str  # processing | success | failed

状态流转

创建 → processing
  ↓
成功 → success
  ↓
失败 → failed

2. 日志创建

async def create_q2s_log(
    db_session: AsyncSession,
    datasource_id: str,
    question: str,
    role_id: str | None = None,
    role_variables: dict | None = None,
) -> Q2SModel:
    """创建 Q2S 日志"""
    q2s_log = Q2SModel(
        id=gen_id("q2s"),  # 生成唯一 ID
        project_id=project_id_var.get(),  # 从上下文获取
        datasource_id=datasource_id,
        question=question,
        role_id=role_id,
        role_variables=role_variables,
        duration=0,
        status=Q2SModel.STATUS_PROCESSING,  # 初始状态
    )

    db_session.add(q2s_log)
    await db_session.flush()  # 立即写入数据库

    # 刷新以获取服务器端时间戳
    await db_session.refresh(q2s_log)

    return q2s_log

设计亮点

2.1 立即创建

await db_session.flush()  # 立即写入

为什么要立即写入?

2.2 上下文变量

project_id=project_id_var.get()  # 从上下文获取

使用 contextvars

from contextvars import ContextVar

project_id_var: ContextVar[str] = ContextVar("project_id")

# 在请求开始时设置
project_id_var.set(current_project_id)

# 在任何地方都可以获取
project_id = project_id_var.get()

优势

3. 日志更新

async def update_q2s_log(
    db_session: AsyncSession,
    q2s_id: str,
    query: dict | None = None,
    err_msg: str | None = None,
    duration: int = 0,
    trace_id: str | None = None,
) -> Q2SModel:
    """更新 Q2S 日志"""
    # 查询日志
    q2s_log = (
        await db_session.execute(select(Q2SModel).filter_by(id=q2s_id))
    ).scalar_one()

    # 更新状态
    if err_msg:
        q2s_log.status = Q2SModel.STATUS_FAILED
        q2s_log.err_msg = err_msg[:128]  # 限制长度
    else:
        q2s_log.query = query
        q2s_log.status = Q2SModel.STATUS_SUCCESS

    # 更新性能指标
    q2s_log.duration = duration
    q2s_log.trace_id = trace_id

    await db_session.flush()
    await db_session.refresh(q2s_log)  # 刷新时间戳

    return q2s_log

设计亮点

3.1 错误信息截断

q2s_log.err_msg = err_msg[:128]  # 限制长度

为什么要截断?

3.2 条件更新

if err_msg:
    q2s_log.status = Q2SModel.STATUS_FAILED
    q2s_log.err_msg = err_msg
else:
    q2s_log.query = query
    q2s_log.status = Q2SModel.STATUS_SUCCESS

清晰的状态管理

4. 日志查询

async def get_q2s_logs(
    db_session: AsyncSession,
    datasource_id=None
) -> Page[Q2SModel]:
    """查询 Q2S 日志(分页)"""
    project_id = project_id_var.get()

    # 构建查询
    if datasource_id:
        stmt = select(Q2SModel).filter_by(
            project_id=project_id,
            datasource_id=datasource_id
        )
    else:
        stmt = select(Q2SModel).filter_by(project_id=project_id)

    # 按时间倒序
    stmt = stmt.order_by(Q2SModel.created_at.desc())

    # 分页
    return await paginate(db_session, stmt)

设计亮点

4.1 自动分页

from fastapi_pagination import Page
from fastapi_pagination.ext.sqlalchemy import paginate

# 自动处理分页参数(page, size)
return await paginate(db_session, stmt)

返回格式

{
  "items": [...],
  "total": 1000,
  "page": 1,
  "size": 20,
  "pages": 50
}

4.2 多维度过滤

# 按项目过滤
filter_by(project_id=project_id)

# 按数据源过滤
filter_by(datasource_id=datasource_id)

# 按时间排序
order_by(Q2SModel.created_at.desc())

实际应用场景

场景 1:问题诊断

用户反馈:"查询结果不对"

诊断流程

# 1. 查询用户的 Q2S 日志
logs = await get_q2s_logs(db_session, datasource_id=user_datasource_id)

# 2. 找到问题日志
problem_log = logs.items[0]

# 3. 查看详细信息
print(f"问题:{problem_log.question}")
print(f"生成的 SQL:{problem_log.query['sql']}")
print(f"错误信息:{problem_log.err_msg}")
print(f"Trace ID:{problem_log.trace_id}")

# 4. 在 Langfuse 中查看完整 trace
# 包括:Prompt、LLM 响应、工具调用、中间结果

发现问题

# 问题:上个月的销售额
# SQL:SELECT SUM(amount) FROM sales WHERE date >= '2024-02-01'
# 错误:没有结束日期,查询了 2 月至今的数据

修复方案

# 优化 Prompt,明确时间范围处理
# 修改后 SQL:
# SELECT SUM(amount) FROM sales
# WHERE date >= '2024-02-01' AND date < '2024-03-01'

场景 2:性能分析

目标:找出慢查询,优化性能

分析流程

# 1. 查询耗时超过 5 秒的日志
slow_logs = await db_session.execute(
    select(Q2SModel)
    .filter(Q2SModel.duration > 5000)
    .order_by(Q2SModel.duration.desc())
    .limit(100)
)

# 2. 分析慢查询模式
for log in slow_logs:
    print(f"问题:{log.question}")
    print(f"耗时:{log.duration}ms")
    print(f"SQL:{log.query['sql']}")
    print("---")

# 3. 发现问题
# - 某些查询没有使用索引
# - 某些查询扫描了全表
# - 某些查询 JOIN 了过多表

# 4. 优化方案
# - 添加索引
# - 优化 SQL 生成逻辑
# - 限制 JOIN 数量

场景 3:质量监控

目标:监控 SQL 生成质量

监控指标

# 1. 成功率
success_rate = (
    count(status == 'success') / count(total)
) * 100

# 2. 平均耗时
avg_duration = avg(duration)

# 3. 错误分布
error_distribution = group_by(err_msg).count()

# 4. 热门问题
popular_questions = group_by(question).count().order_by(desc).limit(10)

告警规则

# 成功率低于 90%
if success_rate < 0.9:
    send_alert("Q2S success rate is low")

# 平均耗时超过 3 秒
if avg_duration > 3000:
    send_alert("Q2S average duration is high")

# 错误率突增
if error_rate > last_hour_error_rate * 2:
    send_alert("Q2S error rate spike")

场景 4:用户行为分析

目标:了解用户如何使用系统

分析维度

# 1. 最常问的问题
SELECT question, COUNT(*) as count
FROM q2s_logs
GROUP BY question
ORDER BY count DESC
LIMIT 20

# 2. 不同角色的查询模式
SELECT role_id, COUNT(*) as count, AVG(duration) as avg_duration
FROM q2s_logs
GROUP BY role_id

# 3. 查询时间分布
SELECT HOUR(created_at) as hour, COUNT(*) as count
FROM q2s_logs
GROUP BY hour
ORDER BY hour

# 4. 数据源使用情况
SELECT datasource_id, COUNT(*) as count
FROM q2s_logs
GROUP BY datasource_id
ORDER BY count DESC

最佳实践

1. 日志保留策略

# 定期清理旧日志
async def cleanup_old_logs(days=90):
    cutoff_date = datetime.now() - timedelta(days=days)
    await db_session.execute(
        delete(Q2SModel).where(Q2SModel.created_at < cutoff_date)
    )

2. 敏感信息脱敏

def mask_sensitive_data(question: str) -> str:
    """脱敏敏感信息"""
    # 脱敏手机号
    question = re.sub(r'\d{11}', '***********', question)
    # 脱敏身份证号
    question = re.sub(r'\d{18}', '******************', question)
    return question

# 创建日志时脱敏
q2s_log = Q2SModel(
    question=mask_sensitive_data(question),
    ...
)

3. 关联 Langfuse Trace

# 在 Langfuse 中记录 Q2S ID
from langfuse import Langfuse

langfuse = Langfuse()
trace = langfuse.trace(
    name="q2s",
    metadata={"q2s_id": q2s_log.id}
)

# 在 Q2S 日志中记录 Trace ID
q2s_log.trace_id = trace.id

优势

总结

AskTable 的 Q2S 日志系统提供了完整的查询追踪能力:

  1. 完整记录:问题、SQL、结果、错误、耗时
  2. 状态管理:processing → success/failed
  3. 性能追踪:耗时分析、慢查询识别
  4. 问题诊断:完整复现用户场景
  5. 质量监控:成功率、错误分布、用户行为

通过 Q2S 日志,我们可以:

日志不仅是调试工具,更是产品优化的数据基础。