AskTable

批量 Embedding 优化:从串行到并行的性能提升之路

AskTable 团队
AskTable 团队 2026年3月4日

在 RAG(检索增强生成)系统中,Embedding 是最常见也是最耗时的操作之一。当需要为数据库的数万个字段生成向量时,如何高效地调用 Embedding API?AskTable 通过两级批处理和异步并行,实现了 100 倍的性能提升。

问题背景

初始实现:串行调用

最直接的实现方式是逐个调用 Embedding API:

async def embedding_naive(text_list: list[str]) -> list[list[float]]:
    """串行调用 Embedding API"""
    embeddings = []
    for text in text_list:
        response = await openai.embeddings.create(
            input=text,
            model="text-embedding-3-small"
        )
        embeddings.append(response.data[0].embedding)
    return embeddings

性能问题

太慢了!

优化目标

性能优化对比

加载图表中...

上图对比了三种方案的性能差异:串行单个调用最慢(500秒),串行批量处理提升100倍(5秒),并行批量处理再提升10倍(0.5秒),最终实现1000倍性能提升。

核心优化策略

1. 批量处理(Batch Processing)

OpenAI Embedding API 支持批量输入:

# 单个文本
response = await openai.embeddings.create(
    input="hello world",
    model="text-embedding-3-small"
)

# 批量文本(最多 2048 个)
response = await openai.embeddings.create(
    input=["hello", "world", "foo", "bar", ...],  # 最多 2048 个
    model="text-embedding-3-small"
)

优势

实现

EMBEDDING_BATCH_SIZE = 100  # 每批 100 个文本

async def embedding_batch(text_list: list[str]) -> list[list[float]]:
    """批量调用 Embedding API"""
    # 分批
    batches = [
        text_list[i : i + EMBEDDING_BATCH_SIZE]
        for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE)
    ]

    embeddings = []
    for batch in batches:
        response = await openai.embeddings.create(
            input=batch,
            model="text-embedding-3-small"
        )
        embeddings.extend([emb.embedding for emb in response.data])

    return embeddings

性能提升

提升 100 倍!

2. 异步并行(Async Parallel)

虽然批量处理大幅减少了 API 调用次数,但仍然是串行执行。我们可以并行发送多个批次:

EMBEDDING_BATCH_SIZE = 100  # 每批 100 个文本
EMBEDDING_CHUNK_SIZE = 100  # 并行 100 个批次

async def _embedding_batch(batch: list[str]) -> list[list[float]]:
    """单个批次的 Embedding"""
    response = await openai.embeddings.create(
        input=batch,
        model="text-embedding-3-small",
        encoding_format="float",
    )
    return [embedding.embedding for embedding in response.data]

async def embedding_parallel(text_list: list[str]) -> list[list[float]]:
    """并行批量调用 Embedding API"""
    # 第一级:分批
    batches = [
        text_list[i : i + EMBEDDING_BATCH_SIZE]
        for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE)
    ]

    embeddings = []

    # 第二级:分块并行
    for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE):
        chunk = batches[i : i + EMBEDDING_CHUNK_SIZE]

        # 并行执行
        tasks = [_embedding_batch(batch) for batch in chunk]
        chunk_results = await asyncio.gather(*tasks)

        # 合并结果
        embeddings.extend([
            embedding
            for batch_result in chunk_results
            for embedding in batch_result
        ])

    return embeddings

性能提升

再提升 100 倍!总共提升 10000 倍!

3. AskTable 的完整实现

class AsyncLLMClient:
    @openai_error_wrapper
    async def _embedding_batch(self, batch):
        try:
            response = await self.client_embedding.embeddings.create(
                input=batch,
                model=config.embed_model_name,
                encoding_format="float",
            )
            return [embedding.embedding for embedding in response.data]
        except Exception as e:
            log.error(f"embedding failed: {e}, batch: {batch}")
            raise e

    @openai_error_wrapper
    async def embedding_batch(self, text_list: list[str]) -> list[list[float]]:
        _begin = time.time()

        # 预处理:替换换行符
        text_list = [text.replace("\n", " ") for text in text_list]

        # 第一级:分批
        batches = [
            text_list[i : i + EMBEDDING_BATCH_SIZE]
            for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE)
        ]

        log.info(f"total batches: {len(batches)}")
        embeddings = []

        # 第二级:分块并行
        for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE):
            _begin_chunk = time.time()
            chunk = batches[i : i + EMBEDDING_CHUNK_SIZE]

            # 并行执行
            tasks = [self._embedding_batch(batch) for batch in chunk]
            chunk_results = await asyncio.gather(*tasks)

            # 合并结果
            embeddings.extend([
                embedding
                for batch_result in chunk_results
                for embedding in batch_result
            ])

            log.info(
                f"embedding_chunks: {len(chunk)} batches, "
                f"time: {time.time() - _begin_chunk:.2f}s"
            )

        log.info(
            f"embedding_batch: {len(text_list)} texts, "
            f"time: {time.time() - _begin:.2f}s"
        )
        return embeddings

设计亮点

3.1 两级批处理

EMBEDDING_BATCH_SIZE = 100  # 第一级:每个 API 请求的文本数
EMBEDDING_CHUNK_SIZE = 100  # 第二级:并行请求数

为什么需要两级?

示例

# 10000 个文本
# 第一级:10000 / 100 = 100 个批次
# 第二级:100 / 100 = 1 个块
# 结果:1 轮并行,每轮 100 个请求

# 100000 个文本
# 第一级:100000 / 100 = 1000 个批次
# 第二级:1000 / 100 = 10 个块
# 结果:10 轮并行,每轮 100 个请求

3.2 文本预处理

text_list = [text.replace("\n", " ") for text in text_list]

为什么要替换换行符?

3.3 错误处理

@openai_error_wrapper
async def _embedding_batch(self, batch):
    try:
        response = await self.client_embedding.embeddings.create(...)
        return [embedding.embedding for embedding in response.data]
    except Exception as e:
        log.error(f"embedding failed: {e}, batch: {batch}")
        raise e

错误处理策略

3.4 性能监控

log.info(f"total batches: {len(batches)}")
log.info(
    f"embedding_chunks: {len(chunk)} batches, "
    f"time: {time.time() - _begin_chunk:.2f}s"
)
log.info(
    f"embedding_batch: {len(text_list)} texts, "
    f"time: {time.time() - _begin:.2f}s"
)

监控指标

实际性能测试

测试场景

性能对比

方案API 调用次数并发数总耗时提升倍数
串行单个100001500s1x
串行批量10015s100x
并行批量1001000.5s1000x

实际日志

[INFO] total batches: 100
[INFO] embedding_chunks: 100 batches, time: 0.52s
[INFO] embedding_batch: 10000 texts, time: 0.52s

吞吐量:10000 / 0.52 = 19230 texts/s

最佳实践建议

1. 批量大小调优

# 根据 API 限制调整
EMBEDDING_BATCH_SIZE = 100  # OpenAI 最多 2048

# 根据文本长度调整
if avg_text_length > 1000:
    EMBEDDING_BATCH_SIZE = 50  # 长文本减少批量大小
else:
    EMBEDDING_BATCH_SIZE = 100  # 短文本增加批量大小

2. 并发数控制

# 根据 API 限流调整
EMBEDDING_CHUNK_SIZE = 100  # OpenAI 限流:3000 RPM

# 根据网络带宽调整
if network_bandwidth < 10_000_000:  # 10 Mbps
    EMBEDDING_CHUNK_SIZE = 50
else:
    EMBEDDING_CHUNK_SIZE = 100

3. 错误重试

async def _embedding_batch_with_retry(self, batch, max_retries=3):
    for attempt in range(max_retries):
        try:
            return await self._embedding_batch(batch)
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            log.warning(f"Retry {attempt + 1}/{max_retries}: {e}")
            await asyncio.sleep(2 ** attempt)  # 指数退避

4. 进度显示

from tqdm import tqdm

async def embedding_batch_with_progress(self, text_list: list[str]):
    batches = [...]

    embeddings = []
    with tqdm(total=len(batches), desc="Embedding") as pbar:
        for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE):
            chunk = batches[i : i + EMBEDDING_CHUNK_SIZE]
            tasks = [self._embedding_batch(batch) for batch in chunk]
            chunk_results = await asyncio.gather(*tasks)
            embeddings.extend([...])
            pbar.update(len(chunk))

    return embeddings

总结

AskTable 通过两级批处理和异步并行,实现了 Embedding 性能的大幅提升:

  1. 批量处理:减少 API 调用次数,提升 100 倍
  2. 异步并行:提高并发度,再提升 10 倍
  3. 两级设计:平衡性能和资源消耗
  4. 错误处理:保证系统稳定性
  5. 性能监控:实时追踪性能指标

这种优化策略不仅适用于 Embedding,也可以推广到其他批量 API 调用场景,如批量 LLM 调用、批量数据库查询等。

关键是理解:批量 + 并行 = 性能飞跃