AskTable

Qdrant 向量检索:让 AI 精准理解数据库结构

AskTable 团队
AskTable 团队 2026年3月4日

在企业级数据分析场景中,数据库往往包含数百甚至上千张表,每张表又有几十个字段。当用户用自然语言提问时,如何快速找到相关的表和字段?传统的关键词匹配往往力不从心,而 AskTable 通过 Qdrant 向量检索实现了真正的语义理解。

为什么需要向量检索?

传统方法的局限

假设用户问:"上个月华东地区的销售情况如何?"

关键词匹配的问题

暴力方案的问题

向量检索的优势

AskTable 使用 Qdrant 向量数据库实现语义检索:

  1. 语义理解:理解"销售情况"和"revenue"的语义相似性
  2. 高效检索:从 50000 个字段中毫秒级找到最相关的 10-20 个
  3. 成本优化:只将相关字段传给 LLM,大幅降低 token 消耗
  4. 准确性提升:减少无关信息干扰,提高 SQL 生成准确率

向量检索完整流程

加载图表中...

上图展示了从元数据向量化到检索的完整流程:左侧是离线的向量化过程,右侧是在线的检索过程。通过向量相似度计算,系统能在毫秒级从海量字段中找到最相关的数据。

核心架构设计

1. 元数据向量化

AskTable 将数据库元数据(表名、字段名、描述)转换为向量并存储到 Qdrant:

async def create(meta: MetaAdmin):
    """
    - get or create project collection in db
    - upsert fields data to vector
    """
    project_id = project_id_var.get()
    collection_name = PROJECT_COLLECTION_PREFIX + project_id
    await create_collection_if_not_exist(collection_name)

    docs = []
    for schema in meta.schemas.values():
        for table in schema.tables.values():
            for field in table.fields.values():
                if field.curr_desc and field.curr_desc.strip():
                    assert table.curr_desc is not None
                    assert meta.datasource_id is not None
                    metadata = {
                        "datasource_id": meta.datasource_id,
                        "schema_name": schema.name,
                        "table_name": table.name,
                        "field_name": field.name,
                        "type": "curr_desc",
                        "value": field.curr_desc,
                    }
                    docs.append({
                        "page_content": table.curr_desc + field.curr_desc,
                        "metadata": metadata,
                        "id": _gen_id_by_field(
                            meta.datasource_id,
                            schema.name,
                            table.name,
                            field.name,
                        ),
                    })
    await upload_batch(collection_name, docs, with_id=True)

设计亮点

1.1 组合式文本构建

"page_content": table.curr_desc + field.curr_desc

为什么要组合表描述和字段描述?

1.2 确定性 ID 生成

def _gen_id_by_field(
    datasource_id: str, schema_name: str, table_name: str, field_name: str
) -> str:
    """
    根据datasource_id, schema_name, table_name, field_name生成唯一ID
    使用 uuid5 来生成基于输入字符串的 UUID
    """
    parts = [datasource_id, schema_name, table_name, field_name]
    input_string = "-".join(parts)
    return str(uuid.uuid5(uuid.NAMESPACE_DNS, input_string))

为什么使用 UUID5?

1.3 丰富的元数据

metadata = {
    "datasource_id": meta.datasource_id,
    "schema_name": schema.name,
    "table_name": table.name,
    "field_name": field.name,
    "type": "curr_desc",
    "value": field.curr_desc,
}

元数据的作用:

2. 语义检索实现

当用户提问时,AskTable 使用向量检索找到最相关的字段:

async def retrieve(
    subqueries: list[str],
    ds_id: str,
    meta: MetaAdmin | None = None,
    top_k: int = 12,
    threshold: float = 0.4,
):
    """
    retrieve meta from vector db
    filter either by ds_id or by meta
    """
    project_id = project_id_var.get()
    collection_name = PROJECT_COLLECTION_PREFIX + project_id
    db = get_vector_db()

    if not await db.collection_exists(collection_name):
        log.info(f"集合 {collection_name} 不存在,跳过查询。")
        return []

    # 将查询文本转换为向量
    query_embeddings = await embed_docs(subqueries)

    # 构建过滤条件
    if meta and meta.datasource_id:
        # 如果提供了 meta,只在允许的字段中搜索
        allow_ids = []
        for schema in meta.schemas.values():
            for table in schema.tables.values():
                for field in table.fields.values():
                    allow_ids.append(
                        _gen_id_by_field(
                            meta.datasource_id,
                            schema.name,
                            table.name,
                            field.name,
                        )
                    )
        filter = Filter(must=[HasIdCondition(has_id=allow_ids)])
    else:
        # 否则按数据源 ID 过滤
        filter = Filter(
            must=[FieldCondition(key="datasource_id", match=MatchValue(value=ds_id))]
        )

    # 构建批量搜索请求
    search_queries = [
        SearchRequest(
            vector=embedding,
            filter=filter,
            limit=top_k,
            score_threshold=threshold,
            with_payload=True,
        )
        for embedding in query_embeddings
    ]

    # 批量搜索
    results = await db.search_batch(
        collection_name=collection_name,
        requests=search_queries,
    )

    hits = [point for result in results for point in result]
    return unpack_qdrant_points(hits=hits)

设计亮点

2.1 多查询并行检索

query_embeddings = await embed_docs(subqueries)
search_queries = [
    SearchRequest(vector=embedding, ...)
    for embedding in query_embeddings
]
results = await db.search_batch(...)

为什么使用多查询?

用户问题:"上个月华东地区的销售情况如何?"

可以分解为多个子查询:

每个子查询独立检索,然后合并结果,提高召回率。

2.2 权限感知的过滤

if meta and meta.datasource_id:
    # 只在用户有权限的字段中搜索
    allow_ids = [...]
    filter = Filter(must=[HasIdCondition(has_id=allow_ids)])

安全性保障

2.3 相似度阈值过滤

score_threshold=threshold  # 默认 0.4

为什么需要阈值?

3. Embedding 批量优化

AskTable 实现了高效的批量 Embedding 处理:

async def embedding_batch(self, text_list: list[str]) -> list[list[float]]:
    _begin = time.time()
    text_list = [text.replace("\n", " ") for text in text_list]

    # 分批处理
    batches = [
        text_list[i : i + EMBEDDING_BATCH_SIZE]
        for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE)
    ]

    log.info(f"total batches: {len(batches)}")
    embeddings = []

    # 分块并行处理
    for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE):
        _begin_chunk = time.time()
        chunk = batches[i : i + EMBEDDING_CHUNK_SIZE]
        tasks = [self._embedding_batch(batch) for batch in chunk]
        chunk_results = await asyncio.gather(*tasks)
        embeddings.extend([
            embedding
            for batch_result in chunk_results
            for embedding in batch_result
        ])
        log.info(
            f"embedding_chunks: {len(chunk)} batches, "
            f"time: {time.time() - _begin_chunk:.2f}s"
        )

    log.info(
        f"embedding_batch: {len(text_list)} texts, "
        f"time: {time.time() - _begin:.2f}s"
    )
    return embeddings

性能优化策略

3.1 两级批处理

EMBEDDING_BATCH_SIZE = 100  # 每个 API 请求的文本数
EMBEDDING_CHUNK_SIZE = 100  # 并行请求数

为什么需要两级?

3.2 异步并行处理

tasks = [self._embedding_batch(batch) for batch in chunk]
chunk_results = await asyncio.gather(*tasks)

性能提升

3.3 文本预处理

text_list = [text.replace("\n", " ") for text in text_list]

为什么要替换换行符?

实际应用场景

场景 1:大型企业数据仓库

背景

检索流程

  1. 子查询分解

    • "产品线"
    • "毛利率"
    • "上个月"
  2. 向量检索(每个子查询 top_k=12):

    • "产品线" →
      product.product_line
      ,
      sales.category
      ,
      dim_product.line_name
    • "毛利率" →
      finance.gross_margin
      ,
      sales.profit_rate
      ,
      report.margin_pct
    • "上个月" →
      orders.order_date
      ,
      sales.sale_date
      ,
      fact_sales.date_key
  3. 结果合并去重

    • 涉及 3 张表:
      product
      ,
      sales
      ,
      finance
    • 相关字段:8 个
  4. 传给 LLM

    • 原始:20000 个字段 × 50 tokens = 1M tokens
    • 优化后:8 个字段 × 50 tokens = 400 tokens
    • Token 减少 99.96%

场景 2:多租户 SaaS 平台

背景

实现方案

# 每个租户一个独立的 collection
collection_name = f"meta_{project_id}"

# 检索时按 datasource_id 过滤
filter = Filter(
    must=[FieldCondition(key="datasource_id", match=MatchValue(value=ds_id))]
)

优势

场景 3:权限控制

背景

实现方案

# 根据用户角色获取可访问的元数据
accessible_meta = role.get_accessible_meta(datasource)

# 只在可访问的字段中检索
allow_ids = [
    _gen_id_by_field(...)
    for field in accessible_meta.all_fields()
]
filter = Filter(must=[HasIdCondition(has_id=allow_ids)])

安全保障

性能优化实践

1. Collection 设计

# 按项目隔离 collection
collection_name = PROJECT_COLLECTION_PREFIX + project_id

优势

2. 增量更新

# 使用确定性 ID 支持幂等更新
id = _gen_id_by_field(datasource_id, schema_name, table_name, field_name)
docs.append({"id": id, "page_content": ..., "metadata": ...})
await upload_batch(collection_name, docs, with_id=True)

优势

3. 删除策略

async def delete_by_field_names(ds_id: str, fields_names: list[tuple[str, str, str]]):
    """删除指定字段的向量"""
    filter = Filter(
        should=[
            Filter(
                must=[
                    FieldCondition(key="datasource_id", match=MatchValue(value=ds_id)),
                    FieldCondition(key="field_name", match=MatchValue(value=field[2])),
                    FieldCondition(key="table_name", match=MatchValue(value=field[1])),
                    FieldCondition(key="schema_name", match=MatchValue(value=field[0])),
                ]
            )
            for field in fields_names
        ]
    )
    await db.delete(collection_name, filter)

优势

最佳实践建议

1. 文本构建策略

推荐

# 组合表描述和字段描述
page_content = f"{table.curr_desc} {field.curr_desc}"

不推荐

# 只用字段名
page_content = field.name  # 语义信息太少

# 只用字段描述
page_content = field.curr_desc  # 缺少表的上下文

2. 相似度阈值调优

# 精确查询场景(如报表生成)
threshold = 0.6  # 高阈值,只返回高度相关的字段

# 探索性查询场景(如数据探索)
threshold = 0.3  # 低阈值,返回更多可能相关的字段

# 默认场景
threshold = 0.4  # 平衡准确率和召回率

3. Top-K 设置

# 简单查询
top_k = 5  # 减少噪音

# 复杂查询
top_k = 20  # 提高召回率

# 默认
top_k = 12  # 经验值

4. 元数据质量

高质量元数据的特征

示例

# 好的描述
table.curr_desc = "订单表,记录所有客户订单信息"
field.curr_desc = "订单金额,单位:元,不含税"

# 差的描述
table.curr_desc = "orders"  # 没有业务含义
field.curr_desc = "amount"  # 信息不足

技术选型对比

为什么选择 Qdrant?

特性QdrantPineconeMilvusWeaviate
开源
本地部署
过滤性能⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
批量搜索
Python SDK⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
文档质量⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐

Qdrant 的优势

总结

AskTable 通过 Qdrant 向量检索实现了高效的数据库元数据检索:

  1. 语义理解:理解用户问题和字段描述的语义相似性
  2. 高效检索:从海量字段中毫秒级找到最相关的字段
  3. 成本优化:大幅减少传给 LLM 的 token 数量
  4. 权限控制:在向量检索层面就实现了权限过滤
  5. 性能优化:批量处理、并行检索、增量更新

这种架构不仅适用于 Text-to-SQL,也可以推广到其他需要从大量结构化数据中检索的场景,如:

通过向量检索,我们可以让 AI 真正"理解"数据,而不仅仅是匹配关键词。