
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在元数据检索中,单纯的向量检索或全文检索都有局限性。向量检索擅长语义理解但可能漏掉精确匹配,全文检索擅长精确匹配但不理解语义。AskTable 通过双重检索策略,结合两者优势,实现了更准确的元数据检索。
场景:用户问"北京地区的订单"
# 向量检索 query_embedding = embed("北京地区的订单") results = vector_search(query_embedding) # 可能返回: # - region 字段(语义相关) # - area 字段(语义相关) # - province 字段(语义相关) # 但可能漏掉: # - 值为"北京"的具体记录
问题:
场景:用户问"销售情况"
# 全文检索 results = fulltext_search("销售情况") # 可能返回: # - 包含"销售"的字段 # - 包含"情况"的字段 # 但可能漏掉: # - revenue 字段(英文,语义相关但关键词不匹配) # - amount 字段(语义相关但关键词不匹配)
问题:
结合两者:
# 1. 向量检索:找相关字段 field_results = vector_search(["销售额", "订单金额"]) # 返回:sales.amount, orders.revenue # 2. 全文检索:找具体值 value_results = fulltext_search(["北京", "华东"]) # 返回:region='北京', area='华东' # 3. 合并结果 final_results = merge(field_results, value_results)
加载图表中...
上图展示了双重检索的完整流程:将用户问题分解为语义概念和具体值,分别使用向量检索和全文检索,最后合并结果。这种策略既能理解语义(销售额≈revenue),又能精确匹配(北京="北京")。
优势:
async def retrieve_entities( meta: MetaAdmin, datasource: DataSourceAdmin, subqueries: list[str], # 语义查询 keywords: list[str], # 关键词查询 ) -> MetaAdmin: """ 双重检索:向量检索 + 全文检索 """ fields: list[RetrievedMetaEntity] = [] values: list[RetrievedMetaEntity] = [] # 1. 向量检索:找相关字段 fields = await datasource.retrieve_fields_by_question(subqueries) log.info(f"retrieved {len(fields)} possible relevant fields") # 2. 全文检索:找具体值 if not config.aisearch_host or not config.aisearch_master_key: log.warning("Value index is not enabled, skipping value retrieval") elif not keywords: log.warning("No keywords provided, skipping value retrieval") else: values = await datasource.retrieve_values_by_question(keywords) log.info(f"retrieved {len(values)} possible relevant values") # 3. 合并结果 entities = _merge_values_fields(values + fields) # 4. 过滤元数据 hit_tables = meta.filter_tables_by_names( list( set([(entity["schema_name"], entity["table_name"]) for entity in entities]) ) ) # 5. 添加上下文 _add_context_to_meta(meta=hit_tables, entities=entities) if len(entities) == 0: raise errors.NoDataToQuery() return hit_tables
设计亮点:
subqueries: list[str] # 语义查询:["销售额", "订单金额", "地区"] keywords: list[str] # 关键词查询:["北京", "华东", "2024"]
为什么要分解?
subqueries:用于向量检索,找相关字段
keywords:用于全文检索,找具体值
示例:
# 用户问题:"北京地区 2024 年的销售额" # 分解为: subqueries = ["销售额", "地区", "年份"] # 语义概念 keywords = ["北京", "2024"] # 具体值
if not config.aisearch_host or not config.aisearch_master_key: log.warning("Value index is not enabled, skipping value retrieval") elif not keywords: log.warning("No keywords provided, skipping value retrieval") else: values = await datasource.retrieve_values_by_question(keywords)
优雅降级:
async def retrieve_fields_by_question( self, subqueries: list[str], accessible_meta=None ) -> list[RetrievedMetaEntity]: """使用向量检索查找相关字段""" if accessible_meta: return await retrieve_meta(subqueries=subqueries, ds_id=self.id) else: return await retrieve_meta(subqueries=subqueries, ds_id=self.id)
调用 Qdrant:
# 在 meta/qdrant.py 中 async def retrieve( subqueries: list[str], ds_id: str, meta: MetaAdmin | None = None, top_k: int = 12, threshold: float = 0.4, ): """向量检索元数据""" # 1. 生成查询向量 query_embeddings = await embed_docs(subqueries) # 2. 构建搜索请求 search_queries = [ SearchRequest( vector=embedding, filter=filter, limit=top_k, score_threshold=threshold, with_payload=True, ) for embedding in query_embeddings ] # 3. 批量搜索 results = await db.search_batch( collection_name=collection_name, requests=search_queries, ) # 4. 返回结果 hits = [point for result in results for point in result] return unpack_qdrant_points(hits=hits)
返回格式:
[ { "id": "uuid-1", "payload": { "schema_name": "public", "table_name": "sales", "field_name": "amount", "type": "curr_desc", "value": "销售金额,单位:元" }, "score": 0.85 }, ... ]
async def retrieve_values_by_question( self, keywords: list[str], accessible_meta=None ) -> list[RetrievedMetaEntity]: """使用全文检索查找具体值""" meta = self.brain_meta visited_fields = [] async def _query_values(schema_name, table_name, field_name): """查询字段的唯一值""" field_values = query_values( self.id, schema_name, table_name, field_name, keywords ) return field_values # 遍历所有字段,查询包含关键词的值 tasks = [] for schema in meta.schemas.values(): for table in schema.tables.values(): for field in table.fields.values(): if field.is_index: # 只查询索引字段 tasks.append(_query_values( schema.name, table.name, field.name )) # 并行查询 results = await asyncio.gather(*tasks) # 合并结果 return [item for sublist in results for item in sublist]
全文检索实现:
def query_values( datasource_id: str, schema_name: str, table_name: str, field_name: str, keywords: list[str] ) -> list[RetrievedMetaEntity]: """ 从 Azure AI Search 或 Elasticsearch 查询值 """ # 构建查询 query = { "search": " OR ".join(keywords), # "北京 OR 上海 OR 广州" "filter": f"datasource_id eq '{datasource_id}' and " f"schema_name eq '{schema_name}' and " f"table_name eq '{table_name}' and " f"field_name eq '{field_name}'", "top": 10 } # 执行查询 results = search_client.search(**query) # 返回结果 return [ { "id": result["id"], "payload": { "schema_name": schema_name, "table_name": table_name, "field_name": field_name, "type": "value", "value": result["value"] }, "score": result["@search.score"] } for result in results ]
返回格式:
[ { "id": "value-1", "payload": { "schema_name": "public", "table_name": "sales", "field_name": "region", "type": "value", "value": "北京" }, "score": 0.95 }, ... ]
def _merge_values_fields(entities: list[RetrievedMetaEntity]) -> list[dict]: """ 合并字段检索和值检索的结果 """ # 按表分组 table_entities = {} for entity in entities: table_key = (entity["payload"]["schema_name"], entity["payload"]["table_name"]) if table_key not in table_entities: table_entities[table_key] = [] table_entities[table_key].append(entity) # 去重和排序 merged = [] for table_key, table_entities_list in table_entities.items(): # 按 score 排序 sorted_entities = sorted( table_entities_list, key=lambda x: x["score"], reverse=True ) # 去重(同一字段只保留最高分) seen_fields = set() for entity in sorted_entities: field_key = ( entity["payload"]["schema_name"], entity["payload"]["table_name"], entity["payload"]["field_name"] ) if field_key not in seen_fields: merged.append(entity["payload"]) seen_fields.add(field_key) return merged
合并策略:
def _add_context_to_meta(meta: MetaAdmin, entities: list[dict]): """ 将检索到的上下文添加到元数据中 """ for entity in entities: schema_name = entity["schema_name"] table_name = entity["table_name"] field_name = entity["field_name"] # 找到对应的字段 if schema := meta.schemas.get(schema_name): if table := schema.tables.get(table_name): if field := table.fields.get(field_name): # 添加检索上下文 if entity["type"] == "value": # 添加示例值 field.sample_data = entity["value"] elif entity["type"] == "curr_desc": # 已有描述,不需要添加 pass
上下文类型:
用途:
用户问题:"北京地区的销售额"
检索过程:
# 1. 分解查询 subqueries = ["销售额", "地区"] keywords = ["北京"] # 2. 向量检索字段 field_results = [ {"table": "sales", "field": "amount", "score": 0.85}, {"table": "sales", "field": "region", "score": 0.80}, ] # 3. 全文检索值 value_results = [ {"table": "sales", "field": "region", "value": "北京", "score": 0.95}, ] # 4. 合并结果 # 表:sales # 字段:amount(销售金额), region(地区) # 示例值:region='北京' # 5. 生成 SQL # SELECT SUM(amount) FROM sales WHERE region = '北京'
用户问题:"2024 年 1 月的订单数量"
检索过程:
# 1. 分解查询 subqueries = ["订单数量", "时间", "年份", "月份"] keywords = ["2024", "1月", "01"] # 2. 向量检索字段 field_results = [ {"table": "orders", "field": "order_count", "score": 0.90}, {"table": "orders", "field": "created_at", "score": 0.85}, {"table": "orders", "field": "order_date", "score": 0.82}, ] # 3. 全文检索值 value_results = [ {"table": "orders", "field": "order_date", "value": "2024-01-15", "score": 0.92}, {"table": "orders", "field": "order_date", "value": "2024-01-20", "score": 0.91}, ] # 4. 合并结果 # 表:orders # 字段:order_count, order_date # 示例值:order_date='2024-01-15' # 5. 生成 SQL # SELECT COUNT(*) FROM orders # WHERE order_date >= '2024-01-01' AND order_date < '2024-02-01'
# 并行执行向量检索和全文检索 field_task = datasource.retrieve_fields_by_question(subqueries) value_task = datasource.retrieve_values_by_question(keywords) fields, values = await asyncio.gather(field_task, value_task)
# 缓存常见查询的结果 cache_key = f"{datasource_id}:{hash(tuple(subqueries))}:{hash(tuple(keywords))}" if cache_key in cache: return cache[cache_key] results = await retrieve_entities(...) cache[cache_key] = results return results
# 只在索引字段上进行全文检索 for field in table.fields.values(): if field.is_index: # 高基数字段 tasks.append(_query_values(...))
# ✅ 好的分解 subqueries = ["销售额", "地区"] # 语义概念 keywords = ["北京"] # 具体值 # ❌ 差的分解 subqueries = ["北京地区的销售额"] # 混在一起 keywords = []
# 设置合理的阈值 vector_threshold = 0.4 # 向量检索阈值 fulltext_threshold = 0.5 # 全文检索阈值 # 限制结果数量 top_k = 12 # 每个查询最多返回 12 个结果
# 全文检索不可用时,只用向量检索 if not fulltext_available: log.warning("Fulltext search not available, using vector search only") return await vector_search_only(subqueries)
AskTable 通过双重检索策略,结合向量检索和全文检索的优势:
通过双重检索,我们实现了:
这种策略不仅适用于元数据检索,也可以推广到其他需要结合语义理解和精确匹配的场景。