AskTable

AskTable Canvas 流式执行与断点续传:基于 Redis Streams 的事件总线架构

AskTable 团队
AskTable 团队 2026年3月5日

在 AI 数据分析场景中,一个 SQL 查询可能耗时数秒,一个 Python 分析脚本可能运行数分钟。如何让用户实时看到执行进度?如何在网络断线后恢复推送?如何在多实例部署下保证事件不丢失?

AskTable 的 Canvas 流式执行架构,通过 Redis Streams + SSE + arq 的组合,实现了高性能、高可靠的实时推送系统。

本文将深入剖析这套架构的设计与实现。


一、为什么需要流式执行?

1. 传统轮询方案的痛点

短轮询(Polling)

// 客户端每秒轮询一次
setInterval(async () => {
    const status = await fetch(`/api/node/${nodeId}/status`);
    updateUI(status);
}, 1000);

问题

长轮询(Long Polling)

# 服务器阻塞等待直到有新事件
async def get_status(node_id):
    while True:
        if has_new_event(node_id):
            return get_event(node_id)
        await asyncio.sleep(0.1)

问题

2. 我们需要的是什么?

实时性:毫秒级推送延迟 ✅ 断点续传:网络断线后自动恢复,不丢失事件 ✅ 横向扩展:支持多实例部署,事件跨实例广播 ✅ 低成本:复用现有 Redis,无需额外组件 ✅ 可靠性:事件持久化,支持历史回放


二、技术选型:为什么是 Redis Streams?

1. Redis Streams:为事件流而生

Redis Streams 是 Redis 5.0 引入的数据结构,专为消息队列和事件流设计。

核心优势

为什么不用 Redis Pub/Sub?

为什么不用 Kafka/RabbitMQ?

2. SSE (Server-Sent Events):单向推送的最佳选择

SSE 是 HTML5 标准,专为服务器到客户端的单向推送设计。

核心优势

SSE vs WebSocket

特性SSEWebSocket
方向单向(服务器 → 客户端)双向
协议HTTPWebSocket
断点续传原生支持(Last-Event-ID)需要自己实现
浏览器支持所有现代浏览器所有现代浏览器
复杂度

AskTable 的场景


三、架构设计:三层事件流

AskTable 的流式执行架构分为三层:

加载图表中...

工作流程

  1. 用户触发:用户在 Canvas 中点击"运行"按钮
  2. 创建 Stream:FastAPI 创建 Redis Stream(
    at:canvas:stream:{canvas_id}:{node_id}
  3. 提交任务:FastAPI 将任务提交到 arq 队列
  4. SSE 订阅:客户端通过 SSE 订阅事件流
  5. 异步执行:arq worker 执行 Agent,将事件写入 Redis Streams
  6. 实时推送:SSE 端点从 Redis Streams 读取事件,推送给客户端
  7. 持久化:任务完成后,结果持久化到 PostgreSQL
  8. TTL 清理:Stream 设置 1 小时 TTL,自动过期

四、核心实现:深入源码

1. Redis Streams 事件存储

class NodeStreamStore:
    STREAM_TTL_SECONDS = 3600  # stream 完成后保留 1 小时
    BLOCK_MS = 2000  # XREAD 阻塞等待时间

    def _stream_key(self, canvas_id: str, node_id: str) -> str:
        """生成 Stream Key"""
        return f"at:canvas:stream:{canvas_id}:{node_id}"

    async def start_stream(self, canvas_id: str, node_id: str):
        """开始新 stream,清除旧数据并创建空 stream"""
        key = self._stream_key(canvas_id, node_id)
        async with self._redis.pipeline(transaction=True) as pipe:
            pipe.delete(key)  # 清除旧数据
            # 种子消息让 key 立即存在,subscribe 跳过 _init 事件
            pipe.xadd(key, {"event": StreamEvent.INIT, "data": ""})
            await pipe.execute()

    async def append(self, canvas_id: str, node_id: str, event: str, data: str) -> str:
        """追加事件到 stream,返回 stream ID"""
        key = self._stream_key(canvas_id, node_id)
        stream_id = await self._redis.xadd(
            key, {"event": event, "data": data}, maxlen=STREAM_MAXLEN, approximate=True
        )
        return stream_id

    async def complete(self, canvas_id: str, node_id: str):
        """标记 stream 完成,设置 TTL"""
        key = self._stream_key(canvas_id, node_id)
        await self._redis.expire(key, self.STREAM_TTL_SECONDS)

关键点

2. 事件类型定义

class StreamEvent(StrEnum):
    INIT = "_init"  # 种子事件(不推送给客户端)
    START = "start"  # 任务开始
    DELTA = "delta"  # 增量数据(如 AI 生成的文本)
    DONE = "done"  # 任务完成
    ERROR = "error"  # 任务失败

_TERMINAL_EVENTS = frozenset((StreamEvent.DONE, StreamEvent.ERROR))

事件生命周期

INIT → START → DELTA × N → DONE/ERROR

3. 断点续传:XRANGE + XREAD

async def subscribe(
    self, canvas_id: str, node_id: str, from_id: str = "0-0"
) -> AsyncGenerator[tuple[str, str, str], None]:
    """订阅 stream 事件,支持断点续传。

    1. XRANGE 重放 from_id 之后的历史消息
    2. 如果未遇到终止事件(done/error),切换到 XREAD BLOCK 等待实时消息
    3. 遇到 done/error 终止

    from_id: "0-0" 从头读取,或传入 Last-Event-ID 实现断点重续
    """
    key = self._stream_key(canvas_id, node_id)

    # Phase 1: 重放历史(XRANGE)
    range_start = f"({from_id}" if from_id != "0-0" else "-"
    raw_entries = await self._redis.xrange(key, min=range_start)

    last_id = from_id
    for stream_id, event, data, terminal in self._iter_entries(raw_entries):
        last_id = stream_id
        yield (stream_id, event, data)
        if terminal:
            return

    # Phase 2: 实时等待(XREAD BLOCK)
    while True:
        results = await self._redis.xread(
            {key: last_id}, block=self.BLOCK_MS, count=10
        )
        if not results:
            # 超时,检查 stream 是否还存在
            if not await self._redis.exists(key):
                return
            continue

        for _stream_name, entries in results:
            for stream_id, event, data, terminal in self._iter_entries(entries):
                last_id = stream_id
                yield (stream_id, event, data)
                if terminal:
                    return

关键点

断点续传示例

# 客户端首次连接
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="0-0"):
    print(f"{stream_id}: {event} - {data}")
    # 假设在 stream_id="1234567890-0" 时断线

# 客户端重连(传入 Last-Event-ID)
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="1234567890-0"):
    print(f"{stream_id}: {event} - {data}")
    # 从 "1234567890-0" 之后继续推送,无遗漏

4. SSE 端点实现

@router.get("/canvas/{canvas_id}/node/{node_id}/stream")
async def stream_node_events(canvas_id: str, node_id: str, last_event_id: str = "0-0"):
    """SSE 端点:推送节点执行事件"""

    async def event_generator():
        try:
            async for stream_id, event, data in stream_store.subscribe(
                canvas_id, node_id, from_id=last_event_id
            ):
                # SSE 格式:id + event + data
                yield f"id: {stream_id}\n"
                yield f"event: {event}\n"
                yield f"data: {data}\n\n"
        except Exception as e:
            # 错误事件
            yield f"event: error\n"
            yield f"data: {json.dumps({'message': str(e)})}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # 禁用 Nginx 缓冲
        },
    )

关键点

客户端代码

const eventSource = new EventSource(
    `/api/canvas/${canvasId}/node/${nodeId}/stream`
);

eventSource.addEventListener("start", (e) => {
    console.log("Task started");
});

eventSource.addEventListener("delta", (e) => {
    const data = JSON.parse(e.data);
    updateUI(data);  // 实时更新 UI
});

eventSource.addEventListener("done", (e) => {
    console.log("Task completed");
    eventSource.close();
});

eventSource.addEventListener("error", (e) => {
    console.error("Task failed", e.data);
    eventSource.close();
});

// 断线自动重连(浏览器原生支持)
eventSource.onerror = () => {
    console.log("Connection lost, reconnecting...");
    // EventSource 会自动重连,并传递 Last-Event-ID
};

5. arq 异步任务执行

async def run_agent_task(canvas_id: str, node_id: str, message: str):
    """后台执行 Agent,事件写入 Redis Stream"""
    try:
        async with node_status_handler(canvas_id, node_id):
            # 1. 写入 START 事件
            await stream_store.append(canvas_id, node_id, StreamEvent.START, "null")

            # 2. 创建 Agent 会话
            async with unit_of_work() as db:
                fresh_node = await service.get_node(db, canvas_id, node_id)
                handler = get_handler(fresh_node.type)
                agent = await _create_agent_session(db, fresh_node, handler, canvas_id)

                # 3. 流式执行 Agent
                async for event in agent.run(message):
                    await stream_store.append(
                        canvas_id, node_id, StreamEvent.DELTA, json.dumps(event)
                    )

                # 4. 持久化结果
                agent_result = agent.get_result()
                streaming_result = handler.format_result(fresh_node, agent_result)

                if streaming_result is not None:
                    merged_data = {**fresh_node.data, **streaming_result.data}
                    df_id = await persist_node_dataframe(merged_data, fresh_node.title, db)
                    await service.update_node(
                        db_session=db,
                        canvas_id=canvas_id,
                        node_id=node_id,
                        data=merged_data,
                        dataframe_id=df_id,
                    )

        # 5. 写入 DONE 事件
        await stream_store.append(canvas_id, node_id, StreamEvent.DONE, "null")
    except Exception as e:
        # 6. 写入 ERROR 事件
        await stream_store.append(
            canvas_id, node_id, StreamEvent.ERROR, json.dumps({"message": str(e)[:200]})
        )
        raise
    finally:
        # 7. 设置 TTL
        await stream_store.complete(canvas_id, node_id)

关键点


五、性能优化与可靠性保障

1. 性能优化

XREAD 批量读取

results = await self._redis.xread(
    {key: last_id}, block=self.BLOCK_MS, count=10
)

MAXLEN 限制

stream_id = await self._redis.xadd(
    key, {"event": event, "data": data}, maxlen=STREAM_MAXLEN, approximate=True
)

TTL 自动清理

await self._redis.expire(key, self.STREAM_TTL_SECONDS)

2. 可靠性保障

事件持久化

断点续传

跨实例广播

幂等性


六、复杂场景处理

1. 长时间任务

# 任务可能运行数分钟
async for event in agent.run(message):
    await stream_store.append(canvas_id, node_id, StreamEvent.DELTA, json.dumps(event))
    # 每生成一段内容,立即推送

优势

2. 多客户端订阅

# 客户端 A 订阅
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id):
    print(f"Client A: {event}")

# 客户端 B 订阅(同时)
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id):
    print(f"Client B: {event}")

优势

3. 历史回放

# 任务完成后,新客户端连接
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="0-0"):
    print(f"{event}: {data}")
    # 完整回放所有历史事件

优势


七、对比其他方案

方案实时性断点续传横向扩展部署成本可靠性
Redis Streams + SSE⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
WebSocket + Redis Pub/Sub⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
短轮询⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
长轮询⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Kafka + WebSocket⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐

AskTable 选择 Redis Streams + SSE 的原因


八、实战经验与踩坑

1. Nginx 缓冲问题

问题:Nginx 默认会缓冲响应,导致 SSE 延迟。

解决方案

headers={
    "X-Accel-Buffering": "no",  # 禁用 Nginx 缓冲
}

2. XREAD 超时处理

问题

XREAD BLOCK
超时后,如何判断 Stream 是否还存在?

解决方案

if not results:
    # 超时,检查 stream 是否还存在
    if not await self._redis.exists(key):
        return  # Stream 已删除,停止订阅
    continue  # Stream 还在,继续等待

3. Stream ID 格式

问题:Redis Stream ID 格式是

{timestamp}-{sequence}
,如何正确解析?

解决方案

4. 事件去重

问题:客户端可能收到重复事件(如网络抖动)。

解决方案

const processedIds = new Set();

eventSource.addEventListener("delta", (e) => {
    if (processedIds.has(e.lastEventId)) {
        return;  // 已处理,跳过
    }
    processedIds.add(e.lastEventId);
    updateUI(JSON.parse(e.data));
});

九、总结与展望

AskTable 的 Canvas 流式执行架构,通过 Redis Streams + SSE + arq 的组合,实现了:

实时性:毫秒级推送延迟 ✅ 断点续传:网络断线后自动恢复,无遗漏 ✅ 横向扩展:支持多实例部署,事件跨实例广播 ✅ 低成本:复用 Redis,无需额外组件 ✅ 高可靠性:事件持久化,支持历史回放

未来优化方向

  1. 消费者组:使用 Redis Streams 的消费者组功能,实现负载均衡
  2. 事件压缩:对大数据量事件进行压缩,减少网络传输
  3. 多租户隔离:按租户分片 Stream,提升性能
  4. 监控告警:监控 Stream 长度、延迟等指标

开源计划

我们计划将 NodeStreamStoreSSE 端点实现 开源,帮助更多团队构建实时推送系统。敬请期待!


相关阅读

技术交流