AskTable

基于 AST 的 SQL 权限控制系统 - SQLGlot Scope 实践

AskTable 团队
AskTable 团队 2026年3月4日

基于 AST 的 SQL 权限控制系统 - SQLGlot Scope 实践

在多租户数据分析系统中,SQL 权限控制是保障数据安全的核心机制。传统的权限控制往往依赖数据库层面的视图或行级安全策略,但这种方式存在灵活性不足、难以动态调整等问题。AskTable 采用了一种创新的方法:基于 AST(抽象语法树)的 SQL 权限控制系统,通过 SQLGlot 库的 Scope 分析能力,在 SQL 执行前动态注入权限条件。

核心设计理念

SQL Guard 的设计遵循"Clean Architecture"原则,将权限控制逻辑与业务逻辑完全分离。其核心思想是:

  1. 规则即代码:权限规则以 SQL 表达式形式定义,支持变量模板
  2. AST 级别注入:在语法树层面注入权限条件,确保无法绕过
  3. Scope 递归处理:处理子查询、CTE、UNION 等复杂结构
  4. 通配符支持:支持
    *
    通配符匹配任意 schema 或 table

权限规则定义

权限规则采用类 SQL 的表达式语法,支持 Jinja2 变量模板:

# 示例规则
rules = [
    "users.orders.user_id = {{ current_user_id }}",  # 用户只能查看自己的订单
    "*.*.region IN ('CN', 'US')",                     # 限制地区范围
    "sales.*.created_at >= '2024-01-01'",            # 时间范围限制
]

技术架构

加载图表中...

核心实现流程

1. 规则解析与匹配

规则解析器首先将规则字符串转换为 AST,并识别其中的 schema、table、field 信息:

def parse_rule(rule: str, variables: dict, sql: str, dialect: str) -> list[Rule]:
    # 1. 渲染 Jinja2 变量
    template = Template(rule)
    rule_str = template.render(**variables)

    # 2. 替换通配符 * 为 ALL
    rule_str = rule_str.replace("*", "ALL")

    # 3. 解析规则和 SQL 为 AST
    rule_ast = sqlglot.parse_one(rule_str, read=dialect)
    sql_ast = sqlglot.parse_one(sql, read=dialect)

    # 4. 提取规则中的列信息
    rule_column = extract_column_from_rule(rule_ast)

    # 5. 遍历 SQL 中的所有表,匹配规则
    rules = []
    for table in sql_ast.find_all(exp.Table):
        if matches(rule_column, table):
            qualified_ast = qualify_rule_ast(rule_ast, table)
            rules.append(Rule(
                schema=table.db,
                table=table.name,
                field=rule_column.name,
                ast=qualified_ast
            ))

    return rules

2. Scope 递归处理

SQLGlot 的 Scope 提供了强大的作用域分析能力,可以准确识别表的别名、子查询、CTE 等复杂结构:

def _process_scope(self, scope: Scope, rules: list[Rule]):
    # 1. 为当前 scope 构建权限条件
    conditions = self._build_conditions_for_scope(scope, rules)
    if conditions:
        self._inject_conditions(scope, conditions)

    # 2. 递归处理所有子 scope
    for subscope in self._get_all_subscopes(scope):
        self._process_scope(subscope, rules)

def _get_all_subscopes(self, scope: Scope) -> list[Scope]:
    return (
        scope.derived_table_scopes +  # FROM 子查询
        scope.cte_scopes +             # WITH CTE
        scope.subquery_scopes +        # WHERE/SELECT 子查询
        scope.union_scopes +           # UNION
        scope.udtf_scopes              # 表函数
    )

3. 条件注入

条件注入需要处理表别名、引号、以及与现有 WHERE 子句的合并:

def _inject_conditions(self, scope: Scope, conditions: list[exp.Expression]):
    # 1. 找到 SELECT 语句
    select = scope.expression if isinstance(scope.expression, exp.Select) \
             else scope.expression.find(exp.Select)

    # 2. 合并所有条件(去重)
    combined = self._combine_conditions(conditions)

    # 3. 注入到 WHERE 子句
    where = select.find(exp.Where)
    if where:
        # 与现有条件用 AND 连接
        where.set("this", exp.And(this=where.this, expression=combined))
    else:
        # 创建新的 WHERE 子句
        select.set("where", exp.Where(this=combined))

实际应用示例

示例 1: 多租户数据隔离

# 原始 SQL
sql = """
SELECT o.id, o.amount, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'completed'
"""

# 权限规则
rules = ["*.orders.tenant_id = '{{ tenant_id }}'"]
variables = {"tenant_id": "tenant_123"}

# 应用权限后的 SQL
protected_sql = guard(sql, "postgres", rules, variables)

生成的 SQL:

SELECT o.id, o.amount, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'completed'
  AND o.tenant_id = 'tenant_123'  -- 自动注入

示例 2: 复杂子查询处理

sql = """
SELECT *
FROM (
    SELECT user_id, SUM(amount) as total
    FROM orders
    GROUP BY user_id
) subq
WHERE total > 1000
"""

rules = ["*.orders.region = 'CN'"]
protected_sql = guard(sql, "mysql", rules)

生成的 SQL:

SELECT *
FROM (
    SELECT user_id, SUM(amount) as total
    FROM orders
    WHERE region = 'CN'  -- 注入到子查询内部
    GROUP BY user_id
) subq
WHERE total > 1000

示例 3: CTE 和 UNION 处理

sql = """
WITH recent_orders AS (
    SELECT * FROM orders WHERE created_at > '2024-01-01'
)
SELECT * FROM recent_orders
UNION ALL
SELECT * FROM orders WHERE status = 'pending'
"""

rules = ["*.orders.user_id = {{ user_id }}"]
variables = {"user_id": "123"}
protected_sql = guard(sql, "postgres", rules, variables)

生成的 SQL:

WITH recent_orders AS (
    SELECT * FROM orders
    WHERE created_at > '2024-01-01'
      AND user_id = '123'  -- CTE 内部注入
)
SELECT * FROM recent_orders
UNION ALL
SELECT * FROM orders
WHERE status = 'pending'
  AND user_id = '123'  -- UNION 分支注入

技术优势

1. 安全性保障

2. 灵活性

3. 可维护性

性能考虑

SQL Guard 的性能开销主要来自 AST 解析和 Scope 构建:

总结

基于 AST 的 SQL 权限控制系统是 AskTable 安全架构的核心组件。通过 SQLGlot 的强大能力,我们实现了:

  1. 细粒度控制:字段级别的权限管理
  2. 动态灵活:支持变量模板和通配符
  3. 结构完整:自动处理所有 SQL 结构
  4. 安全可靠:AST 级别注入,无法绕过

这种设计不仅保障了数据安全,还为多租户、行级安全等复杂场景提供了优雅的解决方案。

相关资源