AskTable
sidebar.freeTrial

Schema Linking: Hybrid Vector + Full-Text Retrieval for Text-to-SQL

AskTable Team
AskTable Team 2026-03-05

In Text-to-SQL scenarios, the biggest challenge is not generating SQL syntax, but how to make AI find the correct tables and fields. An enterprise database may have hundreds of tables and thousands of fields. How do you precisely locate relevant metadata from the user's natural language question?

AskTable's Schema Linking engine achieves high-accuracy metadata retrieval through a hybrid strategy of vector search + full-text search + Training Pair reuse.

This article deeply analyzes the design and implementation of this engine.


1. What is Schema Linking?

1.1 Core Challenges of Text-to-SQL

Problem: User asks "Which product had the highest sales last month?"

Database:

  • 100 tables: orders, products, customers, sales_records, ...
  • 1000 fields: order_id, product_name, sale_amount, created_at, ...

AI needs to answer:

  • Which tables should be queried? (orders? sales_records?)
  • Which fields should be used? (sale_amount? total_price?)
  • Which field does "last month" correspond to? (created_at? order_date?)

Schema Linking's task: Find tables and fields related to the question from massive metadata.

1.2 Limitations of Traditional Approaches

Full Passing:

# Pass all tables and fields to LLM
prompt = f"Database has the following tables: {all_tables}\nQuestion: {question}"

Problems:

  • ❌ Huge token consumption (100 tables = 10K+ tokens)
  • ❌ LLM easily confused (information overload)
  • ❌ High cost, high latency

Keyword Matching:

# Simple string matching
relevant_tables = [t for t in all_tables if keyword in t.name]

Problems:

  • ❌ Cannot handle synonyms ("sales" vs "revenue")
  • ❌ Cannot handle abbreviations ("order" vs "order")
  • ❌ Low recall

2. AskTable's Hybrid Search Strategy

2.1 Three-Layer Search Architecture

加载图表中...

2.2 Core Components

Qdrant (Vector Search):

  • Stores semantic vectors of fields
  • Supports similarity search
  • Good for handling synonyms and semantic relevance

Meilisearch (Full-text Search):

  • Stores field value indexes
  • Supports prefix matching and fuzzy search
  • Good for handling precise value queries

Training Pair (Example Reuse):

  • Stores historical question-SQL pairs
  • Supports similar question search
  • Provides Few-Shot learning samples

3. Core Implementation: Deep Dive into Source Code

3.1 Question Rewriting: Extract Keywords and Sub-queries

async def _rewrite_question(self, question: Question) -> None:
    """Rewrite user question into keywords and sub-queries"""
    if not question.subqueries:
        response = await prompt_generate(
            "query.extract_keywords_from_question",
            QUESTION=question.text,
            SPECIFICATION=question.specification,
            EVIDENCE=question.evidence,
        )
        question.keywords = response["keywords"]
        question.subqueries = response["subqueries"]
        log.info(f"extracted keywords: {question.keywords}")

Example:

# Input
question = "Which product had the highest sales last month?"

# Output
keywords = ["sales", "product", "last month"]
subqueries = [
    "Query sales records table",
    "Group by product and calculate sales amount",
    "Filter data for last month"
]

Key Points:

  • Keywords: Used for full-text search (finding field values)
  • Sub-queries: Used for vector search (finding related fields)
async def _retrieve_fields(self, queries: list[str]) -> list[RetrievedMetaEntity]:
    """Find related fields through vector search"""
    if not queries:
        log.warning("No subqueries provided, skipping field retrieval")
        return []
    return await self.ds.retrieve_fields_by_question(queries)

Qdrant Search Implementation:

async def retrieve_fields_by_question(
    self, queries: list[str]
) -> list[RetrievedMetaEntity]:
    """Vector search for fields"""
    # 1. Convert queries to vectors
    query_vectors = await self.embedding_model.encode(queries)

    # 2. Search in Qdrant
    results = await self.qdrant_client.search(
        collection_name=f"meta_{self.id}",
        query_vector=query_vectors[0],
        limit=20,
        score_threshold=0.7,
    )

    # 3. Return search results
    return [
        {
            "id": hit.id,
            "payload": hit.payload,
            "score": hit.score,
        }
        for hit in results
    ]

Key Points:

  • Semantic Similarity: Calculate relevance through vector distance
  • Threshold Filtering: score_threshold=0.7 filters low-correlation results
  • Top-K Search: Return the 20 most relevant fields

3.3 Full-text Search: Find Field Values

async def _retrieve_values(self, keywords: list[str]) -> list[RetrievedMetaEntity]:
    """Find field values through full-text search"""
    if not config.aisearch_host or not config.aisearch_master_key:
        log.warning("Value index is not enabled, skipping value retrieval")
    elif not keywords:
        log.warning("No keywords provided, skipping value retrieval")
    else:
        values = await self.ds.retrieve_values_by_question(keywords)
        return values
    return []

Meilisearch Search Implementation:

async def retrieve_values_by_question(
    self, keywords: list[str]
) -> list[RetrievedMetaEntity]:
    """Full-text search for field values"""
    # 1. Build search query
    query = " ".join(keywords)

    # 2. Search in Meilisearch
    results = await self.meilisearch_client.index(f"values_{self.id}").search(
        query,
        limit=50,
        attributesToRetrieve=[" schema_name", "table_name", "field_name", "value"],
    )

    # 3. Return search results
    return [
        {
            "id": hit["id"],
            "payload": {
                "schema_name": hit["schema_name"],
                "table_name": hit["table_name"],
                "field_name": hit["field_name"],
                "value": hit["value"],
                "type": "value",
            },
            "score": hit["_rankingScore"],
        }
        for hit in results["hits"]
    ]

Key Points:

  • Prefix Matching: Supports partial matching ("sales" matches "sales amount")
  • Fuzzy Search: Tolerates spelling errors
  • Field Value Index: Stores actual data values, helps AI understand field meaning

3.4 Training Pair Search: Reuse Historical Examples

async def _retrieve_examples(self, query: str) -> list[TrainingPair]:
    """Search for similar historical question-SQL pairs"""
    translation_examples = await retrieve_training_pairs(
        datasource_id=self.ds.id,
        query=query,
        role_id=self.role.id if self.role else None,
    )
    return translation_examples

Training Pair Storage Structure:

class TrainingPair(TypedDict):
    question: str  # Historical question
    sql: str  # Corresponding SQL
    score: float  # Similarity score

Example:

# Current question
question = "Which product had the highest sales last month?"

# Retrieved similar questions
training_pairs = [
    {
        "question": "Which product had the highest sales this month?",
        "sql": "SELECT product_name, SUM(amount) FROM sales WHERE month = CURRENT_MONTH GROUP BY product_name ORDER BY SUM(amount) DESC LIMIT 1",
        "score": 0.92,
    },
    {
        "question": "Which product had the best sales last year?",
        "sql": "SELECT product_id, COUNT(*) FROM orders WHERE year = LAST_YEAR GROUP BY product_id ORDER BY COUNT(*) DESC LIMIT 1",
        "score": 0.85,
    },
]

Key Points:

  • Few-Shot Learning: Provide SQL examples for similar questions
  • Table Field Hints: Extract used tables and fields from historical SQL
  • Pattern Reuse: Learn query patterns (like GROUP BY + ORDER BY + LIMIT)

3.5 Entity Merging: Deduplication and Aggregation

def _merge_values_fields(hits: list[RetrievedMetaEntity]) -> list[MetaEntity]:
    """Merge field and value search results"""
    fields_buckets: dict[tuple, set] = {}

    for hit in hits:
        index = (
            hit["payload"]["schema_name"],
            hit["payload"]["table_name"],
            hit["payload"]["field_name"],
        )
        if not fields_buckets.get(index):
            fields_buckets[index] = set()
        bucket = fields_buckets[index]

        if hit["payload"]["type"] == "value":
            bucket.add(hit["payload"]["value"])

    fields_list: list[MetaEntity] = []
    for index, values in fields_buckets.items():
        fields_list.append(
            {
                "schema_name": index[0],
                "table_name": index[1],
                "field_name": index[2],
                "sample_values": list(values),
            }
        )
    return fields_list

Key Points:

  • Group by Field: Merge multiple values of the same field
  • Deduplication: Avoid duplicate fields
  • Sample Values: Retain sample values for fields

3.6 Context Injection: Enhance Metadata Description

def _add_context_to_meta(meta: MetaAdmin, entities: list[MetaEntity]):
    """Inject retrieved field values into metadata description"""
    for entity in entities:
        if schema := meta.schemas.get(entity["schema_name"]):
            if table := schema.tables.get(entity["table_name"]):
                if field := table.fields.get(entity["field_name"]):
                    values = [f'"{v}"' for v in entity["sample_values"]]
                    if values:
                        if field.curr_desc:
                            field.curr_desc += f"(e.g. {','.join(values)})"
                        else:
                            field.curr_desc = f"(e.g. {','.join(values)})"

Effect:

# Original metadata
field = {
    "name": "status",
    "type": "VARCHAR",
    "description": "Order status"
}

# After context injection
field = {
    "name": "status",
    "type": "VARCHAR",
    "description": "Order status(e.g. \"completed\",\"pending\",\"cancelled\")"
}

Key Points:

  • Sample Values: Help LLM understand actual field meaning
  • Enumeration Values: Show possible value range of field

3.7 Table Selection: LLM Rerank

async def _pick_tables(
    self,
    meta_candidate: MetaAdmin,
    specification: str,
    training_pairs: list[TrainingPair],
) -> list[tuple[str, str]]:
    """Select relevant tables through LLM reranking"""
    # 1. Let LLM select most relevant tables
    table_of_interest_ = await prompt_generate(
        "query.select_tables_by_question",
        meta_data=meta_candidate.to_markdown(),
        question=specification,
        translation_examples=dict_to_markdown(training_pairs),
    )
    table_of_interest = table_of_interest_["table_names"]

    if not table_of_interest:
        raise errors.NoDataToQuery(params={"message": "No data to query"})

    log.info(f"relevant table names: {table_of_interest}")

    # 2. Validate table name format
    pairs: list[tuple[str, str]] = []
    for table_name in table_of_interest:
        schema, table = table_name.split(".", 1)
        pairs.append((schema, table))

    return pairs

Key Points:

  • LLM Judgment: Leverages LLM's semantic understanding capability
  • Rich Context: Combines metadata and historical examples
  • Table Count Control: Avoid passing too many tables to SQL generation stage

4. Multi-Mode Adaptive Strategy

AskTable automatically selects the optimal Schema Linking mode based on database scale:

4.1 Naive Mode

Applicable Scenarios:

  • Table count ≤ 3
  • Field count ≤ 100

Strategy:

async def _naive_link(
    self, accessible_meta: MetaAdmin, question: Question
) -> MetaContext:
    """Naive mode: Full metadata passing"""
    # Only retrieve values and examples, don't filter tables
    values = await self._retrieve_values(question.keywords or [])
    pairs = await self._retrieve_examples(question.specification)

    entities = _merge_values_fields(values)
    _add_context_to_meta(accessible_meta, entities)

    return {"meta": accessible_meta, "training_pairs": pairs}

Advantages:

  • Simple and efficient
  • No complex search needed
  • Suitable for small databases

4.2 RAG Mode

Applicable Scenarios:

  • Table count ≤ 7
  • Field count ≤ 300

Strategy:

async def _rag_link(
    self, accessible_meta: MetaAdmin, question: Question
) -> MetaContext:
    """RAG mode: Vector search + Full-text search"""
    # 1. Search fields, values, and examples
    fields = await self._retrieve_fields(question.subqueries or [])
    values = await self._retrieve_values(question.keywords or [])
    pairs = await self._retrieve_examples(question.specification)

    # 2. Merge entities
    examples = _training_pair_to_entities(pairs, self.ds.dialect)
    entities = _merge_values_fields(values + fields + examples)
    _add_context_to_meta(accessible_meta, entities)

    # 3. Extract hit tables
    hit_table_names: set[tuple[str, str]] = set(
        [(e["schema_name"], e["table_name"]) for e in entities]
    )

    # 4. If too many tables hit, use LLM Rerank
    if len(hit_table_names) > 3:
        fields_full_names = _get_field_full_names_from_entities(entities)
        hit_fields = accessible_meta.filter_fields_by_names(
            [convert_full_name_to_tuple(f) for f in fields_full_names]
        )
        table_of_interest = await self._pick_tables(
            hit_fields, question.specification, pairs
        )
        meta = accessible_meta.filter_tables_by_names(table_of_interest)
    else:
        meta = accessible_meta.filter_tables_by_names(list(hit_table_names))

    return {"meta": meta, "training_pairs": pairs}

Advantages:

  • Precise search
  • Reduced token consumption
  • Improved SQL generation accuracy

4.3 Reasoning Mode

Applicable Scenarios:

  • Table count > 7
  • Field count > 300

Strategy:

async def _reasoning_link(
    self, accessible_meta: MetaAdmin, question: Question
) -> MetaContext:
    """Reasoning mode: LLM-led table selection"""
    # 1. Search values and examples
    values = await self._retrieve_values(question.keywords or [])
    pairs = await self._retrieve_examples(question.specification)

    # 2. Inject context
    entities = _merge_values_fields(values)
    _add_context_to_meta(accessible_meta, entities)

    # 3. Let LLM select relevant tables
    table_of_interest = await self._pick_tables(
        accessible_meta, question.specification, pairs
    )
    meta = accessible_meta.filter_tables_by_names(table_of_interest)

    return {"meta": meta, "training_pairs": pairs}

Advantages:

  • Suitable for large databases
  • LLM deep reasoning
  • Stronger semantic understanding

4.4 Auto Mode

async def link(self, question: Question) -> MetaContext:
    """Automatically select optimal mode"""
    accessible_meta = self._get_accessible_meta()

    if config.at_schema_linking_mode == SchemaLinkingMode.auto:
        if accessible_meta.table_count <= 3 and accessible_meta.field_count <= 100:
            return await self._naive_link(accessible_meta, question)
        elif accessible_meta.table_count <= 7 and accessible_meta.field_count <= 300:
            return await self._reasoning_link(accessible_meta, question)
        else:
            return await self._rag_link(accessible_meta, question)
    elif config.at_schema_linking_mode == SchemaLinkingMode.naive:
        return await self._naive_link(accessible_meta, question)
    elif config.at_schema_linking_mode == SchemaLinkingMode.rag:
        return await self._rag_link(accessible_meta, question)
    elif config.at_schema_linking_mode == SchemaLinkingMode.reasoning:
        return await self._reasoning_link(accessible_meta, question)

5. Performance Optimization

5.1 Vector Index Optimization

HNSW Index:

# Qdrant configuration
collection_config = {
    "vectors": {
        "size": 1536,  # OpenAI embedding dimension
        "distance": "Cosine",
    },
    "hnsw_config": {
        "m": 16,  # Connection count
        "ef_construct": 100,  # Search depth during construction
    },
}

Effect:

  • Original: Linear search 1000 fields = ~100ms
  • Optimized: HNSW index = ~10ms
# Batch search multiple queries
query_vectors = await self.embedding_model.encode(queries)
results = await asyncio.gather(*[
    self.qdrant_client.search(
        collection_name=f"meta_{self.id}",
        query_vector=vec,
        limit=20,
    )
    for vec in query_vectors
])

Effect:

  • Original: 3 queries serial = 30ms
  • Optimized: 3 queries parallel = 10ms

5.3 Cache Strategy

# Cache Embedding results
@lru_cache(maxsize=1000)
async def get_embedding(text: str) -> list[float]:
    return await embedding_model.encode(text)

Effect:

  • Original: Recalculate Embedding each time = 50ms
  • Optimized: Cache hit = < 1ms

6. Practical Cases

Case 1: Simple Query

# Question
question = "How many users are there?"

# Schema Linking result
meta = {
    "tables": [
        {
            "name": "users",
            "fields": [
                {"name": "id", "type": "INT"},
                {"name": "name", "type": "VARCHAR"},
            ]
        }
    ]
}

# Generated SQL
sql = "SELECT COUNT(*) FROM users"

Case 2: Complex Query

# Question
question = "Which product had the highest sales last month?"

# Schema Linking result
meta = {
    "tables": [
        {
            "name": "orders",
            "fields": [
                {"name": "product_id", "type": "INT"},
                {"name": "amount", "type": "DECIMAL", "description": "Sales amount(e.g. \"1000.00\",\"2500.50\")"},
                {"name": "created_at", "type": "TIMESTAMP"},
            ]
        },
        {
            "name": "products",
            "fields": [
                {"name": "id", "type": "INT"},
                {"name": "name", "type": "VARCHAR", "description": "Product name(e.g. \"iPhone\",\"MacBook\")"},
            ]
        }
    ],
    "training_pairs": [
        {
            "question": "Which product had the highest sales this month?",
            "sql": "SELECT p.name, SUM(o.amount) FROM orders o JOIN products p ON o.product_id = p.id WHERE MONTH(o.created_at) = MONTH(NOW()) GROUP BY p.name ORDER BY SUM(o.amount) DESC LIMIT 1"
        }
    ]
}

# Generated SQL
sql = """
SELECT p.name, SUM(o.amount) as total_amount
FROM orders o
JOIN products p ON o.product_id = p.id
WHERE o.created_at >= DATE_SUB(NOW(), INTERVAL 1 MONTH)
GROUP BY p.name
ORDER BY total_amount DESC
LIMIT 1
"""

7. Summary and Outlook

AskTable's Schema Linking engine achieves:

High Accuracy: Multi-modal search improves recall and precision ✅ Low Latency: HNSW index + batch search < 50ms ✅ Adaptive: Automatically selects optimal mode based on database scale ✅ Scalable: Supports large databases (1000+ tables)

Future Optimization Directions

  1. Graph Search: Utilize table relationships (foreign keys) to optimize search
  2. Reinforcement Learning: Optimize search strategy based on user feedback
  3. Multi-language Support: Support Chinese-English mixed queries
  4. Real-time Updates: Automatically update indexes after metadata changes

Related Reading:

Technical Exchange:

cta.readyToSimplify

sidebar.noProgrammingNeededsidebar.startFreeTrial

cta.noCreditCard
cta.quickStart
cta.dbSupport