
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在企业级数据分析场景中,数据库往往包含数百甚至上千张表,每张表又有几十个字段。当用户用自然语言提问时,如何快速找到相关的表和字段?传统的关键词匹配往往力不从心,而 AskTable 通过 Qdrant 向量检索实现了真正的语义理解。
假设用户问:"上个月华东地区的销售情况如何?"
关键词匹配的问题:
revenue、sales_amount、order_totalregion、area、province暴力方案的问题:
AskTable 使用 Qdrant 向量数据库实现语义检索:
加载图表中...
上图展示了从元数据向量化到检索的完整流程:左侧是离线的向量化过程,右侧是在线的检索过程。通过向量相似度计算,系统能在毫秒级从海量字段中找到最相关的数据。
AskTable 将数据库元数据(表名、字段名、描述)转换为向量并存储到 Qdrant:
async def create(meta: MetaAdmin): """ - get or create project collection in db - upsert fields data to vector """ project_id = project_id_var.get() collection_name = PROJECT_COLLECTION_PREFIX + project_id await create_collection_if_not_exist(collection_name) docs = [] for schema in meta.schemas.values(): for table in schema.tables.values(): for field in table.fields.values(): if field.curr_desc and field.curr_desc.strip(): assert table.curr_desc is not None assert meta.datasource_id is not None metadata = { "datasource_id": meta.datasource_id, "schema_name": schema.name, "table_name": table.name, "field_name": field.name, "type": "curr_desc", "value": field.curr_desc, } docs.append({ "page_content": table.curr_desc + field.curr_desc, "metadata": metadata, "id": _gen_id_by_field( meta.datasource_id, schema.name, table.name, field.name, ), }) await upload_batch(collection_name, docs, with_id=True)
设计亮点:
"page_content": table.curr_desc + field.curr_desc
为什么要组合表描述和字段描述?
amount 单独看很模糊,但 订单表.订单金额 就很清晰def _gen_id_by_field( datasource_id: str, schema_name: str, table_name: str, field_name: str ) -> str: """ 根据datasource_id, schema_name, table_name, field_name生成唯一ID 使用 uuid5 来生成基于输入字符串的 UUID """ parts = [datasource_id, schema_name, table_name, field_name] input_string = "-".join(parts) return str(uuid.uuid5(uuid.NAMESPACE_DNS, input_string))
为什么使用 UUID5?
metadata = { "datasource_id": meta.datasource_id, "schema_name": schema.name, "table_name": table.name, "field_name": field.name, "type": "curr_desc", "value": field.curr_desc, }
元数据的作用:
当用户提问时,AskTable 使用向量检索找到最相关的字段:
async def retrieve( subqueries: list[str], ds_id: str, meta: MetaAdmin | None = None, top_k: int = 12, threshold: float = 0.4, ): """ retrieve meta from vector db filter either by ds_id or by meta """ project_id = project_id_var.get() collection_name = PROJECT_COLLECTION_PREFIX + project_id db = get_vector_db() if not await db.collection_exists(collection_name): log.info(f"集合 {collection_name} 不存在,跳过查询。") return [] # 将查询文本转换为向量 query_embeddings = await embed_docs(subqueries) # 构建过滤条件 if meta and meta.datasource_id: # 如果提供了 meta,只在允许的字段中搜索 allow_ids = [] for schema in meta.schemas.values(): for table in schema.tables.values(): for field in table.fields.values(): allow_ids.append( _gen_id_by_field( meta.datasource_id, schema.name, table.name, field.name, ) ) filter = Filter(must=[HasIdCondition(has_id=allow_ids)]) else: # 否则按数据源 ID 过滤 filter = Filter( must=[FieldCondition(key="datasource_id", match=MatchValue(value=ds_id))] ) # 构建批量搜索请求 search_queries = [ SearchRequest( vector=embedding, filter=filter, limit=top_k, score_threshold=threshold, with_payload=True, ) for embedding in query_embeddings ] # 批量搜索 results = await db.search_batch( collection_name=collection_name, requests=search_queries, ) hits = [point for result in results for point in result] return unpack_qdrant_points(hits=hits)
设计亮点:
query_embeddings = await embed_docs(subqueries) search_queries = [ SearchRequest(vector=embedding, ...) for embedding in query_embeddings ] results = await db.search_batch(...)
为什么使用多查询?
用户问题:"上个月华东地区的销售情况如何?"
可以分解为多个子查询:
每个子查询独立检索,然后合并结果,提高召回率。
if meta and meta.datasource_id: # 只在用户有权限的字段中搜索 allow_ids = [...] filter = Filter(must=[HasIdCondition(has_id=allow_ids)])
安全性保障:
score_threshold=threshold # 默认 0.4
为什么需要阈值?
AskTable 实现了高效的批量 Embedding 处理:
async def embedding_batch(self, text_list: list[str]) -> list[list[float]]: _begin = time.time() text_list = [text.replace("\n", " ") for text in text_list] # 分批处理 batches = [ text_list[i : i + EMBEDDING_BATCH_SIZE] for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE) ] log.info(f"total batches: {len(batches)}") embeddings = [] # 分块并行处理 for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE): _begin_chunk = time.time() chunk = batches[i : i + EMBEDDING_CHUNK_SIZE] tasks = [self._embedding_batch(batch) for batch in chunk] chunk_results = await asyncio.gather(*tasks) embeddings.extend([ embedding for batch_result in chunk_results for embedding in batch_result ]) log.info( f"embedding_chunks: {len(chunk)} batches, " f"time: {time.time() - _begin_chunk:.2f}s" ) log.info( f"embedding_batch: {len(text_list)} texts, " f"time: {time.time() - _begin:.2f}s" ) return embeddings
性能优化策略:
EMBEDDING_BATCH_SIZE = 100 # 每个 API 请求的文本数 EMBEDDING_CHUNK_SIZE = 100 # 并行请求数
为什么需要两级?
tasks = [self._embedding_batch(batch) for batch in chunk] chunk_results = await asyncio.gather(*tasks)
性能提升:
text_list = [text.replace("\n", " ") for text in text_list]
为什么要替换换行符?
背景:
检索流程:
子查询分解:
向量检索(每个子查询 top_k=12):
product.product_line, sales.category, dim_product.line_namefinance.gross_margin, sales.profit_rate, report.margin_pctorders.order_date, sales.sale_date, fact_sales.date_key结果合并去重:
product, sales, finance传给 LLM:
背景:
实现方案:
# 每个租户一个独立的 collection collection_name = f"meta_{project_id}" # 检索时按 datasource_id 过滤 filter = Filter( must=[FieldCondition(key="datasource_id", match=MatchValue(value=ds_id))] )
优势:
背景:
实现方案:
# 根据用户角色获取可访问的元数据 accessible_meta = role.get_accessible_meta(datasource) # 只在可访问的字段中检索 allow_ids = [ _gen_id_by_field(...) for field in accessible_meta.all_fields() ] filter = Filter(must=[HasIdCondition(has_id=allow_ids)])
安全保障:
# 按项目隔离 collection collection_name = PROJECT_COLLECTION_PREFIX + project_id
优势:
# 使用确定性 ID 支持幂等更新 id = _gen_id_by_field(datasource_id, schema_name, table_name, field_name) docs.append({"id": id, "page_content": ..., "metadata": ...}) await upload_batch(collection_name, docs, with_id=True)
优势:
async def delete_by_field_names(ds_id: str, fields_names: list[tuple[str, str, str]]): """删除指定字段的向量""" filter = Filter( should=[ Filter( must=[ FieldCondition(key="datasource_id", match=MatchValue(value=ds_id)), FieldCondition(key="field_name", match=MatchValue(value=field[2])), FieldCondition(key="table_name", match=MatchValue(value=field[1])), FieldCondition(key="schema_name", match=MatchValue(value=field[0])), ] ) for field in fields_names ] ) await db.delete(collection_name, filter)
优势:
推荐:
# 组合表描述和字段描述 page_content = f"{table.curr_desc} {field.curr_desc}"
不推荐:
# 只用字段名 page_content = field.name # 语义信息太少 # 只用字段描述 page_content = field.curr_desc # 缺少表的上下文
# 精确查询场景(如报表生成) threshold = 0.6 # 高阈值,只返回高度相关的字段 # 探索性查询场景(如数据探索) threshold = 0.3 # 低阈值,返回更多可能相关的字段 # 默认场景 threshold = 0.4 # 平衡准确率和召回率
# 简单查询 top_k = 5 # 减少噪音 # 复杂查询 top_k = 20 # 提高召回率 # 默认 top_k = 12 # 经验值
高质量元数据的特征:
示例:
# 好的描述 table.curr_desc = "订单表,记录所有客户订单信息" field.curr_desc = "订单金额,单位:元,不含税" # 差的描述 table.curr_desc = "orders" # 没有业务含义 field.curr_desc = "amount" # 信息不足
| 特性 | Qdrant | Pinecone | Milvus | Weaviate |
|---|---|---|---|---|
| 开源 | ✅ | ❌ | ✅ | ✅ |
| 本地部署 | ✅ | ❌ | ✅ | ✅ |
| 过滤性能 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
| 批量搜索 | ✅ | ✅ | ✅ | ✅ |
| Python SDK | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
| 文档质量 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
Qdrant 的优势:
AskTable 通过 Qdrant 向量检索实现了高效的数据库元数据检索:
这种架构不仅适用于 Text-to-SQL,也可以推广到其他需要从大量结构化数据中检索的场景,如:
通过向量检索,我们可以让 AI 真正"理解"数据,而不仅仅是匹配关键词。