
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
In Text-to-SQL scenarios, the biggest challenge is not generating SQL syntax, but how to make AI find the correct tables and fields. An enterprise database may have hundreds of tables and thousands of fields. How do you precisely locate relevant metadata from the user's natural language question?
AskTable's Schema Linking engine achieves high-accuracy metadata retrieval through a hybrid strategy of vector search + full-text search + Training Pair reuse.
This article deeply analyzes the design and implementation of this engine.
Problem: User asks "Which product had the highest sales last month?"
Database:
orders, products, customers, sales_records, ...order_id, product_name, sale_amount, created_at, ...AI needs to answer:
orders? sales_records?)sale_amount? total_price?)created_at? order_date?)Schema Linking's task: Find tables and fields related to the question from massive metadata.
Full Passing:
# Pass all tables and fields to LLM
prompt = f"Database has the following tables: {all_tables}\nQuestion: {question}"
Problems:
Keyword Matching:
# Simple string matching
relevant_tables = [t for t in all_tables if keyword in t.name]
Problems:
Qdrant (Vector Search):
Meilisearch (Full-text Search):
Training Pair (Example Reuse):
async def _rewrite_question(self, question: Question) -> None:
"""Rewrite user question into keywords and sub-queries"""
if not question.subqueries:
response = await prompt_generate(
"query.extract_keywords_from_question",
QUESTION=question.text,
SPECIFICATION=question.specification,
EVIDENCE=question.evidence,
)
question.keywords = response["keywords"]
question.subqueries = response["subqueries"]
log.info(f"extracted keywords: {question.keywords}")
Example:
# Input
question = "Which product had the highest sales last month?"
# Output
keywords = ["sales", "product", "last month"]
subqueries = [
"Query sales records table",
"Group by product and calculate sales amount",
"Filter data for last month"
]
Key Points:
async def _retrieve_fields(self, queries: list[str]) -> list[RetrievedMetaEntity]:
"""Find related fields through vector search"""
if not queries:
log.warning("No subqueries provided, skipping field retrieval")
return []
return await self.ds.retrieve_fields_by_question(queries)
Qdrant Search Implementation:
async def retrieve_fields_by_question(
self, queries: list[str]
) -> list[RetrievedMetaEntity]:
"""Vector search for fields"""
# 1. Convert queries to vectors
query_vectors = await self.embedding_model.encode(queries)
# 2. Search in Qdrant
results = await self.qdrant_client.search(
collection_name=f"meta_{self.id}",
query_vector=query_vectors[0],
limit=20,
score_threshold=0.7,
)
# 3. Return search results
return [
{
"id": hit.id,
"payload": hit.payload,
"score": hit.score,
}
for hit in results
]
Key Points:
score_threshold=0.7 filters low-correlation resultsasync def _retrieve_values(self, keywords: list[str]) -> list[RetrievedMetaEntity]:
"""Find field values through full-text search"""
if not config.aisearch_host or not config.aisearch_master_key:
log.warning("Value index is not enabled, skipping value retrieval")
elif not keywords:
log.warning("No keywords provided, skipping value retrieval")
else:
values = await self.ds.retrieve_values_by_question(keywords)
return values
return []
Meilisearch Search Implementation:
async def retrieve_values_by_question(
self, keywords: list[str]
) -> list[RetrievedMetaEntity]:
"""Full-text search for field values"""
# 1. Build search query
query = " ".join(keywords)
# 2. Search in Meilisearch
results = await self.meilisearch_client.index(f"values_{self.id}").search(
query,
limit=50,
attributesToRetrieve=[" schema_name", "table_name", "field_name", "value"],
)
# 3. Return search results
return [
{
"id": hit["id"],
"payload": {
"schema_name": hit["schema_name"],
"table_name": hit["table_name"],
"field_name": hit["field_name"],
"value": hit["value"],
"type": "value",
},
"score": hit["_rankingScore"],
}
for hit in results["hits"]
]
Key Points:
async def _retrieve_examples(self, query: str) -> list[TrainingPair]:
"""Search for similar historical question-SQL pairs"""
translation_examples = await retrieve_training_pairs(
datasource_id=self.ds.id,
query=query,
role_id=self.role.id if self.role else None,
)
return translation_examples
Training Pair Storage Structure:
class TrainingPair(TypedDict):
question: str # Historical question
sql: str # Corresponding SQL
score: float # Similarity score
Example:
# Current question
question = "Which product had the highest sales last month?"
# Retrieved similar questions
training_pairs = [
{
"question": "Which product had the highest sales this month?",
"sql": "SELECT product_name, SUM(amount) FROM sales WHERE month = CURRENT_MONTH GROUP BY product_name ORDER BY SUM(amount) DESC LIMIT 1",
"score": 0.92,
},
{
"question": "Which product had the best sales last year?",
"sql": "SELECT product_id, COUNT(*) FROM orders WHERE year = LAST_YEAR GROUP BY product_id ORDER BY COUNT(*) DESC LIMIT 1",
"score": 0.85,
},
]
Key Points:
def _merge_values_fields(hits: list[RetrievedMetaEntity]) -> list[MetaEntity]:
"""Merge field and value search results"""
fields_buckets: dict[tuple, set] = {}
for hit in hits:
index = (
hit["payload"]["schema_name"],
hit["payload"]["table_name"],
hit["payload"]["field_name"],
)
if not fields_buckets.get(index):
fields_buckets[index] = set()
bucket = fields_buckets[index]
if hit["payload"]["type"] == "value":
bucket.add(hit["payload"]["value"])
fields_list: list[MetaEntity] = []
for index, values in fields_buckets.items():
fields_list.append(
{
"schema_name": index[0],
"table_name": index[1],
"field_name": index[2],
"sample_values": list(values),
}
)
return fields_list
Key Points:
def _add_context_to_meta(meta: MetaAdmin, entities: list[MetaEntity]):
"""Inject retrieved field values into metadata description"""
for entity in entities:
if schema := meta.schemas.get(entity["schema_name"]):
if table := schema.tables.get(entity["table_name"]):
if field := table.fields.get(entity["field_name"]):
values = [f'"{v}"' for v in entity["sample_values"]]
if values:
if field.curr_desc:
field.curr_desc += f"(e.g. {','.join(values)})"
else:
field.curr_desc = f"(e.g. {','.join(values)})"
Effect:
# Original metadata
field = {
"name": "status",
"type": "VARCHAR",
"description": "Order status"
}
# After context injection
field = {
"name": "status",
"type": "VARCHAR",
"description": "Order status(e.g. \"completed\",\"pending\",\"cancelled\")"
}
Key Points:
async def _pick_tables(
self,
meta_candidate: MetaAdmin,
specification: str,
training_pairs: list[TrainingPair],
) -> list[tuple[str, str]]:
"""Select relevant tables through LLM reranking"""
# 1. Let LLM select most relevant tables
table_of_interest_ = await prompt_generate(
"query.select_tables_by_question",
meta_data=meta_candidate.to_markdown(),
question=specification,
translation_examples=dict_to_markdown(training_pairs),
)
table_of_interest = table_of_interest_["table_names"]
if not table_of_interest:
raise errors.NoDataToQuery(params={"message": "No data to query"})
log.info(f"relevant table names: {table_of_interest}")
# 2. Validate table name format
pairs: list[tuple[str, str]] = []
for table_name in table_of_interest:
schema, table = table_name.split(".", 1)
pairs.append((schema, table))
return pairs
Key Points:
AskTable automatically selects the optimal Schema Linking mode based on database scale:
Applicable Scenarios:
Strategy:
async def _naive_link(
self, accessible_meta: MetaAdmin, question: Question
) -> MetaContext:
"""Naive mode: Full metadata passing"""
# Only retrieve values and examples, don't filter tables
values = await self._retrieve_values(question.keywords or [])
pairs = await self._retrieve_examples(question.specification)
entities = _merge_values_fields(values)
_add_context_to_meta(accessible_meta, entities)
return {"meta": accessible_meta, "training_pairs": pairs}
Advantages:
Applicable Scenarios:
Strategy:
async def _rag_link(
self, accessible_meta: MetaAdmin, question: Question
) -> MetaContext:
"""RAG mode: Vector search + Full-text search"""
# 1. Search fields, values, and examples
fields = await self._retrieve_fields(question.subqueries or [])
values = await self._retrieve_values(question.keywords or [])
pairs = await self._retrieve_examples(question.specification)
# 2. Merge entities
examples = _training_pair_to_entities(pairs, self.ds.dialect)
entities = _merge_values_fields(values + fields + examples)
_add_context_to_meta(accessible_meta, entities)
# 3. Extract hit tables
hit_table_names: set[tuple[str, str]] = set(
[(e["schema_name"], e["table_name"]) for e in entities]
)
# 4. If too many tables hit, use LLM Rerank
if len(hit_table_names) > 3:
fields_full_names = _get_field_full_names_from_entities(entities)
hit_fields = accessible_meta.filter_fields_by_names(
[convert_full_name_to_tuple(f) for f in fields_full_names]
)
table_of_interest = await self._pick_tables(
hit_fields, question.specification, pairs
)
meta = accessible_meta.filter_tables_by_names(table_of_interest)
else:
meta = accessible_meta.filter_tables_by_names(list(hit_table_names))
return {"meta": meta, "training_pairs": pairs}
Advantages:
Applicable Scenarios:
Strategy:
async def _reasoning_link(
self, accessible_meta: MetaAdmin, question: Question
) -> MetaContext:
"""Reasoning mode: LLM-led table selection"""
# 1. Search values and examples
values = await self._retrieve_values(question.keywords or [])
pairs = await self._retrieve_examples(question.specification)
# 2. Inject context
entities = _merge_values_fields(values)
_add_context_to_meta(accessible_meta, entities)
# 3. Let LLM select relevant tables
table_of_interest = await self._pick_tables(
accessible_meta, question.specification, pairs
)
meta = accessible_meta.filter_tables_by_names(table_of_interest)
return {"meta": meta, "training_pairs": pairs}
Advantages:
async def link(self, question: Question) -> MetaContext:
"""Automatically select optimal mode"""
accessible_meta = self._get_accessible_meta()
if config.at_schema_linking_mode == SchemaLinkingMode.auto:
if accessible_meta.table_count <= 3 and accessible_meta.field_count <= 100:
return await self._naive_link(accessible_meta, question)
elif accessible_meta.table_count <= 7 and accessible_meta.field_count <= 300:
return await self._reasoning_link(accessible_meta, question)
else:
return await self._rag_link(accessible_meta, question)
elif config.at_schema_linking_mode == SchemaLinkingMode.naive:
return await self._naive_link(accessible_meta, question)
elif config.at_schema_linking_mode == SchemaLinkingMode.rag:
return await self._rag_link(accessible_meta, question)
elif config.at_schema_linking_mode == SchemaLinkingMode.reasoning:
return await self._reasoning_link(accessible_meta, question)
HNSW Index:
# Qdrant configuration
collection_config = {
"vectors": {
"size": 1536, # OpenAI embedding dimension
"distance": "Cosine",
},
"hnsw_config": {
"m": 16, # Connection count
"ef_construct": 100, # Search depth during construction
},
}
Effect:
# Batch search multiple queries
query_vectors = await self.embedding_model.encode(queries)
results = await asyncio.gather(*[
self.qdrant_client.search(
collection_name=f"meta_{self.id}",
query_vector=vec,
limit=20,
)
for vec in query_vectors
])
Effect:
# Cache Embedding results
@lru_cache(maxsize=1000)
async def get_embedding(text: str) -> list[float]:
return await embedding_model.encode(text)
Effect:
# Question
question = "How many users are there?"
# Schema Linking result
meta = {
"tables": [
{
"name": "users",
"fields": [
{"name": "id", "type": "INT"},
{"name": "name", "type": "VARCHAR"},
]
}
]
}
# Generated SQL
sql = "SELECT COUNT(*) FROM users"
# Question
question = "Which product had the highest sales last month?"
# Schema Linking result
meta = {
"tables": [
{
"name": "orders",
"fields": [
{"name": "product_id", "type": "INT"},
{"name": "amount", "type": "DECIMAL", "description": "Sales amount(e.g. \"1000.00\",\"2500.50\")"},
{"name": "created_at", "type": "TIMESTAMP"},
]
},
{
"name": "products",
"fields": [
{"name": "id", "type": "INT"},
{"name": "name", "type": "VARCHAR", "description": "Product name(e.g. \"iPhone\",\"MacBook\")"},
]
}
],
"training_pairs": [
{
"question": "Which product had the highest sales this month?",
"sql": "SELECT p.name, SUM(o.amount) FROM orders o JOIN products p ON o.product_id = p.id WHERE MONTH(o.created_at) = MONTH(NOW()) GROUP BY p.name ORDER BY SUM(o.amount) DESC LIMIT 1"
}
]
}
# Generated SQL
sql = """
SELECT p.name, SUM(o.amount) as total_amount
FROM orders o
JOIN products p ON o.product_id = p.id
WHERE o.created_at >= DATE_SUB(NOW(), INTERVAL 1 MONTH)
GROUP BY p.name
ORDER BY total_amount DESC
LIMIT 1
"""
AskTable's Schema Linking engine achieves:
✅ High Accuracy: Multi-modal search improves recall and precision ✅ Low Latency: HNSW index + batch search < 50ms ✅ Adaptive: Automatically selects optimal mode based on database scale ✅ Scalable: Supports large databases (1000+ tables)
Related Reading:
Technical Exchange:
sidebar.noProgrammingNeeded
sidebar.startFreeTrial