
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在 AI 应用中,用户体验的关键在于响应速度。等待 10 秒看到完整答案,远不如逐字显示来得友好。AskTable 通过流式响应和工具调用的巧妙结合,实现了真正的实时交互体验。
# ❌ 传统方式:等待完整响应 response = await openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": "分析上个月的销售数据"}] ) # 用户需要等待 10 秒才能看到结果 print(response.choices[0].message.content)
用户体验差:
# ✅ 流式响应:逐 token 返回 stream = await openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": "分析上个月的销售数据"}], stream=True ) async for chunk in stream: delta = chunk.choices[0].delta.content if delta: print(delta, end="", flush=True) # 实时显示
用户体验好:
async def _step(self) -> AsyncGenerator[StreamEvent, None]: """执行一步 Agent 推理""" self.completion_count += 1 # 创建流式请求 completion = await self.llm_client.chat.completions.create( messages=self.message_builder.dump_openai(), model=self.model, tools=self.tools, stream=True, # 关键:启用流式 ) # 逐 chunk 处理 async for chunk in completion: delta = self.message_builder.append_openai_delta(chunk) if delta: if isinstance(delta, list): for event in delta: yield event # 实时返回事件 else: yield delta
设计亮点:
@dataclass class StreamEvent: type: str # "text" | "tool_call" | "tool_result" content: str metadata: dict
事件类型:
示例流程:
# 用户提问:"上个月的销售额是多少?" # Event 1: text {"type": "text", "content": "让我查询一下"} # Event 2: tool_call {"type": "tool_call", "name": "search_metadata", "args": {"queries": ["销售额"]}} # Event 3: tool_result {"type": "tool_result", "content": "找到字段:sales.amount"} # Event 4: tool_call {"type": "tool_call", "name": "execute_sql", "args": {"sql": "SELECT SUM(amount) FROM sales WHERE ..."}} # Event 5: tool_result {"type": "tool_result", "content": "结果:1,234,567 元"} # Event 6: text {"type": "text", "content": "上个月的销售额是 1,234,567 元"}
class ChatMessageBuilder: def append_openai_delta(self, chunk) -> StreamEvent | list[StreamEvent] | None: """处理流式 chunk,返回事件""" delta = chunk.choices[0].delta # 文本内容 if delta.content: self._current_message["content"] += delta.content return StreamEvent(type="text", content=delta.content) # 工具调用 if delta.tool_calls: for tool_call in delta.tool_calls: # 累积工具调用参数 self._accumulate_tool_call(tool_call) # 工具调用完整时返回事件 if self._is_tool_call_complete(): return StreamEvent( type="tool_call", name=self._current_tool_call["name"], args=self._current_tool_call["arguments"] ) return None
关键点:
async def _step(self) -> AsyncGenerator[StreamEvent, None]: # ... 流式接收 LLM 响应 ... # 执行工具调用 for tool_use in self.message_builder.get_unresolved_tool_use_blocks(): func = self.actions[tool_use["name"]] params = json.loads(tool_use["input"]) try: # 执行工具(支持同步和异步) func_handler = func(**params) if iscoroutine(func_handler): result = await func_handler else: result = func_handler except Exception as e: result = json.dumps({"error": str(e)}) self.tool_called_count += 1 # 返回工具结果事件 yield self.message_builder.append_tool_result(tool_use["id"], result)
设计亮点:
# 同步工具 def show_table(self, table_names: list[str]) -> str: return json.dumps([...]) # 异步工具 async def search_metadata(self, queries: list[str]) -> str: results = await retrieve_entities(...) return json.dumps(results) # 自动检测并正确调用 if iscoroutine(func_handler): result = await func_handler # 异步 else: result = func_handler # 同步
try: result = await func(**params) except Exception as e: # 将错误返回给 LLM,让它尝试其他方案 result = json.dumps({"error": str(e)})
优势:
async def run( self, user_input: str | None = None ) -> AsyncGenerator[StreamEvent, None]: """持续执行直到得到最终答案""" self.tool_called_count = 0 self.completion_count = 0 # 第一步:处理用户输入 async for chunk in self.step(user_input): yield chunk # 持续执行直到完成 while True: if self.completion_count >= self.max_completions: raise CannotHandle("Completion count exceeds max") last_message = self.message_builder.dump_anthropic()[-1] if last_message["role"] == "assistant": last_block = last_message["content"][-1] if last_block["type"] == "text": # 得到最终答案,结束 break elif last_message["role"] == "user": last_block = last_message["content"][-1] if last_block["type"] == "tool_result": if self.tool_called_count >= self.max_tool_calls: raise CannotHandle("Tool called count exceeds max") # 继续执行下一步 async for chunk in self.step(): yield chunk
执行流程:
用户输入 ↓ LLM 推理(流式) ↓ 工具调用? ├─ 是 → 执行工具 → 返回结果 → LLM 推理(流式)→ ... └─ 否 → 返回最终答案 → 结束
安全限制:
self.max_tool_calls = 10 # 最多调用 10 次工具 self.max_completions = 20 # 最多 20 次 LLM 请求
防止无限循环消耗资源。
用户问题:"分析上个月各地区销售额,找出增长最快的前 3 个地区"
流式响应过程:
[文本] 让我先查询相关的数据表... [工具调用] search_metadata(queries=["销售额", "地区"]) [工具结果] 找到表:sales, regions [文本] 现在获取表的详细结构... [工具调用] show_table(table_names=["sales", "regions"]) [工具结果] 返回字段信息 [文本] 查询上个月各地区销售额... [工具调用] execute_sql(sql="SELECT region, SUM(amount) FROM sales WHERE ...") [工具结果] 返回查询结果 [文本] 查询上上个月数据用于对比... [工具调用] execute_sql(sql="SELECT region, SUM(amount) FROM sales WHERE ...") [工具结果] 返回查询结果 [文本] 根据数据分析,增长最快的前 3 个地区是: 1. 华东地区:增长 45% 2. 华南地区:增长 38% 3. 西南地区:增长 32%
用户体验:
用户问题:"查询 users 表的数据"
流式响应过程:
[文本] 让我查询 users 表... [工具调用] execute_sql(sql="SELECT * FROM users") [工具结果] 错误:表不存在 [文本] 抱歉,users 表不存在。让我搜索相关的表... [工具调用] search_metadata(queries=["用户"]) [工具结果] 找到表:user_info, customer [文本] 我找到了 user_info 表,是否查询这个表?
优势:
# 避免过于频繁的事件 class EventBuffer: def __init__(self, flush_interval=0.1): self.buffer = [] self.last_flush = time.time() self.flush_interval = flush_interval def add(self, event): self.buffer.append(event) if time.time() - self.last_flush > self.flush_interval: return self.flush() return None def flush(self): if not self.buffer: return None events = self.buffer self.buffer = [] self.last_flush = time.time() return events
# 控制生产速度,避免消费者跟不上 class BackpressureStream: def __init__(self, max_buffer_size=100): self.queue = asyncio.Queue(maxsize=max_buffer_size) async def produce(self, event): await self.queue.put(event) # 队列满时会阻塞 async def consume(self): while True: event = await self.queue.get() yield event
# ✅ 好的事件设计 @dataclass class StreamEvent: type: str content: str metadata: dict # 额外信息 # ❌ 差的事件设计 @dataclass class StreamEvent: data: str # 类型不明确
# ✅ 优雅降级 try: result = await tool(**params) except Exception as e: result = {"error": str(e), "suggestion": "try another approach"} yield StreamEvent(type="error", content=str(e)) # ❌ 直接抛出异常 result = await tool(**params) # 可能中断整个流程
# ✅ 提供进度信息 yield StreamEvent(type="progress", content="正在查询数据库...") yield StreamEvent(type="progress", content="正在分析结果...") # ❌ 长时间无反馈 # 用户不知道系统在做什么
AskTable 通过流式响应和工具调用的结合,实现了流畅的实时交互体验:
流式响应不仅是技术实现,更是用户体验的关键。通过让用户实时看到 AI 的思考过程,我们构建了更加透明、可信的 AI 应用。