
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
在现代数据分析场景中,团队协作已成为刚需。多个分析师需要同时在同一个 Canvas 画布上工作:有人在调整 SQL 查询,有人在优化图表样式,有人在添加 Python 分析节点。如何让这些操作实时同步,同时保证数据一致性和系统性能?
AskTable 的 Canvas 实时协同编辑架构给出了一个优雅的答案:Replicache + PostgreSQL LISTEN/NOTIFY + SSE。这套方案不仅实现了毫秒级的实时同步,还具备断点续传、跨实例扩展和低成本部署的优势。
本文将深入剖析这套架构的核心设计与实现细节。
传统的协同编辑(如 Google Docs)主要处理文本内容的 CRDT(无冲突复制数据类型)合并。但 Canvas 的复杂度远超文本:
WebSocket 全双工方案:
轮询方案:
我们需要的是:
Replicache 是一个专为实时协同设计的客户端状态管理框架,核心理念是 Optimistic UI + Server Reconciliation。
核心优势:
Replicache 的 Push/Pull 协议:
为什么不用 Redis Pub/Sub?
PostgreSQL LISTEN/NOTIFY 的优势:
pg_notify 在事务提交后才广播,保证一致性适用场景:
AskTable 的实时协同架构分为三层:
replicache_server 表)每次 mutation 都需要递增全局版本号。如何在高并发下保证版本号的唯一性?
async def get_next_version(db_session: AsyncSession) -> int:
"""原子递增全局版本号,FOR UPDATE 锁行保证并发安全"""
row = (
await db_session.execute(
text(
"SELECT version FROM atserver.replicache_server WHERE id = 'global' FOR UPDATE"
)
)
).scalar_one()
next_version = row + 1
await db_session.execute(
text("UPDATE atserver.replicache_server SET version = :v WHERE id = 'global'"),
{"v": next_version},
)
return next_version
关键点:
FOR UPDATE 锁住 replicache_server 表的 global 行性能考量:
网络不可靠,客户端可能重复发送同一个 mutation。如何保证幂等性?
async def process_mutation(
db_session: AsyncSession,
client_group_id: str,
client_id: str,
mutation: dict,
canvas_id: str,
) -> None:
"""处理单个 mutation:幂等检查 + 分发 + 容错"""
# 获取或创建客户端记录
client = (
await db_session.execute(select(ReplicacheClientModel).filter_by(id=client_id))
).scalar_one_or_none()
if client is None:
client = ReplicacheClientModel(
id=client_id,
client_group_id=client_group_id,
last_mutation_id=0,
version=0,
)
db_session.add(client)
mutation_id = mutation["id"]
# 幂等:已处理过的 mutation 直接跳过
if mutation_id <= client.last_mutation_id:
return
# Gap tolerance:记录警告但继续处理
if mutation_id != client.last_mutation_id + 1:
logger.warning(
"Mutation gap: client=%s expected=%d got=%d",
client_id,
client.last_mutation_id + 1,
mutation_id,
)
# 分发 mutation,失败时不更新 last_mutation_id 以便客户端重试
try:
await _dispatch_mutation(db_session, canvas_id, mutation, next_version)
# 成功时更新客户端状态
client.last_mutation_id = mutation_id
client.version = next_version
except Exception:
logger.exception("Mutation failed: client=%s mutation=%s", client_id, mutation)
raise
关键点:
last_mutation_id全量同步会浪费大量带宽。Replicache 的 Pull 协议支持增量同步:
async def build_pull_response(
db_session: AsyncSession,
client_group_id: str,
canvas_id: str,
from_version: int,
) -> dict:
"""构建 pull 响应:全量(from_version=0)或增量"""
# 当前服务端版本(不递增)
current_version = (
await db_session.execute(
text("SELECT version FROM atserver.replicache_server WHERE id = 'global'")
)
).scalar_one()
if from_version == 0:
patch = await _build_initial_patch(db_session, canvas_id)
else:
patch = await _build_incremental_patch(db_session, canvas_id, from_version)
# 查询变更的客户端
clients = (
(
await db_session.execute(
select(ReplicacheClientModel).filter(
ReplicacheClientModel.client_group_id == client_group_id,
ReplicacheClientModel.version > from_version,
)
)
)
.scalars()
.all()
)
return {
"cookie": max(current_version, 1),
"lastMutationIDChanges": {c.id: c.last_mutation_id for c in clients},
"patch": patch,
}
增量同步的实现:
async def _build_incremental_patch(
db_session: AsyncSession, canvas_id: str, from_version: int
) -> list[dict]:
"""增量 pull:返回 version > from_version 的变更"""
patch = []
# Canvas 变更
canvases = (
(
await db_session.execute(
select(CanvasModel).filter(
CanvasModel.id == canvas_id, CanvasModel.version > from_version
)
)
)
.scalars()
.all()
)
for canvas in canvases:
if canvas.deleted:
patch.append({"op": "del", "key": f"canvas/{canvas.id}"})
else:
patch.append(
{
"op": "put",
"key": f"canvas/{canvas.id}",
"value": _serialize_canvas(canvas),
}
)
# Node 变更(同理)
nodes = (
(
await db_session.execute(
select(NodeModel).filter(
NodeModel.canvas_id == canvas_id, NodeModel.version > from_version
)
)
)
.scalars()
.all()
)
# ... 序列化 nodes
return patch
关键点:
version 字段version > from_version 的记录{"op": "put/del", "key": "...", "value": {...}}当一个实例处理了 mutation,如何通知其他实例?
PokeBroadcaster 的设计:
class PokeBroadcaster:
"""PG LISTEN/NOTIFY -> asyncio.Queue 广播器"""
def __init__(self):
self._listeners: dict[str, set[asyncio.Queue]] = defaultdict(set)
self._pg_conn: asyncpg.Connection | None = None
def subscribe(self, canvas_id: str) -> asyncio.Queue:
"""SSE 端点调用,订阅某 canvas 的 poke 信号"""
queue: asyncio.Queue = asyncio.Queue(maxsize=1)
self._listeners[canvas_id].add(queue)
return queue
def unsubscribe(self, canvas_id: str, queue: asyncio.Queue):
"""SSE 端点断开时调用"""
self._listeners[canvas_id].discard(queue)
if not self._listeners[canvas_id]:
del self._listeners[canvas_id]
async def start_listener(self, pg_dsn: str):
"""应用启动时调用一次,监听 canvas_poke 频道"""
import asyncpg as _asyncpg
try:
self._pg_conn = await _asyncpg.connect(pg_dsn)
await self._pg_conn.add_listener("canvas_poke", self._on_pg_notify)
logger.info("PokeBroadcaster: listening on canvas_poke channel")
except Exception:
logger.exception("PokeBroadcaster: failed to start PG listener")
def _on_pg_notify(self, conn: Any, pid: int, channel: str, canvas_id: str):
"""PG 通知回调 -> 转发给本进程订阅者"""
self._poke_local(canvas_id)
def _poke_local(self, canvas_id: str):
"""向本进程内所有该 canvas 的 SSE 队列发送 poke"""
for queue in self._listeners.get(canvas_id, set()):
try:
queue.put_nowait(True)
except asyncio.QueueFull:
pass # maxsize=1,合并快速连续 poke
async def poke(self, canvas_id: str, db_session: Any):
"""写操作在事务内调用:PG NOTIFY 广播"""
await db_session.execute(
text("SELECT pg_notify('canvas_poke', :cid)"),
{"cid": canvas_id},
)
工作流程:
start_listener(),监听 canvas_poke 频道subscribe(canvas_id),获取一个 asyncio.Queuepoke(canvas_id, db_session)pg_notify 在事务提交后广播给所有监听者_on_pg_notify 回调触发,向本进程的订阅队列发送信号关键设计:
maxsize=1 的队列:合并快速连续的 poke,避免 SSE 推送风暴put_nowait + QueueFull 忽略:如果队列已满,说明已有待推送的 pokepg_notify:保证通知与数据变更的原子性每个 Node 可能关联一个 DataFrame(查询结果)。如果逐个加载,会产生 N+1 查询问题。
优化方案:
async def _load_dataframe_headers(
db_session: AsyncSession, nodes: Sequence[NodeModel]
) -> dict[str, list[dict]]:
"""批量获取节点关联的 dataframe header"""
df_ids = [n.dataframe_id for n in nodes if n.dataframe_id]
if not df_ids:
return {}
rows = (
await db_session.execute(
select(DataframeModel.id, DataframeModel.header).filter(
DataframeModel.id.in_(df_ids)
)
)
).all()
return {r.id: r.header for r in rows}
效果:
如果 1 秒内有 100 次 mutation,是否需要推送 100 次 poke?
优化方案:
maxsize=1:最多缓存 1 个 poke效果:
当前全局版本号是串行递增的,可能成为瓶颈。
优化方向:
挑战:
| 方案 | 实时性 | 横向扩展 | 断点续传 | 部署成本 | 适用场景 |
|---|---|---|---|---|---|
| Replicache + PG | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 中小规模协同 |
| WebSocket + Redis | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐ | 大规模实时应用 |
| 轮询 | ⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 低实时性需求 |
| CRDT (Yjs) | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | 文本协同编辑 |
AskTable 选择 Replicache + PG 的原因:
问题:NOTIFY 的 payload 最大 8000 字节。
解决方案:
canvas_id(几十字节)问题:客户端断线后,服务器如何清理订阅?
解决方案:
StreamingResponse 支持 finally 块finally 中调用 unsubscribe()async def sse_endpoint(canvas_id: str):
queue = poke_broadcaster.subscribe(canvas_id)
try:
async for _ in queue:
yield f"data: poke\n\n"
finally:
poke_broadcaster.unsubscribe(canvas_id, queue)
问题:客户端 Pull 时,服务器版本号已经远超客户端。
解决方案:
version > from_version 的变更AskTable 的实时协同编辑架构,通过 Replicache + PostgreSQL LISTEN/NOTIFY + SSE 的组合,实现了:
✅ 毫秒级实时性:< 100ms 的端到端延迟
✅ 高可靠性:幂等性设计 + 断点续传
✅ 横向扩展:无状态应用层 + PostgreSQL 广播
✅ 低成本部署:零额外依赖,复用现有基础设施
我们计划将 PokeBroadcaster 和 Replicache 服务端实现 开源,帮助更多团队快速构建实时协同应用。敬请期待!
相关阅读:
技术交流:
sidebar.noProgrammingNeeded
sidebar.startFreeTrial