
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
In AI data analysis scenarios, a SQL query may take seconds, and a Python analysis script may run for minutes. How do we let users see execution progress in real-time? How do we recover pushes after network disconnection? How do we ensure no events are lost under multi-instance deployment?
AskTable's Canvas streaming execution architecture achieves a high-performance, high-reliability real-time push system through the combination of Redis Streams + SSE + arq.
This article deeply analyzes the design and implementation of this architecture.
Short Polling:
// Client polls once per second
setInterval(async () => {
const status = await fetch(`/api/node/${nodeId}/status`);
updateUI(status);
}, 1000);
Problems:
Long Polling:
# Server blocks waiting until there are new events
async def get_status(node_id):
while True:
if has_new_event(node_id):
return get_event(node_id)
await asyncio.sleep(0.1)
Problems:
✅ Real-time: Millisecond-level push latency ✅ Breakpoint Resume: Automatically recover after network disconnection, no event loss ✅ Horizontal Scaling: Support multi-instance deployment, cross-instance event broadcast ✅ Low Cost: Reuse existing Redis, no additional components needed ✅ Reliability: Event persistence, support historical replay
Redis Streams is a data structure introduced in Redis 5.0, designed specifically for message queues and event streams.
Core Advantages:
XREAD BLOCK supports long connections waiting for new eventsXRANGE supports querying historical events by ID rangeWhy Not Redis Pub/Sub?
Why Not Kafka/RabbitMQ?
SSE is an HTML5 standard designed specifically for server-to-client one-way push.
Core Advantages:
EventSource APILast-Event-IDSSE vs WebSocket:
| Feature | SSE | WebSocket |
|---|---|---|
| Direction | One-way (server → client) | Bidirectional |
| Protocol | HTTP | WebSocket |
| Breakpoint Resume | Native support (Last-Event-ID) | Need to implement yourself |
| Browser Support | All modern browsers | All modern browsers |
| Complexity | Low | Medium |
AskTable's Scenario:
AskTable's streaming execution architecture is divided into three layers:
at:canvas:stream:{canvas_id}:{node_id})class NodeStreamStore:
STREAM_TTL_SECONDS = 3600 # Stream retained for 1 hour after completion
BLOCK_MS = 2000 # XREAD blocking wait time
def _stream_key(self, canvas_id: str, node_id: str) -> str:
"""Generate Stream Key"""
return f"at:canvas:stream:{canvas_id}:{node_id}"
async def start_stream(self, canvas_id: str, node_id: str):
"""Start new stream, clear old data and create empty stream"""
key = self._stream_key(canvas_id, node_id)
async with self._redis.pipeline(transaction=True) as pipe:
pipe.delete(key) # Clear old data
# Seed message so key exists immediately, subscribe skips _init event
pipe.xadd(key, {"event": StreamEvent.INIT, "data": ""})
await pipe.execute()
async def append(self, canvas_id: str, node_id: str, event: str, data: str) -> str:
"""Append event to stream, return stream ID"""
key = self._stream_key(canvas_id, node_id)
stream_id = await self._redis.xadd(
key, {"event": event, "data": data}, maxlen=STREAM_MAXLEN, approximate=True
)
return stream_id
async def complete(self, canvas_id: str, node_id: str):
"""Mark stream as complete, set TTL"""
key = self._stream_key(canvas_id, node_id)
await self._redis.expire(key, self.STREAM_TTL_SECONDS)
Key Points:
at:canvas:stream:{canvas_id}:{node_id}, each node has independent Stream_init event makes Stream exist immediately, avoiding key not found when subscribingclass StreamEvent(StrEnum):
INIT = "_init" # Seed event (not pushed to client)
START = "start" # Task started
DELTA = "delta" # Incremental data (like AI-generated text)
DONE = "done" # Task completed
ERROR = "error" # Task failed
_TERMINAL_EVENTS = frozenset((StreamEvent.DONE, StreamEvent.ERROR))
Event Lifecycle:
INIT → START → DELTA × N → DONE/ERROR
async def subscribe(
self, canvas_id: str, node_id: str, from_id: str = "0-0"
) -> AsyncGenerator[tuple[str, str, str], None]:
"""Subscribe to stream events, support breakpoint resume.
1. XRANGE replay historical messages after from_id
2. If no terminal event (done/error) encountered, switch to XREAD BLOCK to wait for real-time messages
3. Terminate when done/error encountered
from_id: "0-0" to read from beginning, or pass Last-Event-ID to implement breakpoint resume
"""
key = self._stream_key(canvas_id, node_id)
# Phase 1: Replay history (XRANGE)
range_start = f"({from_id}" if from_id != "0-0" else "-"
raw_entries = await self._redis.xrange(key, min=range_start)
last_id = from_id
for stream_id, event, data, terminal in self._iter_entries(raw_entries):
last_id = stream_id
yield (stream_id, event, data)
if terminal:
return
# Phase 2: Real-time wait (XREAD BLOCK)
while True:
results = await self._redis.xread(
{key: last_id}, block=self.BLOCK_MS, count=10
)
if not results:
# Timeout, check if stream still exists
if not await self._redis.exists(key):
return
continue
for _stream_name, entries in results:
for stream_id, event, data, terminal in self._iter_entries(entries):
last_id = stream_id
yield (stream_id, event, data)
if terminal:
return
Key Points:
XRANGE: Replay historical messages (support breakpoint resume)XREAD BLOCK: Block and wait for new messages (real-time push)({from_id} skips from_id itself, avoid duplicate pushDONE/ERROR eventXREAD timeout, check if Stream still existsBreakpoint Resume Example:
# Client connects for the first time
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="0-0"):
print(f"{stream_id}: {event} - {data}")
# Assume disconnection at stream_id="1234567890-0"
# Client reconnects (pass Last-Event-ID)
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="1234567890-0"):
print(f"{stream_id}: {event} - {data}")
# Continue pushing after "1234567890-0", no omissions
@router.get("/canvas/{canvas_id}/node/{node_id}/stream")
async def stream_node_events(canvas_id: str, node_id: str, last_event_id: str = "0-0"):
"""SSE endpoint: push node execution events"""
async def event_generator():
try:
async for stream_id, event, data in stream_store.subscribe(
canvas_id, node_id, from_id=last_event_id
):
# SSE format: id + event + data
yield f"id: {stream_id}\n"
yield f"event: {event}\n"
yield f"data: {data}\n\n"
except Exception as e:
# Error event
yield f"event: error\n"
yield f"data: {json.dumps({'message': str(e)})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable Nginx buffering
},
)
Key Points:
id + event + data + blank lineStreamingResponse supports async generatorsX-Accel-Buffering: no disables Nginx bufferingClient Code:
const eventSource = new EventSource(
`/api/canvas/${canvasId}/node/${nodeId}/stream`
);
eventSource.addEventListener("start", (e) => {
console.log("Task started");
});
eventSource.addEventListener("delta", (e) => {
const data = JSON.parse(e.data);
updateUI(data); // Update UI in real-time
});
eventSource.addEventListener("done", (e) => {
console.log("Task completed");
eventSource.close();
});
eventSource.addEventListener("error", (e) => {
console.error("Task failed", e.data);
eventSource.close();
});
// Auto-reconnect on disconnection (native browser support)
eventSource.onerror = () => {
console.log("Connection lost, reconnecting...");
// EventSource will automatically reconnect and pass Last-Event-ID
};
async def run_agent_task(canvas_id: str, node_id: str, message: str):
"""Execute Agent in background, write events to Redis Stream"""
try:
async with node_status_handler(canvas_id, node_id):
# 1. Write START event
await stream_store.append(canvas_id, node_id, StreamEvent.START, "null")
# 2. Create Agent session
async with unit_of_work() as db:
fresh_node = await service.get_node(db, canvas_id, node_id)
handler = get_handler(fresh_node.type)
agent = await _create_agent_session(db, fresh_node, handler, canvas_id)
# 3. Streamingly execute Agent
async for event in agent.run(message):
await stream_store.append(
canvas_id, node_id, StreamEvent.DELTA, json.dumps(event)
)
# 4. Persist result
agent_result = agent.get_result()
streaming_result = handler.format_result(fresh_node, agent_result)
if streaming_result is not None:
merged_data = {**fresh_node.data, **streaming_result.data}
df_id = await persist_node_dataframe(merged_data, fresh_node.title, db)
await service.update_node(
db_session=db,
canvas_id=canvas_id,
node_id=node_id,
data=merged_data,
dataframe_id=df_id,
)
# 5. Write DONE event
await stream_store.append(canvas_id, node_id, StreamEvent.DONE, "null")
except Exception as e:
# 6. Write ERROR event
await stream_store.append(
canvas_id, node_id, StreamEvent.ERROR, json.dumps({"message": str(e)[:200]})
)
raise
finally:
# 7. Set TTL
await stream_store.complete(canvas_id, node_id)
Key Points:
XREAD Batch Read:
results = await self._redis.xread(
{key: last_id}, block=self.BLOCK_MS, count=10
)
count=10: Read up to 10 messages each timeMAXLEN Limit:
stream_id = await self._redis.xadd(
key, {"event": event, "data": data}, maxlen=STREAM_MAXLEN, approximate=True
)
maxlen=10000: Limit maximum stream lengthapproximate=True: Allow approximate trimming, better performanceTTL Auto Cleanup:
await self._redis.expire(key, self.STREAM_TTL_SECONDS)
Event Persistence:
Breakpoint Resume:
Last-Event-IDXRANGE replays historical messages, no omissionsCross-Instance Broadcast:
Idempotency:
# Task may run for minutes
async for event in agent.run(message):
await stream_store.append(canvas_id, node_id, StreamEvent.DELTA, json.dumps(event))
# Push immediately when each piece of content is generated
Advantages:
# Client A subscribes
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id):
print(f"Client A: {event}")
# Client B subscribes (simultaneously)
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id):
print(f"Client B: {event}")
Advantages:
# After task completes, new client connects
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="0-0"):
print(f"{event}: {data}")
# Complete replay of all historical events
Advantages:
| Solution | Real-time | Breakpoint Resume | Horizontal Scaling | Deployment Cost | Reliability |
|---|---|---|---|---|---|
| Redis Streams + SSE | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| WebSocket + Redis Pub/Sub | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| Short Polling | ⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| Long Polling | ⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| Kafka + WebSocket | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ |
Why AskTable Chose Redis Streams + SSE:
Problem: Nginx buffers responses by default, causing SSE delays.
Solution:
headers={
"X-Accel-Buffering": "no", # Disable Nginx buffering
}
Problem: After XREAD BLOCK times out, how to determine if Stream still exists?
Solution:
if not results:
# Timeout, check if stream still exists
if not await self._redis.exists(key):
return # Stream deleted, stop subscription
continue # Stream still exists, continue waiting
Problem: Redis Stream ID format is {timestamp}-{sequence}, how to parse correctly?
Solution:
XRANGE and XREAD support string comparisonProblem: Client may receive duplicate events (like network jitter).
Solution:
stream_idSet for deduplicationconst processedIds = new Set();
eventSource.addEventListener("delta", (e) => {
if (processedIds.has(e.lastEventId)) {
return; // Already processed, skip
}
processedIds.add(e.lastEventId);
updateUI(JSON.parse(e.data));
});
AskTable's Canvas streaming execution architecture achieves:
✅ Real-time: Millisecond-level push latency ✅ Breakpoint Resume: Automatically recover after network disconnection, no omissions ✅ Horizontal Scaling: Support multi-instance deployment, cross-instance event broadcast ✅ Low Cost: Reuse Redis, no additional components needed ✅ High Reliability: Event persistence, support historical replay
We plan to open source NodeStreamStore and SSE endpoint implementation to help more teams build real-time push systems. Stay tuned!
Related Reading:
Technical Exchange:
sidebar.noProgrammingNeeded
sidebar.startFreeTrial