
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在多租户数据分析系统中,SQL 权限控制是保障数据安全的核心机制。传统的权限控制往往依赖数据库层面的视图或行级安全策略,但这种方式存在灵活性不足、难以动态调整等问题。AskTable 采用了一种创新的方法:基于 AST(抽象语法树)的 SQL 权限控制系统,通过 SQLGlot 库的 Scope 分析能力,在 SQL 执行前动态注入权限条件。
SQL Guard 的设计遵循"Clean Architecture"原则,将权限控制逻辑与业务逻辑完全分离。其核心思想是:
* 通配符匹配任意 schema 或 table权限规则采用类 SQL 的表达式语法,支持 Jinja2 变量模板:
# 示例规则 rules = [ "users.orders.user_id = {{ current_user_id }}", # 用户只能查看自己的订单 "*.*.region IN ('CN', 'US')", # 限制地区范围 "sales.*.created_at >= '2024-01-01'", # 时间范围限制 ]
加载图表中...
规则解析器首先将规则字符串转换为 AST,并识别其中的 schema、table、field 信息:
def parse_rule(rule: str, variables: dict, sql: str, dialect: str) -> list[Rule]: # 1. 渲染 Jinja2 变量 template = Template(rule) rule_str = template.render(**variables) # 2. 替换通配符 * 为 ALL rule_str = rule_str.replace("*", "ALL") # 3. 解析规则和 SQL 为 AST rule_ast = sqlglot.parse_one(rule_str, read=dialect) sql_ast = sqlglot.parse_one(sql, read=dialect) # 4. 提取规则中的列信息 rule_column = extract_column_from_rule(rule_ast) # 5. 遍历 SQL 中的所有表,匹配规则 rules = [] for table in sql_ast.find_all(exp.Table): if matches(rule_column, table): qualified_ast = qualify_rule_ast(rule_ast, table) rules.append(Rule( schema=table.db, table=table.name, field=rule_column.name, ast=qualified_ast )) return rules
SQLGlot 的 Scope 提供了强大的作用域分析能力,可以准确识别表的别名、子查询、CTE 等复杂结构:
def _process_scope(self, scope: Scope, rules: list[Rule]): # 1. 为当前 scope 构建权限条件 conditions = self._build_conditions_for_scope(scope, rules) if conditions: self._inject_conditions(scope, conditions) # 2. 递归处理所有子 scope for subscope in self._get_all_subscopes(scope): self._process_scope(subscope, rules) def _get_all_subscopes(self, scope: Scope) -> list[Scope]: return ( scope.derived_table_scopes + # FROM 子查询 scope.cte_scopes + # WITH CTE scope.subquery_scopes + # WHERE/SELECT 子查询 scope.union_scopes + # UNION scope.udtf_scopes # 表函数 )
条件注入需要处理表别名、引号、以及与现有 WHERE 子句的合并:
def _inject_conditions(self, scope: Scope, conditions: list[exp.Expression]): # 1. 找到 SELECT 语句 select = scope.expression if isinstance(scope.expression, exp.Select) \ else scope.expression.find(exp.Select) # 2. 合并所有条件(去重) combined = self._combine_conditions(conditions) # 3. 注入到 WHERE 子句 where = select.find(exp.Where) if where: # 与现有条件用 AND 连接 where.set("this", exp.And(this=where.this, expression=combined)) else: # 创建新的 WHERE 子句 select.set("where", exp.Where(this=combined))
# 原始 SQL sql = """ SELECT o.id, o.amount, c.name FROM orders o JOIN customers c ON o.customer_id = c.id WHERE o.status = 'completed' """ # 权限规则 rules = ["*.orders.tenant_id = '{{ tenant_id }}'"] variables = {"tenant_id": "tenant_123"} # 应用权限后的 SQL protected_sql = guard(sql, "postgres", rules, variables)
生成的 SQL:
SELECT o.id, o.amount, c.name FROM orders o JOIN customers c ON o.customer_id = c.id WHERE o.status = 'completed' AND o.tenant_id = 'tenant_123' -- 自动注入
sql = """ SELECT * FROM ( SELECT user_id, SUM(amount) as total FROM orders GROUP BY user_id ) subq WHERE total > 1000 """ rules = ["*.orders.region = 'CN'"] protected_sql = guard(sql, "mysql", rules)
生成的 SQL:
SELECT * FROM ( SELECT user_id, SUM(amount) as total FROM orders WHERE region = 'CN' -- 注入到子查询内部 GROUP BY user_id ) subq WHERE total > 1000
sql = """ WITH recent_orders AS ( SELECT * FROM orders WHERE created_at > '2024-01-01' ) SELECT * FROM recent_orders UNION ALL SELECT * FROM orders WHERE status = 'pending' """ rules = ["*.orders.user_id = {{ user_id }}"] variables = {"user_id": "123"} protected_sql = guard(sql, "postgres", rules, variables)
生成的 SQL:
WITH recent_orders AS ( SELECT * FROM orders WHERE created_at > '2024-01-01' AND user_id = '123' -- CTE 内部注入 ) SELECT * FROM recent_orders UNION ALL SELECT * FROM orders WHERE status = 'pending' AND user_id = '123' -- UNION 分支注入
* 可匹配任意 schema 或 tableSQL Guard 的性能开销主要来自 AST 解析和 Scope 构建:
基于 AST 的 SQL 权限控制系统是 AskTable 安全架构的核心组件。通过 SQLGlot 的强大能力,我们实现了:
这种设计不仅保障了数据安全,还为多租户、行级安全等复杂场景提供了优雅的解决方案。