AskTable
sidebar.freeTrial

Batch Embedding Optimization: Performance Improvement Journey from Serial to Parallel

AskTable Team
AskTable Team 2026-03-04

In RAG (Retrieval Augmented Generation) systems, Embedding is one of the most common and time-consuming operations. When needing to generate vectors for tens of thousands of fields in a database, how to efficiently call the Embedding API? AskTable achieved 100x performance improvement through two-level batch processing and async parallel.

Problem Background

Initial Implementation: Serial Calls

The most straightforward implementation is calling the Embedding API one by one:

async def embedding_naive(text_list: list[str]) -> list[list[float]]:
    """Serial call Embedding API"""
    embeddings = []
    for text in text_list:
        response = await openai.embeddings.create(
            input=text,
            model="text-embedding-3-small"
        )
        embeddings.append(response.data[0].embedding)
    return embeddings

Performance Problems:

  • 10,000 texts
  • 50ms per API call
  • Total time: 10000 × 0.05s = 500s = 8.3 minutes

Too slow!

Optimization Goals

  • Reduce API call count
  • Increase concurrency
  • Control resource consumption
  • Goal: Complete 10,000 texts within 5 seconds

Performance Optimization Comparison

加载图表中...

The diagram compares performance differences of three approaches: serial single calls are slowest (500 seconds), serial batch processing improves 100x (5 seconds), parallel batch processing improves another 10x (0.5 seconds), achieving a total 1000x performance improvement.

Core Optimization Strategies

1. Batch Processing

OpenAI Embedding API supports batch input:

# Single text
response = await openai.embeddings.create(
    input="hello world",
    model="text-embedding-3-small"
)

# Batch texts (max 2048)
response = await openai.embeddings.create(
    input=["hello", "world", "foo", "bar", ...],  # Max 2048
    model="text-embedding-3-small"
)

Advantages:

  • Reduce network round trips
  • Reduce API call overhead
  • Improve throughput

Implementation:

EMBEDDING_BATCH_SIZE = 100  # 100 texts per batch

async def embedding_batch(text_list: list[str]) -> list[list[float]]:
    """Batch call Embedding API"""
    # Split into batches
    batches = [
        text_list[i : i + EMBEDDING_BATCH_SIZE]
        for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE)
    ]

    embeddings = []
    for batch in batches:
        response = await openai.embeddings.create(
            input=batch,
            model="text-embedding-3-small"
        )
        embeddings.extend([emb.embedding for emb in response.data])

    return embeddings

Performance Improvement:

  • 10,000 texts → 100 batches
  • 50ms per API call
  • Total time: 100 × 0.05s = 5s

100x improvement!

2. Async Parallel

Although batch processing greatly reduces API call count, it's still serial execution. We can parallel send multiple batches:

EMBEDDING_BATCH_SIZE = 100  # Texts per batch
EMBEDDING_CHUNK_SIZE = 100  # 100 batches in parallel

async def _embedding_batch(batch: list[str]) -> list[list[float]]:
    """Single batch Embedding"""
    response = await openai.embeddings.create(
        input=batch,
        model="text-embedding-3-small",
        encoding_format="float",
    )
    return [embedding.embedding for embedding in response.data]

async def embedding_parallel(text_list: list[str]) -> list[list[float]]:
    """Parallel batch call Embedding API"""
    # First level: split into batches
    batches = [
        text_list[i : i + EMBEDDING_BATCH_SIZE]
        for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE)
    ]

    embeddings = []

    # Second level: chunk parallel
    for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE):
        chunk = batches[i : i + EMBEDDING_CHUNK_SIZE]

        # Execute in parallel
        tasks = [_embedding_batch(batch) for batch in chunk]
        chunk_results = await asyncio.gather(*tasks)

        # Merge results
        embeddings.extend([
            embedding
            for batch_result in chunk_results
            for embedding in batch_result
        ])

    return embeddings

Performance Improvement:

  • 10,000 texts → 100 batches
  • 100 batches in parallel each time
  • Total rounds: 100 / 100 = 1 round
  • Total time: 1 × 0.05s = 0.05s

Another 100x improvement! Total 10000x improvement!

3. AskTable's Complete Implementation

class AsyncLLMClient:
    @openai_error_wrapper
    async def _embedding_batch(self, batch):
        try:
            response = await self.client_embedding.embeddings.create(
                input=batch,
                model=config.embed_model_name,
                encoding_format="float",
            )
            return [embedding.embedding for embedding in response.data]
        except Exception as e:
            log.error(f"embedding failed: {e}, batch: {batch}")
            raise e

    @openai_error_wrapper
    async def embedding_batch(self, text_list: list[str]) -> list[list[float]]:
        _begin = time.time()

        # Preprocess: replace newlines
        text_list = [text.replace("\n", " ") for text in text_list]

        # First level: split into batches
        batches = [
            text_list[i : i + EMBEDDING_BATCH_SIZE]
            for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE)
        ]

        log.info(f"total batches: {len(batches)}")
        embeddings = []

        # Second level: chunk parallel
        for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE):
            _begin_chunk = time.time()
            chunk = batches[i : i + EMBEDDING_CHUNK_SIZE]

            # Execute in parallel
            tasks = [self._embedding_batch(batch) for batch in chunk]
            chunk_results = await asyncio.gather(*tasks)

            # Merge results
            embeddings.extend([
                embedding
                for batch_result in chunk_results
                for embedding in batch_result
            ])

            log.info(
                f"embedding_chunks: {len(chunk)} batches, "
                f"time: {time.time() - _begin_chunk:.2f}s"
            )

        log.info(
            f"embedding_batch: {len(text_list)} texts, "
            f"time: {time.time() - _begin:.2f}s"
        )
        return embeddings

Design Highlights:

3.1 Two-Level Batch Processing

EMBEDDING_BATCH_SIZE = 100  # First level: texts per API request
EMBEDDING_CHUNK_SIZE = 100  # Second level: parallel request count

Why two levels?

  • First Level (Batch): Reduce API call count

    • Single request contains multiple texts
    • Leverage API's batch processing capability
    • Reduce network overhead
  • Second Level (Chunk): Control concurrency

    • Avoid sending too many requests simultaneously
    • Prevent API rate limiting
    • Control memory consumption

Example:

# 10,000 texts
# First level: 10000 / 100 = 100 batches
# Second level: 100 / 100 = 1 chunk
# Result: 1 parallel round, 100 requests per round

# 100,000 texts
# First level: 100000 / 100 = 1000 batches
# Second level: 1000 / 100 = 10 chunks
# Result: 10 parallel rounds, 100 requests per round

3.2 Text Preprocessing

text_list = [text.replace("\n", " ") for text in text_list]

Why replace newlines?

  • Some Embedding models are sensitive to newlines
  • Unified format improves vector quality
  • Avoid retrieval bias due to formatting issues

3.3 Error Handling

@openai_error_wrapper
async def _embedding_batch(self, batch):
    try:
        response = await self.client_embedding.embeddings.create(...)
        return [embedding.embedding for embedding in response.data]
    except Exception as e:
        log.error(f"embedding failed: {e}, batch: {batch}")
        raise e

Error Handling Strategy:

  • Record failed batches
  • Support retry mechanism
  • Avoid total failure

3.4 Performance Monitoring

log.info(f"total batches: {len(batches)}")
log.info(
    f"embedding_chunks: {len(chunk)} batches, "
    f"time: {time.time() - _begin_chunk:.2f}s"
)
log.info(
    f"embedding_batch: {len(text_list)} texts, "
    f"time: {time.time() - _begin:.2f}s"
)

Monitoring Metrics:

  • Total batch count
  • Time per chunk
  • Total time
  • Throughput (texts/s)

Actual Performance Testing

Test Scenario

  • Text count: 10,000
  • Text length: average 50 characters
  • Embedding model: text-embedding-3-small
  • Network latency: 50ms

Performance Comparison

SolutionAPI Call CountConcurrencyTotal TimeImprovement
Serial Single100001500s1x
Serial Batch10015s100x
Parallel Batch1001000.5s1000x

Actual Logs

[INFO] total batches: 100
[INFO] embedding_chunks: 100 batches, time: 0.52s
[INFO] embedding_batch: 10000 texts, time: 0.52s

Throughput: 10000 / 0.52 = 19230 texts/s

Best Practice Recommendations

1. Batch Size Tuning

# Adjust based on API limits
EMBEDDING_BATCH_SIZE = 100  # OpenAI max 2048

# Adjust based on text length
if avg_text_length > 1000:
    EMBEDDING_BATCH_SIZE = 50  # Long texts reduce batch size
else:
    EMBEDDING_BATCH_SIZE = 100  # Short texts increase batch size

2. Concurrency Control

# Adjust based on API rate limiting
EMBEDDING_CHUNK_SIZE = 100  # OpenAI rate limit: 3000 RPM

# Adjust based on network bandwidth
if network_bandwidth < 10_000_000:  # 10 Mbps
    EMBEDDING_CHUNK_SIZE = 50
else:
    EMBEDDING_CHUNK_SIZE = 100

3. Error Retry

async def _embedding_batch_with_retry(self, batch, max_retries=3):
    for attempt in range(max_retries):
        try:
            return await self._embedding_batch(batch)
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            log.warning(f"Retry {attempt + 1}/{max_retries}: {e}")
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

4. Progress Display

from tqdm import tqdm

async def embedding_batch_with_progress(self, text_list: list[str]):
    batches = [...]

    embeddings = []
    with tqdm(total=len(batches), desc="Embedding") as pbar:
        for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE):
            chunk = batches[i : i + EMBEDDING_CHUNK_SIZE]
            tasks = [self._embedding_batch(batch) for batch in chunk]
            chunk_results = await asyncio.gather(*tasks)
            embeddings.extend([...])
            pbar.update(len(chunk))

    return embeddings

Summary

AskTable achieved significant Embedding performance improvement through two-level batch processing and async parallel:

  1. Batch Processing: Reduce API call count, 100x improvement
  2. Async Parallel: Increase concurrency, another 10x improvement
  3. Two-Level Design: Balance performance and resource consumption
  4. Error Handling: Ensure system stability
  5. Performance Monitoring: Real-time tracking of performance metrics

This optimization strategy is not only applicable to Embedding but can also be extended to other batch API call scenarios, such as batch LLM calls, batch database queries, etc.

The key insight is: Batch + Parallel = Performance Leap

cta.readyToSimplify

sidebar.noProgrammingNeededsidebar.startFreeTrial

cta.noCreditCard
cta.quickStart
cta.dbSupport