
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
In modern data analysis scenarios, team collaboration has become a necessity. Multiple analysts need to work on the same Canvas simultaneously: someone is adjusting SQL queries, someone is optimizing chart styles, someone is adding Python analysis nodes. How do we synchronize these operations in real-time while ensuring data consistency and system performance?
AskTable's Canvas real-time collaborative editing architecture provides an elegant answer: Replicache + PostgreSQL LISTEN/NOTIFY + SSE. This solution not only achieves millisecond-level real-time synchronization but also has advantages in breakpoint resume, cross-instance scaling, and low-cost deployment.
This article deeply analyzes the core design and implementation details of this architecture.
Traditional collaborative editing (like Google Docs) mainly handles CRDT (Conflict-free Replicated Data Type) merging of text content. But Canvas is far more complex than text:
WebSocket Full-Duplex Solution:
Polling Solution:
What We Need:
Replicache is a client-side state management framework designed specifically for real-time collaboration. Its core concept is Optimistic UI + Server Reconciliation.
Core Advantages:
Replicache's Push/Pull Protocol:
Why not use Redis Pub/Sub?
Advantages of PostgreSQL LISTEN/NOTIFY:
pg_notify broadcasts after transaction commit, ensuring consistencyApplicable Scenarios:
AskTable's real-time collaboration architecture is divided into three layers:
replicache_server table)Each mutation needs to increment the global version number. How to ensure uniqueness of version numbers under high concurrency?
async def get_next_version(db_session: AsyncSession) -> int:
"""Atomically increment global version number, FOR UPDATE lock ensures concurrent safety"""
row = (
await db_session.execute(
text(
"SELECT version FROM atserver.replicache_server WHERE id = 'global' FOR UPDATE"
)
)
).scalar_one()
next_version = row + 1
await db_session.execute(
text("UPDATE atserver.replicache_server SET version = :v WHERE id = 'global'"),
{"v": next_version},
)
return next_version
Key Points:
FOR UPDATE locks the global row of the replicache_server tablePerformance Considerations:
Networks are unreliable, clients may repeatedly send the same mutation. How to ensure idempotency?
async def process_mutation(
db_session: AsyncSession,
client_group_id: str,
client_id: str,
mutation: dict,
canvas_id: str,
) -> None:
"""Process single mutation: idempotency check + dispatch + fault tolerance"""
# Get or create client record
client = (
await db_session.execute(select(ReplicacheClientModel).filter_by(id=client_id))
).scalar_one_or_none()
if client is None:
client = ReplicacheClientModel(
id=client_id,
client_group_id=client_group_id,
last_mutation_id=0,
version=0,
)
db_session.add(client)
mutation_id = mutation["id"]
# Idempotent: skip mutation already processed
if mutation_id <= client.last_mutation_id:
return
# Gap tolerance: log warning but continue processing
if mutation_id != client.last_mutation_id + 1:
logger.warning(
"Mutation gap: client=%s expected=%d got=%d",
client_id,
client.last_mutation_id + 1,
mutation_id,
)
# Dispatch mutation, don't update last_mutation_id on failure for client retry
try:
await _dispatch_mutation(db_session, canvas_id, mutation, next_version)
# Update client state on success
client.last_mutation_id = mutation_id
client.version = next_version
except Exception:
logger.exception("Mutation failed: client=%s mutation=%s", client_id, mutation)
raise
Key Points:
last_mutation_idFull synchronization wastes a lot of bandwidth. Replicache's Pull protocol supports incremental synchronization:
async def build_pull_response(
db_session: AsyncSession,
client_group_id: str,
canvas_id: str,
from_version: int,
) -> dict:
"""Build pull response: full (from_version=0) or incremental"""
# Current server version (not incremented)
current_version = (
await db_session.execute(
text("SELECT version FROM atserver.replicache_server WHERE id = 'global'")
)
).scalar_one()
if from_version == 0:
patch = await _build_initial_patch(db_session, canvas_id)
else:
patch = await _build_incremental_patch(db_session, canvas_id, from_version)
# Query changed clients
clients = (
(
await db_session.execute(
select(ReplicacheClientModel).filter(
ReplicacheClientModel.client_group_id == client_group_id,
ReplicacheClientModel.version > from_version,
)
)
)
.scalars()
.all()
)
return {
"cookie": max(current_version, 1),
"lastMutationIDChanges": {c.id: c.last_mutation_id for c in clients},
"patch": patch,
}
Implementation of Incremental Synchronization:
async def _build_incremental_patch(
db_session: AsyncSession, canvas_id: str, from_version: int
) -> list[dict]:
"""Incremental pull: return changes where version > from_version"""
patch = []
# Canvas changes
canvases = (
(
await db_session.execute(
select(CanvasModel).filter(
CanvasModel.id == canvas_id, CanvasModel.version > from_version
)
)
)
.scalars()
.all()
)
for canvas in canvases:
if canvas.deleted:
patch.append({"op": "del", "key": f"canvas/{canvas.id}"})
else:
patch.append(
{
"op": "put",
"key": f"canvas/{canvas.id}",
"value": _serialize_canvas(canvas),
}
)
# Node changes (similar)
nodes = (
(
await db_session.execute(
select(NodeModel).filter(
NodeModel.canvas_id == canvas_id, NodeModel.version > from_version
)
)
)
.scalars()
.all()
)
# ... serialize nodes
return patch
Key Points:
version fieldversion > from_version{"op": "put/del", "key": "...", "value": {...}}When one instance processes a mutation, how to notify other instances?
PokeBroadcaster Design:
class PokeBroadcaster:
"""PG LISTEN/NOTIFY -> asyncio.Queue broadcaster"""
def __init__(self):
self._listeners: dict[str, set[asyncio.Queue]] = defaultdict(set)
self._pg_conn: asyncpg.Connection | None = None
def subscribe(self, canvas_id: str) -> asyncio.Queue:
"""SSE endpoint call, subscribe to poke signals for a canvas"""
queue: asyncio.Queue = asyncio.Queue(maxsize=1)
self._listeners[canvas_id].add(queue)
return queue
def unsubscribe(self, canvas_id: str, queue: asyncio.Queue):
"""Called when SSE endpoint disconnects"""
self._listeners[canvas_id].discard(queue)
if not self._listeners[canvas_id]:
del self._listeners[canvas_id]
async def start_listener(self, pg_dsn: str):
"""Called once at application startup, listen on canvas_poke channel"""
import asyncpg as _asyncpg
try:
self._pg_conn = await _asyncpg.connect(pg_dsn)
await self._pg_conn.add_listener("canvas_poke", self._on_pg_notify)
logger.info("PokeBroadcaster: listening on canvas_poke channel")
except Exception:
logger.exception("PokeBroadcaster: failed to start PG listener")
def _on_pg_notify(self, conn: Any, pid: int, channel: str, canvas_id: str):
"""PG notification callback -> forward to local subscribers"""
self._poke_local(canvas_id)
def _poke_local(self, canvas_id: str):
"""Send poke to all SSE queues for this canvas in the current process"""
for queue in self._listeners.get(canvas_id, set()):
try:
queue.put_nowait(True)
except asyncio.QueueFull:
pass # maxsize=1, merge rapid successive pokes
async def poke(self, canvas_id: str, db_session: Any):
"""Called during write operation in transaction: PG NOTIFY broadcast"""
await db_session.execute(
text("SELECT pg_notify('canvas_poke', :cid)"),
{"cid": canvas_id},
)
Workflow:
start_listener(), listening on canvas_poke channelsubscribe(canvas_id), gets an asyncio.Queuepoke(canvas_id, db_session)pg_notify broadcasts to all listeners after transaction commit_on_pg_notify callback triggers, sends signal to local subscription queuesKey Design:
maxsize=1: Merge rapid successive pokes, avoid SSE push stormsput_nowait + QueueFull ignore: If queue is full, there's already a pending poke to pushpg_notify: Ensures atomicity of notification and data changesEach Node may be associated with a DataFrame (query result). Loading one by one creates N+1 query problems.
Optimization Solution:
async def _load_dataframe_headers(
db_session: AsyncSession, nodes: Sequence[NodeModel]
) -> dict[str, list[dict]]:
"""Batch get dataframe headers associated with nodes"""
df_ids = [n.dataframe_id for n in nodes if n.dataframe_id]
if not df_ids:
return {}
rows = (
await db_session.execute(
select(DataframeModel.id, DataframeModel.header).filter(
DataframeModel.id.in_(df_ids)
)
)
).all()
return {r.id: r.header for r in rows}
Effect:
If there are 100 mutations in 1 second, does it need to push 100 pokes?
Optimization Solution:
maxsize=1: Cache at most 1 pokeEffect:
Current global version number increments serially, which may become a bottleneck.
Optimization Direction:
Challenges:
| Solution | Real-time | Horizontal Scaling | Breakpoint Resume | Deployment Cost | Applicable Scenario |
|---|---|---|---|---|---|
| Replicache + PG | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | Small-medium collaborative |
| WebSocket + Redis | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐ | Large-scale real-time apps |
| Polling | ⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | Low real-time requirements |
| CRDT (Yjs) | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | Text collaborative editing |
Why AskTable Chose Replicache + PG:
Problem: NOTIFY's payload is maximum 8000 bytes.
Solution:
canvas_id (a few dozen bytes)Problem: After client disconnects, how does the server clean up subscriptions?
Solution:
StreamingResponse supports finally blockunsubscribe() in finallyasync def sse_endpoint(canvas_id: str):
queue = poke_broadcaster.subscribe(canvas_id)
try:
async for _ in queue:
yield f"data: poke\n\n"
finally:
poke_broadcaster.unsubscribe(canvas_id, queue)
Problem: When client Pulls, server version number is already far ahead of client.
Solution:
version > from_versionAskTable's real-time collaborative editing architecture achieves:
✅ Millisecond-level Real-time Performance: < 100ms end-to-end latency
✅ High Reliability: Idempotency design + breakpoint resume
✅ Horizontal Scaling: Stateless application layer + PostgreSQL broadcast
✅ Low-cost Deployment: Zero additional dependencies, reusing existing infrastructure
We plan to open source PokeBroadcaster and Replicache server implementation to help more teams quickly build real-time collaborative applications. Stay tuned!
Related Reading:
Technical Exchange:
sidebar.noProgrammingNeeded
sidebar.startFreeTrial