AskTable

流式响应与工具调用:打造实时交互的 AI 体验

AskTable 团队
AskTable 团队 2026年3月4日

在 AI 应用中,用户体验的关键在于响应速度。等待 10 秒看到完整答案,远不如逐字显示来得友好。AskTable 通过流式响应和工具调用的巧妙结合,实现了真正的实时交互体验。

为什么需要流式响应?

传统方式的问题

# ❌ 传统方式:等待完整响应
response = await openai.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "分析上个月的销售数据"}]
)

# 用户需要等待 10 秒才能看到结果
print(response.choices[0].message.content)

用户体验差

流式响应的优势

# ✅ 流式响应:逐 token 返回
stream = await openai.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "分析上个月的销售数据"}],
    stream=True
)

async for chunk in stream:
    delta = chunk.choices[0].delta.content
    if delta:
        print(delta, end="", flush=True)  # 实时显示

用户体验好

核心实现

1. 基础流式处理

async def _step(self) -> AsyncGenerator[StreamEvent, None]:
    """执行一步 Agent 推理"""
    self.completion_count += 1

    # 创建流式请求
    completion = await self.llm_client.chat.completions.create(
        messages=self.message_builder.dump_openai(),
        model=self.model,
        tools=self.tools,
        stream=True,  # 关键:启用流式
    )

    # 逐 chunk 处理
    async for chunk in completion:
        delta = self.message_builder.append_openai_delta(chunk)
        if delta:
            if isinstance(delta, list):
                for event in delta:
                    yield event  # 实时返回事件
            else:
                yield delta

设计亮点

1.1 事件驱动架构

@dataclass
class StreamEvent:
    type: str  # "text" | "tool_call" | "tool_result"
    content: str
    metadata: dict

事件类型

示例流程

# 用户提问:"上个月的销售额是多少?"

# Event 1: text
{"type": "text", "content": "让我查询一下"}

# Event 2: tool_call
{"type": "tool_call", "name": "search_metadata", "args": {"queries": ["销售额"]}}

# Event 3: tool_result
{"type": "tool_result", "content": "找到字段:sales.amount"}

# Event 4: tool_call
{"type": "tool_call", "name": "execute_sql", "args": {"sql": "SELECT SUM(amount) FROM sales WHERE ..."}}

# Event 5: tool_result
{"type": "tool_result", "content": "结果:1,234,567 元"}

# Event 6: text
{"type": "text", "content": "上个月的销售额是 1,234,567 元"}

1.2 增量消息构建

class ChatMessageBuilder:
    def append_openai_delta(self, chunk) -> StreamEvent | list[StreamEvent] | None:
        """处理流式 chunk,返回事件"""
        delta = chunk.choices[0].delta

        # 文本内容
        if delta.content:
            self._current_message["content"] += delta.content
            return StreamEvent(type="text", content=delta.content)

        # 工具调用
        if delta.tool_calls:
            for tool_call in delta.tool_calls:
                # 累积工具调用参数
                self._accumulate_tool_call(tool_call)

            # 工具调用完整时返回事件
            if self._is_tool_call_complete():
                return StreamEvent(
                    type="tool_call",
                    name=self._current_tool_call["name"],
                    args=self._current_tool_call["arguments"]
                )

        return None

关键点

2. 工具调用集成

async def _step(self) -> AsyncGenerator[StreamEvent, None]:
    # ... 流式接收 LLM 响应 ...

    # 执行工具调用
    for tool_use in self.message_builder.get_unresolved_tool_use_blocks():
        func = self.actions[tool_use["name"]]
        params = json.loads(tool_use["input"])

        try:
            # 执行工具(支持同步和异步)
            func_handler = func(**params)
            if iscoroutine(func_handler):
                result = await func_handler
            else:
                result = func_handler
        except Exception as e:
            result = json.dumps({"error": str(e)})

        self.tool_called_count += 1

        # 返回工具结果事件
        yield self.message_builder.append_tool_result(tool_use["id"], result)

设计亮点

2.1 同步/异步兼容

# 同步工具
def show_table(self, table_names: list[str]) -> str:
    return json.dumps([...])

# 异步工具
async def search_metadata(self, queries: list[str]) -> str:
    results = await retrieve_entities(...)
    return json.dumps(results)

# 自动检测并正确调用
if iscoroutine(func_handler):
    result = await func_handler  # 异步
else:
    result = func_handler  # 同步

2.2 错误处理

try:
    result = await func(**params)
except Exception as e:
    # 将错误返回给 LLM,让它尝试其他方案
    result = json.dumps({"error": str(e)})

优势

3. 自动循环执行

async def run(
    self, user_input: str | None = None
) -> AsyncGenerator[StreamEvent, None]:
    """持续执行直到得到最终答案"""
    self.tool_called_count = 0
    self.completion_count = 0

    # 第一步:处理用户输入
    async for chunk in self.step(user_input):
        yield chunk

    # 持续执行直到完成
    while True:
        if self.completion_count >= self.max_completions:
            raise CannotHandle("Completion count exceeds max")

        last_message = self.message_builder.dump_anthropic()[-1]

        if last_message["role"] == "assistant":
            last_block = last_message["content"][-1]
            if last_block["type"] == "text":
                # 得到最终答案,结束
                break

        elif last_message["role"] == "user":
            last_block = last_message["content"][-1]
            if last_block["type"] == "tool_result":
                if self.tool_called_count >= self.max_tool_calls:
                    raise CannotHandle("Tool called count exceeds max")

                # 继续执行下一步
                async for chunk in self.step():
                    yield chunk

执行流程

用户输入
  ↓
LLM 推理(流式)
  ↓
工具调用?
  ├─ 是 → 执行工具 → 返回结果 → LLM 推理(流式)→ ...
  └─ 否 → 返回最终答案 → 结束

安全限制

self.max_tool_calls = 10  # 最多调用 10 次工具
self.max_completions = 20  # 最多 20 次 LLM 请求

防止无限循环消耗资源。

实际应用场景

场景 1:复杂数据分析

用户问题:"分析上个月各地区销售额,找出增长最快的前 3 个地区"

流式响应过程

[文本] 让我先查询相关的数据表...
[工具调用] search_metadata(queries=["销售额", "地区"])
[工具结果] 找到表:sales, regions
[文本] 现在获取表的详细结构...
[工具调用] show_table(table_names=["sales", "regions"])
[工具结果] 返回字段信息
[文本] 查询上个月各地区销售额...
[工具调用] execute_sql(sql="SELECT region, SUM(amount) FROM sales WHERE ...")
[工具结果] 返回查询结果
[文本] 查询上上个月数据用于对比...
[工具调用] execute_sql(sql="SELECT region, SUM(amount) FROM sales WHERE ...")
[工具结果] 返回查询结果
[文本] 根据数据分析,增长最快的前 3 个地区是:
1. 华东地区:增长 45%
2. 华南地区:增长 38%
3. 西南地区:增长 32%

用户体验

场景 2:错误恢复

用户问题:"查询 users 表的数据"

流式响应过程

[文本] 让我查询 users 表...
[工具调用] execute_sql(sql="SELECT * FROM users")
[工具结果] 错误:表不存在
[文本] 抱歉,users 表不存在。让我搜索相关的表...
[工具调用] search_metadata(queries=["用户"])
[工具结果] 找到表:user_info, customer
[文本] 我找到了 user_info 表,是否查询这个表?

优势

性能优化

1. 事件合并

# 避免过于频繁的事件
class EventBuffer:
    def __init__(self, flush_interval=0.1):
        self.buffer = []
        self.last_flush = time.time()
        self.flush_interval = flush_interval

    def add(self, event):
        self.buffer.append(event)
        if time.time() - self.last_flush > self.flush_interval:
            return self.flush()
        return None

    def flush(self):
        if not self.buffer:
            return None
        events = self.buffer
        self.buffer = []
        self.last_flush = time.time()
        return events

2. 背压控制

# 控制生产速度,避免消费者跟不上
class BackpressureStream:
    def __init__(self, max_buffer_size=100):
        self.queue = asyncio.Queue(maxsize=max_buffer_size)

    async def produce(self, event):
        await self.queue.put(event)  # 队列满时会阻塞

    async def consume(self):
        while True:
            event = await self.queue.get()
            yield event

最佳实践

1. 事件设计

# ✅ 好的事件设计
@dataclass
class StreamEvent:
    type: str
    content: str
    metadata: dict  # 额外信息

# ❌ 差的事件设计
@dataclass
class StreamEvent:
    data: str  # 类型不明确

2. 错误处理

# ✅ 优雅降级
try:
    result = await tool(**params)
except Exception as e:
    result = {"error": str(e), "suggestion": "try another approach"}
    yield StreamEvent(type="error", content=str(e))

# ❌ 直接抛出异常
result = await tool(**params)  # 可能中断整个流程

3. 用户反馈

# ✅ 提供进度信息
yield StreamEvent(type="progress", content="正在查询数据库...")
yield StreamEvent(type="progress", content="正在分析结果...")

# ❌ 长时间无反馈
# 用户不知道系统在做什么

总结

AskTable 通过流式响应和工具调用的结合,实现了流畅的实时交互体验:

  1. 真正的流式:逐 token 返回,而不是等待完整响应
  2. 事件驱动:清晰的事件类型,便于前端渲染
  3. 工具集成:在流式过程中无缝执行工具调用
  4. 自动循环:持续执行直到得到最终答案
  5. 错误恢复:优雅处理错误,不中断流程

流式响应不仅是技术实现,更是用户体验的关键。通过让用户实时看到 AI 的思考过程,我们构建了更加透明、可信的 AI 应用。