
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
In enterprise data analysis scenarios, databases often contain hundreds or even thousands of tables, each with dozens of fields. When users ask questions in natural language, how do you quickly find the relevant tables and fields? Traditional keyword matching is often inadequate, and AskTable achieves true semantic understanding through Qdrant vector search.
Suppose the user asks: "How were sales in the East China region last month?"
Problems with keyword matching:
revenue, sales_amount, order_totalregion, area, provinceProblems with brute force approach:
AskTable uses Qdrant vector database for semantic search:
The diagram above shows the complete flow from metadata vectorization to retrieval: the left side is the offline vectorization process, the right side is the online retrieval process. Through vector similarity calculation, the system can find the most relevant data from massive fields in milliseconds.
AskTable converts database metadata (table names, field names, descriptions) to vectors and stores them in Qdrant:
async def create(meta: MetaAdmin):
"""
- get or create project collection in db
- upsert fields data to vector
"""
project_id = project_id_var.get()
collection_name = PROJECT_COLLECTION_PREFIX + project_id
await create_collection_if_not_exist(collection_name)
docs = []
for schema in meta.schemas.values():
for table in schema.tables.values():
for field in table.fields.values():
if field.curr_desc and field.curr_desc.strip():
assert table.curr_desc is not None
assert meta.datasource_id is not None
metadata = {
"datasource_id": meta.datasource_id,
"schema_name": schema.name,
"table_name": table.name,
"field_name": field.name,
"type": "curr_desc",
"value": field.curr_desc,
}
docs.append({
"page_content": table.curr_desc + field.curr_desc,
"metadata": metadata,
"id": _gen_id_by_field(
meta.datasource_id,
schema.name,
table.name,
field.name,
),
})
await upload_batch(collection_name, docs, with_id=True)
Design Highlights:
"page_content": table.curr_desc + field.curr_desc
Why combine table description and field description?
amount alone is vague, but orders.order_amount is cleardef _gen_id_by_field(
datasource_id: str, schema_name: str, table_name: str, field_name: str
) -> str:
"""
Generate unique ID based on datasource_id, schema_name, table_name, field_name
Use uuid5 to generate UUID based on input string
"""
parts = [datasource_id, schema_name, table_name, field_name]
input_string = "-".join(parts)
return str(uuid.uuid5(uuid.NAMESPACE_DNS, input_string))
Why use UUID5?
metadata = {
"datasource_id": meta.datasource_id,
"schema_name": schema.name,
"table_name": table.name,
"field_name": field.name,
"type": "curr_desc",
"value": field.curr_desc,
}
Role of metadata:
When users ask questions, AskTable uses vector search to find the most relevant fields:
async def retrieve(
subqueries: list[str],
ds_id: str,
meta: MetaAdmin | None = None,
top_k: int = 12,
threshold: float = 0.4,
):
"""
retrieve meta from vector db
filter either by ds_id or by meta
"""
project_id = project_id_var.get()
collection_name = PROJECT_COLLECTION_PREFIX + project_id
db = get_vector_db()
if not await db.collection_exists(collection_name):
log.info(f"Collection {collection_name} does not exist, skipping query.")
return []
# Convert query text to vectors
query_embeddings = await embed_docs(subqueries)
# Build filter conditions
if meta and meta.datasource_id:
# If meta is provided, only search in allowed fields
allow_ids = []
for schema in meta.schemas.values():
for table in schema.tables.values():
for field in table.fields.values():
allow_ids.append(
_gen_id_by_field(
meta.datasource_id,
schema.name,
table.name,
field.name,
)
)
filter = Filter(must=[HasIdCondition(has_id=allow_ids)])
else:
# Otherwise filter by datasource ID
filter = Filter(
must=[FieldCondition(key="datasource_id", match=MatchValue(value=ds_id))]
)
# Build batch search requests
search_requests = [
SearchRequest(
vector=embedding,
filter=filter,
limit=top_k,
score_threshold=threshold,
with_payload=True,
)
for embedding in query_embeddings
]
# Batch search
results = await db.search_batch(
collection_name=collection_name,
requests=search_requests,
)
hits = [point for result in results for point in result]
return unpack_qdrant_points(hits=hits)
Design Highlights:
query_embeddings = await embed_docs(subqueries)
search_requests = [
SearchRequest(vector=embedding, ...)
for embedding in query_embeddings
]
results = await db.search_batch(...)
Why use multiple queries?
User question: "How were sales in the East China region last month?"
Can be decomposed into multiple sub-queries:
Each sub-query searches independently, then results are merged to improve recall.
if meta and meta.datasource_id:
# Only search in fields user has permission for
allow_ids = [...]
filter = Filter(must=[HasIdCondition(has_id=allow_ids)])
Security Assurance:
score_threshold=threshold # Default 0.4
Why is threshold needed?
AskTable implements efficient batch Embedding processing:
async def embedding_batch(self, text_list: list[str]) -> list[list[float]]:
_begin = time.time()
text_list = [text.replace("\n", " ") for text in text_list]
# Batch processing
batches = [
text_list[i : i + EMBEDDING_BATCH_SIZE]
for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE)
]
log.info(f"total batches: {len(batches)}")
embeddings = []
# Chunk parallel processing
for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE):
_begin_chunk = time.time()
chunk = batches[i : i + EMBEDDING_CHUNK_SIZE]
tasks = [self._embedding_batch(batch) for batch in chunk]
chunk_results = await asyncio.gather(*tasks)
embeddings.extend([
embedding
for batch_result in chunk_results
for embedding in batch_result
])
log.info(
f"embedding_chunks: {len(chunk)} batches, "
f"time: {time.time() - _begin_chunk:.2f}s"
)
log.info(
f"embedding_batch: {len(text_list)} texts, "
f"time: {time.time() - _begin:.2f}s"
)
return embeddings
Performance Optimization Strategies:
EMBEDDING_BATCH_SIZE = 100 # Number of texts per API request
EMBEDDING_CHUNK_SIZE = 100 # Number of parallel requests
Why two levels?
tasks = [self._embedding_batch(batch) for batch in chunk]
chunk_results = await asyncio.gather(*tasks)
Performance Improvement:
text_list = [text.replace("\n", " ") for text in text_list]
Why replace newlines?
Background:
Search Flow:
Sub-query decomposition:
Vector search (each sub-query top_k=12):
product.product_line, sales.category, dim_product.line_namefinance.gross_margin, sales.profit_rate, report.margin_pctorders.order_date, sales.sale_date, fact_sales.date_keyResult merging and deduplication:
product, sales, financePass to LLM:
Background:
Implementation:
# Each tenant has an independent collection
collection_name = f"meta_{project_id}"
# Filter by datasource_id during search
filter = Filter(
must=[FieldCondition(key="datasource_id", match=MatchValue(value=ds_id))]
)
Advantages:
Background:
Implementation:
# Get accessible metadata based on user role
accessible_meta = role.get_accessible_meta(datasource)
# Only search in accessible fields
allow_ids = [
_gen_id_by_field(...)
for field in accessible_meta.all_fields()
]
filter = Filter(must=[HasIdCondition(has_id=allow_ids)])
Security Assurance:
# Isolate collection by project
collection_name = PROJECT_COLLECTION_PREFIX + project_id
Advantages:
# Use deterministic ID to support idempotent updates
id = _gen_id_by_field(datasource_id, schema_name, table_name, field_name)
docs.append({"id": id, "page_content": ..., "metadata": ...})
await upload_batch(collection_name, docs, with_id=True)
Advantages:
async def delete_by_field_names(ds_id: str, fields_names: list[tuple[str, str, str]]):
"""Delete vectors of specified fields"""
filter = Filter(
should=[
Filter(
must=[
FieldCondition(key="datasource_id", match=MatchValue(value=ds_id)),
FieldCondition(key="field_name", match=MatchValue(value=field[2])),
FieldCondition(key="table_name", match=MatchValue(value=field[1])),
FieldCondition(key="schema_name", match=MatchValue(value=field[0])),
]
)
for field in fields_names
]
)
await db.delete(collection_name, filter)
Advantages:
Recommended:
# Combine table description and field description
page_content = f"{table.curr_desc} {field.curr_desc}"
Not Recommended:
# Only use field name
page_content = field.name # Too little semantic information
# Only use field description
page_content = field.curr_desc # Missing table context
# Precise query scenario (like report generation)
threshold = 0.6 # High threshold, only return highly relevant fields
# Exploratory query scenario (like data exploration)
threshold = 0.3 # Low threshold, return more possibly relevant fields
# Default scenario
threshold = 0.4 # Balance accuracy and recall
# Simple query
top_k = 5 # Reduce noise
# Complex query
top_k = 20 # Improve recall
# Default
top_k = 12 # Empirical value
Characteristics of high-quality metadata:
Examples:
# Good description
table.curr_desc = "Orders table, records all customer order information"
field.curr_desc = "Order amount, unit: yuan, excluding tax"
# Bad description
table.curr_desc = "orders" # No business meaning
field.curr_desc = "amount" # Insufficient information
| Feature | Qdrant | Pinecone | Milvus | Weaviate |
|---|---|---|---|---|
| Open Source | ✅ | ❌ | ✅ | ✅ |
| On-premise Deployment | ✅ | ❌ | ✅ | ✅ |
| Filtering Performance | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
| Batch Search | ✅ | ✅ | ✅ | ✅ |
| Python SDK | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
| Documentation Quality | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
Advantages of Qdrant:
AskTable achieves efficient database metadata retrieval through Qdrant vector search:
This architecture is not only applicable to Text-to-SQL, but can also be extended to other scenarios requiring search from massive structured data, such as:
Through vector search, we can make AI truly "understand" data, not just match keywords.
sidebar.noProgrammingNeeded
sidebar.startFreeTrial