
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
In Canvas, there are complex dependency relationships between nodes: Chart nodes depend on Data node query results, Python nodes depend on DataFrames from multiple parent nodes. When one node is updated, how do you intelligently refresh child nodes that depend on it? How do you avoid duplicate calculations? How do you ensure correct execution order?
AskTable's Canvas dependency graph engine implements efficient and reliable node execution management through the combination of topological sorting + state machine + incremental computation.
This article will deeply analyze the design and implementation of this engine.
Nodes in Canvas form a Directed Acyclic Graph (DAG):
Dependencies:
Manual Refresh:
Full Refresh:
✅ Smart Refresh: Automatically identify nodes that need to be refreshed ✅ Incremental Computation: Only refresh affected nodes ✅ Correct Order: Execute in dependency order ✅ Concurrent Execution: Independent nodes can execute in parallel ✅ Cache Reuse: Unchanged nodes reuse cached results
Topological sort is a linear ordering of a DAG such that for every directed edge u → v, node u comes before node v.
Example:
Dependencies: A → C, A → D, B → D, D → E
Topological sort result: A, B, D, C, E (or B, A, D, C, E)
def topological_sort(nodes: list[NodeModel]) -> list[NodeModel]:
"""
Topologically sort node list (Kahn's algorithm)
Only considers dependencies within the node list, external dependencies not involved in sorting.
"""
if not nodes:
return []
# 1. Build graph and in-degree table
node_ids = {n.id for n in nodes}
in_degree: dict[str, int] = {n.id: 0 for n in nodes}
graph: dict[str, list[str]] = defaultdict(list)
for node in nodes:
for dep_id in node.dependencies or []:
if dep_id in node_ids: # Only consider internal dependencies
graph[dep_id].append(node.id)
in_degree[node.id] += 1
# 2. Add nodes with in-degree 0 to queue
queue = deque(nid for nid, deg in in_degree.items() if deg == 0)
sorted_ids: list[str] = []
# 3. BFS traversal
while queue:
nid = queue.popleft()
sorted_ids.append(nid)
# After removing this node, update in-degree of child nodes
for child in graph[nid]:
in_degree[child] -= 1
if in_degree[child] == 0:
queue.append(child)
# 4. Return sorted node list
id_to_node = {n.id: n for n in nodes}
return [id_to_node[nid] for nid in sorted_ids if nid in id_to_node]
Key Points:
Example:
# Node dependencies
nodes = [
Node(id="A", dependencies=[]),
Node(id="B", dependencies=[]),
Node(id="C", dependencies=["A"]),
Node(id="D", dependencies=["A", "B"]),
Node(id="E", dependencies=["D"]),
]
# Topological sort result
sorted_nodes = topological_sort(nodes)
# Result: [A, B, C, D, E] or [B, A, C, D, E]
class NodeStatus:
PENDING = "pending" # Pending execution
STREAMING = "streaming" # Executing
SUCCESS = "success" # Executed successfully
ERROR = "error" # Execution failed
State Transition:
@asynccontextmanager
async def node_status_handler(canvas_id: str, node_id: str):
"""
Concurrent-safe node state management.
State transitions use independent transactions and commit immediately.
"""
# 1. Acquire node lock (atomic operation)
await db.acquire_node_lock(
canvas_id,
node_id,
expected_statuses=[NodeStatus.PENDING, NodeStatus.SUCCESS, NodeStatus.ERROR],
processing_status=NodeStatus.STREAMING,
)
log.info(f"Node {node_id}: -> streaming")
try:
yield # Execute node task
except Exception as e:
# 2. Execution failed, release lock and set ERROR status
await db.release_node_lock(canvas_id, node_id, NodeStatus.ERROR, str(e)[:200])
log.error(f"Node {node_id} failed: {e}")
raise
else:
# 3. Execution successful, release lock and set SUCCESS status
await db.release_node_lock(canvas_id, node_id, NodeStatus.SUCCESS)
log.info(f"Node {node_id}: streaming -> success")
Key Points:
acquire_node_lock uses database row lock (FOR UPDATE)acquire_node_lock Implementation:
async def acquire_node_lock(
canvas_id: str,
node_id: str,
expected_statuses: list[str],
processing_status: str,
):
"""Atomically acquire node lock"""
async with unit_of_work() as db:
# Lock the node row using FOR UPDATE
node = (
await db.execute(
select(NodeModel)
.filter(NodeModel.canvas_id == canvas_id, NodeModel.id == node_id)
.with_for_update()
)
).scalar_one()
# Check if status matches expectation
if node.status not in expected_statuses:
raise errors.ConflictError(f"Node {node_id} is {node.status}, cannot acquire lock")
# Update status to processing
node.status = processing_status
await db.commit()
When does a node need to be refreshed?
Node itself changed:
Dependent node changed:
Manual trigger:
def find_affected_nodes(
all_nodes: list[NodeModel], changed_node_id: str
) -> list[NodeModel]:
"""Find all descendant nodes affected by a node change"""
# 1. Build dependency graph
graph: dict[str, list[str]] = defaultdict(list)
for node in all_nodes:
for dep_id in node.dependencies or []:
graph[dep_id].append(node.id)
# 2. BFS traverse all descendant nodes
affected_ids = set()
queue = deque([changed_node_id])
while queue:
nid = queue.popleft()
for child_id in graph[nid]:
if child_id not in affected_ids:
affected_ids.add(child_id)
queue.append(child_id)
# 3. Return affected nodes (topologically sorted)
affected_nodes = [n for n in all_nodes if n.id in affected_ids]
return topological_sort(affected_nodes)
Example:
# Node dependencies: A → C, A → D, B → D, D → E
# When node A changes
affected = find_affected_nodes(all_nodes, "A")
# Result: [C, D, E] (topologically sorted)
# When node D changes
affected = find_affected_nodes(all_nodes, "D")
# Result: [E]
async def load_parent_context(
db: AsyncSession,
canvas_id: str,
dependency_ids: list[str],
sample_rows: int = 5,
) -> list[dict]:
"""Load parent node context (reuse cached DataFrame)"""
parent_contexts = []
for dep_id in dependency_ids:
parent = await service.get_node(db, canvas_id, dep_id)
# Check parent node status
if parent.status != "success":
continue # Skip parents that didn't succeed
# Check if there's a cached DataFrame
if not parent.dataframe_id:
continue # Skip parents without data
# Read DataFrame from Parquet file
project_id = project_id_var.get()
try:
raw_df = await dataframe_client.read(f"{project_id}/{parent.dataframe_id}")
except FileNotFoundError:
continue # Parquet file missing
# Build context
records = raw_df.to_dict(orient="records")
context = {
"id": parent.id,
"description": parent.description,
"dataframe": {
"columns": raw_df.columns.tolist(),
"data": records[:sample_rows], # Sampled data
"sample_data": records[:sample_rows],
},
}
parent_contexts.append(context)
return parent_contexts
Key Points:
async def batch_refresh_nodes(canvas_id: str, node_ids: list[str]):
"""Batch refresh nodes (execute concurrently by topological sort)"""
async with unit_of_work() as db:
# 1. Get all nodes
all_nodes = await service.get_nodes(db, canvas_id)
# 2. Find nodes that need to be refreshed
nodes_to_refresh = [n for n in all_nodes if n.id in node_ids]
# 3. Topological sort
sorted_nodes = topological_sort(nodes_to_refresh)
# 4. Group by level (same level can execute concurrently)
levels = group_by_level(sorted_nodes, all_nodes)
# 5. Execute level by level
for level_nodes in levels:
# Execute nodes at same level concurrently
tasks = [
execute_node(canvas_id, node.id)
for node in level_nodes
]
await asyncio.gather(*tasks, return_exceptions=True)
def group_by_level(
sorted_nodes: list[NodeModel], all_nodes: list[NodeModel]
) -> list[list[NodeModel]]:
"""Group nodes by dependency level"""
# 1. Build dependency graph
node_map = {n.id: n for n in all_nodes}
# 2. Calculate level for each node
levels: dict[str, int] = {}
for node in sorted_nodes:
# Node level = max(parent node level) + 1
max_parent_level = -1
for dep_id in node.dependencies or []:
if dep_id in levels:
max_parent_level = max(max_parent_level, levels[dep_id])
levels[node.id] = max_parent_level + 1
# 3. Group by level
level_groups: dict[int, list[NodeModel]] = defaultdict(list)
for node in sorted_nodes:
level_groups[levels[node.id]].append(node)
# 4. Return groups sorted by level
return [level_groups[i] for i in sorted(level_groups.keys())]
Example:
# Node dependencies: A → C, A → D, B → D, D → E
# Level grouping result
levels = group_by_level(sorted_nodes, all_nodes)
# Result:
# Level 0: [A, B] # No dependencies, can execute concurrently
# Level 1: [C, D] # Depends on Level 0, can execute concurrently
# Level 2: [E] # Depends on Level 1
# User modified Data node A's SQL query
await refresh_node(canvas_id="canvas-123", node_id="A")
# System automatically identifies affected nodes
affected_nodes = find_affected_nodes(all_nodes, "A")
# Result: [C, D, E]
# Execute in topological order
for node in affected_nodes:
await execute_node(canvas_id, node.id)
# User selects multiple nodes and clicks "Batch Refresh"
await batch_refresh_nodes(canvas_id="canvas-123", node_ids=["A", "B", "D"])
# System executes by level concurrently
# Level 0: [A, B] execute concurrently
# Level 1: [D] executes after A and B complete
# Node A executed successfully, DataFrame cached to Parquet
await persist_node_dataframe(node_a, df_a)
# Node C executes, directly reads node A's cache
parent_contexts = await load_parent_context(db, canvas_id, ["A"])
# No need to re-execute node A
Before optimization:
# Execute all nodes serially
for node in sorted_nodes:
await execute_node(canvas_id, node.id)
# Total time = sum of each node's execution time
After optimization:
# Execute by level concurrently
for level_nodes in levels:
await asyncio.gather(*[execute_node(canvas_id, n.id) for n in level_nodes])
# Total time = sum of maximum time for each level
Effect:
Before optimization:
# Re-execute parent node every time child node executes
parent_df = await execute_parent_node(parent_id)
After optimization:
# Directly read parent's cache
parent_df = await dataframe_client.read(f"{project_id}/{parent.dataframe_id}")
Effect:
Before optimization:
# Pass complete DataFrame to AI (possibly tens of thousands of rows)
context = {"dataframe": {"data": raw_df.to_dict(orient="records")}}
After optimization:
# Only pass sampled data (5 rows)
context = {"dataframe": {"sample_data": records[:5]}}
Effect:
AskTable's Canvas dependency graph engine implements:
✅ Smart Refresh: Automatically identify affected nodes ✅ Incremental Computation: Only refresh necessary nodes, reuse cache ✅ Correct Order: Topological sort ensures execution order ✅ Concurrent Execution: Execute by level to improve performance ✅ Concurrent Safety: State machine + row locks ensure consistency
Related Reading:
Technical Exchange:
sidebar.noProgrammingNeeded
sidebar.startFreeTrial