
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在构建 LLM 应用时,不同模型提供商的 API 格式差异是一个常见的痛点。OpenAI 和 Anthropic 虽然都支持工具调用和流式响应,但消息格式却大相径庭。AskTable 的 ChatMessageBuilder 提供了一个优雅的解决方案:统一的内部消息格式 + 双向转换器。
messages = [ {"role": "system", "content": "You are a helpful assistant"}, {"role": "user", "content": "What's the weather?"}, { "role": "assistant", "content": "Let me check", "tool_calls": [{ "id": "call_123", "type": "function", "function": {"name": "get_weather", "arguments": '{"city": "Beijing"}'} }] }, {"role": "tool", "tool_call_id": "call_123", "content": "Sunny, 25°C"} ]
messages = [ { "role": "user", "content": [{"type": "text", "text": "What's the weather?"}] }, { "role": "assistant", "content": [ {"type": "text", "text": "Let me check"}, {"type": "tool_use", "id": "call_123", "name": "get_weather", "input": '{"city": "Beijing"}'} ] }, { "role": "user", "content": [{"type": "tool_result", "tool_use_id": "call_123", "content": "Sunny, 25°C"}] } ]
| 特性 | OpenAI | Anthropic |
|---|---|---|
| System Prompt | 独立的 system 消息 | 作为 API 参数传递 |
| 内容格式 | 字符串 | Content Block 数组 |
| 工具调用 | 字段 | Content Block |
| 工具结果 | 独立的 角色消息 | Content Block |
| Thinking | 不支持(部分模型支持 reasoning) | 原生支持 Block |
加载图表中...
ChatMessageBuilder 使用类 Anthropic 的 Content Block 格式作为内部表示:
# 内部消息格式 InternalMessage = { "role": "assistant" | "user", "content": [ {"type": "text", "text": "..."}, {"type": "thinking", "thinking": "..."}, {"type": "tool_use", "id": "...", "name": "...", "input": "..."}, {"type": "tool_result", "tool_use_id": "...", "content": "..."} ] }
def append_openai_message(self, message: ChatCompletionMessageParam) -> None: role = message["role"] if role == "user": # 用户消息 self._messages.append({ "role": "user", "content": [{"type": "text", "text": str(message["content"])}], }) elif role == "assistant": # 助手消息 blocks: list[ContentBlock] = [] # 添加文本内容 content = message.get("content") if isinstance(content, str) and content: blocks.append({"type": "text", "text": content}) # 添加工具调用 tool_calls = message.get("tool_calls") if tool_calls: for tc in tool_calls: blocks.append({ "type": "tool_use", "id": tc["id"], "name": tc["function"]["name"], "input": tc["function"]["arguments"], }) if blocks: self._messages.append({"role": "assistant", "content": blocks}) elif role == "tool": # 工具结果消息 tool_call_id = message.get("tool_call_id") content = message.get("content", "") if tool_call_id: # 如果上一条是 user 消息,追加到其中 if self._messages[-1]["role"] == "user": self._messages[-1]["content"].append({ "type": "tool_result", "tool_use_id": tool_call_id, "content": str(content), }) else: # 否则创建新的 user 消息 self._messages.append({ "role": "user", "content": [{ "type": "tool_result", "tool_use_id": tool_call_id, "content": str(content), }], })
流式响应需要增量构建消息:
def append_openai_delta(self, chunk: ChatCompletionChunk) -> StreamEvent | None: if not chunk.choices: return None choice = chunk.choices[0] delta = choice.delta # 确保 assistant 消息存在 if not self._messages or self._messages[-1]["role"] != "assistant": self._messages.append({"role": "assistant", "content": []}) blocks = self._messages[-1]["content"] # 处理文本内容 if delta.content: if blocks and blocks[-1]["type"] == "text": # 追加到现有文本块 blocks[-1]["text"] += delta.content else: # 创建新文本块 blocks.append({"type": "text", "text": delta.content}) return AssistantStreamEvent( role="assistant", content=TextDelta(type="text", text=delta.content) ) # 处理 thinking/reasoning reasoning_text = None if hasattr(delta, "reasoning_details") and delta.reasoning_details: reasoning_text = delta.reasoning_details[0].get("text", "") elif hasattr(delta, "reasoning") and delta.reasoning is not None: reasoning_text = delta.reasoning elif hasattr(delta, "reasoning_content") and delta.reasoning_content is not None: reasoning_text = delta.reasoning_content if reasoning_text: if blocks and blocks[-1]["type"] == "thinking": blocks[-1]["thinking"] += reasoning_text else: blocks.append({"type": "thinking", "thinking": reasoning_text}) return AssistantStreamEvent( role="assistant", content=ThinkingDelta(type="thinking", thinking=reasoning_text) ) # 处理工具调用 if delta.tool_calls: for tc_delta in delta.tool_calls: idx = tc_delta.index if tc_delta.index is not None else 0 tool_use_block = self._get_or_create_tool_use_block(blocks, idx) if tc_delta.id: tool_use_block["id"] = tc_delta.id if tc_delta.function: if tc_delta.function.name: tool_use_block["name"] = tc_delta.function.name if tc_delta.function.arguments: tool_use_block["input"] += tc_delta.function.arguments return None # 工具调用完成后才发送事件 # 处理 finish_reason - 发送工具调用事件 if choice.finish_reason: tool_use_blocks = [b for b in blocks if b["type"] == "tool_use"] if tool_use_blocks: events = [ AssistantStreamEvent( role="assistant", content=ToolUse( type="tool_use", id=b["id"], name=b["name"], input=b["input"], ), ) for b in tool_use_blocks ] return events if len(events) > 1 else events[0] return None
def dump_openai(self, cache_control: bool = False) -> list[ChatCompletionMessageParam]: openai_messages = [] # 添加 system prompt if self.system_prompt is not None: openai_messages.append({"role": "system", "content": self.system_prompt}) for msg in self._messages: content_blocks = msg["content"] # 分离不同类型的 block text_parts = [] tool_uses = [] tool_results = [] for block in content_blocks: if block["type"] == "text": text_parts.append(block["text"]) elif block["type"] == "thinking": # OpenAI 不支持 thinking,跳过 pass elif block["type"] == "tool_use": tool_uses.append(block) elif block["type"] == "tool_result": tool_results.append(block) # 构建 assistant 消息 if msg["role"] == "assistant": assistant_msg = { "role": "assistant", "content": "".join(text_parts), } if tool_uses: assistant_msg["tool_calls"] = [ { "id": tool["id"], "type": "function", "function": { "name": tool["name"], "arguments": tool["input"], }, } for tool in tool_uses ] openai_messages.append(assistant_msg) # 构建 user 消息 elif msg["role"] == "user" and text_parts: openai_messages.append({"role": "user", "content": "".join(text_parts)}) # 构建 tool 消息 for tool_result in tool_results: content = tool_result["content"] openai_messages.append({ "role": "tool", "tool_call_id": tool_result["tool_use_id"], "content": str(content), }) # 添加 cache control(用于 Anthropic 兼容的 OpenAI API) if cache_control and openai_messages: last_msg = openai_messages[-1] if last_msg["role"] == "user": content = last_msg.get("content", "") if isinstance(content, str): last_msg["content"] = [{ "type": "text", "text": content, "cache_control": {"type": "ephemeral"}, }] return openai_messages
def dump_anthropic(self) -> list[InternalMessage]: """ 导出为 Anthropic 格式(直接返回内部格式) """ return self._messages
ChatMessageBuilder 可以追踪哪些工具调用还没有返回结果:
def get_unresolved_tool_use_blocks(self) -> list[ContentBlock]: """查找最后一条 assistant 消息中未解决的 tool_use 块""" for msg in reversed(self._messages): if msg["role"] == "assistant": tool_use_blocks = [ block for block in msg["content"] if block["type"] == "tool_use" ] if not tool_use_blocks: return [] # 收集已解决的 tool_use ID resolved_ids = { block["tool_use_id"] for m in self._messages if m["role"] == "user" for block in m["content"] if block["type"] == "tool_result" } return [b for b in tool_use_blocks if b["id"] not in resolved_ids] return []
def append_tool_result(self, tool_call_id: str, content: str) -> StreamEvent: # 创建 tool_result block tool_result_block = { "type": "tool_result", "tool_use_id": tool_call_id, "content": content, } # 添加为 user 消息 if self._messages[-1]["role"] == "user": self._messages[-1]["content"].append(tool_result_block) else: self._messages.append({"role": "user", "content": [tool_result_block]}) return StreamUserEvent( role="user", content=ToolResult( type="tool_result", tool_use_id=tool_call_id, content=content ), )
# 初始化 builder = ChatMessageBuilder(system_prompt="You are a helpful assistant") # 添加用户消息 builder.append_openai_message({ "role": "user", "content": "What's the weather in Beijing?" }) # 使用 OpenAI API openai_messages = builder.dump_openai() response = openai.chat.completions.create( model="gpt-4", messages=openai_messages ) # 或使用 Anthropic API anthropic_messages = builder.dump_anthropic() response = anthropic.messages.create( model="claude-3-5-sonnet-20241022", system=builder.system_prompt, messages=anthropic_messages )
builder = ChatMessageBuilder() # 流式接收 OpenAI 响应 stream = openai.chat.completions.create( model="gpt-4", messages=messages, stream=True ) for chunk in stream: event = builder.append_openai_delta(chunk) if event: # 发送给前端 yield event
builder = ChatMessageBuilder() # 用户消息 builder.append_openai_message({ "role": "user", "content": "What's the weather?" }) # LLM 响应(包含工具调用) builder.append_openai_message({ "role": "assistant", "content": "Let me check", "tool_calls": [{ "id": "call_123", "type": "function", "function": {"name": "get_weather", "arguments": '{"city": "Beijing"}'} }] }) # 检查未解决的工具调用 unresolved = builder.get_unresolved_tool_use_blocks() print(unresolved) # [{"type": "tool_use", "id": "call_123", ...}] # 添加工具结果 builder.append_tool_result("call_123", "Sunny, 25°C") # 继续对话 messages = builder.dump_openai()
ChatMessageBuilder 支持多种 thinking/reasoning 格式:
# OpenAI o1 格式 delta.reasoning_details = [{"text": "Let me think..."}] # OpenRouter 格式 delta.reasoning = "Let me think..." # Qwen 格式 delta.reasoning_content = "Let me think..."
所有格式都会被转换为统一的
thinking block:
{"type": "thinking", "thinking": "Let me think..."}
流式处理时增量构建消息,避免重复解析:
# 增量追加文本 if blocks and blocks[-1]["type"] == "text": blocks[-1]["text"] += delta.content
只在需要时才导出为特定格式:
# 内部格式保持不变 builder._messages # 始终是统一格式 # 按需导出 openai_messages = builder.dump_openai() # 仅在调用时转换
对于相同的消息历史,可以缓存导出结果:
@lru_cache(maxsize=128) def dump_openai_cached(self, messages_hash: str): return self.dump_openai()
ChatMessageBuilder 通过统一的内部格式和双向转换器,优雅地解决了多模型 API 兼容性问题:
这种设计不仅简化了多模型集成,还为未来支持更多模型提供了可扩展的基础。