AskTable
sidebar.freeTrial

AskTable Canvas 流式执行与断点续传:基于 Redis Streams 的事件总线架构

AskTable Team
AskTable Team 2026-03-05

在 AI 数据分析场景中,一个 SQL 查询可能耗时数秒,一个 Python 分析脚本可能运行数分钟。如何让用户实时看到执行进度?如何在网络断线后恢复推送?如何在多实例部署下保证事件不丢失?

AskTable 的 Canvas 流式执行架构,通过 Redis Streams + SSE + arq 的组合,实现了高性能、高可靠的实时推送系统。

本文将深入剖析这套架构的设计与实现。


一、为什么需要流式执行?

1. 传统轮询方案的痛点

短轮询(Polling)

// 客户端每秒轮询一次
setInterval(async () => {
    const status = await fetch(`/api/node/${nodeId}/status`);
    updateUI(status);
}, 1000);

问题

  • ❌ 延迟高(1-3 秒)
  • ❌ 服务器压力大(大量无效请求)
  • ❌ 无法实时推送(如 AI 生成的文本流)

长轮询(Long Polling)

# 服务器阻塞等待直到有新事件
async def get_status(node_id):
    while True:
        if has_new_event(node_id):
            return get_event(node_id)
        await asyncio.sleep(0.1)

问题

  • ❌ 连接数受限(每个客户端占用一个连接)
  • ❌ 断线后无法恢复(需要重新轮询)
  • ❌ 难以横向扩展(连接绑定到特定实例)

2. 我们需要的是什么?

实时性:毫秒级推送延迟 ✅ 断点续传:网络断线后自动恢复,不丢失事件 ✅ 横向扩展:支持多实例部署,事件跨实例广播 ✅ 低成本:复用现有 Redis,无需额外组件 ✅ 可靠性:事件持久化,支持历史回放


二、技术选型:为什么是 Redis Streams?

1. Redis Streams:为事件流而生

Redis Streams 是 Redis 5.0 引入的数据结构,专为消息队列和事件流设计。

核心优势

  • 持久化:事件写入 Redis 后持久化,支持历史回放
  • 消费者组:支持多消费者并行消费(我们暂未使用)
  • 阻塞读取XREAD BLOCK 支持长连接等待新事件
  • 范围查询XRANGE 支持按 ID 范围查询历史事件
  • 自动 ID:Redis 自动生成递增的 Stream ID(时间戳 + 序列号)

为什么不用 Redis Pub/Sub?

  • Pub/Sub 不持久化,订阅者离线时消息丢失
  • Pub/Sub 无法回放历史消息
  • Pub/Sub 无法实现断点续传

为什么不用 Kafka/RabbitMQ?

  • 部署成本高(需要额外组件)
  • 对于中小规模场景过于重量级
  • AskTable 已经使用 Redis,复用现有基础设施

2. SSE (Server-Sent Events):单向推送的最佳选择

SSE 是 HTML5 标准,专为服务器到客户端的单向推送设计。

核心优势

  • 原生支持:浏览器原生 EventSource API
  • 断点续传:通过 Last-Event-ID 自动重连
  • 简单高效:基于 HTTP,无需 WebSocket 握手
  • 跨域支持:支持 CORS

SSE vs WebSocket

特性SSEWebSocket
方向单向(服务器 → 客户端)双向
协议HTTPWebSocket
断点续传原生支持(Last-Event-ID)需要自己实现
浏览器支持所有现代浏览器所有现代浏览器
复杂度

AskTable 的场景

  • Canvas 节点执行是单向推送(服务器 → 客户端)
  • 不需要客户端实时发送数据
  • SSE 更简单、更可靠

三、架构设计:三层事件流

AskTable 的流式执行架构分为三层:

加载图表中...

工作流程

  1. 用户触发:用户在 Canvas 中点击"运行"按钮
  2. 创建 Stream:FastAPI 创建 Redis Stream(at:canvas:stream:{canvas_id}:{node_id}
  3. 提交任务:FastAPI 将任务提交到 arq 队列
  4. SSE 订阅:客户端通过 SSE 订阅事件流
  5. 异步执行:arq worker 执行 Agent,将事件写入 Redis Streams
  6. 实时推送:SSE 端点从 Redis Streams 读取事件,推送给客户端
  7. 持久化:任务完成后,结果持久化到 PostgreSQL
  8. TTL 清理:Stream 设置 1 小时 TTL,自动过期

四、核心实现:深入源码

1. Redis Streams 事件存储

class NodeStreamStore:
    STREAM_TTL_SECONDS = 3600  # stream 完成后保留 1 小时
    BLOCK_MS = 2000  # XREAD 阻塞等待时间

    def _stream_key(self, canvas_id: str, node_id: str) -> str:
        """生成 Stream Key"""
        return f"at:canvas:stream:{canvas_id}:{node_id}"

    async def start_stream(self, canvas_id: str, node_id: str):
        """开始新 stream,清除旧数据并创建空 stream"""
        key = self._stream_key(canvas_id, node_id)
        async with self._redis.pipeline(transaction=True) as pipe:
            pipe.delete(key)  # 清除旧数据
            # 种子消息让 key 立即存在,subscribe 跳过 _init 事件
            pipe.xadd(key, {"event": StreamEvent.INIT, "data": ""})
            await pipe.execute()

    async def append(self, canvas_id: str, node_id: str, event: str, data: str) -> str:
        """追加事件到 stream,返回 stream ID"""
        key = self._stream_key(canvas_id, node_id)
        stream_id = await self._redis.xadd(
            key, {"event": event, "data": data}, maxlen=STREAM_MAXLEN, approximate=True
        )
        return stream_id

    async def complete(self, canvas_id: str, node_id: str):
        """标记 stream 完成,设置 TTL"""
        key = self._stream_key(canvas_id, node_id)
        await self._redis.expire(key, self.STREAM_TTL_SECONDS)

关键点

  • Stream Key 设计at:canvas:stream:{canvas_id}:{node_id},每个节点独立 Stream
  • 种子消息_init 事件让 Stream 立即存在,避免订阅时 key 不存在
  • MAXLEN 限制:限制 Stream 最大长度(10000 条),避免内存溢出
  • TTL 清理:完成后设置 1 小时 TTL,自动过期

2. 事件类型定义

class StreamEvent(StrEnum):
    INIT = "_init"  # 种子事件(不推送给客户端)
    START = "start"  # 任务开始
    DELTA = "delta"  # 增量数据(如 AI 生成的文本)
    DONE = "done"  # 任务完成
    ERROR = "error"  # 任务失败

_TERMINAL_EVENTS = frozenset((StreamEvent.DONE, StreamEvent.ERROR))

事件生命周期

INIT → START → DELTA × N → DONE/ERROR

3. 断点续传:XRANGE + XREAD

async def subscribe(
    self, canvas_id: str, node_id: str, from_id: str = "0-0"
) -> AsyncGenerator[tuple[str, str, str], None]:
    """订阅 stream 事件,支持断点续传。

    1. XRANGE 重放 from_id 之后的历史消息
    2. 如果未遇到终止事件(done/error),切换到 XREAD BLOCK 等待实时消息
    3. 遇到 done/error 终止

    from_id: "0-0" 从头读取,或传入 Last-Event-ID 实现断点重续
    """
    key = self._stream_key(canvas_id, node_id)

    # Phase 1: 重放历史(XRANGE)
    range_start = f"({from_id}" if from_id != "0-0" else "-"
    raw_entries = await self._redis.xrange(key, min=range_start)

    last_id = from_id
    for stream_id, event, data, terminal in self._iter_entries(raw_entries):
        last_id = stream_id
        yield (stream_id, event, data)
        if terminal:
            return

    # Phase 2: 实时等待(XREAD BLOCK)
    while True:
        results = await self._redis.xread(
            {key: last_id}, block=self.BLOCK_MS, count=10
        )
        if not results:
            # 超时,检查 stream 是否还存在
            if not await self._redis.exists(key):
                return
            continue

        for _stream_name, entries in results:
            for stream_id, event, data, terminal in self._iter_entries(entries):
                last_id = stream_id
                yield (stream_id, event, data)
                if terminal:
                    return

关键点

  • 两阶段读取
    1. XRANGE:重放历史消息(支持断点续传)
    2. XREAD BLOCK:阻塞等待新消息(实时推送)
  • Exclusive Range({from_id} 跳过 from_id 本身,避免重复推送
  • 终止检测:遇到 DONE/ERROR 事件后停止订阅
  • 超时处理XREAD 超时后检查 Stream 是否还存在

断点续传示例

# 客户端首次连接
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="0-0"):
    print(f"{stream_id}: {event} - {data}")
    # 假设在 stream_id="1234567890-0" 时断线

# 客户端重连(传入 Last-Event-ID)
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="1234567890-0"):
    print(f"{stream_id}: {event} - {data}")
    # 从 "1234567890-0" 之后继续推送,无遗漏

4. SSE 端点实现

@router.get("/canvas/{canvas_id}/node/{node_id}/stream")
async def stream_node_events(canvas_id: str, node_id: str, last_event_id: str = "0-0"):
    """SSE 端点:推送节点执行事件"""

    async def event_generator():
        try:
            async for stream_id, event, data in stream_store.subscribe(
                canvas_id, node_id, from_id=last_event_id
            ):
                # SSE 格式:id + event + data
                yield f"id: {stream_id}\n"
                yield f"event: {event}\n"
                yield f"data: {data}\n\n"
        except Exception as e:
            # 错误事件
            yield f"event: error\n"
            yield f"data: {json.dumps({'message': str(e)})}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # 禁用 Nginx 缓冲
        },
    )

关键点

  • SSE 格式id + event + data + 空行
  • Last-Event-ID:客户端断线重连时自动传递
  • 流式响应StreamingResponse 支持异步生成器
  • Nginx 兼容X-Accel-Buffering: no 禁用 Nginx 缓冲

客户端代码

const eventSource = new EventSource(
    `/api/canvas/${canvasId}/node/${nodeId}/stream`
);

eventSource.addEventListener("start", (e) => {
    console.log("Task started");
});

eventSource.addEventListener("delta", (e) => {
    const data = JSON.parse(e.data);
    updateUI(data);  // 实时更新 UI
});

eventSource.addEventListener("done", (e) => {
    console.log("Task completed");
    eventSource.close();
});

eventSource.addEventListener("error", (e) => {
    console.error("Task failed", e.data);
    eventSource.close();
});

// 断线自动重连(浏览器原生支持)
eventSource.onerror = () => {
    console.log("Connection lost, reconnecting...");
    // EventSource 会自动重连,并传递 Last-Event-ID
};

5. arq 异步任务执行

async def run_agent_task(canvas_id: str, node_id: str, message: str):
    """后台执行 Agent,事件写入 Redis Stream"""
    try:
        async with node_status_handler(canvas_id, node_id):
            # 1. 写入 START 事件
            await stream_store.append(canvas_id, node_id, StreamEvent.START, "null")

            # 2. 创建 Agent 会话
            async with unit_of_work() as db:
                fresh_node = await service.get_node(db, canvas_id, node_id)
                handler = get_handler(fresh_node.type)
                agent = await _create_agent_session(db, fresh_node, handler, canvas_id)

                # 3. 流式执行 Agent
                async for event in agent.run(message):
                    await stream_store.append(
                        canvas_id, node_id, StreamEvent.DELTA, json.dumps(event)
                    )

                # 4. 持久化结果
                agent_result = agent.get_result()
                streaming_result = handler.format_result(fresh_node, agent_result)

                if streaming_result is not None:
                    merged_data = {**fresh_node.data, **streaming_result.data}
                    df_id = await persist_node_dataframe(merged_data, fresh_node.title, db)
                    await service.update_node(
                        db_session=db,
                        canvas_id=canvas_id,
                        node_id=node_id,
                        data=merged_data,
                        dataframe_id=df_id,
                    )

        # 5. 写入 DONE 事件
        await stream_store.append(canvas_id, node_id, StreamEvent.DONE, "null")
    except Exception as e:
        # 6. 写入 ERROR 事件
        await stream_store.append(
            canvas_id, node_id, StreamEvent.ERROR, json.dumps({"message": str(e)[:200]})
        )
        raise
    finally:
        # 7. 设置 TTL
        await stream_store.complete(canvas_id, node_id)

关键点

  • 异步执行:arq worker 在后台执行,不阻塞 API 请求
  • 流式写入:Agent 每生成一段内容,立即写入 Stream
  • 结果持久化:任务完成后,结果写入 PostgreSQL
  • 错误处理:异常时写入 ERROR 事件,客户端可以感知
  • TTL 清理:无论成功失败,都设置 TTL

五、性能优化与可靠性保障

1. 性能优化

XREAD 批量读取

results = await self._redis.xread(
    {key: last_id}, block=self.BLOCK_MS, count=10
)
  • count=10:每次最多读取 10 条消息
  • 减少网络往返次数

MAXLEN 限制

stream_id = await self._redis.xadd(
    key, {"event": event, "data": data}, maxlen=STREAM_MAXLEN, approximate=True
)
  • maxlen=10000:限制 Stream 最大长度
  • approximate=True:允许近似裁剪,性能更好

TTL 自动清理

await self._redis.expire(key, self.STREAM_TTL_SECONDS)
  • 完成后 1 小时自动过期
  • 避免 Redis 内存泄漏

2. 可靠性保障

事件持久化

  • Redis Streams 持久化到磁盘(AOF/RDB)
  • 即使 Redis 重启,事件不丢失

断点续传

  • 客户端断线后,通过 Last-Event-ID 恢复
  • XRANGE 重放历史消息,无遗漏

跨实例广播

  • Redis Streams 是全局的,所有实例共享
  • 任意实例的 SSE 端点都可以读取事件

幂等性

  • Stream ID 是唯一的,客户端可以去重
  • 重复推送不会导致数据错误

六、复杂场景处理

1. 长时间任务

# 任务可能运行数分钟
async for event in agent.run(message):
    await stream_store.append(canvas_id, node_id, StreamEvent.DELTA, json.dumps(event))
    # 每生成一段内容,立即推送

优势

  • 用户实时看到进度
  • 不需要等待任务完成

2. 多客户端订阅

# 客户端 A 订阅
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id):
    print(f"Client A: {event}")

# 客户端 B 订阅(同时)
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id):
    print(f"Client B: {event}")

优势

  • 多个客户端可以同时订阅同一个 Stream
  • 每个客户端独立读取,互不影响

3. 历史回放

# 任务完成后,新客户端连接
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="0-0"):
    print(f"{event}: {data}")
    # 完整回放所有历史事件

优势

  • 支持"重播"功能
  • 用户可以查看任务的完整执行过程

七、对比其他方案

方案实时性断点续传横向扩展部署成本可靠性
Redis Streams + SSE⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
WebSocket + Redis Pub/Sub⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
短轮询⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
长轮询⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Kafka + WebSocket⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐

AskTable 选择 Redis Streams + SSE 的原因

  • 实时性满足需求(< 100ms)
  • 原生支持断点续传
  • 横向扩展简单(无状态 SSE 端点)
  • 零额外依赖(复用 Redis)
  • 高可靠性(事件持久化)

八、实战经验与踩坑

1. Nginx 缓冲问题

问题:Nginx 默认会缓冲响应,导致 SSE 延迟。

解决方案

headers={
    "X-Accel-Buffering": "no",  # 禁用 Nginx 缓冲
}

2. XREAD 超时处理

问题XREAD BLOCK 超时后,如何判断 Stream 是否还存在?

解决方案

if not results:
    # 超时,检查 stream 是否还存在
    if not await self._redis.exists(key):
        return  # Stream 已删除,停止订阅
    continue  # Stream 还在,继续等待

3. Stream ID 格式

问题:Redis Stream ID 格式是 {timestamp}-{sequence},如何正确解析?

解决方案

  • 不需要解析,直接作为字符串使用
  • Redis 保证 ID 严格递增
  • XRANGEXREAD 都支持字符串比较

4. 事件去重

问题:客户端可能收到重复事件(如网络抖动)。

解决方案

  • 客户端记录已处理的 stream_id
  • 使用 Set 去重
const processedIds = new Set();

eventSource.addEventListener("delta", (e) => {
    if (processedIds.has(e.lastEventId)) {
        return;  // 已处理,跳过
    }
    processedIds.add(e.lastEventId);
    updateUI(JSON.parse(e.data));
});

九、总结与展望

AskTable 的 Canvas 流式执行架构,通过 Redis Streams + SSE + arq 的组合,实现了:

实时性:毫秒级推送延迟 ✅ 断点续传:网络断线后自动恢复,无遗漏 ✅ 横向扩展:支持多实例部署,事件跨实例广播 ✅ 低成本:复用 Redis,无需额外组件 ✅ 高可靠性:事件持久化,支持历史回放

未来优化方向

  1. 消费者组:使用 Redis Streams 的消费者组功能,实现负载均衡
  2. 事件压缩:对大数据量事件进行压缩,减少网络传输
  3. 多租户隔离:按租户分片 Stream,提升性能
  4. 监控告警:监控 Stream 长度、延迟等指标

开源计划

我们计划将 NodeStreamStoreSSE 端点实现 开源,帮助更多团队构建实时推送系统。敬请期待!


相关阅读

技术交流

cta.readyToSimplify

sidebar.noProgrammingNeededsidebar.startFreeTrial

cta.noCreditCard
cta.quickStart
cta.dbSupport