AskTable

AskTable Canvas 节点依赖图与增量计算引擎:基于拓扑排序的智能刷新机制

AskTable 团队
AskTable 团队 2026年3月5日

在 Canvas 画布中,节点之间存在复杂的依赖关系:Chart 节点依赖 Data 节点的查询结果,Python 节点依赖多个父节点的 DataFrame。当一个节点更新时,如何智能地刷新依赖它的子节点?如何避免重复计算?如何保证执行顺序的正确性?

AskTable 的 Canvas 依赖图引擎,通过 拓扑排序 + 状态机 + 增量计算 的组合,实现了高效、可靠的节点执行管理。

本文将深入剖析这套引擎的设计与实现。


一、为什么需要依赖图管理?

1. Canvas 的依赖关系

Canvas 中的节点形成一个 有向无环图(DAG)

加载图表中...

依赖关系

2. 传统方案的痛点

手动刷新

全量刷新

3. 我们需要的是什么?

智能刷新:自动识别需要刷新的节点 ✅ 增量计算:只刷新受影响的节点 ✅ 正确顺序:按依赖关系顺序执行 ✅ 并发执行:无依赖的节点可以并行执行 ✅ 缓存复用:未变更的节点复用缓存结果


二、核心算法:拓扑排序(Kahn 算法)

1. 什么是拓扑排序?

拓扑排序是对 DAG 的一种线性排序,使得对于每条有向边

u → v
,节点
u
都排在节点
v
之前。

示例

依赖关系:A → C, A → D, B → D, D → E

拓扑排序结果:A, B, D, C, E(或 B, A, D, C, E)

2. Kahn 算法实现

def topological_sort(nodes: list[NodeModel]) -> list[NodeModel]:
    """
    拓扑排序节点列表(Kahn 算法)

    仅考虑节点列表内部的依赖关系,外部依赖不参与排序。
    """
    if not nodes:
        return []

    # 1. 构建图和入度表
    node_ids = {n.id for n in nodes}
    in_degree: dict[str, int] = {n.id: 0 for n in nodes}
    graph: dict[str, list[str]] = defaultdict(list)

    for node in nodes:
        for dep_id in node.dependencies or []:
            if dep_id in node_ids:  # 只考虑内部依赖
                graph[dep_id].append(node.id)
                in_degree[node.id] += 1

    # 2. 将入度为 0 的节点加入队列
    queue = deque(nid for nid, deg in in_degree.items() if deg == 0)
    sorted_ids: list[str] = []

    # 3. BFS 遍历
    while queue:
        nid = queue.popleft()
        sorted_ids.append(nid)

        # 移除该节点后,更新子节点的入度
        for child in graph[nid]:
            in_degree[child] -= 1
            if in_degree[child] == 0:
                queue.append(child)

    # 4. 返回排序后的节点列表
    id_to_node = {n.id: n for n in nodes}
    return [id_to_node[nid] for nid in sorted_ids if nid in id_to_node]

关键点

示例

# 节点依赖关系
nodes = [
    Node(id="A", dependencies=[]),
    Node(id="B", dependencies=[]),
    Node(id="C", dependencies=["A"]),
    Node(id="D", dependencies=["A", "B"]),
    Node(id="E", dependencies=["D"]),
]

# 拓扑排序结果
sorted_nodes = topological_sort(nodes)
# 结果:[A, B, C, D, E] 或 [B, A, C, D, E]

三、状态机管理:并发安全的节点执行

1. 节点状态定义

class NodeStatus:
    PENDING = "pending"      # 待执行
    STREAMING = "streaming"  # 执行中
    SUCCESS = "success"      # 执行成功
    ERROR = "error"          # 执行失败

状态转移

加载图表中...

2. 并发安全的状态管理

@asynccontextmanager
async def node_status_handler(canvas_id: str, node_id: str):
    """
    并发安全的节点状态管理。
    状态转移使用独立事务立即 commit。
    """
    # 1. 获取节点锁(原子操作)
    await db.acquire_node_lock(
        canvas_id,
        node_id,
        expected_statuses=[NodeStatus.PENDING, NodeStatus.SUCCESS, NodeStatus.ERROR],
        processing_status=NodeStatus.STREAMING,
    )
    log.info(f"Node {node_id}: -> streaming")

    try:
        yield  # 执行节点任务
    except Exception as e:
        # 2. 执行失败,释放锁并设置 ERROR 状态
        await db.release_node_lock(canvas_id, node_id, NodeStatus.ERROR, str(e)[:200])
        log.error(f"Node {node_id} failed: {e}")
        raise
    else:
        # 3. 执行成功,释放锁并设置 SUCCESS 状态
        await db.release_node_lock(canvas_id, node_id, NodeStatus.SUCCESS)
        log.info(f"Node {node_id}: streaming -> success")

关键点

acquire_node_lock 实现

async def acquire_node_lock(
    canvas_id: str,
    node_id: str,
    expected_statuses: list[str],
    processing_status: str,
):
    """原子获取节点锁"""
    async with unit_of_work() as db:
        # 使用 FOR UPDATE 锁住节点行
        node = (
            await db.execute(
                select(NodeModel)
                .filter(NodeModel.canvas_id == canvas_id, NodeModel.id == node_id)
                .with_for_update()
            )
        ).scalar_one()

        # 检查状态是否符合预期
        if node.status not in expected_statuses:
            raise errors.ConflictError(f"Node {node_id} is {node.status}, cannot acquire lock")

        # 更新状态为 processing
        node.status = processing_status
        await db.commit()

四、增量计算:智能刷新策略

1. 刷新触发条件

何时需要刷新节点?

  1. 节点自身变更

    • Data 节点:SQL 查询变更
    • Chart 节点:图表配置变更
    • Python 节点:代码变更
  2. 依赖节点变更

    • 父节点的 DataFrame 更新
    • 父节点的状态变为 SUCCESS
  3. 手动触发

    • 用户点击"刷新"按钮

2. 受影响节点识别

def find_affected_nodes(
    all_nodes: list[NodeModel], changed_node_id: str
) -> list[NodeModel]:
    """找出受某个节点变更影响的所有子孙节点"""
    # 1. 构建依赖图
    graph: dict[str, list[str]] = defaultdict(list)
    for node in all_nodes:
        for dep_id in node.dependencies or []:
            graph[dep_id].append(node.id)

    # 2. BFS 遍历所有子孙节点
    affected_ids = set()
    queue = deque([changed_node_id])

    while queue:
        nid = queue.popleft()
        for child_id in graph[nid]:
            if child_id not in affected_ids:
                affected_ids.add(child_id)
                queue.append(child_id)

    # 3. 返回受影响的节点(按拓扑排序)
    affected_nodes = [n for n in all_nodes if n.id in affected_ids]
    return topological_sort(affected_nodes)

示例

# 节点依赖关系:A → C, A → D, B → D, D → E

# 当节点 A 变更时
affected = find_affected_nodes(all_nodes, "A")
# 结果:[C, D, E](按拓扑排序)

# 当节点 D 变更时
affected = find_affected_nodes(all_nodes, "D")
# 结果:[E]

3. DataFrame 缓存复用

async def load_parent_context(
    db: AsyncSession,
    canvas_id: str,
    dependency_ids: list[str],
    sample_rows: int = 5,
) -> list[dict]:
    """加载父节点上下文(复用缓存的 DataFrame)"""
    parent_contexts = []

    for dep_id in dependency_ids:
        parent = await service.get_node(db, canvas_id, dep_id)

        # 检查父节点状态
        if parent.status != "success":
            continue  # 跳过未成功的父节点

        # 检查是否有缓存的 DataFrame
        if not parent.dataframe_id:
            continue  # 跳过无数据的父节点

        # 从 Parquet 文件读取 DataFrame
        project_id = project_id_var.get()
        try:
            raw_df = await dataframe_client.read(f"{project_id}/{parent.dataframe_id}")
        except FileNotFoundError:
            continue  # Parquet 文件缺失

        # 构建上下文
        records = raw_df.to_dict(orient="records")
        context = {
            "id": parent.id,
            "description": parent.description,
            "dataframe": {
                "columns": raw_df.columns.tolist(),
                "data": records[:sample_rows],  # 采样数据
                "sample_data": records[:sample_rows],
            },
        }
        parent_contexts.append(context)

    return parent_contexts

关键点


五、批量刷新:并发执行优化

1. 批量刷新流程

async def batch_refresh_nodes(canvas_id: str, node_ids: list[str]):
    """批量刷新节点(按拓扑排序并发执行)"""
    async with unit_of_work() as db:
        # 1. 获取所有节点
        all_nodes = await service.get_nodes(db, canvas_id)

        # 2. 找出需要刷新的节点
        nodes_to_refresh = [n for n in all_nodes if n.id in node_ids]

        # 3. 拓扑排序
        sorted_nodes = topological_sort(nodes_to_refresh)

        # 4. 按层级分组(同一层级可以并发执行)
        levels = group_by_level(sorted_nodes, all_nodes)

        # 5. 逐层执行
        for level_nodes in levels:
            # 并发执行同一层级的节点
            tasks = [
                execute_node(canvas_id, node.id)
                for node in level_nodes
            ]
            await asyncio.gather(*tasks, return_exceptions=True)

2. 层级分组算法

def group_by_level(
    sorted_nodes: list[NodeModel], all_nodes: list[NodeModel]
) -> list[list[NodeModel]]:
    """将节点按依赖层级分组"""
    # 1. 构建依赖图
    node_map = {n.id: n for n in all_nodes}

    # 2. 计算每个节点的层级
    levels: dict[str, int] = {}

    for node in sorted_nodes:
        # 节点的层级 = max(父节点层级) + 1
        max_parent_level = -1
        for dep_id in node.dependencies or []:
            if dep_id in levels:
                max_parent_level = max(max_parent_level, levels[dep_id])
        levels[node.id] = max_parent_level + 1

    # 3. 按层级分组
    level_groups: dict[int, list[NodeModel]] = defaultdict(list)
    for node in sorted_nodes:
        level_groups[levels[node.id]].append(node)

    # 4. 返回按层级排序的分组
    return [level_groups[i] for i in sorted(level_groups.keys())]

示例

# 节点依赖关系:A → C, A → D, B → D, D → E

# 层级分组结果
levels = group_by_level(sorted_nodes, all_nodes)
# 结果:
# Level 0: [A, B]  # 无依赖,可以并发执行
# Level 1: [C, D]  # 依赖 Level 0,可以并发执行
# Level 2: [E]     # 依赖 Level 1

六、实战案例

案例 1:单节点刷新

# 用户修改了 Data 节点 A 的 SQL 查询
await refresh_node(canvas_id="canvas-123", node_id="A")

# 系统自动识别受影响的节点
affected_nodes = find_affected_nodes(all_nodes, "A")
# 结果:[C, D, E]

# 按拓扑排序执行
for node in affected_nodes:
    await execute_node(canvas_id, node.id)

案例 2:批量刷新

# 用户选中多个节点并点击"批量刷新"
await batch_refresh_nodes(canvas_id="canvas-123", node_ids=["A", "B", "D"])

# 系统按层级并发执行
# Level 0: [A, B] 并发执行
# Level 1: [D] 等待 A 和 B 完成后执行

案例 3:增量计算

# 节点 A 执行成功,DataFrame 缓存到 Parquet
await persist_node_dataframe(node_a, df_a)

# 节点 C 执行时,直接读取节点 A 的缓存
parent_contexts = await load_parent_context(db, canvas_id, ["A"])
# 无需重新执行节点 A

七、性能优化

1. 并发执行

优化前

# 串行执行所有节点
for node in sorted_nodes:
    await execute_node(canvas_id, node.id)
# 总耗时 = sum(每个节点的耗时)

优化后

# 按层级并发执行
for level_nodes in levels:
    await asyncio.gather(*[execute_node(canvas_id, n.id) for n in level_nodes])
# 总耗时 = sum(每层的最大耗时)

效果

2. DataFrame 缓存

优化前

# 每次执行子节点时,重新执行父节点
parent_df = await execute_parent_node(parent_id)

优化后

# 直接读取父节点的缓存
parent_df = await dataframe_client.read(f"{project_id}/{parent.dataframe_id}")

效果

3. 采样数据

优化前

# 传递完整 DataFrame 给 AI(可能数万行)
context = {"dataframe": {"data": raw_df.to_dict(orient="records")}}

优化后

# 只传递采样数据(5 行)
context = {"dataframe": {"sample_data": records[:5]}}

效果


八、总结与展望

AskTable 的 Canvas 依赖图引擎,通过 拓扑排序 + 状态机 + 增量计算 的组合,实现了:

智能刷新:自动识别受影响的节点 ✅ 增量计算:只刷新必要的节点,复用缓存 ✅ 正确顺序:拓扑排序保证执行顺序 ✅ 并发执行:按层级并发,提升性能 ✅ 并发安全:状态机 + 行锁保证一致性

未来优化方向

  1. 智能预测:预测用户下一步操作,提前刷新节点
  2. 增量更新:只更新 DataFrame 的变更部分
  3. 分布式执行:跨实例并发执行节点
  4. 可视化调试:展示依赖图和执行流程

相关阅读

技术交流