AskTable

AskTable Canvas 实时协同编辑架构:Replicache + PostgreSQL LISTEN/NOTIFY 的优雅实践

AskTable 团队
AskTable 团队 2026年3月5日

在现代数据分析场景中,团队协作已成为刚需。多个分析师需要同时在同一个 Canvas 画布上工作:有人在调整 SQL 查询,有人在优化图表样式,有人在添加 Python 分析节点。如何让这些操作实时同步,同时保证数据一致性和系统性能?

AskTable 的 Canvas 实时协同编辑架构给出了一个优雅的答案:Replicache + PostgreSQL LISTEN/NOTIFY + SSE。这套方案不仅实现了毫秒级的实时同步,还具备断点续传、跨实例扩展和低成本部署的优势。

本文将深入剖析这套架构的核心设计与实现细节。


一、为什么需要实时协同?Canvas 的独特挑战

1. Canvas 不是传统文档

传统的协同编辑(如 Google Docs)主要处理文本内容的 CRDT(无冲突复制数据类型)合并。但 Canvas 的复杂度远超文本:

2. 传统方案的局限

WebSocket 全双工方案

轮询方案

我们需要的是


二、技术选型:为什么是 Replicache + PostgreSQL?

1. Replicache:为协同而生的状态同步框架

Replicache 是一个专为实时协同设计的客户端状态管理框架,核心理念是 Optimistic UI + Server Reconciliation

核心优势

Replicache 的 Push/Pull 协议

加载图表中...

2. PostgreSQL LISTEN/NOTIFY:轻量级的跨实例广播

为什么不用 Redis Pub/Sub?

PostgreSQL LISTEN/NOTIFY 的优势

适用场景


三、架构设计:三层协同机制

AskTable 的实时协同架构分为三层:

加载图表中...

1. 客户端层:Replicache

2. 应用层:FastAPI + PokeBroadcaster

3. 数据层:PostgreSQL


四、核心实现:深入源码

1. 版本管理:FOR UPDATE 锁保证并发安全

每次 mutation 都需要递增全局版本号。如何在高并发下保证版本号的唯一性?

async def get_next_version(db_session: AsyncSession) -> int:
    """原子递增全局版本号,FOR UPDATE 锁行保证并发安全"""
    row = (
        await db_session.execute(
            text(
                "SELECT version FROM atserver.replicache_server WHERE id = 'global' FOR UPDATE"
            )
        )
    ).scalar_one()

    next_version = row + 1

    await db_session.execute(
        text("UPDATE atserver.replicache_server SET version = :v WHERE id = 'global'"),
        {"v": next_version},
    )

    return next_version

关键点

性能考量

2. 幂等性设计:避免重复处理

网络不可靠,客户端可能重复发送同一个 mutation。如何保证幂等性?

async def process_mutation(
    db_session: AsyncSession,
    client_group_id: str,
    client_id: str,
    mutation: dict,
    canvas_id: str,
) -> None:
    """处理单个 mutation:幂等检查 + 分发 + 容错"""
    # 获取或创建客户端记录
    client = (
        await db_session.execute(select(ReplicacheClientModel).filter_by(id=client_id))
    ).scalar_one_or_none()
    
    if client is None:
        client = ReplicacheClientModel(
            id=client_id,
            client_group_id=client_group_id,
            last_mutation_id=0,
            version=0,
        )
        db_session.add(client)

    mutation_id = mutation["id"]

    # 幂等:已处理过的 mutation 直接跳过
    if mutation_id <= client.last_mutation_id:
        return

    # Gap tolerance:记录警告但继续处理
    if mutation_id != client.last_mutation_id + 1:
        logger.warning(
            "Mutation gap: client=%s expected=%d got=%d",
            client_id,
            client.last_mutation_id + 1,
            mutation_id,
        )

    # 分发 mutation,失败时不更新 last_mutation_id 以便客户端重试
    try:
        await _dispatch_mutation(db_session, canvas_id, mutation, next_version)
        # 成功时更新客户端状态
        client.last_mutation_id = mutation_id
        client.version = next_version
    except Exception:
        logger.exception("Mutation failed: client=%s mutation=%s", client_id, mutation)
        raise

关键点

3. 增量同步:只传输变更数据

全量同步会浪费大量带宽。Replicache 的 Pull 协议支持增量同步:

async def build_pull_response(
    db_session: AsyncSession,
    client_group_id: str,
    canvas_id: str,
    from_version: int,
) -> dict:
    """构建 pull 响应:全量(from_version=0)或增量"""
    # 当前服务端版本(不递增)
    current_version = (
        await db_session.execute(
            text("SELECT version FROM atserver.replicache_server WHERE id = 'global'")
        )
    ).scalar_one()

    if from_version == 0:
        patch = await _build_initial_patch(db_session, canvas_id)
    else:
        patch = await _build_incremental_patch(db_session, canvas_id, from_version)

    # 查询变更的客户端
    clients = (
        (
            await db_session.execute(
                select(ReplicacheClientModel).filter(
                    ReplicacheClientModel.client_group_id == client_group_id,
                    ReplicacheClientModel.version > from_version,
                )
            )
        )
        .scalars()
        .all()
    )

    return {
        "cookie": max(current_version, 1),
        "lastMutationIDChanges": {c.id: c.last_mutation_id for c in clients},
        "patch": patch,
    }

增量同步的实现

async def _build_incremental_patch(
    db_session: AsyncSession, canvas_id: str, from_version: int
) -> list[dict]:
    """增量 pull:返回 version > from_version 的变更"""
    patch = []

    # Canvas 变更
    canvases = (
        (
            await db_session.execute(
                select(CanvasModel).filter(
                    CanvasModel.id == canvas_id, CanvasModel.version > from_version
                )
            )
        )
        .scalars()
        .all()
    )
    for canvas in canvases:
        if canvas.deleted:
            patch.append({"op": "del", "key": f"canvas/{canvas.id}"})
        else:
            patch.append(
                {
                    "op": "put",
                    "key": f"canvas/{canvas.id}",
                    "value": _serialize_canvas(canvas),
                }
            )

    # Node 变更(同理)
    nodes = (
        (
            await db_session.execute(
                select(NodeModel).filter(
                    NodeModel.canvas_id == canvas_id, NodeModel.version > from_version
                )
            )
        )
        .scalars()
        .all()
    )
    # ... 序列化 nodes
    
    return patch

关键点

4. 跨实例广播:PostgreSQL LISTEN/NOTIFY

当一个实例处理了 mutation,如何通知其他实例?

PokeBroadcaster 的设计

class PokeBroadcaster:
    """PG LISTEN/NOTIFY -> asyncio.Queue 广播器"""

    def __init__(self):
        self._listeners: dict[str, set[asyncio.Queue]] = defaultdict(set)
        self._pg_conn: asyncpg.Connection | None = None

    def subscribe(self, canvas_id: str) -> asyncio.Queue:
        """SSE 端点调用,订阅某 canvas 的 poke 信号"""
        queue: asyncio.Queue = asyncio.Queue(maxsize=1)
        self._listeners[canvas_id].add(queue)
        return queue

    def unsubscribe(self, canvas_id: str, queue: asyncio.Queue):
        """SSE 端点断开时调用"""
        self._listeners[canvas_id].discard(queue)
        if not self._listeners[canvas_id]:
            del self._listeners[canvas_id]

    async def start_listener(self, pg_dsn: str):
        """应用启动时调用一次,监听 canvas_poke 频道"""
        import asyncpg as _asyncpg

        try:
            self._pg_conn = await _asyncpg.connect(pg_dsn)
            await self._pg_conn.add_listener("canvas_poke", self._on_pg_notify)
            logger.info("PokeBroadcaster: listening on canvas_poke channel")
        except Exception:
            logger.exception("PokeBroadcaster: failed to start PG listener")

    def _on_pg_notify(self, conn: Any, pid: int, channel: str, canvas_id: str):
        """PG 通知回调 -> 转发给本进程订阅者"""
        self._poke_local(canvas_id)

    def _poke_local(self, canvas_id: str):
        """向本进程内所有该 canvas 的 SSE 队列发送 poke"""
        for queue in self._listeners.get(canvas_id, set()):
            try:
                queue.put_nowait(True)
            except asyncio.QueueFull:
                pass  # maxsize=1,合并快速连续 poke

    async def poke(self, canvas_id: str, db_session: Any):
        """写操作在事务内调用:PG NOTIFY 广播"""
        await db_session.execute(
            text("SELECT pg_notify('canvas_poke', :cid)"),
            {"cid": canvas_id},
        )

工作流程

  1. 应用启动:每个实例调用
    start_listener()
    ,监听
    canvas_poke
    频道
  2. 客户端订阅:SSE 端点调用
    subscribe(canvas_id)
    ,获取一个
    asyncio.Queue
  3. 写操作:mutation 处理完成后,调用
    poke(canvas_id, db_session)
  4. PostgreSQL 广播
    pg_notify
    在事务提交后广播给所有监听者
  5. 本地转发
    _on_pg_notify
    回调触发,向本进程的订阅队列发送信号
  6. SSE 推送:SSE 端点从队列读取信号,推送给客户端

关键设计


五、性能优化:从理论到实践

1. 批量加载 DataFrame Header

每个 Node 可能关联一个 DataFrame(查询结果)。如果逐个加载,会产生 N+1 查询问题。

优化方案

async def _load_dataframe_headers(
    db_session: AsyncSession, nodes: Sequence[NodeModel]
) -> dict[str, list[dict]]:
    """批量获取节点关联的 dataframe header"""
    df_ids = [n.dataframe_id for n in nodes if n.dataframe_id]
    if not df_ids:
        return {}
    rows = (
        await db_session.execute(
            select(DataframeModel.id, DataframeModel.header).filter(
                DataframeModel.id.in_(df_ids)
            )
        )
    ).all()
    return {r.id: r.header for r in rows}

效果

2. SSE 队列合并

如果 1 秒内有 100 次 mutation,是否需要推送 100 次 poke?

优化方案

效果

3. 版本号分片(未来优化)

当前全局版本号是串行递增的,可能成为瓶颈。

优化方向

挑战


六、对比其他方案

方案实时性横向扩展断点续传部署成本适用场景
Replicache + PG⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐中小规模协同
WebSocket + Redis⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐大规模实时应用
轮询⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐低实时性需求
CRDT (Yjs)⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐文本协同编辑

AskTable 选择 Replicache + PG 的原因


七、实战经验与踩坑

1. PostgreSQL NOTIFY 的限制

问题:NOTIFY 的 payload 最大 8000 字节。

解决方案

2. SSE 连接的生命周期管理

问题:客户端断线后,服务器如何清理订阅?

解决方案

async def sse_endpoint(canvas_id: str):
    queue = poke_broadcaster.subscribe(canvas_id)
    try:
        async for _ in queue:
            yield f"data: poke\n\n"
    finally:
        poke_broadcaster.unsubscribe(canvas_id, queue)

3. 版本号冲突的处理

问题:客户端 Pull 时,服务器版本号已经远超客户端。

解决方案


八、总结与展望

AskTable 的实时协同编辑架构,通过 Replicache + PostgreSQL LISTEN/NOTIFY + SSE 的组合,实现了:

毫秒级实时性:< 100ms 的端到端延迟
高可靠性:幂等性设计 + 断点续传
横向扩展:无状态应用层 + PostgreSQL 广播
低成本部署:零额外依赖,复用现有基础设施

未来优化方向

  1. 版本号分片:按 Canvas ID 分片,提升并发性能
  2. Conflict Resolution:支持更复杂的冲突解决策略(如 CRDT)
  3. 离线编辑:客户端完全离线时的本地编辑与同步
  4. 跨区域同步:多数据中心部署的一致性保证

开源计划

我们计划将 PokeBroadcasterReplicache 服务端实现 开源,帮助更多团队快速构建实时协同应用。敬请期待!


相关阅读

技术交流