AskTable
sidebar.freeTrial

Real-Time Canvas Collaboration with Replicache and PostgreSQL LISTEN/NOTIFY

AskTable Team
AskTable Team 2026-03-05

In modern data analysis scenarios, team collaboration has become a necessity. Multiple analysts need to work on the same Canvas simultaneously: someone is adjusting SQL queries, someone is optimizing chart styles, someone is adding Python analysis nodes. How do we synchronize these operations in real-time while ensuring data consistency and system performance?

AskTable's Canvas real-time collaborative editing architecture provides an elegant answer: Replicache + PostgreSQL LISTEN/NOTIFY + SSE. This solution not only achieves millisecond-level real-time synchronization but also has advantages in breakpoint resume, cross-instance scaling, and low-cost deployment.

This article deeply analyzes the core design and implementation details of this architecture.


1. Why Do We Need Real-time Collaboration? Canvas's Unique Challenges

1.1 Canvas Is Not a Traditional Document

Traditional collaborative editing (like Google Docs) mainly handles CRDT (Conflict-free Replicated Data Type) merging of text content. But Canvas is far more complex than text:

  • Node Dependency Graph: There are data flow dependencies between nodes (e.g., Chart nodes depend on Data node query results)
  • Asynchronous Execution: SQL queries and Python code execution may take seconds or even minutes
  • Large Data Volume: A single node may produce tens of thousands of rows of DataFrame, requiring efficient incremental synchronization
  • Multi-end Collaboration: Web, mobile, and API calls need unified state management

1.2 Limitations of Traditional Solutions

WebSocket Full-Duplex Solution:

  • ✅ Good real-time performance
  • ❌ Need to maintain long connection state, difficult to scale horizontally
  • ❌ Reconnection after disconnection requires complex state recovery logic

Polling Solution:

  • ✅ Simple to implement
  • ❌ High latency (typically 3-5 seconds)
  • ❌ High server pressure (lots of invalid requests)

What We Need:

  • Millisecond-level real-time performance
  • Breakpoint resume capability
  • Horizontal scaling friendly
  • Low-cost deployment (no dependency on Redis Pub/Sub or other additional components)

2. Technology Selection: Why Replicache + PostgreSQL?

2.1 Replicache: A State Synchronization Framework Built for Collaboration

Replicache is a client-side state management framework designed specifically for real-time collaboration. Its core concept is Optimistic UI + Server Reconciliation.

Core Advantages:

  • Optimistic Updates: Client operations take effect immediately without waiting for server response
  • Incremental Synchronization: Only transmit changed data (patches), not full state
  • Version Management: Conflict detection and resolution based on global version numbers
  • Offline Support: Client caches state, automatically reconnects and synchronizes after going offline

Replicache's Push/Pull Protocol:

加载图表中...
  • Push: Client pushes local operations (mutations) to the server
  • Pull: Client pulls the latest state from the server (incremental or full)

2.2 PostgreSQL LISTEN/NOTIFY: Lightweight Cross-Instance Broadcast

Why not use Redis Pub/Sub?

Advantages of PostgreSQL LISTEN/NOTIFY:

  • Zero Additional Dependencies: AskTable already uses PostgreSQL as the main database
  • Transactional Notifications: pg_notify broadcasts after transaction commit, ensuring consistency
  • Low Latency: Typically < 10ms
  • Horizontal Scaling: Each application instance listens independently, no centralized message queue needed

Applicable Scenarios:

  • Message volume is not large (< 1000 msg/s)
  • Needs strong binding with database transactions
  • Want simplified deployment architecture

3. Architecture Design: Three-Layer Collaboration Mechanism

AskTable's real-time collaboration architecture is divided into three layers:

加载图表中...

3.1 Client Layer: Replicache

  • Maintains local state cache (IndexedDB)
  • Optimistically updates UI
  • Periodically pulls server state
  • Listens for SSE pushes (Poke signals)

3.2 Application Layer: FastAPI + PokeBroadcaster

  • Handles Push/Pull requests
  • Maintains SSE connections (one subscription queue per Canvas)
  • Listens for PostgreSQL NOTIFY and forwards to local subscribers

3.3 Data Layer: PostgreSQL

  • Stores Canvas and Node data
  • Maintains global version number (replicache_server table)
  • Broadcasts changes through LISTEN/NOTIFY

4. Core Implementation: Deep Dive into Source Code

4.1 Version Management: FOR UPDATE Lock Ensures Concurrent Safety

Each mutation needs to increment the global version number. How to ensure uniqueness of version numbers under high concurrency?

async def get_next_version(db_session: AsyncSession) -> int:
    """Atomically increment global version number, FOR UPDATE lock ensures concurrent safety"""
    row = (
        await db_session.execute(
            text(
                "SELECT version FROM atserver.replicache_server WHERE id = 'global' FOR UPDATE"
            )
        )
    ).scalar_one()

    next_version = row + 1

    await db_session.execute(
        text("UPDATE atserver.replicache_server SET version = :v WHERE id = 'global'"),
        {"v": next_version},
    )

    return next_version

Key Points:

  • FOR UPDATE locks the global row of the replicache_server table
  • Other concurrent transactions will block and wait until the current transaction commits
  • Ensures version numbers are strictly incremented with no duplicates

Performance Considerations:

  • Version number increment is a serial operation, which may become a bottleneck -实测: Single instance can support ~500 mutations/s
  • Optimization direction: Sharded version numbers (sharded by Canvas ID)

4.2 Idempotency Design: Avoid Duplicate Processing

Networks are unreliable, clients may repeatedly send the same mutation. How to ensure idempotency?

async def process_mutation(
    db_session: AsyncSession,
    client_group_id: str,
    client_id: str,
    mutation: dict,
    canvas_id: str,
) -> None:
    """Process single mutation: idempotency check + dispatch + fault tolerance"""
    # Get or create client record
    client = (
        await db_session.execute(select(ReplicacheClientModel).filter_by(id=client_id))
    ).scalar_one_or_none()
    
    if client is None:
        client = ReplicacheClientModel(
            id=client_id,
            client_group_id=client_group_id,
            last_mutation_id=0,
            version=0,
        )
        db_session.add(client)

    mutation_id = mutation["id"]

    # Idempotent: skip mutation already processed
    if mutation_id <= client.last_mutation_id:
        return

    # Gap tolerance: log warning but continue processing
    if mutation_id != client.last_mutation_id + 1:
        logger.warning(
            "Mutation gap: client=%s expected=%d got=%d",
            client_id,
            client.last_mutation_id + 1,
            mutation_id,
        )

    # Dispatch mutation, don't update last_mutation_id on failure for client retry
    try:
        await _dispatch_mutation(db_session, canvas_id, mutation, next_version)
        # Update client state on success
        client.last_mutation_id = mutation_id
        client.version = next_version
    except Exception:
        logger.exception("Mutation failed: client=%s mutation=%s", client_id, mutation)
        raise

Key Points:

  • Each client maintains last_mutation_id
  • Server records the maximum mutation ID processed for each client
  • Duplicate mutations are skipped directly
  • Gap tolerance: Allow mutation IDs to be non-consecutive (network packet loss scenarios)

4.3 Incremental Synchronization: Only Transmit Changed Data

Full synchronization wastes a lot of bandwidth. Replicache's Pull protocol supports incremental synchronization:

async def build_pull_response(
    db_session: AsyncSession,
    client_group_id: str,
    canvas_id: str,
    from_version: int,
) -> dict:
    """Build pull response: full (from_version=0) or incremental"""
    # Current server version (not incremented)
    current_version = (
        await db_session.execute(
            text("SELECT version FROM atserver.replicache_server WHERE id = 'global'")
        )
    ).scalar_one()

    if from_version == 0:
        patch = await _build_initial_patch(db_session, canvas_id)
    else:
        patch = await _build_incremental_patch(db_session, canvas_id, from_version)

    # Query changed clients
    clients = (
        (
            await db_session.execute(
                select(ReplicacheClientModel).filter(
                    ReplicacheClientModel.client_group_id == client_group_id,
                    ReplicacheClientModel.version > from_version,
                )
            )
        )
        .scalars()
        .all()
    )

    return {
        "cookie": max(current_version, 1),
        "lastMutationIDChanges": {c.id: c.last_mutation_id for c in clients},
        "patch": patch,
    }

Implementation of Incremental Synchronization:

async def _build_incremental_patch(
    db_session: AsyncSession, canvas_id: str, from_version: int
) -> list[dict]:
    """Incremental pull: return changes where version > from_version"""
    patch = []

    # Canvas changes
    canvases = (
        (
            await db_session.execute(
                select(CanvasModel).filter(
                    CanvasModel.id == canvas_id, CanvasModel.version > from_version
                )
            )
        )
        .scalars()
        .all()
    )
    for canvas in canvases:
        if canvas.deleted:
            patch.append({"op": "del", "key": f"canvas/{canvas.id}"})
        else:
            patch.append(
                {
                    "op": "put",
                    "key": f"canvas/{canvas.id}",
                    "value": _serialize_canvas(canvas),
                }
            )

    # Node changes (similar)
    nodes = (
        (
            await db_session.execute(
                select(NodeModel).filter(
                    NodeModel.canvas_id == canvas_id, NodeModel.version > from_version
                )
            )
        )
        .scalars()
        .all()
    )
    # ... serialize nodes
    
    return patch

Key Points:

  • Each Canvas and Node has a version field
  • Incremental Pull only queries records where version > from_version
  • Patch format: {"op": "put/del", "key": "...", "value": {...}}
  • Client updates local state based on patch

4.4 Cross-Instance Broadcast: PostgreSQL LISTEN/NOTIFY

When one instance processes a mutation, how to notify other instances?

PokeBroadcaster Design:

class PokeBroadcaster:
    """PG LISTEN/NOTIFY -> asyncio.Queue broadcaster"""

    def __init__(self):
        self._listeners: dict[str, set[asyncio.Queue]] = defaultdict(set)
        self._pg_conn: asyncpg.Connection | None = None

    def subscribe(self, canvas_id: str) -> asyncio.Queue:
        """SSE endpoint call, subscribe to poke signals for a canvas"""
        queue: asyncio.Queue = asyncio.Queue(maxsize=1)
        self._listeners[canvas_id].add(queue)
        return queue

    def unsubscribe(self, canvas_id: str, queue: asyncio.Queue):
        """Called when SSE endpoint disconnects"""
        self._listeners[canvas_id].discard(queue)
        if not self._listeners[canvas_id]:
            del self._listeners[canvas_id]

    async def start_listener(self, pg_dsn: str):
        """Called once at application startup, listen on canvas_poke channel"""
        import asyncpg as _asyncpg

        try:
            self._pg_conn = await _asyncpg.connect(pg_dsn)
            await self._pg_conn.add_listener("canvas_poke", self._on_pg_notify)
            logger.info("PokeBroadcaster: listening on canvas_poke channel")
        except Exception:
            logger.exception("PokeBroadcaster: failed to start PG listener")

    def _on_pg_notify(self, conn: Any, pid: int, channel: str, canvas_id: str):
        """PG notification callback -> forward to local subscribers"""
        self._poke_local(canvas_id)

    def _poke_local(self, canvas_id: str):
        """Send poke to all SSE queues for this canvas in the current process"""
        for queue in self._listeners.get(canvas_id, set()):
            try:
                queue.put_nowait(True)
            except asyncio.QueueFull:
                pass  # maxsize=1, merge rapid successive pokes

    async def poke(self, canvas_id: str, db_session: Any):
        """Called during write operation in transaction: PG NOTIFY broadcast"""
        await db_session.execute(
            text("SELECT pg_notify('canvas_poke', :cid)"),
            {"cid": canvas_id},
        )

Workflow:

  1. Application Startup: Each instance calls start_listener(), listening on canvas_poke channel
  2. Client Subscription: SSE endpoint calls subscribe(canvas_id), gets an asyncio.Queue
  3. Write Operation: After mutation processing completes, call poke(canvas_id, db_session)
  4. PostgreSQL Broadcast: pg_notify broadcasts to all listeners after transaction commit
  5. Local Forward: _on_pg_notify callback triggers, sends signal to local subscription queues
  6. SSE Push: SSE endpoint reads signal from queue, pushes to client

Key Design:

  • Queue with maxsize=1: Merge rapid successive pokes, avoid SSE push storms
  • put_nowait + QueueFull ignore: If queue is full, there's already a pending poke to push
  • Transactional pg_notify: Ensures atomicity of notification and data changes

5. Performance Optimization: From Theory to Practice

5.1 Batch Load DataFrame Headers

Each Node may be associated with a DataFrame (query result). Loading one by one creates N+1 query problems.

Optimization Solution:

async def _load_dataframe_headers(
    db_session: AsyncSession, nodes: Sequence[NodeModel]
) -> dict[str, list[dict]]:
    """Batch get dataframe headers associated with nodes"""
    df_ids = [n.dataframe_id for n in nodes if n.dataframe_id]
    if not df_ids:
        return {}
    rows = (
        await db_session.execute(
            select(DataframeModel.id, DataframeModel.header).filter(
                DataframeModel.id.in_(df_ids)
            )
        )
    ).all()
    return {r.id: r.header for r in rows}

Effect:

  • Original: 100 nodes = 100 queries
  • After optimization: 100 nodes = 1 query
  • Pull response time reduced from ~500ms to ~50ms

5.2 SSE Queue Merging

If there are 100 mutations in 1 second, does it need to push 100 pokes?

Optimization Solution:

  • Queue maxsize=1: Cache at most 1 poke
  • After client receives poke, immediately Pull to get all accumulated changes
  • Avoid SSE push storms

Effect:

  • Original: 100 mutations = 100 SSE pushes + 100 Pulls
  • After optimization: 100 mutations = 1 SSE push + 1 Pull (contains all changes)

5.3 Version Number Sharding (Future Optimization)

Current global version number increments serially, which may become a bottleneck.

Optimization Direction:

  • Shard version number by Canvas ID
  • Each Canvas independently maintains version number
  • Avoid cross-Canvas lock competition

Challenges:

  • Need to modify Replicache protocol (support multiple version numbers)
  • Increases client complexity

6. Comparison with Other Solutions

SolutionReal-timeHorizontal ScalingBreakpoint ResumeDeployment CostApplicable Scenario
Replicache + PG⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐Small-medium collaborative
WebSocket + Redis⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐Large-scale real-time apps
Polling⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐Low real-time requirements
CRDT (Yjs)⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐Text collaborative editing

Why AskTable Chose Replicache + PG:

  • Real-time performance meets requirements (< 100ms)
  • Zero additional dependencies (PostgreSQL already available)
  • Breakpoint resume works out of the box
  • Horizontal scaling is simple (stateless application layer)

7. Practical Experience and Pitfalls

7.1 Limitations of PostgreSQL NOTIFY

Problem: NOTIFY's payload is maximum 8000 bytes.

Solution:

  • Only pass canvas_id (a few dozen bytes)
  • After client receives poke, Pull to get complete data

7.2 SSE Connection Lifecycle Management

Problem: After client disconnects, how does the server clean up subscriptions?

Solution:

  • FastAPI's StreamingResponse supports finally block
  • Call unsubscribe() in finally
async def sse_endpoint(canvas_id: str):
    queue = poke_broadcaster.subscribe(canvas_id)
    try:
        async for _ in queue:
            yield f"data: poke\n\n"
    finally:
        poke_broadcaster.unsubscribe(canvas_id, queue)

7.3 Handling Version Number Conflicts

Problem: When client Pulls, server version number is already far ahead of client.

Solution:

  • Incremental Pull returns all changes where version > from_version
  • Client applies patches one by one, eventually converges to latest state

8. Summary and Outlook

AskTable's real-time collaborative editing architecture achieves:

Millisecond-level Real-time Performance: < 100ms end-to-end latency
High Reliability: Idempotency design + breakpoint resume
Horizontal Scaling: Stateless application layer + PostgreSQL broadcast
Low-cost Deployment: Zero additional dependencies, reusing existing infrastructure

Future Optimization Directions

  1. Version Number Sharding: Shard by Canvas ID to improve concurrency performance
  2. Conflict Resolution: Support more complex conflict resolution strategies (like CRDT)
  3. Offline Editing: Local editing and synchronization when client is completely offline
  4. Cross-region Synchronization: Consistency guarantees for multi-datacenter deployment

Open Source Plan

We plan to open source PokeBroadcaster and Replicache server implementation to help more teams quickly build real-time collaborative applications. Stay tuned!


Related Reading:

Technical Exchange:

cta.readyToSimplify

sidebar.noProgrammingNeededsidebar.startFreeTrial

cta.noCreditCard
cta.quickStart
cta.dbSupport