
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在 AI 数据分析场景中,一个 SQL 查询可能耗时数秒,一个 Python 分析脚本可能运行数分钟。如何让用户实时看到执行进度?如何在网络断线后恢复推送?如何在多实例部署下保证事件不丢失?
AskTable 的 Canvas 流式执行架构,通过 Redis Streams + SSE + arq 的组合,实现了高性能、高可靠的实时推送系统。
本文将深入剖析这套架构的设计与实现。
短轮询(Polling):
// 客户端每秒轮询一次 setInterval(async () => { const status = await fetch(`/api/node/${nodeId}/status`); updateUI(status); }, 1000);
问题:
长轮询(Long Polling):
# 服务器阻塞等待直到有新事件 async def get_status(node_id): while True: if has_new_event(node_id): return get_event(node_id) await asyncio.sleep(0.1)
问题:
✅ 实时性:毫秒级推送延迟 ✅ 断点续传:网络断线后自动恢复,不丢失事件 ✅ 横向扩展:支持多实例部署,事件跨实例广播 ✅ 低成本:复用现有 Redis,无需额外组件 ✅ 可靠性:事件持久化,支持历史回放
Redis Streams 是 Redis 5.0 引入的数据结构,专为消息队列和事件流设计。
核心优势:
XREAD BLOCK 支持长连接等待新事件XRANGE 支持按 ID 范围查询历史事件为什么不用 Redis Pub/Sub?
为什么不用 Kafka/RabbitMQ?
SSE 是 HTML5 标准,专为服务器到客户端的单向推送设计。
核心优势:
EventSource APILast-Event-ID 自动重连SSE vs WebSocket:
| 特性 | SSE | WebSocket |
|---|---|---|
| 方向 | 单向(服务器 → 客户端) | 双向 |
| 协议 | HTTP | WebSocket |
| 断点续传 | 原生支持(Last-Event-ID) | 需要自己实现 |
| 浏览器支持 | 所有现代浏览器 | 所有现代浏览器 |
| 复杂度 | 低 | 中 |
AskTable 的场景:
AskTable 的流式执行架构分为三层:
加载图表中...
at:canvas:stream:{canvas_id}:{node_id})class NodeStreamStore: STREAM_TTL_SECONDS = 3600 # stream 完成后保留 1 小时 BLOCK_MS = 2000 # XREAD 阻塞等待时间 def _stream_key(self, canvas_id: str, node_id: str) -> str: """生成 Stream Key""" return f"at:canvas:stream:{canvas_id}:{node_id}" async def start_stream(self, canvas_id: str, node_id: str): """开始新 stream,清除旧数据并创建空 stream""" key = self._stream_key(canvas_id, node_id) async with self._redis.pipeline(transaction=True) as pipe: pipe.delete(key) # 清除旧数据 # 种子消息让 key 立即存在,subscribe 跳过 _init 事件 pipe.xadd(key, {"event": StreamEvent.INIT, "data": ""}) await pipe.execute() async def append(self, canvas_id: str, node_id: str, event: str, data: str) -> str: """追加事件到 stream,返回 stream ID""" key = self._stream_key(canvas_id, node_id) stream_id = await self._redis.xadd( key, {"event": event, "data": data}, maxlen=STREAM_MAXLEN, approximate=True ) return stream_id async def complete(self, canvas_id: str, node_id: str): """标记 stream 完成,设置 TTL""" key = self._stream_key(canvas_id, node_id) await self._redis.expire(key, self.STREAM_TTL_SECONDS)
关键点:
at:canvas:stream:{canvas_id}:{node_id},每个节点独立 Stream_init 事件让 Stream 立即存在,避免订阅时 key 不存在class StreamEvent(StrEnum): INIT = "_init" # 种子事件(不推送给客户端) START = "start" # 任务开始 DELTA = "delta" # 增量数据(如 AI 生成的文本) DONE = "done" # 任务完成 ERROR = "error" # 任务失败 _TERMINAL_EVENTS = frozenset((StreamEvent.DONE, StreamEvent.ERROR))
事件生命周期:
INIT → START → DELTA × N → DONE/ERROR
async def subscribe( self, canvas_id: str, node_id: str, from_id: str = "0-0" ) -> AsyncGenerator[tuple[str, str, str], None]: """订阅 stream 事件,支持断点续传。 1. XRANGE 重放 from_id 之后的历史消息 2. 如果未遇到终止事件(done/error),切换到 XREAD BLOCK 等待实时消息 3. 遇到 done/error 终止 from_id: "0-0" 从头读取,或传入 Last-Event-ID 实现断点重续 """ key = self._stream_key(canvas_id, node_id) # Phase 1: 重放历史(XRANGE) range_start = f"({from_id}" if from_id != "0-0" else "-" raw_entries = await self._redis.xrange(key, min=range_start) last_id = from_id for stream_id, event, data, terminal in self._iter_entries(raw_entries): last_id = stream_id yield (stream_id, event, data) if terminal: return # Phase 2: 实时等待(XREAD BLOCK) while True: results = await self._redis.xread( {key: last_id}, block=self.BLOCK_MS, count=10 ) if not results: # 超时,检查 stream 是否还存在 if not await self._redis.exists(key): return continue for _stream_name, entries in results: for stream_id, event, data, terminal in self._iter_entries(entries): last_id = stream_id yield (stream_id, event, data) if terminal: return
关键点:
XRANGE:重放历史消息(支持断点续传)XREAD BLOCK:阻塞等待新消息(实时推送)({from_id} 跳过 from_id 本身,避免重复推送DONE/ERROR 事件后停止订阅XREAD 超时后检查 Stream 是否还存在断点续传示例:
# 客户端首次连接 async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="0-0"): print(f"{stream_id}: {event} - {data}") # 假设在 stream_id="1234567890-0" 时断线 # 客户端重连(传入 Last-Event-ID) async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="1234567890-0"): print(f"{stream_id}: {event} - {data}") # 从 "1234567890-0" 之后继续推送,无遗漏
@router.get("/canvas/{canvas_id}/node/{node_id}/stream") async def stream_node_events(canvas_id: str, node_id: str, last_event_id: str = "0-0"): """SSE 端点:推送节点执行事件""" async def event_generator(): try: async for stream_id, event, data in stream_store.subscribe( canvas_id, node_id, from_id=last_event_id ): # SSE 格式:id + event + data yield f"id: {stream_id}\n" yield f"event: {event}\n" yield f"data: {data}\n\n" except Exception as e: # 错误事件 yield f"event: error\n" yield f"data: {json.dumps({'message': str(e)})}\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", # 禁用 Nginx 缓冲 }, )
关键点:
id + event + data + 空行StreamingResponse 支持异步生成器X-Accel-Buffering: no 禁用 Nginx 缓冲客户端代码:
const eventSource = new EventSource( `/api/canvas/${canvasId}/node/${nodeId}/stream` ); eventSource.addEventListener("start", (e) => { console.log("Task started"); }); eventSource.addEventListener("delta", (e) => { const data = JSON.parse(e.data); updateUI(data); // 实时更新 UI }); eventSource.addEventListener("done", (e) => { console.log("Task completed"); eventSource.close(); }); eventSource.addEventListener("error", (e) => { console.error("Task failed", e.data); eventSource.close(); }); // 断线自动重连(浏览器原生支持) eventSource.onerror = () => { console.log("Connection lost, reconnecting..."); // EventSource 会自动重连,并传递 Last-Event-ID };
async def run_agent_task(canvas_id: str, node_id: str, message: str): """后台执行 Agent,事件写入 Redis Stream""" try: async with node_status_handler(canvas_id, node_id): # 1. 写入 START 事件 await stream_store.append(canvas_id, node_id, StreamEvent.START, "null") # 2. 创建 Agent 会话 async with unit_of_work() as db: fresh_node = await service.get_node(db, canvas_id, node_id) handler = get_handler(fresh_node.type) agent = await _create_agent_session(db, fresh_node, handler, canvas_id) # 3. 流式执行 Agent async for event in agent.run(message): await stream_store.append( canvas_id, node_id, StreamEvent.DELTA, json.dumps(event) ) # 4. 持久化结果 agent_result = agent.get_result() streaming_result = handler.format_result(fresh_node, agent_result) if streaming_result is not None: merged_data = {**fresh_node.data, **streaming_result.data} df_id = await persist_node_dataframe(merged_data, fresh_node.title, db) await service.update_node( db_session=db, canvas_id=canvas_id, node_id=node_id, data=merged_data, dataframe_id=df_id, ) # 5. 写入 DONE 事件 await stream_store.append(canvas_id, node_id, StreamEvent.DONE, "null") except Exception as e: # 6. 写入 ERROR 事件 await stream_store.append( canvas_id, node_id, StreamEvent.ERROR, json.dumps({"message": str(e)[:200]}) ) raise finally: # 7. 设置 TTL await stream_store.complete(canvas_id, node_id)
关键点:
XREAD 批量读取:
results = await self._redis.xread( {key: last_id}, block=self.BLOCK_MS, count=10 )
count=10:每次最多读取 10 条消息MAXLEN 限制:
stream_id = await self._redis.xadd( key, {"event": event, "data": data}, maxlen=STREAM_MAXLEN, approximate=True )
maxlen=10000:限制 Stream 最大长度approximate=True:允许近似裁剪,性能更好TTL 自动清理:
await self._redis.expire(key, self.STREAM_TTL_SECONDS)
事件持久化:
断点续传:
Last-Event-ID 恢复XRANGE 重放历史消息,无遗漏跨实例广播:
幂等性:
# 任务可能运行数分钟 async for event in agent.run(message): await stream_store.append(canvas_id, node_id, StreamEvent.DELTA, json.dumps(event)) # 每生成一段内容,立即推送
优势:
# 客户端 A 订阅 async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id): print(f"Client A: {event}") # 客户端 B 订阅(同时) async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id): print(f"Client B: {event}")
优势:
# 任务完成后,新客户端连接 async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="0-0"): print(f"{event}: {data}") # 完整回放所有历史事件
优势:
| 方案 | 实时性 | 断点续传 | 横向扩展 | 部署成本 | 可靠性 |
|---|---|---|---|---|---|
| Redis Streams + SSE | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| WebSocket + Redis Pub/Sub | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| 短轮询 | ⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| 长轮询 | ⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| Kafka + WebSocket | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ |
AskTable 选择 Redis Streams + SSE 的原因:
问题:Nginx 默认会缓冲响应,导致 SSE 延迟。
解决方案:
headers={ "X-Accel-Buffering": "no", # 禁用 Nginx 缓冲 }
问题:
XREAD BLOCK 超时后,如何判断 Stream 是否还存在?
解决方案:
if not results: # 超时,检查 stream 是否还存在 if not await self._redis.exists(key): return # Stream 已删除,停止订阅 continue # Stream 还在,继续等待
问题:Redis Stream ID 格式是
{timestamp}-{sequence},如何正确解析?
解决方案:
XRANGE 和 XREAD 都支持字符串比较问题:客户端可能收到重复事件(如网络抖动)。
解决方案:
stream_idSet 去重const processedIds = new Set(); eventSource.addEventListener("delta", (e) => { if (processedIds.has(e.lastEventId)) { return; // 已处理,跳过 } processedIds.add(e.lastEventId); updateUI(JSON.parse(e.data)); });
AskTable 的 Canvas 流式执行架构,通过 Redis Streams + SSE + arq 的组合,实现了:
✅ 实时性:毫秒级推送延迟 ✅ 断点续传:网络断线后自动恢复,无遗漏 ✅ 横向扩展:支持多实例部署,事件跨实例广播 ✅ 低成本:复用 Redis,无需额外组件 ✅ 高可靠性:事件持久化,支持历史回放
我们计划将 NodeStreamStore 和 SSE 端点实现 开源,帮助更多团队构建实时推送系统。敬请期待!
相关阅读:
技术交流: