
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在现代数据分析场景中,团队协作已成为刚需。多个分析师需要同时在同一个 Canvas 画布上工作:有人在调整 SQL 查询,有人在优化图表样式,有人在添加 Python 分析节点。如何让这些操作实时同步,同时保证数据一致性和系统性能?
AskTable 的 Canvas 实时协同编辑架构给出了一个优雅的答案:Replicache + PostgreSQL LISTEN/NOTIFY + SSE。这套方案不仅实现了毫秒级的实时同步,还具备断点续传、跨实例扩展和低成本部署的优势。
本文将深入剖析这套架构的核心设计与实现细节。
传统的协同编辑(如 Google Docs)主要处理文本内容的 CRDT(无冲突复制数据类型)合并。但 Canvas 的复杂度远超文本:
WebSocket 全双工方案:
轮询方案:
我们需要的是:
Replicache 是一个专为实时协同设计的客户端状态管理框架,核心理念是 Optimistic UI + Server Reconciliation。
核心优势:
Replicache 的 Push/Pull 协议:
加载图表中...
为什么不用 Redis Pub/Sub?
PostgreSQL LISTEN/NOTIFY 的优势:
pg_notify 在事务提交后才广播,保证一致性适用场景:
AskTable 的实时协同架构分为三层:
加载图表中...
replicache_server 表)每次 mutation 都需要递增全局版本号。如何在高并发下保证版本号的唯一性?
async def get_next_version(db_session: AsyncSession) -> int: """原子递增全局版本号,FOR UPDATE 锁行保证并发安全""" row = ( await db_session.execute( text( "SELECT version FROM atserver.replicache_server WHERE id = 'global' FOR UPDATE" ) ) ).scalar_one() next_version = row + 1 await db_session.execute( text("UPDATE atserver.replicache_server SET version = :v WHERE id = 'global'"), {"v": next_version}, ) return next_version
关键点:
FOR UPDATE 锁住 replicache_server 表的 global 行性能考量:
网络不可靠,客户端可能重复发送同一个 mutation。如何保证幂等性?
async def process_mutation( db_session: AsyncSession, client_group_id: str, client_id: str, mutation: dict, canvas_id: str, ) -> None: """处理单个 mutation:幂等检查 + 分发 + 容错""" # 获取或创建客户端记录 client = ( await db_session.execute(select(ReplicacheClientModel).filter_by(id=client_id)) ).scalar_one_or_none() if client is None: client = ReplicacheClientModel( id=client_id, client_group_id=client_group_id, last_mutation_id=0, version=0, ) db_session.add(client) mutation_id = mutation["id"] # 幂等:已处理过的 mutation 直接跳过 if mutation_id <= client.last_mutation_id: return # Gap tolerance:记录警告但继续处理 if mutation_id != client.last_mutation_id + 1: logger.warning( "Mutation gap: client=%s expected=%d got=%d", client_id, client.last_mutation_id + 1, mutation_id, ) # 分发 mutation,失败时不更新 last_mutation_id 以便客户端重试 try: await _dispatch_mutation(db_session, canvas_id, mutation, next_version) # 成功时更新客户端状态 client.last_mutation_id = mutation_id client.version = next_version except Exception: logger.exception("Mutation failed: client=%s mutation=%s", client_id, mutation) raise
关键点:
last_mutation_id全量同步会浪费大量带宽。Replicache 的 Pull 协议支持增量同步:
async def build_pull_response( db_session: AsyncSession, client_group_id: str, canvas_id: str, from_version: int, ) -> dict: """构建 pull 响应:全量(from_version=0)或增量""" # 当前服务端版本(不递增) current_version = ( await db_session.execute( text("SELECT version FROM atserver.replicache_server WHERE id = 'global'") ) ).scalar_one() if from_version == 0: patch = await _build_initial_patch(db_session, canvas_id) else: patch = await _build_incremental_patch(db_session, canvas_id, from_version) # 查询变更的客户端 clients = ( ( await db_session.execute( select(ReplicacheClientModel).filter( ReplicacheClientModel.client_group_id == client_group_id, ReplicacheClientModel.version > from_version, ) ) ) .scalars() .all() ) return { "cookie": max(current_version, 1), "lastMutationIDChanges": {c.id: c.last_mutation_id for c in clients}, "patch": patch, }
增量同步的实现:
async def _build_incremental_patch( db_session: AsyncSession, canvas_id: str, from_version: int ) -> list[dict]: """增量 pull:返回 version > from_version 的变更""" patch = [] # Canvas 变更 canvases = ( ( await db_session.execute( select(CanvasModel).filter( CanvasModel.id == canvas_id, CanvasModel.version > from_version ) ) ) .scalars() .all() ) for canvas in canvases: if canvas.deleted: patch.append({"op": "del", "key": f"canvas/{canvas.id}"}) else: patch.append( { "op": "put", "key": f"canvas/{canvas.id}", "value": _serialize_canvas(canvas), } ) # Node 变更(同理) nodes = ( ( await db_session.execute( select(NodeModel).filter( NodeModel.canvas_id == canvas_id, NodeModel.version > from_version ) ) ) .scalars() .all() ) # ... 序列化 nodes return patch
关键点:
version 字段version > from_version 的记录{"op": "put/del", "key": "...", "value": {...}}当一个实例处理了 mutation,如何通知其他实例?
PokeBroadcaster 的设计:
class PokeBroadcaster: """PG LISTEN/NOTIFY -> asyncio.Queue 广播器""" def __init__(self): self._listeners: dict[str, set[asyncio.Queue]] = defaultdict(set) self._pg_conn: asyncpg.Connection | None = None def subscribe(self, canvas_id: str) -> asyncio.Queue: """SSE 端点调用,订阅某 canvas 的 poke 信号""" queue: asyncio.Queue = asyncio.Queue(maxsize=1) self._listeners[canvas_id].add(queue) return queue def unsubscribe(self, canvas_id: str, queue: asyncio.Queue): """SSE 端点断开时调用""" self._listeners[canvas_id].discard(queue) if not self._listeners[canvas_id]: del self._listeners[canvas_id] async def start_listener(self, pg_dsn: str): """应用启动时调用一次,监听 canvas_poke 频道""" import asyncpg as _asyncpg try: self._pg_conn = await _asyncpg.connect(pg_dsn) await self._pg_conn.add_listener("canvas_poke", self._on_pg_notify) logger.info("PokeBroadcaster: listening on canvas_poke channel") except Exception: logger.exception("PokeBroadcaster: failed to start PG listener") def _on_pg_notify(self, conn: Any, pid: int, channel: str, canvas_id: str): """PG 通知回调 -> 转发给本进程订阅者""" self._poke_local(canvas_id) def _poke_local(self, canvas_id: str): """向本进程内所有该 canvas 的 SSE 队列发送 poke""" for queue in self._listeners.get(canvas_id, set()): try: queue.put_nowait(True) except asyncio.QueueFull: pass # maxsize=1,合并快速连续 poke async def poke(self, canvas_id: str, db_session: Any): """写操作在事务内调用:PG NOTIFY 广播""" await db_session.execute( text("SELECT pg_notify('canvas_poke', :cid)"), {"cid": canvas_id}, )
工作流程:
start_listener(),监听 canvas_poke 频道subscribe(canvas_id),获取一个 asyncio.Queuepoke(canvas_id, db_session)pg_notify 在事务提交后广播给所有监听者_on_pg_notify 回调触发,向本进程的订阅队列发送信号关键设计:
maxsize=1 的队列:合并快速连续的 poke,避免 SSE 推送风暴put_nowait + QueueFull 忽略:如果队列已满,说明已有待推送的 pokepg_notify:保证通知与数据变更的原子性每个 Node 可能关联一个 DataFrame(查询结果)。如果逐个加载,会产生 N+1 查询问题。
优化方案:
async def _load_dataframe_headers( db_session: AsyncSession, nodes: Sequence[NodeModel] ) -> dict[str, list[dict]]: """批量获取节点关联的 dataframe header""" df_ids = [n.dataframe_id for n in nodes if n.dataframe_id] if not df_ids: return {} rows = ( await db_session.execute( select(DataframeModel.id, DataframeModel.header).filter( DataframeModel.id.in_(df_ids) ) ) ).all() return {r.id: r.header for r in rows}
效果:
如果 1 秒内有 100 次 mutation,是否需要推送 100 次 poke?
优化方案:
maxsize=1:最多缓存 1 个 poke效果:
当前全局版本号是串行递增的,可能成为瓶颈。
优化方向:
挑战:
| 方案 | 实时性 | 横向扩展 | 断点续传 | 部署成本 | 适用场景 |
|---|---|---|---|---|---|
| Replicache + PG | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 中小规模协同 |
| WebSocket + Redis | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐ | 大规模实时应用 |
| 轮询 | ⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 低实时性需求 |
| CRDT (Yjs) | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | 文本协同编辑 |
AskTable 选择 Replicache + PG 的原因:
问题:NOTIFY 的 payload 最大 8000 字节。
解决方案:
canvas_id(几十字节)问题:客户端断线后,服务器如何清理订阅?
解决方案:
StreamingResponse 支持 finally 块finally 中调用 unsubscribe()async def sse_endpoint(canvas_id: str): queue = poke_broadcaster.subscribe(canvas_id) try: async for _ in queue: yield f"data: poke\n\n" finally: poke_broadcaster.unsubscribe(canvas_id, queue)
问题:客户端 Pull 时,服务器版本号已经远超客户端。
解决方案:
version > from_version 的变更AskTable 的实时协同编辑架构,通过 Replicache + PostgreSQL LISTEN/NOTIFY + SSE 的组合,实现了:
✅ 毫秒级实时性:< 100ms 的端到端延迟
✅ 高可靠性:幂等性设计 + 断点续传
✅ 横向扩展:无状态应用层 + PostgreSQL 广播
✅ 低成本部署:零额外依赖,复用现有基础设施
我们计划将 PokeBroadcaster 和 Replicache 服务端实现 开源,帮助更多团队快速构建实时协同应用。敬请期待!
相关阅读:
技术交流: