
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在企业数据分析场景中,不同用户应该看到不同的数据。销售人员只能查询自己负责地区的数据,财务人员只能查询本部门的数据。传统的表级权限控制过于粗糙,而 AskTable 通过 Policy 系统实现了灵活的行级安全控制。
表级权限:
-- 授予用户查询订单表的权限 GRANT SELECT ON orders TO user_sales;
问题:
视图方案:
-- 为每个地区创建视图 CREATE VIEW orders_beijing AS SELECT * FROM orders WHERE region = '北京'; CREATE VIEW orders_shanghai AS SELECT * FROM orders WHERE region = '上海'; -- 授予不同用户不同视图的权限 GRANT SELECT ON orders_beijing TO user_beijing; GRANT SELECT ON orders_shanghai TO user_shanghai;
问题:
通过 Policy 系统,定义灵活的行级过滤规则:
# 定义策略:销售人员只能查询自己负责地区的数据 policy = RowFilter( condition="region_filter", db_regex=".*", # 匹配所有 schema table_regex="orders|sales", # 匹配订单和销售表 field_regex="region|area", # 匹配地区字段 operator_expression="= '{{ user_region }}'", # 过滤条件 variables=["user_region"] # 需要的变量 ) # 应用策略 # 原始 SQL:SELECT * FROM orders # 注入后:SELECT * FROM orders WHERE region = '北京'
优势:
加载图表中...
上图展示了 Policy 从定义到应用的完整流程:通过正则表达式匹配相关字段,使用 Jinja2 渲染过滤条件,最后将条件注入到用户 SQL 中,实现透明的行级过滤。
class RowFilter: condition: str # 策略名称 db_regex: str # Schema 正则表达式 table_regex: str # 表名正则表达式 field_regex: str # 字段名正则表达式 operator_expression: str # 过滤表达式 variables: list[str] # 需要的变量 def __init__( self, condition: str, db_regex: str, table_regex: str, field_regex: str, operator_expression: str, variables: list[str], ): self.condition = condition self.db_regex = db_regex self.table_regex = table_regex self.field_regex = field_regex self.operator_expression = operator_expression self.variables = variables def __repr__(self): return f"<Filter {self.condition}>"
设计亮点:
db_regex: str # 匹配 Schema table_regex: str # 匹配表名 field_regex: str # 匹配字段名
为什么需要三层?
场景 1:跨表统一过滤
# 所有包含 user_id 字段的表都按用户过滤 RowFilter( condition="user_filter", db_regex=".*", # 所有 schema table_regex=".*", # 所有表 field_regex="user_id", # 只匹配 user_id 字段 operator_expression="= '{{ current_user_id }}'", variables=["current_user_id"] ) # 应用到: # - orders 表:WHERE user_id = '123' # - payments 表:WHERE user_id = '123' # - reviews 表:WHERE user_id = '123'
场景 2:特定表特定字段
# 只对订单表的地区字段过滤 RowFilter( condition="region_filter", db_regex="public", # 只匹配 public schema table_regex="orders|order_items", # 只匹配订单相关表 field_regex="region|area|province", # 匹配地区相关字段 operator_expression="IN ({{ allowed_regions }})", variables=["allowed_regions"] )
场景 3:排除特定表
# 除了管理员表,其他表都按部门过滤 RowFilter( condition="department_filter", db_regex=".*", table_regex="^(?!admin_).*", # 排除 admin_ 开头的表 field_regex="dept_id|department_id", operator_expression="= '{{ user_dept_id }}'", variables=["user_dept_id"] )
operator_expression: str # 支持 Jinja2 模板 variables: list[str] # 需要的变量
简单变量替换:
operator_expression = "= '{{ user_id }}'" variables = {"user_id": "123"} # 渲染结果:= '123'
复杂逻辑:
operator_expression = """ {% if is_admin %} IS NOT NULL {% else %} IN ({{ allowed_values | join(', ') }}) {% endif %} """ variables = { "is_admin": False, "allowed_values": ["'北京'", "'上海'", "'广州'"] } # 渲染结果:IN ('北京', '上海', '广州')
日期计算:
operator_expression = ">= DATE_SUB(NOW(), INTERVAL {{ days }} DAY)" variables = {"days": 30} # 渲染结果:>= DATE_SUB(NOW(), INTERVAL 30 DAY)
def get_real_conditions_by_table( self, meta: MetaAdmin, table_full_names: list[str], variables: dict[str, str] | None = None, ): """ 返回对应的所有表的真实条件 返回结果: { "schema1.table1": [ "schema1.table1.user_id = '123'", "schema1.table1.region = '北京'" ], "schema1.table2": [ "schema1.table2.user_id = '123'" ] } """ result = {} log.debug( f"Filter {self} by {meta} in {table_full_names} with variables {variables}" ) # 1. 按正则表达式过滤元数据 filtered_meta = meta.filter_by_regex( self.db_regex, self.table_regex, self.field_regex ) log.debug(f"Filtered meta: {filtered_meta}") # 2. 渲染表达式 if variables: expr = render_jinja2_template(self.operator_expression, variables) log.debug( f"Rendered expr: {expr}, operator_expression: {self.operator_expression}, variables: {variables}" ) else: expr = self.operator_expression log.debug( f"Rendered expr: {expr}, operator_expression: {self.operator_expression}" ) # 3. 为每个匹配的字段生成条件 for schema in filtered_meta.schemas.values(): for table in schema.tables.values(): if table.full_name in table_full_names: result[table.full_name] = [ f"{field.full_name} {expr}" for field in table.fields.values() ] log.debug(f"Conditions: {result}") total_conds = sum([len(conds) for conds in result.values()]) log.info( f"Get {total_conds} conditions by {self} in {len(table_full_names)} tables." ) return result
执行流程:
示例:
# 策略定义 policy = RowFilter( condition="user_region_filter", db_regex="public", table_regex="orders|sales", field_regex="region", operator_expression="= '{{ user_region }}'", variables=["user_region"] ) # 应用策略 conditions = policy.get_real_conditions_by_table( meta=meta, table_full_names=["public.orders", "public.sales"], variables={"user_region": "北京"} ) # 结果 { "public.orders": [ "public.orders.region = '北京'" ], "public.sales": [ "public.sales.region = '北京'" ] }
将策略条件注入到用户的 SQL 查询中:
def inject_row_filters( original_sql: str, conditions: dict[str, list[str]], dialect: str = "mysql" ) -> str: """ 将行级过滤条件注入到 SQL 中 """ import sqlglot # 解析 SQL parsed = sqlglot.parse_one(original_sql, dialect=dialect) # 遍历所有表引用 for table_node in parsed.find_all(sqlglot.exp.Table): table_name = table_node.name schema_name = table_node.db or "public" full_name = f"{schema_name}.{table_name}" # 获取该表的过滤条件 if full_name in conditions: table_conditions = conditions[full_name] # 构建 WHERE 条件 for condition_str in table_conditions: condition_expr = sqlglot.parse_one(condition_str, dialect=dialect) # 注入到 WHERE 子句 if parsed.args.get("where"): # 已有 WHERE,用 AND 连接 parsed.args["where"] = sqlglot.exp.And( this=parsed.args["where"], expression=condition_expr ) else: # 没有 WHERE,直接添加 parsed.args["where"] = condition_expr # 生成新的 SQL return parsed.sql(dialect=dialect)
示例:
# 原始 SQL original_sql = """ SELECT o.order_id, o.amount, c.customer_name FROM orders o JOIN customers c ON o.customer_id = c.id WHERE o.status = 'completed' """ # 策略条件 conditions = { "public.orders": ["public.orders.region = '北京'"], "public.customers": ["public.customers.region = '北京'"] } # 注入后的 SQL injected_sql = inject_row_filters(original_sql, conditions) # 结果 """ SELECT o.order_id, o.amount, c.customer_name FROM orders o JOIN customers c ON o.customer_id = c.id WHERE o.status = 'completed' AND o.region = '北京' AND c.region = '北京' """
需求:SaaS 平台,每个租户只能查询自己的数据
策略定义:
tenant_filter = RowFilter( condition="tenant_isolation", db_regex=".*", # 所有 schema table_regex=".*", # 所有表 field_regex="tenant_id|org_id|company_id", # 租户 ID 字段 operator_expression="= '{{ tenant_id }}'", variables=["tenant_id"] )
应用:
# 用户登录时获取租户 ID user = authenticate(username, password) tenant_id = user.tenant_id # 查询时自动注入租户过滤 # 原始:SELECT * FROM orders # 注入:SELECT * FROM orders WHERE tenant_id = 'tenant_001'
效果:
需求:销售人员只能查询自己负责地区的数据
策略定义:
region_filter = RowFilter( condition="region_access", db_regex="public", table_regex="orders|sales|customers", field_regex="region|area|province|city", operator_expression="IN ({{ allowed_regions }})", variables=["allowed_regions"] )
应用:
# 用户登录时获取负责地区 user = authenticate(username, password) allowed_regions = user.get_allowed_regions() # ['北京', '天津', '河北'] # 渲染表达式 variables = { "allowed_regions": ", ".join([f"'{r}'" for r in allowed_regions]) } # 结果:IN ('北京', '天津', '河北') # 查询时自动注入 # 原始:SELECT * FROM orders # 注入:SELECT * FROM orders WHERE region IN ('北京', '天津', '河北')
需求:普通用户只能查询最近 90 天的数据,管理员无限制
策略定义:
time_filter = RowFilter( condition="time_range", db_regex=".*", table_regex=".*", field_regex="created_at|updated_at|order_date|sale_date", operator_expression=""" {% if is_admin %} IS NOT NULL {% else %} >= DATE_SUB(NOW(), INTERVAL {{ days }} DAY) {% endif %} """, variables=["is_admin", "days"] )
应用:
# 普通用户 variables = {"is_admin": False, "days": 90} # 渲染:>= DATE_SUB(NOW(), INTERVAL 90 DAY) # 管理员 variables = {"is_admin": True, "days": 90} # 渲染:IS NOT NULL(相当于无限制)
需求:用户只能查询本部门及下级部门的数据
策略定义:
dept_filter = RowFilter( condition="department_hierarchy", db_regex=".*", table_regex=".*", field_regex="dept_id|department_id", operator_expression="IN ({{ dept_ids }})", variables=["dept_ids"] )
应用:
# 获取用户部门及下级部门 user = authenticate(username, password) dept_tree = get_department_tree(user.dept_id) dept_ids = [dept.id for dept in dept_tree] # ['D001', 'D001-01', 'D001-02'] # 渲染表达式 variables = { "dept_ids": ", ".join([f"'{d}'" for d in dept_ids]) } # 结果:IN ('D001', 'D001-01', 'D001-02')
需求:客服人员可以查询用户信息,但敏感字段需要脱敏
策略定义:
# 方案 1:在 SQL 层面脱敏 mask_filter = RowFilter( condition="phone_mask", db_regex="public", table_regex="users|customers", field_regex="phone|mobile", operator_expression="", # 不过滤行,而是修改字段 variables=[] ) # 方案 2:在结果层面脱敏 def apply_masking(result: pd.DataFrame, meta: MetaAdmin) -> pd.DataFrame: for field in meta.all_fields(): if field.identifiable_type == IdentifiableType.phone: result[field.name] = result[field.name].apply(mask_phone) return result
class PolicyCache: _cache: dict[str, dict] = {} @classmethod def get_conditions(cls, policy_id: str, variables: dict) -> dict | None: cache_key = f"{policy_id}:{hash(frozenset(variables.items()))}" return cls._cache.get(cache_key) @classmethod def set_conditions(cls, policy_id: str, variables: dict, conditions: dict): cache_key = f"{policy_id}:{hash(frozenset(variables.items()))}" cls._cache[cache_key] = conditions
# 确保过滤字段有索引 CREATE INDEX idx_orders_region ON orders(region); CREATE INDEX idx_orders_tenant_id ON orders(tenant_id); CREATE INDEX idx_orders_created_at ON orders(created_at); # 复合索引 CREATE INDEX idx_orders_tenant_region ON orders(tenant_id, region);
# 优化前:多个 AND 条件 WHERE tenant_id = 'T001' AND region = '北京' AND created_at >= '2024-01-01' # 优化后:使用复合索引 # 索引:(tenant_id, region, created_at) # 查询计划:使用索引扫描,而不是全表扫描
最小权限原则:
# ✅ 默认拒绝,显式授权 default_filter = RowFilter( condition="default_deny", db_regex=".*", table_regex=".*", field_regex="tenant_id", operator_expression="= '{{ tenant_id }}'", variables=["tenant_id"] ) # ❌ 默认允许,显式拒绝 # 容易遗漏,不安全
分层策略:
# 第一层:租户隔离(所有用户) tenant_filter = RowFilter(...) # 第二层:部门隔离(非管理员) dept_filter = RowFilter(...) # 第三层:地区隔离(销售人员) region_filter = RowFilter(...)
# ✅ 从用户会话获取变量 variables = { "tenant_id": session.get("tenant_id"), "user_id": session.get("user_id"), "dept_id": session.get("dept_id"), "allowed_regions": session.get("allowed_regions"), } # ❌ 从用户输入获取变量 # 容易被篡改,不安全 variables = { "tenant_id": request.args.get("tenant_id") # 危险! }
def apply_policy(sql: str, policy: RowFilter, variables: dict) -> str: # 记录策略应用 log.info(f"Applying policy {policy.condition}") log.info(f"Original SQL: {sql}") log.info(f"Variables: {variables}") # 应用策略 injected_sql = inject_row_filters(sql, policy, variables) # 记录结果 log.info(f"Injected SQL: {injected_sql}") # 审计日志 audit_log.record({ "user_id": current_user.id, "policy": policy.condition, "original_sql": sql, "injected_sql": injected_sql, "variables": variables, "timestamp": datetime.now() }) return injected_sql
AskTable 的 Policy 系统通过行级安全控制,实现了细粒度的数据权限管理:
行级安全是企业级数据分析平台的必备能力,通过 Policy 系统,AskTable 在保证数据安全的同时,提供了灵活的权限管理能力。