
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在企业级数据分析场景中,数据安全是头等大事。不同角色的用户应该只能看到属于自己权限范围内的数据:销售只能看自己区域的订单,财务只能看已审核的报表,管理层可以看全局数据。
传统的权限控制方案往往需要在应用层手动拼接 WHERE 条件,不仅容易出错,还难以应对复杂的 SQL 查询(子查询、CTE、UNION 等)。AskTable 的 SQL 权限守卫通过 SQLGlot AST 改写技术,实现了透明、安全、高性能的行级权限控制。
本文将深入剖析这套权限引擎的设计与实现。
应用层拼接 WHERE 条件:
# 传统做法:手动拼接权限条件 if user.role == "sales": sql = f"SELECT * FROM orders WHERE region = '{user.region}'" else: sql = "SELECT * FROM orders"
问题:
数据库视图方案:
-- 为每个角色创建视图 CREATE VIEW sales_orders AS SELECT * FROM orders WHERE region = 'East';
问题:
user_id)✅ 透明性:权限控制对 AI 和用户透明,无需手动处理 ✅ 安全性:自动注入权限条件,无遗漏、无注入风险 ✅ 完整性:支持所有 SQL 语法(子查询、CTE、UNION、JOIN) ✅ 灵活性:支持动态变量(如
{{user_id}})和通配符规则
✅ 高性能:AST 改写开销小(< 10ms)
SQLGlot 是一个强大的 SQL 解析器和转换器,支持 20+ 种 SQL 方言。
核心优势:
为什么不用正则表达式?
SQLGlot 的 Scope 是权限注入的关键。Scope 表示 SQL 中的一个查询范围(SELECT、子查询、CTE 等),包含:
sources:当前 Scope 中的表和别名derived_table_scopes:派生表(子查询)的 Scopecte_scopes:CTE 的 Scopesubquery_scopes:WHERE/HAVING 中的子查询 Scopeunion_scopes:UNION 的 Scope通过递归遍历 Scope 树,我们可以在每个查询层级注入权限条件。
AskTable 的 SQL 权限守卫采用清晰的分层架构:
加载图表中...
权限规则是一个简单的 SQL 条件表达式:
# 示例规则 rules = [ "orders.region = 'East'", # 固定值 "orders.user_id = {{user_id}}", # Jinja2 变量 "*.*.status IN ('approved', 'completed')", # 通配符(所有表的 status 字段) ]
规则格式:
schema.table.field <operator> <value>=, !=, >, <, >=, <=, IN, LIKE, IS, NOT IN, NOT LIKE, IS NOT* 表示匹配所有 schema 或 tabledef parse_rule( rule: str, variables: dict[str, str], sql: str, dialect: str ) -> list[Rule]: # 1. 渲染 Jinja2 变量 template = Template(rule) rule_str = template.render(**variables) # 2. 替换通配符(* -> ALL) rule_str = rule_str.replace("*", "ALL") # 3. 解析规则 AST rule_ast = sqlglot.parse_one(rule_str, read=dialect) sql_ast = sqlglot.parse_one(sql, read=dialect) # 4. 提取规则中的列信息 rule_column = extract_column_from_rule(rule_ast) # 5. 遍历 SQL 中的所有表,匹配规则 rules: list[Rule] = [] for table in sql_ast.find_all(exp.Table): # 检查 schema 和 table 是否匹配 schema_matches = rule_column.db == "ALL" or rule_column.db == table.db table_matches = rule_column.table == "ALL" or rule_column.table == table.name if schema_matches and table_matches: # 生成针对该表的规则 qualified_ast = qualify_rule_for_table(rule_ast, table) rules.append(Rule( schema=table.db, table=table.name, field=rule_column.name, ast=qualified_ast, )) return rules
关键点:
{{user_id}})*.*.* 可以匹配所有表的指定字段示例:
# 输入规则 rule = "*.*.status = 'approved'" # 输入 SQL sql = "SELECT * FROM orders JOIN products ON orders.product_id = products.id" # 解析后的规则 [ Rule(schema="public", table="orders", field="status", ast="orders.status = 'approved'"), Rule(schema="public", table="products", field="status", ast="products.status = 'approved'"), ]
class SqlPermissionGuard: def apply_rules( self, sql: str, rules: list[str], variables: dict[str, str] | None = None ) -> str: # 1. 解析 SQL 并构建 Scope 树 ast = sqlglot.parse_one(sql, read=self.dialect).copy() root_scope = build_scope(ast) # 2. 解析所有规则 parsed_rules = list( itertools.chain.from_iterable( parse_rule(r, variables, sql, self.dialect) for r in rules ) ) # 3. 递归处理 Scope 树 self._process_scope(root_scope, parsed_rules) # 4. 生成新的 SQL return ast.sql(dialect=self.dialect) def _process_scope(self, scope: Scope, rules: list[Rule]): """递归处理 Scope 及其所有子 Scope""" # 1. 为当前 Scope 构建权限条件 conditions = self._build_conditions_for_scope(scope, rules) if conditions: self._inject_conditions(scope, conditions) # 2. 递归处理所有子 Scope for subscope in self._get_all_subscopes(scope): self._process_scope(subscope, rules) def _get_all_subscopes(self, scope: Scope) -> list[Scope]: """获取所有子 Scope(子查询、CTE、UNION 等)""" return ( scope.derived_table_scopes # 派生表(子查询) + scope.cte_scopes # CTE + scope.subquery_scopes # WHERE/HAVING 中的子查询 + scope.union_scopes # UNION + scope.udtf_scopes # 用户定义表函数 )
关键点:
def _inject_conditions(self, scope: Scope, conditions: list[exp.Expression]): """将权限条件注入到 WHERE 子句""" # 1. 找到 SELECT 语句 select = ( scope.expression if isinstance(scope.expression, exp.Select) else scope.expression.find(exp.Select) ) if not select: return # 2. 合并所有条件(去重) combined = self._combine_conditions(conditions) if not combined: return # 3. 注入到 WHERE 子句 where = select.find(exp.Where) if where: # 已有 WHERE:用 AND 连接 where.set("this", exp.And(this=where.this, expression=combined)) else: # 无 WHERE:创建新的 WHERE 子句 select.set("where", exp.Where(this=combined)) def _combine_conditions(self, conditions: list[exp.Expression]) -> exp.Expression | None: """合并多个条件,去重并用 AND 连接""" if not conditions: return None # 去重(基于 SQL 字符串) unique_conditions = [] seen = set() for cond in conditions: cond_str = cond.sql(dialect=self.dialect) if cond_str not in seen: seen.add(cond_str) unique_conditions.append(cond) # 用 AND 连接 if len(unique_conditions) == 1: return unique_conditions[0] result = unique_conditions[0] for cond in unique_conditions[1:]: result = exp.And(this=result, expression=cond) return result
关键点:
示例:
# 原始 SQL sql = "SELECT * FROM orders WHERE status = 'pending'" # 权限规则 rules = ["orders.region = 'East'"] # 改写后的 SQL "SELECT * FROM orders WHERE status = 'pending' AND orders.region = 'East'"
SQL 中的表可能有别名,权限条件需要使用正确的引用:
def build_conditions( self, rule: Rule, tables: list[tuple[str | None, exp.Table]] ) -> list[exp.Expression]: """为匹配的表构建权限条件""" conditions = [] for alias, table in tables: condition = rule.ast.copy() column = condition.find(exp.Column) if column and column.table: if alias: # 有别名:使用别名 column.set("table", exp.to_identifier(alias)) column.set("db", None) else: # 无别名:使用完整表名 column.set("table", exp.to_identifier(table.this.this)) column.set("db", exp.to_identifier(table.db)) column.set("catalog", None) conditions.append(condition) return conditions
示例:
# 原始 SQL(有别名) sql = "SELECT * FROM orders o WHERE o.status = 'pending'" # 权限规则 rules = ["orders.region = 'East'"] # 改写后的 SQL(使用别名 o) "SELECT * FROM orders o WHERE o.status = 'pending' AND o.region = 'East'"
# 原始 SQL sql = """ SELECT * FROM ( SELECT * FROM orders WHERE status = 'pending' ) AS pending_orders """ # 权限规则 rules = ["orders.region = 'East'"] # 改写后的 SQL(子查询内部也被注入) """ SELECT * FROM ( SELECT * FROM orders WHERE status = 'pending' AND orders.region = 'East' ) AS pending_orders """
关键点:
# 原始 SQL sql = """ WITH pending_orders AS ( SELECT * FROM orders WHERE status = 'pending' ) SELECT * FROM pending_orders """ # 权限规则 rules = ["orders.region = 'East'"] # 改写后的 SQL """ WITH pending_orders AS ( SELECT * FROM orders WHERE status = 'pending' AND orders.region = 'East' ) SELECT * FROM pending_orders """
# 原始 SQL sql = """ SELECT * FROM orders WHERE status = 'pending' UNION SELECT * FROM orders WHERE status = 'approved' """ # 权限规则 rules = ["orders.region = 'East'"] # 改写后的 SQL(两个 SELECT 都被注入) """ SELECT * FROM orders WHERE status = 'pending' AND orders.region = 'East' UNION SELECT * FROM orders WHERE status = 'approved' AND orders.region = 'East' """
# 原始 SQL sql = """ SELECT o.*, p.name FROM orders o JOIN products p ON o.product_id = p.id WHERE o.status = 'pending' """ # 权限规则 rules = [ "orders.region = 'East'", "products.category = 'Electronics'" ] # 改写后的 SQL(两个表都被注入) """ SELECT o.*, p.name FROM orders o JOIN products p ON o.product_id = p.id WHERE o.status = 'pending' AND o.region = 'East' AND p.category = 'Electronics' """
AST 改写开销:
优化策略:
SQL 注入防护:
权限遗漏防护:
兼容性保障:
# 用户信息 user = {"role": "sales", "region": "East"} # 权限规则 rules = ["orders.region = '{{region}}'"] # 原始 SQL(AI 生成) sql = "SELECT SUM(amount) FROM orders WHERE status = 'completed'" # 应用权限 filtered_sql = guard( sql=sql, sql_dialect="mysql", rules=rules, variables={"region": user["region"]} ) # 结果 "SELECT SUM(amount) FROM orders WHERE status = 'completed' AND orders.region = 'East'"
# 权限规则(所有表的 deleted 字段必须为 0) rules = ["*.*.deleted = 0"] # 原始 SQL sql = """ SELECT o.*, c.name FROM orders o JOIN customers c ON o.customer_id = c.id WHERE o.status = 'pending' """ # 应用权限 filtered_sql = guard(sql=sql, sql_dialect="postgres", rules=rules) # 结果(两个表都被注入) """ SELECT o.*, c.name FROM orders o JOIN customers c ON o.customer_id = c.id WHERE o.status = 'pending' AND o.deleted = 0 AND c.deleted = 0 """
# 权限规则 rules = ["orders.user_id = {{user_id}}"] # 原始 SQL(包含子查询和 CTE) sql = """ WITH monthly_sales AS ( SELECT DATE_TRUNC('month', order_date) AS month, SUM(amount) AS total FROM orders WHERE status = 'completed' GROUP BY month ) SELECT * FROM monthly_sales WHERE total > ( SELECT AVG(total) FROM monthly_sales ) """ # 应用权限 filtered_sql = guard( sql=sql, sql_dialect="postgres", rules=rules, variables={"user_id": "12345"} ) # 结果(CTE 内部被注入) """ WITH monthly_sales AS ( SELECT DATE_TRUNC('month', order_date) AS month, SUM(amount) AS total FROM orders WHERE status = 'completed' AND orders.user_id = '12345' GROUP BY month ) SELECT * FROM monthly_sales WHERE total > ( SELECT AVG(total) FROM monthly_sales ) """
| 方案 | 透明性 | 完整性 | 安全性 | 性能 | 维护成本 |
|---|---|---|---|---|---|
| AST 改写 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 应用层拼接 | ⭐⭐ | ⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐ |
| 数据库视图 | ⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐ |
| ORM 过滤器 | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| Row-Level Security | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐ |
AskTable 选择 AST 改写的原因:
问题:不同数据库的标识符引号规则不同。
解决方案:
`table`"table""TABLE"(大写)# 根据方言决定是否加引号 should_quote = self.dialect != "oracle" column.set("table", exp.to_identifier(name=table_name, quoted=should_quote))
问题:如何区分真实别名和表名?
-- 真实别名 SELECT * FROM orders o -- 表名(无别名) SELECT * FROM orders
解决方案:
has_real_alias = source.alias and source.alias != source.name
问题:同一个表在 SQL 中出现多次,会重复注入条件。
解决方案:
set 记录已注入的条件问题:AST 改写失败时如何处理?
解决方案:
try: filtered_sql = guard(sql, dialect, rules, variables) except Exception: # 降级到 LLM 改写 filtered_sql = await llm_rewrite_sql(sql, rules)
AskTable 的 SQL 权限守卫,通过 SQLGlot AST 改写技术,实现了:
✅ 透明性:AI 和用户无需感知权限控制 ✅ 完整性:支持所有 SQL 语法(子查询、CTE、UNION、JOIN) ✅ 安全性:无 SQL 注入风险,无权限遗漏 ✅ 灵活性:支持动态变量和通配符规则 ✅ 高性能:AST 改写开销 < 10ms
我们计划将 SQL 权限守卫 开源,帮助更多团队构建安全的数据分析系统。敬请期待!
相关阅读:
技术交流: