
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
In metadata retrieval, pure vector search or full-text search alone has limitations. Vector search excels at semantic understanding but may miss exact matches, while full-text search excels at exact matching but doesn't understand semantics. AskTable uses a dual retrieval strategy, combining the strengths of both to achieve more accurate metadata retrieval.
Scenario: User asks about "orders from Beijing"
# Vector retrieval
query_embedding = embed("北京地区的订单")
results = vector_search(query_embedding)
# Might return:
# - region field (semantically related)
# - area field (semantically related)
# - province field (semantically related)
# But might miss:
# - specific records with value "北京"
Problem:
Scenario: User asks about "sales situation"
# Full-text retrieval
results = fulltext_search("销售情况")
# Might return:
# - fields containing "销售"
# - fields containing "情况"
# But might miss:
# - revenue field (English, semantically related but keyword doesn't match)
# - amount field (semantically related but keyword doesn't match)
Problem:
Combining both:
# 1. Vector retrieval: find related fields
field_results = vector_search(["销售额", "订单金额"])
# Returns: sales.amount, orders.revenue
# 2. Full-text retrieval: find specific values
value_results = fulltext_search(["北京", "华东"])
# Returns: region='北京', area='华东'
# 3. Merge results
final_results = merge(field_results, value_results)
The diagram above shows the complete dual retrieval process: decomposing user questions into semantic concepts and specific values, using vector search and full-text search respectively, then merging results. This strategy both understands semantics (销售额 ≈ revenue) and matches precisely (北京 = "北京").
Advantages:
async def retrieve_entities(
meta: MetaAdmin,
datasource: DataSourceAdmin,
subqueries: list[str], # Semantic queries
keywords: list[str], # Keyword queries
) -> MetaAdmin:
"""
Dual retrieval: Vector search + Full-text search
"""
fields: list[RetrievedMetaEntity] = []
values: list[RetrievedMetaEntity] = []
# 1. Vector retrieval: find related fields
fields = await datasource.retrieve_fields_by_question(subqueries)
log.info(f"retrieved {len(fields)} possible relevant fields")
# 2. Full-text retrieval: find specific values
if not config.aisearch_host or not config.aisearch_master_key:
log.warning("Value index is not enabled, skipping value retrieval")
elif not keywords:
log.warning("No keywords provided, skipping value retrieval")
else:
values = await datasource.retrieve_values_by_question(keywords)
log.info(f"retrieved {len(values)} possible relevant values")
# 3. Merge results
entities = _merge_values_fields(values + fields)
# 4. Filter metadata
hit_tables = meta.filter_tables_by_names(
list(
set([(entity["schema_name"], entity["table_name"]) for entity in entities])
)
)
# 5. Add context
_add_context_to_meta(meta=hit_tables, entities=entities)
if len(entities) == 0:
raise errors.NoDataToQuery()
return hit_tables
Design Highlights:
subqueries: list[str] # Semantic queries: ["销售额", "订单金额", "地区"]
keywords: list[str] # Keyword queries: ["北京", "华东", "2024"]
Why decompose?
subqueries: Used for vector retrieval, finding related fields
keywords: Used for full-text retrieval, finding specific values
Example:
# User question: "Beijing region 2024 sales"
# Decomposed into:
subqueries = ["销售额", "地区", "年份"] # Semantic concepts
keywords = ["北京", "2024"] # Specific values
if not config.aisearch_host or not config.aisearch_master_key:
log.warning("Value index is not enabled, skipping value retrieval")
elif not keywords:
log.warning("No keywords provided, skipping value retrieval")
else:
values = await datasource.retrieve_values_by_question(keywords)
Graceful Degradation:
async def retrieve_fields_by_question(
self, subqueries: list[str], accessible_meta=None
) -> list[RetrievedMetaEntity]:
"""Use vector search to find related fields"""
if accessible_meta:
return await retrieve_meta(subqueries=subqueries, ds_id=self.id)
else:
return await retrieve_meta(subqueries=subqueries, ds_id=self.id)
Calling Qdrant:
# In meta/qdrant.py
async def retrieve(
subqueries: list[str],
ds_id: str,
meta: MetaAdmin | None = None,
top_k: int = 12,
threshold: float = 0.4,
):
"""Vector search for metadata"""
# 1. Generate query vectors
query_embeddings = await embed_docs(subqueries)
# 2. Build search requests
search_queries = [
SearchRequest(
vector=embedding,
filter=filter,
limit=top_k,
score_threshold=threshold,
with_payload=True,
)
for embedding in query_embeddings
]
# 3. Batch search
results = await db.search_batch(
collection_name=collection_name,
requests=search_queries,
)
# 4. Return results
hits = [point for result in results for point in result]
return unpack_qdrant_points(hits=hits)
Return Format:
[
{
"id": "uuid-1",
"payload": {
"schema_name": "public",
"table_name": "sales",
"field_name": "amount",
"type": "curr_desc",
"value": "销售金额,单位:元"
},
"score": 0.85
},
...
]
async def retrieve_values_by_question(
self, keywords: list[str], accessible_meta=None
) -> list[RetrievedMetaEntity]:
"""Use full-text search to find specific values"""
meta = self.brain_meta
visited_fields = []
async def _query_values(schema_name, table_name, field_name):
"""Query unique values for a field"""
field_values = query_values(
self.id,
schema_name,
table_name,
field_name,
keywords
)
return field_values
# Traverse all fields, query values containing keywords
tasks = []
for schema in meta.schemas.values():
for table in schema.tables.values():
for field in table.fields.values():
if field.is_index: # Only query indexed fields
tasks.append(_query_values(
schema.name,
table.name,
field.name
))
# Parallel queries
results = await asyncio.gather(*tasks)
# Merge results
return [item for sublist in results for item in sublist]
Full-Text Search Implementation:
def query_values(
datasource_id: str,
schema_name: str,
table_name: str,
field_name: str,
keywords: list[str]
) -> list[RetrievedMetaEntity]:
"""
Query values from Azure AI Search or Elasticsearch
"""
# Build query
query = {
"search": " OR ".join(keywords), # "北京 OR 上海 OR 广州"
"filter": f"datasource_id eq '{datasource_id}' and "
f"schema_name eq '{schema_name}' and "
f"table_name eq '{table_name}' and "
f"field_name eq '{field_name}'",
"top": 10
}
# Execute query
results = search_client.search(**query)
# Return results
return [
{
"id": result["id"],
"payload": {
"schema_name": schema_name,
"table_name": table_name,
"field_name": field_name,
"type": "value",
"value": result["value"]
},
"score": result["@search.score"]
}
for result in results
]
Return Format:
[
{
"id": "value-1",
"payload": {
"schema_name": "public",
"table_name": "sales",
"field_name": "region",
"type": "value",
"value": "北京"
},
"score": 0.95
},
...
]
def _merge_values_fields(entities: list[RetrievedMetaEntity]) -> list[dict]:
"""
Merge field retrieval and value retrieval results
"""
# Group by table
table_entities = {}
for entity in entities:
table_key = (entity["payload"]["schema_name"], entity["payload"]["table_name"])
if table_key not in table_entities:
table_entities[table_key] = []
table_entities[table_key].append(entity)
# Deduplicate and sort
merged = []
for table_key, table_entities_list in table_entities.items():
# Sort by score
sorted_entities = sorted(
table_entities_list,
key=lambda x: x["score"],
reverse=True
)
# Deduplicate (keep highest score for same field)
seen_fields = set()
for entity in sorted_entities:
field_key = (
entity["payload"]["schema_name"],
entity["payload"]["table_name"],
entity["payload"]["field_name"]
)
if field_key not in seen_fields:
merged.append(entity["payload"])
seen_fields.add(field_key)
return merged
Merging Strategy:
def _add_context_to_meta(meta: MetaAdmin, entities: list[dict]):
"""
Add retrieved context to metadata
"""
for entity in entities:
schema_name = entity["schema_name"]
table_name = entity["table_name"]
field_name = entity["field_name"]
# Find the corresponding field
if schema := meta.schemas.get(schema_name):
if table := schema.tables.get(table_name):
if field := table.fields.get(field_name):
# Add retrieval context
if entity["type"] == "value":
# Add sample value
field.sample_data = entity["value"]
elif entity["type"] == "curr_desc":
# Already has description, no need to add
pass
Context Types:
Purpose:
User Question: "Sales for Beijing region"
Retrieval Process:
# 1. Decompose query
subqueries = ["销售额", "地区"]
keywords = ["北京"]
# 2. Vector retrieval for fields
field_results = [
{"table": "sales", "field": "amount", "score": 0.85},
{"table": "sales", "field": "region", "score": 0.80},
]
# 3. Full-text retrieval for values
value_results = [
{"table": "sales", "field": "region", "value": "北京", "score": 0.95},
]
# 4. Merge results
# Table: sales
# Fields: amount (sales amount), region (region)
# Sample value: region='北京'
# 5. Generate SQL
# SELECT SUM(amount) FROM sales WHERE region = '北京'
User Question: "Order count for January 2024"
Retrieval Process:
# 1. Decompose query
subqueries = ["订单数量", "时间", "年份", "月份"]
keywords = ["2024", "1月", "01"]
# 2. Vector retrieval for fields
field_results = [
{"table": "orders", "field": "order_count", "score": 0.90},
{"table": "orders", "field": "created_at", "score": 0.85},
{"table": "orders", "field": "order_date", "score": 0.82},
]
# 3. Full-text retrieval for values
value_results = [
{"table": "orders", "field": "order_date", "value": "2024-01-15", "score": 0.92},
{"table": "orders", "field": "order_date", "value": "2024-01-20", "score": 0.91},
]
# 4. Merge results
# Table: orders
# Fields: order_count, order_date
# Sample value: order_date='2024-01-15'
# 5. Generate SQL
# SELECT COUNT(*) FROM orders
# WHERE order_date >= '2024-01-01' AND order_date < '2024-02-01'
# Parallel execution of vector search and full-text search
field_task = datasource.retrieve_fields_by_question(subqueries)
value_task = datasource.retrieve_values_by_question(keywords)
fields, values = await asyncio.gather(field_task, value_task)
# Cache common query results
cache_key = f"{datasource_id}:{hash(tuple(subqueries))}:{hash(tuple(keywords))}"
if cache_key in cache:
return cache[cache_key]
results = await retrieve_entities(...)
cache[cache_key] = results
return results
# Only perform full-text search on indexed fields
for field in table.fields.values():
if field.is_index: # High cardinality fields
tasks.append(_query_values(...))
# ✅ Good decomposition
subqueries = ["销售额", "地区"] # Semantic concepts
keywords = ["北京"] # Specific values
# ❌ Poor decomposition
subqueries = ["北京地区的销售额"] # Mixed together
keywords = []
# Set reasonable thresholds
vector_threshold = 0.4 # Vector retrieval threshold
fulltext_threshold = 0.5 # Full-text retrieval threshold
# Limit result count
top_k = 12 # Maximum 12 results per query
# When full-text search is unavailable, use vector search only
if not fulltext_available:
log.warning("Fulltext search not available, using vector search only")
return await vector_search_only(subqueries)
AskTable achieves the combination of vector search and full-text search advantages through dual retrieval strategy:
Through dual retrieval, we achieve:
This strategy is not only applicable to metadata retrieval but can also be extended to other scenarios requiring both semantic understanding and exact matching.
sidebar.noProgrammingNeeded
sidebar.startFreeTrial