AskTable
sidebar.freeTrial

Canvas Streaming and Resume: Redis Streams as the Event Bus

AskTable Team
AskTable Team 2026-03-05

In AI data analysis scenarios, a SQL query may take seconds, and a Python analysis script may run for minutes. How do we let users see execution progress in real-time? How do we recover pushes after network disconnection? How do we ensure no events are lost under multi-instance deployment?

AskTable's Canvas streaming execution architecture achieves a high-performance, high-reliability real-time push system through the combination of Redis Streams + SSE + arq.

This article deeply analyzes the design and implementation of this architecture.


1. Why Do We Need Streaming Execution?

1.1 Pain Points of Traditional Polling Solutions

Short Polling:

// Client polls once per second
setInterval(async () => {
    const status = await fetch(`/api/node/${nodeId}/status`);
    updateUI(status);
}, 1000);

Problems:

  • ❌ High latency (1-3 seconds)
  • ❌ High server pressure (lots of invalid requests)
  • ❌ Cannot push in real-time (like AI-generated text streams)

Long Polling:

# Server blocks waiting until there are new events
async def get_status(node_id):
    while True:
        if has_new_event(node_id):
            return get_event(node_id)
        await asyncio.sleep(0.1)

Problems:

  • ❌ Limited by connection count (each client occupies one connection)
  • ❌ Cannot recover after disconnection (need to repoll)
  • ❌ Difficult to scale horizontally (connections bound to specific instances)

1.2 What Do We Need?

Real-time: Millisecond-level push latency ✅ Breakpoint Resume: Automatically recover after network disconnection, no event loss ✅ Horizontal Scaling: Support multi-instance deployment, cross-instance event broadcast ✅ Low Cost: Reuse existing Redis, no additional components needed ✅ Reliability: Event persistence, support historical replay


2. Technology Selection: Why Redis Streams?

2.1 Redis Streams: Born for Event Streams

Redis Streams is a data structure introduced in Redis 5.0, designed specifically for message queues and event streams.

Core Advantages:

  • Persistence: Events are written to Redis and persisted, support historical replay
  • Consumer Groups: Support parallel consumption by multiple consumers (we haven't used this yet)
  • Blocking Read: XREAD BLOCK supports long connections waiting for new events
  • Range Query: XRANGE supports querying historical events by ID range
  • Auto ID: Redis automatically generates incrementing Stream IDs (timestamp + sequence number)

Why Not Redis Pub/Sub?

  • Pub/Sub doesn't persist, messages are lost when subscribers go offline
  • Pub/Sub cannot replay historical messages
  • Pub/Sub cannot implement breakpoint resume

Why Not Kafka/RabbitMQ?

  • High deployment cost (need additional components)
  • Too heavyweight for small-medium scale scenarios
  • AskTable already uses Redis, reuse existing infrastructure

2.2 SSE (Server-Sent Events): The Best Choice for One-Way Push

SSE is an HTML5 standard designed specifically for server-to-client one-way push.

Core Advantages:

  • Native Support: Browser native EventSource API
  • Breakpoint Resume: Automatic reconnection through Last-Event-ID
  • Simple and Efficient: Based on HTTP, no WebSocket handshake needed
  • Cross-origin Support: Supports CORS

SSE vs WebSocket:

FeatureSSEWebSocket
DirectionOne-way (server → client)Bidirectional
ProtocolHTTPWebSocket
Breakpoint ResumeNative support (Last-Event-ID)Need to implement yourself
Browser SupportAll modern browsersAll modern browsers
ComplexityLowMedium

AskTable's Scenario:

  • Canvas node execution is one-way push (server → client)
  • No need for client to send data in real-time
  • SSE is simpler and more reliable

3. Architecture Design: Three-Layer Event Stream

AskTable's streaming execution architecture is divided into three layers:

加载图表中...

Workflow

  1. User Trigger: User clicks "Run" button in Canvas
  2. Create Stream: FastAPI creates Redis Stream (at:canvas:stream:{canvas_id}:{node_id})
  3. Submit Task: FastAPI submits task to arq queue
  4. SSE Subscription: Client subscribes to event stream via SSE
  5. Async Execution: arq worker executes Agent, writes events to Redis Streams
  6. Real-time Push: SSE endpoint reads events from Redis Streams, pushes to client
  7. Persistence: After task completes, result is persisted to PostgreSQL
  8. TTL Cleanup: Stream has 1 hour TTL after completion, auto-expires

4. Core Implementation: Deep Dive into Source Code

4.1 Redis Streams Event Storage

class NodeStreamStore:
    STREAM_TTL_SECONDS = 3600  # Stream retained for 1 hour after completion
    BLOCK_MS = 2000  # XREAD blocking wait time

    def _stream_key(self, canvas_id: str, node_id: str) -> str:
        """Generate Stream Key"""
        return f"at:canvas:stream:{canvas_id}:{node_id}"

    async def start_stream(self, canvas_id: str, node_id: str):
        """Start new stream, clear old data and create empty stream"""
        key = self._stream_key(canvas_id, node_id)
        async with self._redis.pipeline(transaction=True) as pipe:
            pipe.delete(key)  # Clear old data
            # Seed message so key exists immediately, subscribe skips _init event
            pipe.xadd(key, {"event": StreamEvent.INIT, "data": ""})
            await pipe.execute()

    async def append(self, canvas_id: str, node_id: str, event: str, data: str) -> str:
        """Append event to stream, return stream ID"""
        key = self._stream_key(canvas_id, node_id)
        stream_id = await self._redis.xadd(
            key, {"event": event, "data": data}, maxlen=STREAM_MAXLEN, approximate=True
        )
        return stream_id

    async def complete(self, canvas_id: str, node_id: str):
        """Mark stream as complete, set TTL"""
        key = self._stream_key(canvas_id, node_id)
        await self._redis.expire(key, self.STREAM_TTL_SECONDS)

Key Points:

  • Stream Key Design: at:canvas:stream:{canvas_id}:{node_id}, each node has independent Stream
  • Seed Message: _init event makes Stream exist immediately, avoiding key not found when subscribing
  • MAXLEN Limit: Limit maximum stream length (10000), avoid memory overflow
  • TTL Cleanup: Set 1 hour TTL after completion, auto-expires

4.2 Event Type Definition

class StreamEvent(StrEnum):
    INIT = "_init"  # Seed event (not pushed to client)
    START = "start"  # Task started
    DELTA = "delta"  # Incremental data (like AI-generated text)
    DONE = "done"  # Task completed
    ERROR = "error"  # Task failed

_TERMINAL_EVENTS = frozenset((StreamEvent.DONE, StreamEvent.ERROR))

Event Lifecycle:

INIT → START → DELTA × N → DONE/ERROR

4.3 Breakpoint Resume: XRANGE + XREAD

async def subscribe(
    self, canvas_id: str, node_id: str, from_id: str = "0-0"
) -> AsyncGenerator[tuple[str, str, str], None]:
    """Subscribe to stream events, support breakpoint resume.

    1. XRANGE replay historical messages after from_id
    2. If no terminal event (done/error) encountered, switch to XREAD BLOCK to wait for real-time messages
    3. Terminate when done/error encountered

    from_id: "0-0" to read from beginning, or pass Last-Event-ID to implement breakpoint resume
    """
    key = self._stream_key(canvas_id, node_id)

    # Phase 1: Replay history (XRANGE)
    range_start = f"({from_id}" if from_id != "0-0" else "-"
    raw_entries = await self._redis.xrange(key, min=range_start)

    last_id = from_id
    for stream_id, event, data, terminal in self._iter_entries(raw_entries):
        last_id = stream_id
        yield (stream_id, event, data)
        if terminal:
            return

    # Phase 2: Real-time wait (XREAD BLOCK)
    while True:
        results = await self._redis.xread(
            {key: last_id}, block=self.BLOCK_MS, count=10
        )
        if not results:
            # Timeout, check if stream still exists
            if not await self._redis.exists(key):
                return
            continue

        for _stream_name, entries in results:
            for stream_id, event, data, terminal in self._iter_entries(entries):
                last_id = stream_id
                yield (stream_id, event, data)
                if terminal:
                    return

Key Points:

  • Two-phase Reading:
    1. XRANGE: Replay historical messages (support breakpoint resume)
    2. XREAD BLOCK: Block and wait for new messages (real-time push)
  • Exclusive Range: ({from_id} skips from_id itself, avoid duplicate push
  • Terminal Detection: Stop subscription after encountering DONE/ERROR event
  • Timeout Handling: After XREAD timeout, check if Stream still exists

Breakpoint Resume Example:

# Client connects for the first time
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="0-0"):
    print(f"{stream_id}: {event} - {data}")
    # Assume disconnection at stream_id="1234567890-0"

# Client reconnects (pass Last-Event-ID)
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="1234567890-0"):
    print(f"{stream_id}: {event} - {data}")
    # Continue pushing after "1234567890-0", no omissions

4.4 SSE Endpoint Implementation

@router.get("/canvas/{canvas_id}/node/{node_id}/stream")
async def stream_node_events(canvas_id: str, node_id: str, last_event_id: str = "0-0"):
    """SSE endpoint: push node execution events"""

    async def event_generator():
        try:
            async for stream_id, event, data in stream_store.subscribe(
                canvas_id, node_id, from_id=last_event_id
            ):
                # SSE format: id + event + data
                yield f"id: {stream_id}\n"
                yield f"event: {event}\n"
                yield f"data: {data}\n\n"
        except Exception as e:
            # Error event
            yield f"event: error\n"
            yield f"data: {json.dumps({'message': str(e)})}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Disable Nginx buffering
        },
    )

Key Points:

  • SSE Format: id + event + data + blank line
  • Last-Event-ID: Automatically passed when client reconnects after disconnection
  • Streaming Response: StreamingResponse supports async generators
  • Nginx Compatible: X-Accel-Buffering: no disables Nginx buffering

Client Code:

const eventSource = new EventSource(
    `/api/canvas/${canvasId}/node/${nodeId}/stream`
);

eventSource.addEventListener("start", (e) => {
    console.log("Task started");
});

eventSource.addEventListener("delta", (e) => {
    const data = JSON.parse(e.data);
    updateUI(data);  // Update UI in real-time
});

eventSource.addEventListener("done", (e) => {
    console.log("Task completed");
    eventSource.close();
});

eventSource.addEventListener("error", (e) => {
    console.error("Task failed", e.data);
    eventSource.close();
});

// Auto-reconnect on disconnection (native browser support)
eventSource.onerror = () => {
    console.log("Connection lost, reconnecting...");
    // EventSource will automatically reconnect and pass Last-Event-ID
};

4.5 arq Async Task Execution

async def run_agent_task(canvas_id: str, node_id: str, message: str):
    """Execute Agent in background, write events to Redis Stream"""
    try:
        async with node_status_handler(canvas_id, node_id):
            # 1. Write START event
            await stream_store.append(canvas_id, node_id, StreamEvent.START, "null")

            # 2. Create Agent session
            async with unit_of_work() as db:
                fresh_node = await service.get_node(db, canvas_id, node_id)
                handler = get_handler(fresh_node.type)
                agent = await _create_agent_session(db, fresh_node, handler, canvas_id)

                # 3. Streamingly execute Agent
                async for event in agent.run(message):
                    await stream_store.append(
                        canvas_id, node_id, StreamEvent.DELTA, json.dumps(event)
                    )

                # 4. Persist result
                agent_result = agent.get_result()
                streaming_result = handler.format_result(fresh_node, agent_result)

                if streaming_result is not None:
                    merged_data = {**fresh_node.data, **streaming_result.data}
                    df_id = await persist_node_dataframe(merged_data, fresh_node.title, db)
                    await service.update_node(
                        db_session=db,
                        canvas_id=canvas_id,
                        node_id=node_id,
                        data=merged_data,
                        dataframe_id=df_id,
                    )

        # 5. Write DONE event
        await stream_store.append(canvas_id, node_id, StreamEvent.DONE, "null")
    except Exception as e:
        # 6. Write ERROR event
        await stream_store.append(
            canvas_id, node_id, StreamEvent.ERROR, json.dumps({"message": str(e)[:200]})
        )
        raise
    finally:
        # 7. Set TTL
        await stream_store.complete(canvas_id, node_id)

Key Points:

  • Async Execution: arq worker executes in background, doesn't block API request
  • Streaming Write: Agent writes to Stream immediately when generating each piece of content
  • Result Persistence: After task completes, result is written to PostgreSQL
  • Error Handling: Write ERROR event on exception, client can perceive it
  • TTL Cleanup: Set TTL regardless of success or failure

5. Performance Optimization and Reliability Assurance

5.1 Performance Optimization

XREAD Batch Read:

results = await self._redis.xread(
    {key: last_id}, block=self.BLOCK_MS, count=10
)
  • count=10: Read up to 10 messages each time
  • Reduces network round trips

MAXLEN Limit:

stream_id = await self._redis.xadd(
    key, {"event": event, "data": data}, maxlen=STREAM_MAXLEN, approximate=True
)
  • maxlen=10000: Limit maximum stream length
  • approximate=True: Allow approximate trimming, better performance

TTL Auto Cleanup:

await self._redis.expire(key, self.STREAM_TTL_SECONDS)
  • Auto-expires after 1 hour of completion
  • Avoids Redis memory leaks

5.2 Reliability Assurance

Event Persistence:

  • Redis Streams persist to disk (AOF/RDB)
  • Events are not lost even if Redis restarts

Breakpoint Resume:

  • After client disconnection, recover through Last-Event-ID
  • XRANGE replays historical messages, no omissions

Cross-Instance Broadcast:

  • Redis Streams are global, shared by all instances
  • Any instance's SSE endpoint can read events

Idempotency:

  • Stream ID is unique, client can deduplicate
  • Duplicate pushes won't cause data errors

6. Complex Scenario Handling

6.1 Long-Running Tasks

# Task may run for minutes
async for event in agent.run(message):
    await stream_store.append(canvas_id, node_id, StreamEvent.DELTA, json.dumps(event))
    # Push immediately when each piece of content is generated

Advantages:

  • User sees progress in real-time
  • No need to wait for task completion

6.2 Multi-Client Subscription

# Client A subscribes
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id):
    print(f"Client A: {event}")

# Client B subscribes (simultaneously)
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id):
    print(f"Client B: {event}")

Advantages:

  • Multiple clients can subscribe to the same Stream simultaneously
  • Each client reads independently without interference

6.3 Historical Replay

# After task completes, new client connects
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="0-0"):
    print(f"{event}: {data}")
    # Complete replay of all historical events

Advantages:

  • Supports "replay" functionality
  • User can view the complete execution process of the task

7. Comparison with Other Solutions

SolutionReal-timeBreakpoint ResumeHorizontal ScalingDeployment CostReliability
Redis Streams + SSE⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
WebSocket + Redis Pub/Sub⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Short Polling⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Long Polling⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Kafka + WebSocket⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐

Why AskTable Chose Redis Streams + SSE:

  • Real-time performance meets requirements (< 100ms)
  • Native breakpoint resume support
  • Simple horizontal scaling (stateless SSE endpoints)
  • Zero additional dependencies (reuse Redis)
  • High reliability (event persistence)

8. Practical Experience and Pitfalls

8.1 Nginx Buffering Problem

Problem: Nginx buffers responses by default, causing SSE delays.

Solution:

headers={
    "X-Accel-Buffering": "no",  # Disable Nginx buffering
}

8.2 XREAD Timeout Handling

Problem: After XREAD BLOCK times out, how to determine if Stream still exists?

Solution:

if not results:
    # Timeout, check if stream still exists
    if not await self._redis.exists(key):
        return  # Stream deleted, stop subscription
    continue  # Stream still exists, continue waiting

8.3 Stream ID Format

Problem: Redis Stream ID format is {timestamp}-{sequence}, how to parse correctly?

Solution:

  • No need to parse, use directly as string
  • Redis ensures IDs are strictly incrementing
  • Both XRANGE and XREAD support string comparison

8.4 Event Deduplication

Problem: Client may receive duplicate events (like network jitter).

Solution:

  • Client records already processed stream_id
  • Use Set for deduplication
const processedIds = new Set();

eventSource.addEventListener("delta", (e) => {
    if (processedIds.has(e.lastEventId)) {
        return;  // Already processed, skip
    }
    processedIds.add(e.lastEventId);
    updateUI(JSON.parse(e.data));
});

9. Summary and Outlook

AskTable's Canvas streaming execution architecture achieves:

Real-time: Millisecond-level push latency ✅ Breakpoint Resume: Automatically recover after network disconnection, no omissions ✅ Horizontal Scaling: Support multi-instance deployment, cross-instance event broadcast ✅ Low Cost: Reuse Redis, no additional components needed ✅ High Reliability: Event persistence, support historical replay

Future Optimization Directions

  1. Consumer Groups: Use Redis Streams' consumer group functionality for load balancing
  2. Event Compression: Compress large-volume events to reduce network transmission
  3. Multi-tenant Isolation: Shard Streams by tenant to improve performance
  4. Monitoring Alerts: Monitor Stream length, latency and other metrics

Open Source Plan

We plan to open source NodeStreamStore and SSE endpoint implementation to help more teams build real-time push systems. Stay tuned!


Related Reading:

Technical Exchange:

cta.readyToSimplify

sidebar.noProgrammingNeededsidebar.startFreeTrial

cta.noCreditCard
cta.quickStart
cta.dbSupport