AskTable
sidebar.freeTrial

Canvas Dependency Graph and Incremental Compute: Topological Refresh

AskTable Team
AskTable Team 2026-03-05

In Canvas, there are complex dependency relationships between nodes: Chart nodes depend on Data node query results, Python nodes depend on DataFrames from multiple parent nodes. When one node is updated, how do you intelligently refresh child nodes that depend on it? How do you avoid duplicate calculations? How do you ensure correct execution order?

AskTable's Canvas dependency graph engine implements efficient and reliable node execution management through the combination of topological sorting + state machine + incremental computation.

This article will deeply analyze the design and implementation of this engine.


1. Why Do We Need Dependency Graph Management?

1.1 Dependencies in Canvas

Nodes in Canvas form a Directed Acyclic Graph (DAG):

加载图表中...

Dependencies:

  • Chart nodes depend on Data nodes (need query results)
  • Python nodes can depend on multiple parent nodes (need multiple DataFrames)
  • Nodes cannot form circular dependencies

1.2 Pain Points of Traditional Solutions

Manual Refresh:

  • Users need to manually click the "refresh" button on each node
  • Easy to miss dependent nodes
  • Wrong refresh order leads to data inconsistency

Full Refresh:

  • When refreshing one node, re-execute all nodes
  • Wastes computing resources
  • Poor user experience (long wait times)

1.3 What Do We Need?

Smart Refresh: Automatically identify nodes that need to be refreshed ✅ Incremental Computation: Only refresh affected nodes ✅ Correct Order: Execute in dependency order ✅ Concurrent Execution: Independent nodes can execute in parallel ✅ Cache Reuse: Unchanged nodes reuse cached results


2. Core Algorithm: Topological Sort (Kahn's Algorithm)

2.1 What is Topological Sort?

Topological sort is a linear ordering of a DAG such that for every directed edge u → v, node u comes before node v.

Example:

Dependencies: A → C, A → D, B → D, D → E

Topological sort result: A, B, D, C, E (or B, A, D, C, E)

2.2 Kahn's Algorithm Implementation

def topological_sort(nodes: list[NodeModel]) -> list[NodeModel]:
    """
    Topologically sort node list (Kahn's algorithm)

    Only considers dependencies within the node list, external dependencies not involved in sorting.
    """
    if not nodes:
        return []

    # 1. Build graph and in-degree table
    node_ids = {n.id for n in nodes}
    in_degree: dict[str, int] = {n.id: 0 for n in nodes}
    graph: dict[str, list[str]] = defaultdict(list)

    for node in nodes:
        for dep_id in node.dependencies or []:
            if dep_id in node_ids:  # Only consider internal dependencies
                graph[dep_id].append(node.id)
                in_degree[node.id] += 1

    # 2. Add nodes with in-degree 0 to queue
    queue = deque(nid for nid, deg in in_degree.items() if deg == 0)
    sorted_ids: list[str] = []

    # 3. BFS traversal
    while queue:
        nid = queue.popleft()
        sorted_ids.append(nid)

        # After removing this node, update in-degree of child nodes
        for child in graph[nid]:
            in_degree[child] -= 1
            if in_degree[child] == 0:
                queue.append(child)

    # 4. Return sorted node list
    id_to_node = {n.id: n for n in nodes}
    return [id_to_node[nid] for nid in sorted_ids if nid in id_to_node]

Key Points:

  • In-Degree: Number of edges pointing to the node
  • Queue: Stores nodes with in-degree 0 (no dependencies or dependencies satisfied)
  • BFS Traversal: Each time a node is taken out, update in-degree of its child nodes
  • Time Complexity: O(V + E), V is number of nodes, E is number of edges

Example:

# Node dependencies
nodes = [
    Node(id="A", dependencies=[]),
    Node(id="B", dependencies=[]),
    Node(id="C", dependencies=["A"]),
    Node(id="D", dependencies=["A", "B"]),
    Node(id="E", dependencies=["D"]),
]

# Topological sort result
sorted_nodes = topological_sort(nodes)
# Result: [A, B, C, D, E] or [B, A, C, D, E]

3. State Machine Management: Concurrent-Safe Node Execution

3.1 Node Status Definition

class NodeStatus:
    PENDING = "pending"      # Pending execution
    STREAMING = "streaming"  # Executing
    SUCCESS = "success"      # Executed successfully
    ERROR = "error"          # Execution failed

State Transition:

加载图表中...

3.2 Concurrent-Safe State Management

@asynccontextmanager
async def node_status_handler(canvas_id: str, node_id: str):
    """
    Concurrent-safe node state management.
    State transitions use independent transactions and commit immediately.
    """
    # 1. Acquire node lock (atomic operation)
    await db.acquire_node_lock(
        canvas_id,
        node_id,
        expected_statuses=[NodeStatus.PENDING, NodeStatus.SUCCESS, NodeStatus.ERROR],
        processing_status=NodeStatus.STREAMING,
    )
    log.info(f"Node {node_id}: -> streaming")

    try:
        yield  # Execute node task
    except Exception as e:
        # 2. Execution failed, release lock and set ERROR status
        await db.release_node_lock(canvas_id, node_id, NodeStatus.ERROR, str(e)[:200])
        log.error(f"Node {node_id} failed: {e}")
        raise
    else:
        # 3. Execution successful, release lock and set SUCCESS status
        await db.release_node_lock(canvas_id, node_id, NodeStatus.SUCCESS)
        log.info(f"Node {node_id}: streaming -> success")

Key Points:

  • Atomic Lock: acquire_node_lock uses database row lock (FOR UPDATE)
  • Independent Transaction: State transition commits immediately, does not depend on business transaction
  • Exception Handling: Lock is released regardless of success or failure

acquire_node_lock Implementation:

async def acquire_node_lock(
    canvas_id: str,
    node_id: str,
    expected_statuses: list[str],
    processing_status: str,
):
    """Atomically acquire node lock"""
    async with unit_of_work() as db:
        # Lock the node row using FOR UPDATE
        node = (
            await db.execute(
                select(NodeModel)
                .filter(NodeModel.canvas_id == canvas_id, NodeModel.id == node_id)
                .with_for_update()
            )
        ).scalar_one()

        # Check if status matches expectation
        if node.status not in expected_statuses:
            raise errors.ConflictError(f"Node {node_id} is {node.status}, cannot acquire lock")

        # Update status to processing
        node.status = processing_status
        await db.commit()

4. Incremental Computation: Smart Refresh Strategy

4.1 Refresh Trigger Conditions

When does a node need to be refreshed?

  1. Node itself changed:

    • Data node: SQL query changed
    • Chart node: Chart configuration changed
    • Python node: Code changed
  2. Dependent node changed:

    • Parent node's DataFrame updated
    • Parent node's status became SUCCESS
  3. Manual trigger:

    • User clicks "Refresh" button

4.2 Affected Node Identification

def find_affected_nodes(
    all_nodes: list[NodeModel], changed_node_id: str
) -> list[NodeModel]:
    """Find all descendant nodes affected by a node change"""
    # 1. Build dependency graph
    graph: dict[str, list[str]] = defaultdict(list)
    for node in all_nodes:
        for dep_id in node.dependencies or []:
            graph[dep_id].append(node.id)

    # 2. BFS traverse all descendant nodes
    affected_ids = set()
    queue = deque([changed_node_id])

    while queue:
        nid = queue.popleft()
        for child_id in graph[nid]:
            if child_id not in affected_ids:
                affected_ids.add(child_id)
                queue.append(child_id)

    # 3. Return affected nodes (topologically sorted)
    affected_nodes = [n for n in all_nodes if n.id in affected_ids]
    return topological_sort(affected_nodes)

Example:

# Node dependencies: A → C, A → D, B → D, D → E

# When node A changes
affected = find_affected_nodes(all_nodes, "A")
# Result: [C, D, E] (topologically sorted)

# When node D changes
affected = find_affected_nodes(all_nodes, "D")
# Result: [E]

4.3 DataFrame Cache Reuse

async def load_parent_context(
    db: AsyncSession,
    canvas_id: str,
    dependency_ids: list[str],
    sample_rows: int = 5,
) -> list[dict]:
    """Load parent node context (reuse cached DataFrame)"""
    parent_contexts = []

    for dep_id in dependency_ids:
        parent = await service.get_node(db, canvas_id, dep_id)

        # Check parent node status
        if parent.status != "success":
            continue  # Skip parents that didn't succeed

        # Check if there's a cached DataFrame
        if not parent.dataframe_id:
            continue  # Skip parents without data

        # Read DataFrame from Parquet file
        project_id = project_id_var.get()
        try:
            raw_df = await dataframe_client.read(f"{project_id}/{parent.dataframe_id}")
        except FileNotFoundError:
            continue  # Parquet file missing

        # Build context
        records = raw_df.to_dict(orient="records")
        context = {
            "id": parent.id,
            "description": parent.description,
            "dataframe": {
                "columns": raw_df.columns.tolist(),
                "data": records[:sample_rows],  # Sampled data
                "sample_data": records[:sample_rows],
            },
        }
        parent_contexts.append(context)

    return parent_contexts

Key Points:

  • DataFrame Persistence: After successful execution, DataFrame is written to Parquet file
  • dataframe_id: Each node records its DataFrame's ID
  • Cache Reuse: When child nodes execute, they directly read parent's Parquet file
  • Sampled Data: Only pass sampled data to AI to reduce token consumption

5. Batch Refresh: Concurrent Execution Optimization

5.1 Batch Refresh Flow

async def batch_refresh_nodes(canvas_id: str, node_ids: list[str]):
    """Batch refresh nodes (execute concurrently by topological sort)"""
    async with unit_of_work() as db:
        # 1. Get all nodes
        all_nodes = await service.get_nodes(db, canvas_id)

        # 2. Find nodes that need to be refreshed
        nodes_to_refresh = [n for n in all_nodes if n.id in node_ids]

        # 3. Topological sort
        sorted_nodes = topological_sort(nodes_to_refresh)

        # 4. Group by level (same level can execute concurrently)
        levels = group_by_level(sorted_nodes, all_nodes)

        # 5. Execute level by level
        for level_nodes in levels:
            # Execute nodes at same level concurrently
            tasks = [
                execute_node(canvas_id, node.id)
                for node in level_nodes
            ]
            await asyncio.gather(*tasks, return_exceptions=True)

5.2 Level Grouping Algorithm

def group_by_level(
    sorted_nodes: list[NodeModel], all_nodes: list[NodeModel]
) -> list[list[NodeModel]]:
    """Group nodes by dependency level"""
    # 1. Build dependency graph
    node_map = {n.id: n for n in all_nodes}

    # 2. Calculate level for each node
    levels: dict[str, int] = {}

    for node in sorted_nodes:
        # Node level = max(parent node level) + 1
        max_parent_level = -1
        for dep_id in node.dependencies or []:
            if dep_id in levels:
                max_parent_level = max(max_parent_level, levels[dep_id])
        levels[node.id] = max_parent_level + 1

    # 3. Group by level
    level_groups: dict[int, list[NodeModel]] = defaultdict(list)
    for node in sorted_nodes:
        level_groups[levels[node.id]].append(node)

    # 4. Return groups sorted by level
    return [level_groups[i] for i in sorted(level_groups.keys())]

Example:

# Node dependencies: A → C, A → D, B → D, D → E

# Level grouping result
levels = group_by_level(sorted_nodes, all_nodes)
# Result:
# Level 0: [A, B]  # No dependencies, can execute concurrently
# Level 1: [C, D]  # Depends on Level 0, can execute concurrently
# Level 2: [E]     # Depends on Level 1

6. Practical Cases

Case 1: Single Node Refresh

# User modified Data node A's SQL query
await refresh_node(canvas_id="canvas-123", node_id="A")

# System automatically identifies affected nodes
affected_nodes = find_affected_nodes(all_nodes, "A")
# Result: [C, D, E]

# Execute in topological order
for node in affected_nodes:
    await execute_node(canvas_id, node.id)

Case 2: Batch Refresh

# User selects multiple nodes and clicks "Batch Refresh"
await batch_refresh_nodes(canvas_id="canvas-123", node_ids=["A", "B", "D"])

# System executes by level concurrently
# Level 0: [A, B] execute concurrently
# Level 1: [D] executes after A and B complete

Case 3: Incremental Computation

# Node A executed successfully, DataFrame cached to Parquet
await persist_node_dataframe(node_a, df_a)

# Node C executes, directly reads node A's cache
parent_contexts = await load_parent_context(db, canvas_id, ["A"])
# No need to re-execute node A

7. Performance Optimization

7.1 Concurrent Execution

Before optimization:

# Execute all nodes serially
for node in sorted_nodes:
    await execute_node(canvas_id, node.id)
# Total time = sum of each node's execution time

After optimization:

# Execute by level concurrently
for level_nodes in levels:
    await asyncio.gather(*[execute_node(canvas_id, n.id) for n in level_nodes])
# Total time = sum of maximum time for each level

Effect:

  • Original: 5 nodes, 10 seconds each = 50 seconds
  • Optimized: 3 levels, max 10 seconds each = 30 seconds

7.2 DataFrame Cache

Before optimization:

# Re-execute parent node every time child node executes
parent_df = await execute_parent_node(parent_id)

After optimization:

# Directly read parent's cache
parent_df = await dataframe_client.read(f"{project_id}/{parent.dataframe_id}")

Effect:

  • Original: Every child node execution = parent execution time + child execution time
  • Optimized: Every child node execution = cache read time (< 100ms) + child execution time

7.3 Sampled Data

Before optimization:

# Pass complete DataFrame to AI (possibly tens of thousands of rows)
context = {"dataframe": {"data": raw_df.to_dict(orient="records")}}

After optimization:

# Only pass sampled data (5 rows)
context = {"dataframe": {"sample_data": records[:5]}}

Effect:

  • Original: 10000 rows = ~1M tokens
  • Optimized: 5 rows = ~500 tokens

8. Summary and Outlook

AskTable's Canvas dependency graph engine implements:

Smart Refresh: Automatically identify affected nodes ✅ Incremental Computation: Only refresh necessary nodes, reuse cache ✅ Correct Order: Topological sort ensures execution order ✅ Concurrent Execution: Execute by level to improve performance ✅ Concurrent Safety: State machine + row locks ensure consistency

Future Optimization Directions

  1. Smart Prediction: Predict user's next operation, refresh nodes in advance
  2. Incremental Update: Only update changed parts of DataFrame
  3. Distributed Execution: Execute nodes concurrently across instances
  4. Visual Debugging: Display dependency graph and execution flow

Related Reading:

Technical Exchange:

cta.readyToSimplify

sidebar.noProgrammingNeededsidebar.startFreeTrial

cta.noCreditCard
cta.quickStart
cta.dbSupport