
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在 RAG(检索增强生成)系统中,Embedding 是最常见也是最耗时的操作之一。当需要为数据库的数万个字段生成向量时,如何高效地调用 Embedding API?AskTable 通过两级批处理和异步并行,实现了 100 倍的性能提升。
最直接的实现方式是逐个调用 Embedding API:
async def embedding_naive(text_list: list[str]) -> list[list[float]]: """串行调用 Embedding API""" embeddings = [] for text in text_list: response = await openai.embeddings.create( input=text, model="text-embedding-3-small" ) embeddings.append(response.data[0].embedding) return embeddings
性能问题:
太慢了!
加载图表中...
上图对比了三种方案的性能差异:串行单个调用最慢(500秒),串行批量处理提升100倍(5秒),并行批量处理再提升10倍(0.5秒),最终实现1000倍性能提升。
OpenAI Embedding API 支持批量输入:
# 单个文本 response = await openai.embeddings.create( input="hello world", model="text-embedding-3-small" ) # 批量文本(最多 2048 个) response = await openai.embeddings.create( input=["hello", "world", "foo", "bar", ...], # 最多 2048 个 model="text-embedding-3-small" )
优势:
实现:
EMBEDDING_BATCH_SIZE = 100 # 每批 100 个文本 async def embedding_batch(text_list: list[str]) -> list[list[float]]: """批量调用 Embedding API""" # 分批 batches = [ text_list[i : i + EMBEDDING_BATCH_SIZE] for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE) ] embeddings = [] for batch in batches: response = await openai.embeddings.create( input=batch, model="text-embedding-3-small" ) embeddings.extend([emb.embedding for emb in response.data]) return embeddings
性能提升:
提升 100 倍!
虽然批量处理大幅减少了 API 调用次数,但仍然是串行执行。我们可以并行发送多个批次:
EMBEDDING_BATCH_SIZE = 100 # 每批 100 个文本 EMBEDDING_CHUNK_SIZE = 100 # 并行 100 个批次 async def _embedding_batch(batch: list[str]) -> list[list[float]]: """单个批次的 Embedding""" response = await openai.embeddings.create( input=batch, model="text-embedding-3-small", encoding_format="float", ) return [embedding.embedding for embedding in response.data] async def embedding_parallel(text_list: list[str]) -> list[list[float]]: """并行批量调用 Embedding API""" # 第一级:分批 batches = [ text_list[i : i + EMBEDDING_BATCH_SIZE] for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE) ] embeddings = [] # 第二级:分块并行 for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE): chunk = batches[i : i + EMBEDDING_CHUNK_SIZE] # 并行执行 tasks = [_embedding_batch(batch) for batch in chunk] chunk_results = await asyncio.gather(*tasks) # 合并结果 embeddings.extend([ embedding for batch_result in chunk_results for embedding in batch_result ]) return embeddings
性能提升:
再提升 100 倍!总共提升 10000 倍!
class AsyncLLMClient: @openai_error_wrapper async def _embedding_batch(self, batch): try: response = await self.client_embedding.embeddings.create( input=batch, model=config.embed_model_name, encoding_format="float", ) return [embedding.embedding for embedding in response.data] except Exception as e: log.error(f"embedding failed: {e}, batch: {batch}") raise e @openai_error_wrapper async def embedding_batch(self, text_list: list[str]) -> list[list[float]]: _begin = time.time() # 预处理:替换换行符 text_list = [text.replace("\n", " ") for text in text_list] # 第一级:分批 batches = [ text_list[i : i + EMBEDDING_BATCH_SIZE] for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE) ] log.info(f"total batches: {len(batches)}") embeddings = [] # 第二级:分块并行 for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE): _begin_chunk = time.time() chunk = batches[i : i + EMBEDDING_CHUNK_SIZE] # 并行执行 tasks = [self._embedding_batch(batch) for batch in chunk] chunk_results = await asyncio.gather(*tasks) # 合并结果 embeddings.extend([ embedding for batch_result in chunk_results for embedding in batch_result ]) log.info( f"embedding_chunks: {len(chunk)} batches, " f"time: {time.time() - _begin_chunk:.2f}s" ) log.info( f"embedding_batch: {len(text_list)} texts, " f"time: {time.time() - _begin:.2f}s" ) return embeddings
设计亮点:
EMBEDDING_BATCH_SIZE = 100 # 第一级:每个 API 请求的文本数 EMBEDDING_CHUNK_SIZE = 100 # 第二级:并行请求数
为什么需要两级?
第一级(Batch):减少 API 调用次数
第二级(Chunk):控制并发数
示例:
# 10000 个文本 # 第一级:10000 / 100 = 100 个批次 # 第二级:100 / 100 = 1 个块 # 结果:1 轮并行,每轮 100 个请求 # 100000 个文本 # 第一级:100000 / 100 = 1000 个批次 # 第二级:1000 / 100 = 10 个块 # 结果:10 轮并行,每轮 100 个请求
text_list = [text.replace("\n", " ") for text in text_list]
为什么要替换换行符?
@openai_error_wrapper async def _embedding_batch(self, batch): try: response = await self.client_embedding.embeddings.create(...) return [embedding.embedding for embedding in response.data] except Exception as e: log.error(f"embedding failed: {e}, batch: {batch}") raise e
错误处理策略:
log.info(f"total batches: {len(batches)}") log.info( f"embedding_chunks: {len(chunk)} batches, " f"time: {time.time() - _begin_chunk:.2f}s" ) log.info( f"embedding_batch: {len(text_list)} texts, " f"time: {time.time() - _begin:.2f}s" )
监控指标:
| 方案 | API 调用次数 | 并发数 | 总耗时 | 提升倍数 |
|---|---|---|---|---|
| 串行单个 | 10000 | 1 | 500s | 1x |
| 串行批量 | 100 | 1 | 5s | 100x |
| 并行批量 | 100 | 100 | 0.5s | 1000x |
[INFO] total batches: 100 [INFO] embedding_chunks: 100 batches, time: 0.52s [INFO] embedding_batch: 10000 texts, time: 0.52s
吞吐量:10000 / 0.52 = 19230 texts/s
# 根据 API 限制调整 EMBEDDING_BATCH_SIZE = 100 # OpenAI 最多 2048 # 根据文本长度调整 if avg_text_length > 1000: EMBEDDING_BATCH_SIZE = 50 # 长文本减少批量大小 else: EMBEDDING_BATCH_SIZE = 100 # 短文本增加批量大小
# 根据 API 限流调整 EMBEDDING_CHUNK_SIZE = 100 # OpenAI 限流:3000 RPM # 根据网络带宽调整 if network_bandwidth < 10_000_000: # 10 Mbps EMBEDDING_CHUNK_SIZE = 50 else: EMBEDDING_CHUNK_SIZE = 100
async def _embedding_batch_with_retry(self, batch, max_retries=3): for attempt in range(max_retries): try: return await self._embedding_batch(batch) except Exception as e: if attempt == max_retries - 1: raise log.warning(f"Retry {attempt + 1}/{max_retries}: {e}") await asyncio.sleep(2 ** attempt) # 指数退避
from tqdm import tqdm async def embedding_batch_with_progress(self, text_list: list[str]): batches = [...] embeddings = [] with tqdm(total=len(batches), desc="Embedding") as pbar: for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE): chunk = batches[i : i + EMBEDDING_CHUNK_SIZE] tasks = [self._embedding_batch(batch) for batch in chunk] chunk_results = await asyncio.gather(*tasks) embeddings.extend([...]) pbar.update(len(chunk)) return embeddings
AskTable 通过两级批处理和异步并行,实现了 Embedding 性能的大幅提升:
这种优化策略不仅适用于 Embedding,也可以推广到其他批量 API 调用场景,如批量 LLM 调用、批量数据库查询等。
关键是理解:批量 + 并行 = 性能飞跃