
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
在 Canvas 画布中,节点之间存在复杂的依赖关系:Chart 节点依赖 Data 节点的查询结果,Python 节点依赖多个父节点的 DataFrame。当一个节点更新时,如何智能地刷新依赖它的子节点?如何避免重复计算?如何保证执行顺序的正确性?
AskTable 的 Canvas 依赖图引擎,通过 拓扑排序 + 状态机 + 增量计算 的组合,实现了高效、可靠的节点执行管理。
本文将深入剖析这套引擎的设计与实现。
Canvas 中的节点形成一个 有向无环图(DAG):
依赖关系:
手动刷新:
全量刷新:
✅ 智能刷新:自动识别需要刷新的节点 ✅ 增量计算:只刷新受影响的节点 ✅ 正确顺序:按依赖关系顺序执行 ✅ 并发执行:无依赖的节点可以并行执行 ✅ 缓存复用:未变更的节点复用缓存结果
拓扑排序是对 DAG 的一种线性排序,使得对于每条有向边 u → v,节点 u 都排在节点 v 之前。
示例:
依赖关系:A → C, A → D, B → D, D → E
拓扑排序结果:A, B, D, C, E(或 B, A, D, C, E)
def topological_sort(nodes: list[NodeModel]) -> list[NodeModel]:
"""
拓扑排序节点列表(Kahn 算法)
仅考虑节点列表内部的依赖关系,外部依赖不参与排序。
"""
if not nodes:
return []
# 1. 构建图和入度表
node_ids = {n.id for n in nodes}
in_degree: dict[str, int] = {n.id: 0 for n in nodes}
graph: dict[str, list[str]] = defaultdict(list)
for node in nodes:
for dep_id in node.dependencies or []:
if dep_id in node_ids: # 只考虑内部依赖
graph[dep_id].append(node.id)
in_degree[node.id] += 1
# 2. 将入度为 0 的节点加入队列
queue = deque(nid for nid, deg in in_degree.items() if deg == 0)
sorted_ids: list[str] = []
# 3. BFS 遍历
while queue:
nid = queue.popleft()
sorted_ids.append(nid)
# 移除该节点后,更新子节点的入度
for child in graph[nid]:
in_degree[child] -= 1
if in_degree[child] == 0:
queue.append(child)
# 4. 返回排序后的节点列表
id_to_node = {n.id: n for n in nodes}
return [id_to_node[nid] for nid in sorted_ids if nid in id_to_node]
关键点:
示例:
# 节点依赖关系
nodes = [
Node(id="A", dependencies=[]),
Node(id="B", dependencies=[]),
Node(id="C", dependencies=["A"]),
Node(id="D", dependencies=["A", "B"]),
Node(id="E", dependencies=["D"]),
]
# 拓扑排序结果
sorted_nodes = topological_sort(nodes)
# 结果:[A, B, C, D, E] 或 [B, A, C, D, E]
class NodeStatus:
PENDING = "pending" # 待执行
STREAMING = "streaming" # 执行中
SUCCESS = "success" # 执行成功
ERROR = "error" # 执行失败
状态转移:
@asynccontextmanager
async def node_status_handler(canvas_id: str, node_id: str):
"""
并发安全的节点状态管理。
状态转移使用独立事务立即 commit。
"""
# 1. 获取节点锁(原子操作)
await db.acquire_node_lock(
canvas_id,
node_id,
expected_statuses=[NodeStatus.PENDING, NodeStatus.SUCCESS, NodeStatus.ERROR],
processing_status=NodeStatus.STREAMING,
)
log.info(f"Node {node_id}: -> streaming")
try:
yield # 执行节点任务
except Exception as e:
# 2. 执行失败,释放锁并设置 ERROR 状态
await db.release_node_lock(canvas_id, node_id, NodeStatus.ERROR, str(e)[:200])
log.error(f"Node {node_id} failed: {e}")
raise
else:
# 3. 执行成功,释放锁并设置 SUCCESS 状态
await db.release_node_lock(canvas_id, node_id, NodeStatus.SUCCESS)
log.info(f"Node {node_id}: streaming -> success")
关键点:
acquire_node_lock 使用数据库行锁(FOR UPDATE)acquire_node_lock 实现:
async def acquire_node_lock(
canvas_id: str,
node_id: str,
expected_statuses: list[str],
processing_status: str,
):
"""原子获取节点锁"""
async with unit_of_work() as db:
# 使用 FOR UPDATE 锁住节点行
node = (
await db.execute(
select(NodeModel)
.filter(NodeModel.canvas_id == canvas_id, NodeModel.id == node_id)
.with_for_update()
)
).scalar_one()
# 检查状态是否符合预期
if node.status not in expected_statuses:
raise errors.ConflictError(f"Node {node_id} is {node.status}, cannot acquire lock")
# 更新状态为 processing
node.status = processing_status
await db.commit()
何时需要刷新节点?
节点自身变更:
依赖节点变更:
手动触发:
def find_affected_nodes(
all_nodes: list[NodeModel], changed_node_id: str
) -> list[NodeModel]:
"""找出受某个节点变更影响的所有子孙节点"""
# 1. 构建依赖图
graph: dict[str, list[str]] = defaultdict(list)
for node in all_nodes:
for dep_id in node.dependencies or []:
graph[dep_id].append(node.id)
# 2. BFS 遍历所有子孙节点
affected_ids = set()
queue = deque([changed_node_id])
while queue:
nid = queue.popleft()
for child_id in graph[nid]:
if child_id not in affected_ids:
affected_ids.add(child_id)
queue.append(child_id)
# 3. 返回受影响的节点(按拓扑排序)
affected_nodes = [n for n in all_nodes if n.id in affected_ids]
return topological_sort(affected_nodes)
示例:
# 节点依赖关系:A → C, A → D, B → D, D → E
# 当节点 A 变更时
affected = find_affected_nodes(all_nodes, "A")
# 结果:[C, D, E](按拓扑排序)
# 当节点 D 变更时
affected = find_affected_nodes(all_nodes, "D")
# 结果:[E]
async def load_parent_context(
db: AsyncSession,
canvas_id: str,
dependency_ids: list[str],
sample_rows: int = 5,
) -> list[dict]:
"""加载父节点上下文(复用缓存的 DataFrame)"""
parent_contexts = []
for dep_id in dependency_ids:
parent = await service.get_node(db, canvas_id, dep_id)
# 检查父节点状态
if parent.status != "success":
continue # 跳过未成功的父节点
# 检查是否有缓存的 DataFrame
if not parent.dataframe_id:
continue # 跳过无数据的父节点
# 从 Parquet 文件读取 DataFrame
project_id = project_id_var.get()
try:
raw_df = await dataframe_client.read(f"{project_id}/{parent.dataframe_id}")
except FileNotFoundError:
continue # Parquet 文件缺失
# 构建上下文
records = raw_df.to_dict(orient="records")
context = {
"id": parent.id,
"description": parent.description,
"dataframe": {
"columns": raw_df.columns.tolist(),
"data": records[:sample_rows], # 采样数据
"sample_data": records[:sample_rows],
},
}
parent_contexts.append(context)
return parent_contexts
关键点:
async def batch_refresh_nodes(canvas_id: str, node_ids: list[str]):
"""批量刷新节点(按拓扑排序并发执行)"""
async with unit_of_work() as db:
# 1. 获取所有节点
all_nodes = await service.get_nodes(db, canvas_id)
# 2. 找出需要刷新的节点
nodes_to_refresh = [n for n in all_nodes if n.id in node_ids]
# 3. 拓扑排序
sorted_nodes = topological_sort(nodes_to_refresh)
# 4. 按层级分组(同一层级可以并发执行)
levels = group_by_level(sorted_nodes, all_nodes)
# 5. 逐层执行
for level_nodes in levels:
# 并发执行同一层级的节点
tasks = [
execute_node(canvas_id, node.id)
for node in level_nodes
]
await asyncio.gather(*tasks, return_exceptions=True)
def group_by_level(
sorted_nodes: list[NodeModel], all_nodes: list[NodeModel]
) -> list[list[NodeModel]]:
"""将节点按依赖层级分组"""
# 1. 构建依赖图
node_map = {n.id: n for n in all_nodes}
# 2. 计算每个节点的层级
levels: dict[str, int] = {}
for node in sorted_nodes:
# 节点的层级 = max(父节点层级) + 1
max_parent_level = -1
for dep_id in node.dependencies or []:
if dep_id in levels:
max_parent_level = max(max_parent_level, levels[dep_id])
levels[node.id] = max_parent_level + 1
# 3. 按层级分组
level_groups: dict[int, list[NodeModel]] = defaultdict(list)
for node in sorted_nodes:
level_groups[levels[node.id]].append(node)
# 4. 返回按层级排序的分组
return [level_groups[i] for i in sorted(level_groups.keys())]
示例:
# 节点依赖关系:A → C, A → D, B → D, D → E
# 层级分组结果
levels = group_by_level(sorted_nodes, all_nodes)
# 结果:
# Level 0: [A, B] # 无依赖,可以并发执行
# Level 1: [C, D] # 依赖 Level 0,可以并发执行
# Level 2: [E] # 依赖 Level 1
# 用户修改了 Data 节点 A 的 SQL 查询
await refresh_node(canvas_id="canvas-123", node_id="A")
# 系统自动识别受影响的节点
affected_nodes = find_affected_nodes(all_nodes, "A")
# 结果:[C, D, E]
# 按拓扑排序执行
for node in affected_nodes:
await execute_node(canvas_id, node.id)
# 用户选中多个节点并点击"批量刷新"
await batch_refresh_nodes(canvas_id="canvas-123", node_ids=["A", "B", "D"])
# 系统按层级并发执行
# Level 0: [A, B] 并发执行
# Level 1: [D] 等待 A 和 B 完成后执行
# 节点 A 执行成功,DataFrame 缓存到 Parquet
await persist_node_dataframe(node_a, df_a)
# 节点 C 执行时,直接读取节点 A 的缓存
parent_contexts = await load_parent_context(db, canvas_id, ["A"])
# 无需重新执行节点 A
优化前:
# 串行执行所有节点
for node in sorted_nodes:
await execute_node(canvas_id, node.id)
# 总耗时 = sum(每个节点的耗时)
优化后:
# 按层级并发执行
for level_nodes in levels:
await asyncio.gather(*[execute_node(canvas_id, n.id) for n in level_nodes])
# 总耗时 = sum(每层的最大耗时)
效果:
优化前:
# 每次执行子节点时,重新执行父节点
parent_df = await execute_parent_node(parent_id)
优化后:
# 直接读取父节点的缓存
parent_df = await dataframe_client.read(f"{project_id}/{parent.dataframe_id}")
效果:
优化前:
# 传递完整 DataFrame 给 AI(可能数万行)
context = {"dataframe": {"data": raw_df.to_dict(orient="records")}}
优化后:
# 只传递采样数据(5 行)
context = {"dataframe": {"sample_data": records[:5]}}
效果:
AskTable 的 Canvas 依赖图引擎,通过 拓扑排序 + 状态机 + 增量计算 的组合,实现了:
✅ 智能刷新:自动识别受影响的节点 ✅ 增量计算:只刷新必要的节点,复用缓存 ✅ 正确顺序:拓扑排序保证执行顺序 ✅ 并发执行:按层级并发,提升性能 ✅ 并发安全:状态机 + 行锁保证一致性
相关阅读:
技术交流:
sidebar.noProgrammingNeeded
sidebar.startFreeTrial