AskTable
sidebar.freeTrial

Qdrant 向量检索:让 AI 精准理解数据库结构

AskTable Team
AskTable Team 2026-03-04

在企业级数据分析场景中,数据库往往包含数百甚至上千张表,每张表又有几十个字段。当用户用自然语言提问时,如何快速找到相关的表和字段?传统的关键词匹配往往力不从心,而 AskTable 通过 Qdrant 向量检索实现了真正的语义理解。

为什么需要向量检索?

传统方法的局限

假设用户问:"上个月华东地区的销售情况如何?"

关键词匹配的问题

  • 用户说"销售情况",但表字段可能叫 revenuesales_amountorder_total
  • 用户说"华东地区",但字段可能叫 regionareaprovince
  • 无法理解同义词、缩写、业务术语

暴力方案的问题

  • 将所有表和字段信息都塞给 LLM?
    • 1000 张表 × 50 个字段 = 50000 个字段
    • 每个字段描述 50 tokens = 2.5M tokens
    • 成本高昂,响应缓慢,容易超出上下文限制

向量检索的优势

AskTable 使用 Qdrant 向量数据库实现语义检索:

  1. 语义理解:理解"销售情况"和"revenue"的语义相似性
  2. 高效检索:从 50000 个字段中毫秒级找到最相关的 10-20 个
  3. 成本优化:只将相关字段传给 LLM,大幅降低 token 消耗
  4. 准确性提升:减少无关信息干扰,提高 SQL 生成准确率

向量检索完整流程

加载图表中...

上图展示了从元数据向量化到检索的完整流程:左侧是离线的向量化过程,右侧是在线的检索过程。通过向量相似度计算,系统能在毫秒级从海量字段中找到最相关的数据。

核心架构设计

1. 元数据向量化

AskTable 将数据库元数据(表名、字段名、描述)转换为向量并存储到 Qdrant:

async def create(meta: MetaAdmin):
    """
    - get or create project collection in db
    - upsert fields data to vector
    """
    project_id = project_id_var.get()
    collection_name = PROJECT_COLLECTION_PREFIX + project_id
    await create_collection_if_not_exist(collection_name)

    docs = []
    for schema in meta.schemas.values():
        for table in schema.tables.values():
            for field in table.fields.values():
                if field.curr_desc and field.curr_desc.strip():
                    assert table.curr_desc is not None
                    assert meta.datasource_id is not None
                    metadata = {
                        "datasource_id": meta.datasource_id,
                        "schema_name": schema.name,
                        "table_name": table.name,
                        "field_name": field.name,
                        "type": "curr_desc",
                        "value": field.curr_desc,
                    }
                    docs.append({
                        "page_content": table.curr_desc + field.curr_desc,
                        "metadata": metadata,
                        "id": _gen_id_by_field(
                            meta.datasource_id,
                            schema.name,
                            table.name,
                            field.name,
                        ),
                    })
    await upload_batch(collection_name, docs, with_id=True)

设计亮点

1.1 组合式文本构建

"page_content": table.curr_desc + field.curr_desc

为什么要组合表描述和字段描述?

  • 上下文增强:字段 amount 单独看很模糊,但 订单表.订单金额 就很清晰
  • 语义完整性:表描述提供业务背景,字段描述提供具体含义
  • 检索准确性:用户问"订单金额"时,能同时匹配表和字段的语义

1.2 确定性 ID 生成

def _gen_id_by_field(
    datasource_id: str, schema_name: str, table_name: str, field_name: str
) -> str:
    """
    根据datasource_id, schema_name, table_name, field_name生成唯一ID
    使用 uuid5 来生成基于输入字符串的 UUID
    """
    parts = [datasource_id, schema_name, table_name, field_name]
    input_string = "-".join(parts)
    return str(uuid.uuid5(uuid.NAMESPACE_DNS, input_string))

为什么使用 UUID5?

  • 确定性:相同输入永远生成相同 UUID,支持幂等更新
  • 唯一性:不同字段生成不同 UUID,避免冲突
  • 可追溯:可以根据字段信息反向生成 ID,便于调试

1.3 丰富的元数据

metadata = {
    "datasource_id": meta.datasource_id,
    "schema_name": schema.name,
    "table_name": table.name,
    "field_name": field.name,
    "type": "curr_desc",
    "value": field.curr_desc,
}

元数据的作用:

  • 精确过滤:按数据源、Schema、表名过滤
  • 权限控制:结合用户权限过滤可访问的字段
  • 结果解析:检索结果可以直接定位到具体字段

2. 语义检索实现

当用户提问时,AskTable 使用向量检索找到最相关的字段:

async def retrieve(
    subqueries: list[str],
    ds_id: str,
    meta: MetaAdmin | None = None,
    top_k: int = 12,
    threshold: float = 0.4,
):
    """
    retrieve meta from vector db
    filter either by ds_id or by meta
    """
    project_id = project_id_var.get()
    collection_name = PROJECT_COLLECTION_PREFIX + project_id
    db = get_vector_db()

    if not await db.collection_exists(collection_name):
        log.info(f"集合 {collection_name} 不存在,跳过查询。")
        return []

    # 将查询文本转换为向量
    query_embeddings = await embed_docs(subqueries)

    # 构建过滤条件
    if meta and meta.datasource_id:
        # 如果提供了 meta,只在允许的字段中搜索
        allow_ids = []
        for schema in meta.schemas.values():
            for table in schema.tables.values():
                for field in table.fields.values():
                    allow_ids.append(
                        _gen_id_by_field(
                            meta.datasource_id,
                            schema.name,
                            table.name,
                            field.name,
                        )
                    )
        filter = Filter(must=[HasIdCondition(has_id=allow_ids)])
    else:
        # 否则按数据源 ID 过滤
        filter = Filter(
            must=[FieldCondition(key="datasource_id", match=MatchValue(value=ds_id))]
        )

    # 构建批量搜索请求
    search_queries = [
        SearchRequest(
            vector=embedding,
            filter=filter,
            limit=top_k,
            score_threshold=threshold,
            with_payload=True,
        )
        for embedding in query_embeddings
    ]

    # 批量搜索
    results = await db.search_batch(
        collection_name=collection_name,
        requests=search_queries,
    )

    hits = [point for result in results for point in result]
    return unpack_qdrant_points(hits=hits)

设计亮点

2.1 多查询并行检索

query_embeddings = await embed_docs(subqueries)
search_queries = [
    SearchRequest(vector=embedding, ...)
    for embedding in query_embeddings
]
results = await db.search_batch(...)

为什么使用多查询?

用户问题:"上个月华东地区的销售情况如何?"

可以分解为多个子查询:

  • "销售金额"
  • "订单数量"
  • "地区信息"
  • "时间字段"

每个子查询独立检索,然后合并结果,提高召回率。

2.2 权限感知的过滤

if meta and meta.datasource_id:
    # 只在用户有权限的字段中搜索
    allow_ids = [...]
    filter = Filter(must=[HasIdCondition(has_id=allow_ids)])

安全性保障

  • 用户只能检索到有权限访问的字段
  • 即使向量相似度很高,无权限的字段也不会返回
  • 在向量检索层面就实现了权限控制,而不是事后过滤

2.3 相似度阈值过滤

score_threshold=threshold  # 默认 0.4

为什么需要阈值?

  • 过滤低相关性结果,避免误导 LLM
  • 如果没有相关字段,返回空结果,让 LLM 告知用户
  • 阈值可以根据业务场景调整(精确查询用高阈值,探索性查询用低阈值)

3. Embedding 批量优化

AskTable 实现了高效的批量 Embedding 处理:

async def embedding_batch(self, text_list: list[str]) -> list[list[float]]:
    _begin = time.time()
    text_list = [text.replace("\n", " ") for text in text_list]

    # 分批处理
    batches = [
        text_list[i : i + EMBEDDING_BATCH_SIZE]
        for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE)
    ]

    log.info(f"total batches: {len(batches)}")
    embeddings = []

    # 分块并行处理
    for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE):
        _begin_chunk = time.time()
        chunk = batches[i : i + EMBEDDING_CHUNK_SIZE]
        tasks = [self._embedding_batch(batch) for batch in chunk]
        chunk_results = await asyncio.gather(*tasks)
        embeddings.extend([
            embedding
            for batch_result in chunk_results
            for embedding in batch_result
        ])
        log.info(
            f"embedding_chunks: {len(chunk)} batches, "
            f"time: {time.time() - _begin_chunk:.2f}s"
        )

    log.info(
        f"embedding_batch: {len(text_list)} texts, "
        f"time: {time.time() - _begin:.2f}s"
    )
    return embeddings

性能优化策略

3.1 两级批处理

EMBEDDING_BATCH_SIZE = 100  # 每个 API 请求的文本数
EMBEDDING_CHUNK_SIZE = 100  # 并行请求数

为什么需要两级?

  • Batch:减少 API 调用次数,提高吞吐量
  • Chunk:控制并发数,避免过载和超时
  • 示例:10000 个文本 → 100 个 batch → 每次并行 100 个 batch

3.2 异步并行处理

tasks = [self._embedding_batch(batch) for batch in chunk]
chunk_results = await asyncio.gather(*tasks)

性能提升

  • 串行:100 个 batch × 0.5s = 50s
  • 并行:100 个 batch / 100 并发 × 0.5s = 0.5s
  • 提速 100 倍

3.3 文本预处理

text_list = [text.replace("\n", " ") for text in text_list]

为什么要替换换行符?

  • 某些 Embedding 模型对换行符敏感
  • 统一格式,提高向量质量
  • 避免因格式问题导致的检索偏差

实际应用场景

场景 1:大型企业数据仓库

背景

  • 500 张表,平均每张表 40 个字段 = 20000 个字段
  • 用户问:"上个月各产品线的毛利率是多少?"

检索流程

  1. 子查询分解

    • "产品线"
    • "毛利率"
    • "上个月"
  2. 向量检索(每个子查询 top_k=12):

    • "产品线" → product.product_line, sales.category, dim_product.line_name
    • "毛利率" → finance.gross_margin, sales.profit_rate, report.margin_pct
    • "上个月" → orders.order_date, sales.sale_date, fact_sales.date_key
  3. 结果合并去重

    • 涉及 3 张表:product, sales, finance
    • 相关字段:8 个
  4. 传给 LLM

    • 原始:20000 个字段 × 50 tokens = 1M tokens
    • 优化后:8 个字段 × 50 tokens = 400 tokens
    • Token 减少 99.96%

场景 2:多租户 SaaS 平台

背景

  • 100 个租户,每个租户 50 张表
  • 需要隔离不同租户的数据

实现方案

# 每个租户一个独立的 collection
collection_name = f"meta_{project_id}"

# 检索时按 datasource_id 过滤
filter = Filter(
    must=[FieldCondition(key="datasource_id", match=MatchValue(value=ds_id))]
)

优势

  • 数据隔离:租户 A 无法检索到租户 B 的元数据
  • 性能优化:只在当前租户的数据中检索
  • 扩展性好:新增租户不影响现有租户

场景 3:权限控制

背景

  • 财务数据只有财务部门可以访问
  • 销售数据只有销售部门可以访问

实现方案

# 根据用户角色获取可访问的元数据
accessible_meta = role.get_accessible_meta(datasource)

# 只在可访问的字段中检索
allow_ids = [
    _gen_id_by_field(...)
    for field in accessible_meta.all_fields()
]
filter = Filter(must=[HasIdCondition(has_id=allow_ids)])

安全保障

  • 向量检索层面就过滤了无权限字段
  • 即使用户猜到字段名也无法访问
  • 审计日志记录所有检索请求

性能优化实践

1. Collection 设计

# 按项目隔离 collection
collection_name = PROJECT_COLLECTION_PREFIX + project_id

优势

  • 避免单个 collection 过大
  • 删除项目时可以直接删除 collection
  • 不同项目可以使用不同的向量维度和配置

2. 增量更新

# 使用确定性 ID 支持幂等更新
id = _gen_id_by_field(datasource_id, schema_name, table_name, field_name)
docs.append({"id": id, "page_content": ..., "metadata": ...})
await upload_batch(collection_name, docs, with_id=True)

优势

  • 字段描述更新时,直接覆盖旧向量
  • 避免重复向量
  • 支持部分更新,不需要重建整个 collection

3. 删除策略

async def delete_by_field_names(ds_id: str, fields_names: list[tuple[str, str, str]]):
    """删除指定字段的向量"""
    filter = Filter(
        should=[
            Filter(
                must=[
                    FieldCondition(key="datasource_id", match=MatchValue(value=ds_id)),
                    FieldCondition(key="field_name", match=MatchValue(value=field[2])),
                    FieldCondition(key="table_name", match=MatchValue(value=field[1])),
                    FieldCondition(key="schema_name", match=MatchValue(value=field[0])),
                ]
            )
            for field in fields_names
        ]
    )
    await db.delete(collection_name, filter)

优势

  • 精确删除,不影响其他字段
  • 支持批量删除,提高效率
  • 通过元数据过滤,无需知道向量 ID

最佳实践建议

1. 文本构建策略

推荐

# 组合表描述和字段描述
page_content = f"{table.curr_desc} {field.curr_desc}"

不推荐

# 只用字段名
page_content = field.name  # 语义信息太少

# 只用字段描述
page_content = field.curr_desc  # 缺少表的上下文

2. 相似度阈值调优

# 精确查询场景(如报表生成)
threshold = 0.6  # 高阈值,只返回高度相关的字段

# 探索性查询场景(如数据探索)
threshold = 0.3  # 低阈值,返回更多可能相关的字段

# 默认场景
threshold = 0.4  # 平衡准确率和召回率

3. Top-K 设置

# 简单查询
top_k = 5  # 减少噪音

# 复杂查询
top_k = 20  # 提高召回率

# 默认
top_k = 12  # 经验值

4. 元数据质量

高质量元数据的特征

  • 表描述:简洁明了,说明业务含义
  • 字段描述:包含业务术语、单位、取值范围
  • 一致性:同类字段使用统一的描述风格

示例

# 好的描述
table.curr_desc = "订单表,记录所有客户订单信息"
field.curr_desc = "订单金额,单位:元,不含税"

# 差的描述
table.curr_desc = "orders"  # 没有业务含义
field.curr_desc = "amount"  # 信息不足

技术选型对比

为什么选择 Qdrant?

特性QdrantPineconeMilvusWeaviate
开源
本地部署
过滤性能⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
批量搜索
Python SDK⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
文档质量⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐

Qdrant 的优势

  • 过滤性能:支持复杂的元数据过滤,性能优秀
  • 易用性:API 设计简洁,文档完善
  • 灵活部署:支持本地、Docker、云端多种部署方式
  • 成本:开源免费,可以自建

总结

AskTable 通过 Qdrant 向量检索实现了高效的数据库元数据检索:

  1. 语义理解:理解用户问题和字段描述的语义相似性
  2. 高效检索:从海量字段中毫秒级找到最相关的字段
  3. 成本优化:大幅减少传给 LLM 的 token 数量
  4. 权限控制:在向量检索层面就实现了权限过滤
  5. 性能优化:批量处理、并行检索、增量更新

这种架构不仅适用于 Text-to-SQL,也可以推广到其他需要从大量结构化数据中检索的场景,如:

  • 知识库问答
  • 文档检索
  • 代码搜索
  • 产品推荐

通过向量检索,我们可以让 AI 真正"理解"数据,而不仅仅是匹配关键词。

cta.readyToSimplify

sidebar.noProgrammingNeededsidebar.startFreeTrial

cta.noCreditCard
cta.quickStart
cta.dbSupport