AskTable

消息格式转换器 - OpenAI 与 Anthropic 的统一抽象

AskTable 团队
AskTable 团队 2026年3月4日

消息格式转换器 - OpenAI 与 Anthropic 的统一抽象

在构建 LLM 应用时,不同模型提供商的 API 格式差异是一个常见的痛点。OpenAI 和 Anthropic 虽然都支持工具调用和流式响应,但消息格式却大相径庭。AskTable 的 ChatMessageBuilder 提供了一个优雅的解决方案:统一的内部消息格式 + 双向转换器

问题背景

OpenAI 消息格式

messages = [
    {"role": "system", "content": "You are a helpful assistant"},
    {"role": "user", "content": "What's the weather?"},
    {
        "role": "assistant",
        "content": "Let me check",
        "tool_calls": [{
            "id": "call_123",
            "type": "function",
            "function": {"name": "get_weather", "arguments": '{"city": "Beijing"}'}
        }]
    },
    {"role": "tool", "tool_call_id": "call_123", "content": "Sunny, 25°C"}
]

Anthropic 消息格式

messages = [
    {
        "role": "user",
        "content": [{"type": "text", "text": "What's the weather?"}]
    },
    {
        "role": "assistant",
        "content": [
            {"type": "text", "text": "Let me check"},
            {"type": "tool_use", "id": "call_123", "name": "get_weather", "input": '{"city": "Beijing"}'}
        ]
    },
    {
        "role": "user",
        "content": [{"type": "tool_result", "tool_use_id": "call_123", "content": "Sunny, 25°C"}]
    }
]

核心差异

特性OpenAIAnthropic
System Prompt独立的 system 消息作为 API 参数传递
内容格式字符串Content Block 数组
工具调用
tool_calls
字段
tool_use
Content Block
工具结果独立的
tool
角色消息
tool_result
Content Block
Thinking不支持(部分模型支持 reasoning)原生支持
thinking
Block

ChatMessageBuilder 架构

加载图表中...

统一内部格式

ChatMessageBuilder 使用类 Anthropic 的 Content Block 格式作为内部表示:

# 内部消息格式
InternalMessage = {
    "role": "assistant" | "user",
    "content": [
        {"type": "text", "text": "..."},
        {"type": "thinking", "thinking": "..."},
        {"type": "tool_use", "id": "...", "name": "...", "input": "..."},
        {"type": "tool_result", "tool_use_id": "...", "content": "..."}
    ]
}

为什么选择 Anthropic 格式?

  1. 更灵活:Content Block 数组可以混合多种类型
  2. 更清晰:角色和内容类型分离
  3. 更强大:原生支持 thinking、tool_use、tool_result

核心实现

1. OpenAI 消息导入

def append_openai_message(self, message: ChatCompletionMessageParam) -> None:
    role = message["role"]

    if role == "user":
        # 用户消息
        self._messages.append({
            "role": "user",
            "content": [{"type": "text", "text": str(message["content"])}],
        })

    elif role == "assistant":
        # 助手消息
        blocks: list[ContentBlock] = []

        # 添加文本内容
        content = message.get("content")
        if isinstance(content, str) and content:
            blocks.append({"type": "text", "text": content})

        # 添加工具调用
        tool_calls = message.get("tool_calls")
        if tool_calls:
            for tc in tool_calls:
                blocks.append({
                    "type": "tool_use",
                    "id": tc["id"],
                    "name": tc["function"]["name"],
                    "input": tc["function"]["arguments"],
                })

        if blocks:
            self._messages.append({"role": "assistant", "content": blocks})

    elif role == "tool":
        # 工具结果消息
        tool_call_id = message.get("tool_call_id")
        content = message.get("content", "")

        if tool_call_id:
            # 如果上一条是 user 消息,追加到其中
            if self._messages[-1]["role"] == "user":
                self._messages[-1]["content"].append({
                    "type": "tool_result",
                    "tool_use_id": tool_call_id,
                    "content": str(content),
                })
            else:
                # 否则创建新的 user 消息
                self._messages.append({
                    "role": "user",
                    "content": [{
                        "type": "tool_result",
                        "tool_use_id": tool_call_id,
                        "content": str(content),
                    }],
                })

2. 流式 Delta 处理

流式响应需要增量构建消息:

def append_openai_delta(self, chunk: ChatCompletionChunk) -> StreamEvent | None:
    if not chunk.choices:
        return None

    choice = chunk.choices[0]
    delta = choice.delta

    # 确保 assistant 消息存在
    if not self._messages or self._messages[-1]["role"] != "assistant":
        self._messages.append({"role": "assistant", "content": []})

    blocks = self._messages[-1]["content"]

    # 处理文本内容
    if delta.content:
        if blocks and blocks[-1]["type"] == "text":
            # 追加到现有文本块
            blocks[-1]["text"] += delta.content
        else:
            # 创建新文本块
            blocks.append({"type": "text", "text": delta.content})

        return AssistantStreamEvent(
            role="assistant",
            content=TextDelta(type="text", text=delta.content)
        )

    # 处理 thinking/reasoning
    reasoning_text = None
    if hasattr(delta, "reasoning_details") and delta.reasoning_details:
        reasoning_text = delta.reasoning_details[0].get("text", "")
    elif hasattr(delta, "reasoning") and delta.reasoning is not None:
        reasoning_text = delta.reasoning
    elif hasattr(delta, "reasoning_content") and delta.reasoning_content is not None:
        reasoning_text = delta.reasoning_content

    if reasoning_text:
        if blocks and blocks[-1]["type"] == "thinking":
            blocks[-1]["thinking"] += reasoning_text
        else:
            blocks.append({"type": "thinking", "thinking": reasoning_text})

        return AssistantStreamEvent(
            role="assistant",
            content=ThinkingDelta(type="thinking", thinking=reasoning_text)
        )

    # 处理工具调用
    if delta.tool_calls:
        for tc_delta in delta.tool_calls:
            idx = tc_delta.index if tc_delta.index is not None else 0
            tool_use_block = self._get_or_create_tool_use_block(blocks, idx)

            if tc_delta.id:
                tool_use_block["id"] = tc_delta.id
            if tc_delta.function:
                if tc_delta.function.name:
                    tool_use_block["name"] = tc_delta.function.name
                if tc_delta.function.arguments:
                    tool_use_block["input"] += tc_delta.function.arguments

        return None  # 工具调用完成后才发送事件

    # 处理 finish_reason - 发送工具调用事件
    if choice.finish_reason:
        tool_use_blocks = [b for b in blocks if b["type"] == "tool_use"]
        if tool_use_blocks:
            events = [
                AssistantStreamEvent(
                    role="assistant",
                    content=ToolUse(
                        type="tool_use",
                        id=b["id"],
                        name=b["name"],
                        input=b["input"],
                    ),
                )
                for b in tool_use_blocks
            ]
            return events if len(events) > 1 else events[0]

    return None

3. 导出为 OpenAI 格式

def dump_openai(self, cache_control: bool = False) -> list[ChatCompletionMessageParam]:
    openai_messages = []

    # 添加 system prompt
    if self.system_prompt is not None:
        openai_messages.append({"role": "system", "content": self.system_prompt})

    for msg in self._messages:
        content_blocks = msg["content"]

        # 分离不同类型的 block
        text_parts = []
        tool_uses = []
        tool_results = []

        for block in content_blocks:
            if block["type"] == "text":
                text_parts.append(block["text"])
            elif block["type"] == "thinking":
                # OpenAI 不支持 thinking,跳过
                pass
            elif block["type"] == "tool_use":
                tool_uses.append(block)
            elif block["type"] == "tool_result":
                tool_results.append(block)

        # 构建 assistant 消息
        if msg["role"] == "assistant":
            assistant_msg = {
                "role": "assistant",
                "content": "".join(text_parts),
            }

            if tool_uses:
                assistant_msg["tool_calls"] = [
                    {
                        "id": tool["id"],
                        "type": "function",
                        "function": {
                            "name": tool["name"],
                            "arguments": tool["input"],
                        },
                    }
                    for tool in tool_uses
                ]

            openai_messages.append(assistant_msg)

        # 构建 user 消息
        elif msg["role"] == "user" and text_parts:
            openai_messages.append({"role": "user", "content": "".join(text_parts)})

        # 构建 tool 消息
        for tool_result in tool_results:
            content = tool_result["content"]
            openai_messages.append({
                "role": "tool",
                "tool_call_id": tool_result["tool_use_id"],
                "content": str(content),
            })

    # 添加 cache control(用于 Anthropic 兼容的 OpenAI API)
    if cache_control and openai_messages:
        last_msg = openai_messages[-1]
        if last_msg["role"] == "user":
            content = last_msg.get("content", "")
            if isinstance(content, str):
                last_msg["content"] = [{
                    "type": "text",
                    "text": content,
                    "cache_control": {"type": "ephemeral"},
                }]

    return openai_messages

4. 导出为 Anthropic 格式

def dump_anthropic(self) -> list[InternalMessage]:
    """
    导出为 Anthropic 格式(直接返回内部格式)
    """
    return self._messages

工具调用管理

未解决工具追踪

ChatMessageBuilder 可以追踪哪些工具调用还没有返回结果:

def get_unresolved_tool_use_blocks(self) -> list[ContentBlock]:
    """查找最后一条 assistant 消息中未解决的 tool_use 块"""
    for msg in reversed(self._messages):
        if msg["role"] == "assistant":
            tool_use_blocks = [
                block for block in msg["content"] if block["type"] == "tool_use"
            ]
            if not tool_use_blocks:
                return []

            # 收集已解决的 tool_use ID
            resolved_ids = {
                block["tool_use_id"]
                for m in self._messages
                if m["role"] == "user"
                for block in m["content"]
                if block["type"] == "tool_result"
            }

            return [b for b in tool_use_blocks if b["id"] not in resolved_ids]
    return []

添加工具结果

def append_tool_result(self, tool_call_id: str, content: str) -> StreamEvent:
    # 创建 tool_result block
    tool_result_block = {
        "type": "tool_result",
        "tool_use_id": tool_call_id,
        "content": content,
    }

    # 添加为 user 消息
    if self._messages[-1]["role"] == "user":
        self._messages[-1]["content"].append(tool_result_block)
    else:
        self._messages.append({"role": "user", "content": [tool_result_block]})

    return StreamUserEvent(
        role="user",
        content=ToolResult(
            type="tool_result",
            tool_use_id=tool_call_id,
            content=content
        ),
    )

实际应用示例

示例 1: 多模型切换

# 初始化
builder = ChatMessageBuilder(system_prompt="You are a helpful assistant")

# 添加用户消息
builder.append_openai_message({
    "role": "user",
    "content": "What's the weather in Beijing?"
})

# 使用 OpenAI API
openai_messages = builder.dump_openai()
response = openai.chat.completions.create(
    model="gpt-4",
    messages=openai_messages
)

# 或使用 Anthropic API
anthropic_messages = builder.dump_anthropic()
response = anthropic.messages.create(
    model="claude-3-5-sonnet-20241022",
    system=builder.system_prompt,
    messages=anthropic_messages
)

示例 2: 流式处理

builder = ChatMessageBuilder()

# 流式接收 OpenAI 响应
stream = openai.chat.completions.create(
    model="gpt-4",
    messages=messages,
    stream=True
)

for chunk in stream:
    event = builder.append_openai_delta(chunk)
    if event:
        # 发送给前端
        yield event

示例 3: 工具调用

builder = ChatMessageBuilder()

# 用户消息
builder.append_openai_message({
    "role": "user",
    "content": "What's the weather?"
})

# LLM 响应(包含工具调用)
builder.append_openai_message({
    "role": "assistant",
    "content": "Let me check",
    "tool_calls": [{
        "id": "call_123",
        "type": "function",
        "function": {"name": "get_weather", "arguments": '{"city": "Beijing"}'}
    }]
})

# 检查未解决的工具调用
unresolved = builder.get_unresolved_tool_use_blocks()
print(unresolved)  # [{"type": "tool_use", "id": "call_123", ...}]

# 添加工具结果
builder.append_tool_result("call_123", "Sunny, 25°C")

# 继续对话
messages = builder.dump_openai()

Thinking Block 支持

ChatMessageBuilder 支持多种 thinking/reasoning 格式:

# OpenAI o1 格式
delta.reasoning_details = [{"text": "Let me think..."}]

# OpenRouter 格式
delta.reasoning = "Let me think..."

# Qwen 格式
delta.reasoning_content = "Let me think..."

所有格式都会被转换为统一的

thinking
block:

{"type": "thinking", "thinking": "Let me think..."}

性能优化

1. 增量构建

流式处理时增量构建消息,避免重复解析:

# 增量追加文本
if blocks and blocks[-1]["type"] == "text":
    blocks[-1]["text"] += delta.content

2. 延迟导出

只在需要时才导出为特定格式:

# 内部格式保持不变
builder._messages  # 始终是统一格式

# 按需导出
openai_messages = builder.dump_openai()  # 仅在调用时转换

3. 缓存优化

对于相同的消息历史,可以缓存导出结果:

@lru_cache(maxsize=128)
def dump_openai_cached(self, messages_hash: str):
    return self.dump_openai()

总结

ChatMessageBuilder 通过统一的内部格式和双向转换器,优雅地解决了多模型 API 兼容性问题:

  1. 统一抽象:类 Anthropic 的 Content Block 格式
  2. 双向转换:支持 OpenAI 和 Anthropic 格式互转
  3. 流式支持:增量构建消息,实时转换
  4. 工具调用:统一的工具调用管理
  5. Thinking 支持:兼容多种 reasoning 格式

这种设计不仅简化了多模型集成,还为未来支持更多模型提供了可扩展的基础。

相关资源