AskTable

AskTable SQL 权限守卫:基于 SQLGlot AST 改写的行级权限控制引擎

AskTable 团队
AskTable 团队 2026年3月5日

在企业级数据分析场景中,数据安全是头等大事。不同角色的用户应该只能看到属于自己权限范围内的数据:销售只能看自己区域的订单,财务只能看已审核的报表,管理层可以看全局数据。

传统的权限控制方案往往需要在应用层手动拼接 WHERE 条件,不仅容易出错,还难以应对复杂的 SQL 查询(子查询、CTE、UNION 等)。AskTable 的 SQL 权限守卫通过 SQLGlot AST 改写技术,实现了透明、安全、高性能的行级权限控制。

本文将深入剖析这套权限引擎的设计与实现。


一、为什么需要 SQL 权限守卫?

1. 传统方案的痛点

应用层拼接 WHERE 条件

# 传统做法:手动拼接权限条件
if user.role == "sales":
    sql = f"SELECT * FROM orders WHERE region = '{user.region}'"
else:
    sql = "SELECT * FROM orders"

问题

数据库视图方案

-- 为每个角色创建视图
CREATE VIEW sales_orders AS
SELECT * FROM orders WHERE region = 'East';

问题

2. 我们需要的是什么?

透明性:权限控制对 AI 和用户透明,无需手动处理 ✅ 安全性:自动注入权限条件,无遗漏、无注入风险 ✅ 完整性:支持所有 SQL 语法(子查询、CTE、UNION、JOIN) ✅ 灵活性:支持动态变量(如

{{user_id}}
)和通配符规则 ✅ 高性能:AST 改写开销小(< 10ms)


二、技术选型:为什么是 SQLGlot?

1. SQLGlot:SQL 的瑞士军刀

SQLGlot 是一个强大的 SQL 解析器和转换器,支持 20+ 种 SQL 方言。

核心优势

为什么不用正则表达式?

2. Scope-based 权限注入

SQLGlot 的 Scope 是权限注入的关键。Scope 表示 SQL 中的一个查询范围(SELECT、子查询、CTE 等),包含:

通过递归遍历 Scope 树,我们可以在每个查询层级注入权限条件。


三、架构设计:Clean Architecture

AskTable 的 SQL 权限守卫采用清晰的分层架构:

加载图表中...

核心组件

  1. Rule Parser:解析权限规则,支持 Jinja2 变量和通配符
  2. Scope Processor:递归处理 Scope 树,注入权限条件
  3. Condition Builder:构建权限条件,处理别名和表名
  4. SQL Generator:将修改后的 AST 重新生成 SQL

四、核心实现:深入源码

1. 权限规则的定义

权限规则是一个简单的 SQL 条件表达式:

# 示例规则
rules = [
    "orders.region = 'East'",  # 固定值
    "orders.user_id = {{user_id}}",  # Jinja2 变量
    "*.*.status IN ('approved', 'completed')",  # 通配符(所有表的 status 字段)
]

规则格式

2. 规则解析:Jinja2 + 通配符

def parse_rule(
    rule: str, variables: dict[str, str], sql: str, dialect: str
) -> list[Rule]:
    # 1. 渲染 Jinja2 变量
    template = Template(rule)
    rule_str = template.render(**variables)

    # 2. 替换通配符(* -> ALL)
    rule_str = rule_str.replace("*", "ALL")

    # 3. 解析规则 AST
    rule_ast = sqlglot.parse_one(rule_str, read=dialect)
    sql_ast = sqlglot.parse_one(sql, read=dialect)

    # 4. 提取规则中的列信息
    rule_column = extract_column_from_rule(rule_ast)

    # 5. 遍历 SQL 中的所有表,匹配规则
    rules: list[Rule] = []
    for table in sql_ast.find_all(exp.Table):
        # 检查 schema 和 table 是否匹配
        schema_matches = rule_column.db == "ALL" or rule_column.db == table.db
        table_matches = rule_column.table == "ALL" or rule_column.table == table.name

        if schema_matches and table_matches:
            # 生成针对该表的规则
            qualified_ast = qualify_rule_for_table(rule_ast, table)
            rules.append(Rule(
                schema=table.db,
                table=table.name,
                field=rule_column.name,
                ast=qualified_ast,
            ))

    return rules

关键点

示例

# 输入规则
rule = "*.*.status = 'approved'"

# 输入 SQL
sql = "SELECT * FROM orders JOIN products ON orders.product_id = products.id"

# 解析后的规则
[
    Rule(schema="public", table="orders", field="status", ast="orders.status = 'approved'"),
    Rule(schema="public", table="products", field="status", ast="products.status = 'approved'"),
]

3. Scope 递归处理:注入权限条件

class SqlPermissionGuard:
    def apply_rules(
        self, sql: str, rules: list[str], variables: dict[str, str] | None = None
    ) -> str:
        # 1. 解析 SQL 并构建 Scope 树
        ast = sqlglot.parse_one(sql, read=self.dialect).copy()
        root_scope = build_scope(ast)

        # 2. 解析所有规则
        parsed_rules = list(
            itertools.chain.from_iterable(
                parse_rule(r, variables, sql, self.dialect) for r in rules
            )
        )

        # 3. 递归处理 Scope 树
        self._process_scope(root_scope, parsed_rules)

        # 4. 生成新的 SQL
        return ast.sql(dialect=self.dialect)

    def _process_scope(self, scope: Scope, rules: list[Rule]):
        """递归处理 Scope 及其所有子 Scope"""
        # 1. 为当前 Scope 构建权限条件
        conditions = self._build_conditions_for_scope(scope, rules)
        if conditions:
            self._inject_conditions(scope, conditions)

        # 2. 递归处理所有子 Scope
        for subscope in self._get_all_subscopes(scope):
            self._process_scope(subscope, rules)

    def _get_all_subscopes(self, scope: Scope) -> list[Scope]:
        """获取所有子 Scope(子查询、CTE、UNION 等)"""
        return (
            scope.derived_table_scopes  # 派生表(子查询)
            + scope.cte_scopes  # CTE
            + scope.subquery_scopes  # WHERE/HAVING 中的子查询
            + scope.union_scopes  # UNION
            + scope.udtf_scopes  # 用户定义表函数
        )

关键点

4. 条件注入:WHERE 子句改写

def _inject_conditions(self, scope: Scope, conditions: list[exp.Expression]):
    """将权限条件注入到 WHERE 子句"""
    # 1. 找到 SELECT 语句
    select = (
        scope.expression
        if isinstance(scope.expression, exp.Select)
        else scope.expression.find(exp.Select)
    )
    if not select:
        return

    # 2. 合并所有条件(去重)
    combined = self._combine_conditions(conditions)
    if not combined:
        return

    # 3. 注入到 WHERE 子句
    where = select.find(exp.Where)
    if where:
        # 已有 WHERE:用 AND 连接
        where.set("this", exp.And(this=where.this, expression=combined))
    else:
        # 无 WHERE:创建新的 WHERE 子句
        select.set("where", exp.Where(this=combined))

def _combine_conditions(self, conditions: list[exp.Expression]) -> exp.Expression | None:
    """合并多个条件,去重并用 AND 连接"""
    if not conditions:
        return None

    # 去重(基于 SQL 字符串)
    unique_conditions = []
    seen = set()
    for cond in conditions:
        cond_str = cond.sql(dialect=self.dialect)
        if cond_str not in seen:
            seen.add(cond_str)
            unique_conditions.append(cond)

    # 用 AND 连接
    if len(unique_conditions) == 1:
        return unique_conditions[0]

    result = unique_conditions[0]
    for cond in unique_conditions[1:]:
        result = exp.And(this=result, expression=cond)

    return result

关键点

示例

# 原始 SQL
sql = "SELECT * FROM orders WHERE status = 'pending'"

# 权限规则
rules = ["orders.region = 'East'"]

# 改写后的 SQL
"SELECT * FROM orders WHERE status = 'pending' AND orders.region = 'East'"

5. 别名处理:正确引用表名

SQL 中的表可能有别名,权限条件需要使用正确的引用:

def build_conditions(
    self, rule: Rule, tables: list[tuple[str | None, exp.Table]]
) -> list[exp.Expression]:
    """为匹配的表构建权限条件"""
    conditions = []

    for alias, table in tables:
        condition = rule.ast.copy()
        column = condition.find(exp.Column)

        if column and column.table:
            if alias:
                # 有别名:使用别名
                column.set("table", exp.to_identifier(alias))
                column.set("db", None)
            else:
                # 无别名:使用完整表名
                column.set("table", exp.to_identifier(table.this.this))
                column.set("db", exp.to_identifier(table.db))

            column.set("catalog", None)

        conditions.append(condition)

    return conditions

示例

# 原始 SQL(有别名)
sql = "SELECT * FROM orders o WHERE o.status = 'pending'"

# 权限规则
rules = ["orders.region = 'East'"]

# 改写后的 SQL(使用别名 o)
"SELECT * FROM orders o WHERE o.status = 'pending' AND o.region = 'East'"

五、复杂场景处理

1. 子查询

# 原始 SQL
sql = """
SELECT * FROM (
    SELECT * FROM orders WHERE status = 'pending'
) AS pending_orders
"""

# 权限规则
rules = ["orders.region = 'East'"]

# 改写后的 SQL(子查询内部也被注入)
"""
SELECT * FROM (
    SELECT * FROM orders WHERE status = 'pending' AND orders.region = 'East'
) AS pending_orders
"""

关键点

2. CTE (Common Table Expression)

# 原始 SQL
sql = """
WITH pending_orders AS (
    SELECT * FROM orders WHERE status = 'pending'
)
SELECT * FROM pending_orders
"""

# 权限规则
rules = ["orders.region = 'East'"]

# 改写后的 SQL
"""
WITH pending_orders AS (
    SELECT * FROM orders WHERE status = 'pending' AND orders.region = 'East'
)
SELECT * FROM pending_orders
"""

3. UNION

# 原始 SQL
sql = """
SELECT * FROM orders WHERE status = 'pending'
UNION
SELECT * FROM orders WHERE status = 'approved'
"""

# 权限规则
rules = ["orders.region = 'East'"]

# 改写后的 SQL(两个 SELECT 都被注入)
"""
SELECT * FROM orders WHERE status = 'pending' AND orders.region = 'East'
UNION
SELECT * FROM orders WHERE status = 'approved' AND orders.region = 'East'
"""

4. JOIN

# 原始 SQL
sql = """
SELECT o.*, p.name
FROM orders o
JOIN products p ON o.product_id = p.id
WHERE o.status = 'pending'
"""

# 权限规则
rules = [
    "orders.region = 'East'",
    "products.category = 'Electronics'"
]

# 改写后的 SQL(两个表都被注入)
"""
SELECT o.*, p.name
FROM orders o
JOIN products p ON o.product_id = p.id
WHERE o.status = 'pending' AND o.region = 'East' AND p.category = 'Electronics'
"""

六、性能优化与安全保障

1. 性能优化

AST 改写开销

优化策略

2. 安全保障

SQL 注入防护

权限遗漏防护

兼容性保障


七、实战案例

案例 1:销售区域权限

# 用户信息
user = {"role": "sales", "region": "East"}

# 权限规则
rules = ["orders.region = '{{region}}'"]

# 原始 SQL(AI 生成)
sql = "SELECT SUM(amount) FROM orders WHERE status = 'completed'"

# 应用权限
filtered_sql = guard(
    sql=sql,
    sql_dialect="mysql",
    rules=rules,
    variables={"region": user["region"]}
)

# 结果
"SELECT SUM(amount) FROM orders WHERE status = 'completed' AND orders.region = 'East'"

案例 2:多表通配符规则

# 权限规则(所有表的 deleted 字段必须为 0)
rules = ["*.*.deleted = 0"]

# 原始 SQL
sql = """
SELECT o.*, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'pending'
"""

# 应用权限
filtered_sql = guard(sql=sql, sql_dialect="postgres", rules=rules)

# 结果(两个表都被注入)
"""
SELECT o.*, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'pending' AND o.deleted = 0 AND c.deleted = 0
"""

案例 3:复杂子查询

# 权限规则
rules = ["orders.user_id = {{user_id}}"]

# 原始 SQL(包含子查询和 CTE)
sql = """
WITH monthly_sales AS (
    SELECT DATE_TRUNC('month', order_date) AS month, SUM(amount) AS total
    FROM orders
    WHERE status = 'completed'
    GROUP BY month
)
SELECT * FROM monthly_sales
WHERE total > (
    SELECT AVG(total) FROM monthly_sales
)
"""

# 应用权限
filtered_sql = guard(
    sql=sql,
    sql_dialect="postgres",
    rules=rules,
    variables={"user_id": "12345"}
)

# 结果(CTE 内部被注入)
"""
WITH monthly_sales AS (
    SELECT DATE_TRUNC('month', order_date) AS month, SUM(amount) AS total
    FROM orders
    WHERE status = 'completed' AND orders.user_id = '12345'
    GROUP BY month
)
SELECT * FROM monthly_sales
WHERE total > (
    SELECT AVG(total) FROM monthly_sales
)
"""

八、对比其他方案

方案透明性完整性安全性性能维护成本
AST 改写⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
应用层拼接⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
数据库视图⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
ORM 过滤器⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
Row-Level Security⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐

AskTable 选择 AST 改写的原因


九、实战经验与踩坑

1. 方言差异处理

问题:不同数据库的标识符引号规则不同。

解决方案

# 根据方言决定是否加引号
should_quote = self.dialect != "oracle"
column.set("table", exp.to_identifier(name=table_name, quoted=should_quote))

2. 别名识别

问题:如何区分真实别名和表名?

-- 真实别名
SELECT * FROM orders o

-- 表名(无别名)
SELECT * FROM orders

解决方案

has_real_alias = source.alias and source.alias != source.name

3. 条件去重

问题:同一个表在 SQL 中出现多次,会重复注入条件。

解决方案

4. 降级方案

问题:AST 改写失败时如何处理?

解决方案

try:
    filtered_sql = guard(sql, dialect, rules, variables)
except Exception:
    # 降级到 LLM 改写
    filtered_sql = await llm_rewrite_sql(sql, rules)

十、总结与展望

AskTable 的 SQL 权限守卫,通过 SQLGlot AST 改写技术,实现了:

透明性:AI 和用户无需感知权限控制 ✅ 完整性:支持所有 SQL 语法(子查询、CTE、UNION、JOIN) ✅ 安全性:无 SQL 注入风险,无权限遗漏 ✅ 灵活性:支持动态变量和通配符规则 ✅ 高性能:AST 改写开销 < 10ms

未来优化方向

  1. 列级权限:支持字段级别的权限控制(如脱敏、隐藏)
  2. 权限缓存:缓存规则 AST,提升性能
  3. 权限审计:记录所有权限注入操作,便于审计
  4. 可视化调试:提供 AST 可视化工具,便于调试规则

开源计划

我们计划将 SQL 权限守卫 开源,帮助更多团队构建安全的数据分析系统。敬请期待!


相关阅读

技术交流