AskTable

双重检索:向量搜索 + 全文检索的完美结合

AskTable 团队
AskTable 团队 2026年3月4日

在元数据检索中,单纯的向量检索或全文检索都有局限性。向量检索擅长语义理解但可能漏掉精确匹配,全文检索擅长精确匹配但不理解语义。AskTable 通过双重检索策略,结合两者优势,实现了更准确的元数据检索。

为什么需要双重检索?

单一检索的局限

向量检索的问题

场景:用户问"北京地区的订单"

# 向量检索
query_embedding = embed("北京地区的订单")
results = vector_search(query_embedding)

# 可能返回:
# - region 字段(语义相关)
# - area 字段(语义相关)
# - province 字段(语义相关)

# 但可能漏掉:
# - 值为"北京"的具体记录

问题

全文检索的问题

场景:用户问"销售情况"

# 全文检索
results = fulltext_search("销售情况")

# 可能返回:
# - 包含"销售"的字段
# - 包含"情况"的字段

# 但可能漏掉:
# - revenue 字段(英文,语义相关但关键词不匹配)
# - amount 字段(语义相关但关键词不匹配)

问题

双重检索的优势

结合两者

# 1. 向量检索:找相关字段
field_results = vector_search(["销售额", "订单金额"])
# 返回:sales.amount, orders.revenue

# 2. 全文检索:找具体值
value_results = fulltext_search(["北京", "华东"])
# 返回:region='北京', area='华东'

# 3. 合并结果
final_results = merge(field_results, value_results)
加载图表中...

上图展示了双重检索的完整流程:将用户问题分解为语义概念和具体值,分别使用向量检索和全文检索,最后合并结果。这种策略既能理解语义(销售额≈revenue),又能精确匹配(北京="北京")。

优势

核心实现

1. 双重检索流程

async def retrieve_entities(
    meta: MetaAdmin,
    datasource: DataSourceAdmin,
    subqueries: list[str],  # 语义查询
    keywords: list[str],    # 关键词查询
) -> MetaAdmin:
    """
    双重检索:向量检索 + 全文检索
    """
    fields: list[RetrievedMetaEntity] = []
    values: list[RetrievedMetaEntity] = []

    # 1. 向量检索:找相关字段
    fields = await datasource.retrieve_fields_by_question(subqueries)
    log.info(f"retrieved {len(fields)} possible relevant fields")

    # 2. 全文检索:找具体值
    if not config.aisearch_host or not config.aisearch_master_key:
        log.warning("Value index is not enabled, skipping value retrieval")
    elif not keywords:
        log.warning("No keywords provided, skipping value retrieval")
    else:
        values = await datasource.retrieve_values_by_question(keywords)
        log.info(f"retrieved {len(values)} possible relevant values")

    # 3. 合并结果
    entities = _merge_values_fields(values + fields)

    # 4. 过滤元数据
    hit_tables = meta.filter_tables_by_names(
        list(
            set([(entity["schema_name"], entity["table_name"]) for entity in entities])
        )
    )

    # 5. 添加上下文
    _add_context_to_meta(meta=hit_tables, entities=entities)

    if len(entities) == 0:
        raise errors.NoDataToQuery()

    return hit_tables

设计亮点

1.1 查询分解

subqueries: list[str]  # 语义查询:["销售额", "订单金额", "地区"]
keywords: list[str]    # 关键词查询:["北京", "华东", "2024"]

为什么要分解?

示例

# 用户问题:"北京地区 2024 年的销售额"

# 分解为:
subqueries = ["销售额", "地区", "年份"]  # 语义概念
keywords = ["北京", "2024"]  # 具体值

1.2 条件检索

if not config.aisearch_host or not config.aisearch_master_key:
    log.warning("Value index is not enabled, skipping value retrieval")
elif not keywords:
    log.warning("No keywords provided, skipping value retrieval")
else:
    values = await datasource.retrieve_values_by_question(keywords)

优雅降级

2. 字段检索(向量)

async def retrieve_fields_by_question(
    self, subqueries: list[str], accessible_meta=None
) -> list[RetrievedMetaEntity]:
    """使用向量检索查找相关字段"""
    if accessible_meta:
        return await retrieve_meta(subqueries=subqueries, ds_id=self.id)
    else:
        return await retrieve_meta(subqueries=subqueries, ds_id=self.id)

调用 Qdrant

# 在 meta/qdrant.py 中
async def retrieve(
    subqueries: list[str],
    ds_id: str,
    meta: MetaAdmin | None = None,
    top_k: int = 12,
    threshold: float = 0.4,
):
    """向量检索元数据"""
    # 1. 生成查询向量
    query_embeddings = await embed_docs(subqueries)

    # 2. 构建搜索请求
    search_queries = [
        SearchRequest(
            vector=embedding,
            filter=filter,
            limit=top_k,
            score_threshold=threshold,
            with_payload=True,
        )
        for embedding in query_embeddings
    ]

    # 3. 批量搜索
    results = await db.search_batch(
        collection_name=collection_name,
        requests=search_queries,
    )

    # 4. 返回结果
    hits = [point for result in results for point in result]
    return unpack_qdrant_points(hits=hits)

返回格式

[
    {
        "id": "uuid-1",
        "payload": {
            "schema_name": "public",
            "table_name": "sales",
            "field_name": "amount",
            "type": "curr_desc",
            "value": "销售金额,单位:元"
        },
        "score": 0.85
    },
    ...
]

3. 值检索(全文)

async def retrieve_values_by_question(
    self, keywords: list[str], accessible_meta=None
) -> list[RetrievedMetaEntity]:
    """使用全文检索查找具体值"""
    meta = self.brain_meta
    visited_fields = []

    async def _query_values(schema_name, table_name, field_name):
        """查询字段的唯一值"""
        field_values = query_values(
            self.id,
            schema_name,
            table_name,
            field_name,
            keywords
        )
        return field_values

    # 遍历所有字段,查询包含关键词的值
    tasks = []
    for schema in meta.schemas.values():
        for table in schema.tables.values():
            for field in table.fields.values():
                if field.is_index:  # 只查询索引字段
                    tasks.append(_query_values(
                        schema.name,
                        table.name,
                        field.name
                    ))

    # 并行查询
    results = await asyncio.gather(*tasks)

    # 合并结果
    return [item for sublist in results for item in sublist]

全文检索实现

def query_values(
    datasource_id: str,
    schema_name: str,
    table_name: str,
    field_name: str,
    keywords: list[str]
) -> list[RetrievedMetaEntity]:
    """
    从 Azure AI Search 或 Elasticsearch 查询值
    """
    # 构建查询
    query = {
        "search": " OR ".join(keywords),  # "北京 OR 上海 OR 广州"
        "filter": f"datasource_id eq '{datasource_id}' and "
                  f"schema_name eq '{schema_name}' and "
                  f"table_name eq '{table_name}' and "
                  f"field_name eq '{field_name}'",
        "top": 10
    }

    # 执行查询
    results = search_client.search(**query)

    # 返回结果
    return [
        {
            "id": result["id"],
            "payload": {
                "schema_name": schema_name,
                "table_name": table_name,
                "field_name": field_name,
                "type": "value",
                "value": result["value"]
            },
            "score": result["@search.score"]
        }
        for result in results
    ]

返回格式

[
    {
        "id": "value-1",
        "payload": {
            "schema_name": "public",
            "table_name": "sales",
            "field_name": "region",
            "type": "value",
            "value": "北京"
        },
        "score": 0.95
    },
    ...
]

4. 结果合并

def _merge_values_fields(entities: list[RetrievedMetaEntity]) -> list[dict]:
    """
    合并字段检索和值检索的结果
    """
    # 按表分组
    table_entities = {}
    for entity in entities:
        table_key = (entity["payload"]["schema_name"], entity["payload"]["table_name"])
        if table_key not in table_entities:
            table_entities[table_key] = []
        table_entities[table_key].append(entity)

    # 去重和排序
    merged = []
    for table_key, table_entities_list in table_entities.items():
        # 按 score 排序
        sorted_entities = sorted(
            table_entities_list,
            key=lambda x: x["score"],
            reverse=True
        )

        # 去重(同一字段只保留最高分)
        seen_fields = set()
        for entity in sorted_entities:
            field_key = (
                entity["payload"]["schema_name"],
                entity["payload"]["table_name"],
                entity["payload"]["field_name"]
            )
            if field_key not in seen_fields:
                merged.append(entity["payload"])
                seen_fields.add(field_key)

    return merged

合并策略

  1. 按表分组:同一表的字段放在一起
  2. 按分数排序:高分在前
  3. 去重:同一字段只保留最高分的结果

5. 上下文增强

def _add_context_to_meta(meta: MetaAdmin, entities: list[dict]):
    """
    将检索到的上下文添加到元数据中
    """
    for entity in entities:
        schema_name = entity["schema_name"]
        table_name = entity["table_name"]
        field_name = entity["field_name"]

        # 找到对应的字段
        if schema := meta.schemas.get(schema_name):
            if table := schema.tables.get(table_name):
                if field := table.fields.get(field_name):
                    # 添加检索上下文
                    if entity["type"] == "value":
                        # 添加示例值
                        field.sample_data = entity["value"]
                    elif entity["type"] == "curr_desc":
                        # 已有描述,不需要添加
                        pass

上下文类型

用途

实际应用场景

场景 1:地区查询

用户问题:"北京地区的销售额"

检索过程

# 1. 分解查询
subqueries = ["销售额", "地区"]
keywords = ["北京"]

# 2. 向量检索字段
field_results = [
    {"table": "sales", "field": "amount", "score": 0.85},
    {"table": "sales", "field": "region", "score": 0.80},
]

# 3. 全文检索值
value_results = [
    {"table": "sales", "field": "region", "value": "北京", "score": 0.95},
]

# 4. 合并结果
# 表:sales
# 字段:amount(销售金额), region(地区)
# 示例值:region='北京'

# 5. 生成 SQL
# SELECT SUM(amount) FROM sales WHERE region = '北京'

场景 2:时间查询

用户问题:"2024 年 1 月的订单数量"

检索过程

# 1. 分解查询
subqueries = ["订单数量", "时间", "年份", "月份"]
keywords = ["2024", "1月", "01"]

# 2. 向量检索字段
field_results = [
    {"table": "orders", "field": "order_count", "score": 0.90},
    {"table": "orders", "field": "created_at", "score": 0.85},
    {"table": "orders", "field": "order_date", "score": 0.82},
]

# 3. 全文检索值
value_results = [
    {"table": "orders", "field": "order_date", "value": "2024-01-15", "score": 0.92},
    {"table": "orders", "field": "order_date", "value": "2024-01-20", "score": 0.91},
]

# 4. 合并结果
# 表:orders
# 字段:order_count, order_date
# 示例值:order_date='2024-01-15'

# 5. 生成 SQL
# SELECT COUNT(*) FROM orders
# WHERE order_date >= '2024-01-01' AND order_date < '2024-02-01'

性能优化

1. 并行检索

# 并行执行向量检索和全文检索
field_task = datasource.retrieve_fields_by_question(subqueries)
value_task = datasource.retrieve_values_by_question(keywords)

fields, values = await asyncio.gather(field_task, value_task)

2. 缓存策略

# 缓存常见查询的结果
cache_key = f"{datasource_id}:{hash(tuple(subqueries))}:{hash(tuple(keywords))}"

if cache_key in cache:
    return cache[cache_key]

results = await retrieve_entities(...)
cache[cache_key] = results
return results

3. 索引优化

# 只在索引字段上进行全文检索
for field in table.fields.values():
    if field.is_index:  # 高基数字段
        tasks.append(_query_values(...))

最佳实践

1. 查询分解

# ✅ 好的分解
subqueries = ["销售额", "地区"]  # 语义概念
keywords = ["北京"]  # 具体值

# ❌ 差的分解
subqueries = ["北京地区的销售额"]  # 混在一起
keywords = []

2. 结果过滤

# 设置合理的阈值
vector_threshold = 0.4  # 向量检索阈值
fulltext_threshold = 0.5  # 全文检索阈值

# 限制结果数量
top_k = 12  # 每个查询最多返回 12 个结果

3. 降级策略

# 全文检索不可用时,只用向量检索
if not fulltext_available:
    log.warning("Fulltext search not available, using vector search only")
    return await vector_search_only(subqueries)

总结

AskTable 通过双重检索策略,结合向量检索和全文检索的优势:

  1. 向量检索:理解语义,找相关字段
  2. 全文检索:精确匹配,找具体值
  3. 结果合并:去重、排序、上下文增强
  4. 优雅降级:单一检索不可用时自动降级
  5. 性能优化:并行检索、缓存、索引

通过双重检索,我们实现了:

这种策略不仅适用于元数据检索,也可以推广到其他需要结合语义理解和精确匹配的场景。