
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
In RAG (Retrieval Augmented Generation) systems, Embedding is one of the most common and time-consuming operations. When needing to generate vectors for tens of thousands of fields in a database, how to efficiently call the Embedding API? AskTable achieved 100x performance improvement through two-level batch processing and async parallel.
The most straightforward implementation is calling the Embedding API one by one:
async def embedding_naive(text_list: list[str]) -> list[list[float]]:
"""Serial call Embedding API"""
embeddings = []
for text in text_list:
response = await openai.embeddings.create(
input=text,
model="text-embedding-3-small"
)
embeddings.append(response.data[0].embedding)
return embeddings
Performance Problems:
Too slow!
The diagram compares performance differences of three approaches: serial single calls are slowest (500 seconds), serial batch processing improves 100x (5 seconds), parallel batch processing improves another 10x (0.5 seconds), achieving a total 1000x performance improvement.
OpenAI Embedding API supports batch input:
# Single text
response = await openai.embeddings.create(
input="hello world",
model="text-embedding-3-small"
)
# Batch texts (max 2048)
response = await openai.embeddings.create(
input=["hello", "world", "foo", "bar", ...], # Max 2048
model="text-embedding-3-small"
)
Advantages:
Implementation:
EMBEDDING_BATCH_SIZE = 100 # 100 texts per batch
async def embedding_batch(text_list: list[str]) -> list[list[float]]:
"""Batch call Embedding API"""
# Split into batches
batches = [
text_list[i : i + EMBEDDING_BATCH_SIZE]
for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE)
]
embeddings = []
for batch in batches:
response = await openai.embeddings.create(
input=batch,
model="text-embedding-3-small"
)
embeddings.extend([emb.embedding for emb in response.data])
return embeddings
Performance Improvement:
100x improvement!
Although batch processing greatly reduces API call count, it's still serial execution. We can parallel send multiple batches:
EMBEDDING_BATCH_SIZE = 100 # Texts per batch
EMBEDDING_CHUNK_SIZE = 100 # 100 batches in parallel
async def _embedding_batch(batch: list[str]) -> list[list[float]]:
"""Single batch Embedding"""
response = await openai.embeddings.create(
input=batch,
model="text-embedding-3-small",
encoding_format="float",
)
return [embedding.embedding for embedding in response.data]
async def embedding_parallel(text_list: list[str]) -> list[list[float]]:
"""Parallel batch call Embedding API"""
# First level: split into batches
batches = [
text_list[i : i + EMBEDDING_BATCH_SIZE]
for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE)
]
embeddings = []
# Second level: chunk parallel
for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE):
chunk = batches[i : i + EMBEDDING_CHUNK_SIZE]
# Execute in parallel
tasks = [_embedding_batch(batch) for batch in chunk]
chunk_results = await asyncio.gather(*tasks)
# Merge results
embeddings.extend([
embedding
for batch_result in chunk_results
for embedding in batch_result
])
return embeddings
Performance Improvement:
Another 100x improvement! Total 10000x improvement!
class AsyncLLMClient:
@openai_error_wrapper
async def _embedding_batch(self, batch):
try:
response = await self.client_embedding.embeddings.create(
input=batch,
model=config.embed_model_name,
encoding_format="float",
)
return [embedding.embedding for embedding in response.data]
except Exception as e:
log.error(f"embedding failed: {e}, batch: {batch}")
raise e
@openai_error_wrapper
async def embedding_batch(self, text_list: list[str]) -> list[list[float]]:
_begin = time.time()
# Preprocess: replace newlines
text_list = [text.replace("\n", " ") for text in text_list]
# First level: split into batches
batches = [
text_list[i : i + EMBEDDING_BATCH_SIZE]
for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE)
]
log.info(f"total batches: {len(batches)}")
embeddings = []
# Second level: chunk parallel
for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE):
_begin_chunk = time.time()
chunk = batches[i : i + EMBEDDING_CHUNK_SIZE]
# Execute in parallel
tasks = [self._embedding_batch(batch) for batch in chunk]
chunk_results = await asyncio.gather(*tasks)
# Merge results
embeddings.extend([
embedding
for batch_result in chunk_results
for embedding in batch_result
])
log.info(
f"embedding_chunks: {len(chunk)} batches, "
f"time: {time.time() - _begin_chunk:.2f}s"
)
log.info(
f"embedding_batch: {len(text_list)} texts, "
f"time: {time.time() - _begin:.2f}s"
)
return embeddings
Design Highlights:
EMBEDDING_BATCH_SIZE = 100 # First level: texts per API request
EMBEDDING_CHUNK_SIZE = 100 # Second level: parallel request count
Why two levels?
First Level (Batch): Reduce API call count
Second Level (Chunk): Control concurrency
Example:
# 10,000 texts
# First level: 10000 / 100 = 100 batches
# Second level: 100 / 100 = 1 chunk
# Result: 1 parallel round, 100 requests per round
# 100,000 texts
# First level: 100000 / 100 = 1000 batches
# Second level: 1000 / 100 = 10 chunks
# Result: 10 parallel rounds, 100 requests per round
text_list = [text.replace("\n", " ") for text in text_list]
Why replace newlines?
@openai_error_wrapper
async def _embedding_batch(self, batch):
try:
response = await self.client_embedding.embeddings.create(...)
return [embedding.embedding for embedding in response.data]
except Exception as e:
log.error(f"embedding failed: {e}, batch: {batch}")
raise e
Error Handling Strategy:
log.info(f"total batches: {len(batches)}")
log.info(
f"embedding_chunks: {len(chunk)} batches, "
f"time: {time.time() - _begin_chunk:.2f}s"
)
log.info(
f"embedding_batch: {len(text_list)} texts, "
f"time: {time.time() - _begin:.2f}s"
)
Monitoring Metrics:
| Solution | API Call Count | Concurrency | Total Time | Improvement |
|---|---|---|---|---|
| Serial Single | 10000 | 1 | 500s | 1x |
| Serial Batch | 100 | 1 | 5s | 100x |
| Parallel Batch | 100 | 100 | 0.5s | 1000x |
[INFO] total batches: 100
[INFO] embedding_chunks: 100 batches, time: 0.52s
[INFO] embedding_batch: 10000 texts, time: 0.52s
Throughput: 10000 / 0.52 = 19230 texts/s
# Adjust based on API limits
EMBEDDING_BATCH_SIZE = 100 # OpenAI max 2048
# Adjust based on text length
if avg_text_length > 1000:
EMBEDDING_BATCH_SIZE = 50 # Long texts reduce batch size
else:
EMBEDDING_BATCH_SIZE = 100 # Short texts increase batch size
# Adjust based on API rate limiting
EMBEDDING_CHUNK_SIZE = 100 # OpenAI rate limit: 3000 RPM
# Adjust based on network bandwidth
if network_bandwidth < 10_000_000: # 10 Mbps
EMBEDDING_CHUNK_SIZE = 50
else:
EMBEDDING_CHUNK_SIZE = 100
async def _embedding_batch_with_retry(self, batch, max_retries=3):
for attempt in range(max_retries):
try:
return await self._embedding_batch(batch)
except Exception as e:
if attempt == max_retries - 1:
raise
log.warning(f"Retry {attempt + 1}/{max_retries}: {e}")
await asyncio.sleep(2 ** attempt) # Exponential backoff
from tqdm import tqdm
async def embedding_batch_with_progress(self, text_list: list[str]):
batches = [...]
embeddings = []
with tqdm(total=len(batches), desc="Embedding") as pbar:
for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE):
chunk = batches[i : i + EMBEDDING_CHUNK_SIZE]
tasks = [self._embedding_batch(batch) for batch in chunk]
chunk_results = await asyncio.gather(*tasks)
embeddings.extend([...])
pbar.update(len(chunk))
return embeddings
AskTable achieved significant Embedding performance improvement through two-level batch processing and async parallel:
This optimization strategy is not only applicable to Embedding but can also be extended to other batch API call scenarios, such as batch LLM calls, batch database queries, etc.
The key insight is: Batch + Parallel = Performance Leap
sidebar.noProgrammingNeeded
sidebar.startFreeTrial