AskTable

行级安全:基于策略的细粒度权限控制

AskTable 团队
AskTable 团队 2026年3月4日

在企业数据分析场景中,不同用户应该看到不同的数据。销售人员只能查询自己负责地区的数据,财务人员只能查询本部门的数据。传统的表级权限控制过于粗糙,而 AskTable 通过 Policy 系统实现了灵活的行级安全控制。

为什么需要行级安全?

传统权限控制的局限

表级权限

-- 授予用户查询订单表的权限
GRANT SELECT ON orders TO user_sales;

问题

视图方案

-- 为每个地区创建视图
CREATE VIEW orders_beijing AS
SELECT * FROM orders WHERE region = '北京';

CREATE VIEW orders_shanghai AS
SELECT * FROM orders WHERE region = '上海';

-- 授予不同用户不同视图的权限
GRANT SELECT ON orders_beijing TO user_beijing;
GRANT SELECT ON orders_shanghai TO user_shanghai;

问题

AskTable 的解决方案

通过 Policy 系统,定义灵活的行级过滤规则:

# 定义策略:销售人员只能查询自己负责地区的数据
policy = RowFilter(
    condition="region_filter",
    db_regex=".*",  # 匹配所有 schema
    table_regex="orders|sales",  # 匹配订单和销售表
    field_regex="region|area",  # 匹配地区字段
    operator_expression="= '{{ user_region }}'",  # 过滤条件
    variables=["user_region"]  # 需要的变量
)

# 应用策略
# 原始 SQL:SELECT * FROM orders
# 注入后:SELECT * FROM orders WHERE region = '北京'

优势

Policy 应用流程

加载图表中...

上图展示了 Policy 从定义到应用的完整流程:通过正则表达式匹配相关字段,使用 Jinja2 渲染过滤条件,最后将条件注入到用户 SQL 中,实现透明的行级过滤。

核心架构设计

1. RowFilter 策略定义

class RowFilter:
    condition: str  # 策略名称
    db_regex: str  # Schema 正则表达式
    table_regex: str  # 表名正则表达式
    field_regex: str  # 字段名正则表达式
    operator_expression: str  # 过滤表达式
    variables: list[str]  # 需要的变量

    def __init__(
        self,
        condition: str,
        db_regex: str,
        table_regex: str,
        field_regex: str,
        operator_expression: str,
        variables: list[str],
    ):
        self.condition = condition
        self.db_regex = db_regex
        self.table_regex = table_regex
        self.field_regex = field_regex
        self.operator_expression = operator_expression
        self.variables = variables

    def __repr__(self):
        return f"<Filter {self.condition}>"

设计亮点

1.1 三层正则匹配

db_regex: str  # 匹配 Schema
table_regex: str  # 匹配表名
field_regex: str  # 匹配字段名

为什么需要三层?

场景 1:跨表统一过滤

# 所有包含 user_id 字段的表都按用户过滤
RowFilter(
    condition="user_filter",
    db_regex=".*",  # 所有 schema
    table_regex=".*",  # 所有表
    field_regex="user_id",  # 只匹配 user_id 字段
    operator_expression="= '{{ current_user_id }}'",
    variables=["current_user_id"]
)

# 应用到:
# - orders 表:WHERE user_id = '123'
# - payments 表:WHERE user_id = '123'
# - reviews 表:WHERE user_id = '123'

场景 2:特定表特定字段

# 只对订单表的地区字段过滤
RowFilter(
    condition="region_filter",
    db_regex="public",  # 只匹配 public schema
    table_regex="orders|order_items",  # 只匹配订单相关表
    field_regex="region|area|province",  # 匹配地区相关字段
    operator_expression="IN ({{ allowed_regions }})",
    variables=["allowed_regions"]
)

场景 3:排除特定表

# 除了管理员表,其他表都按部门过滤
RowFilter(
    condition="department_filter",
    db_regex=".*",
    table_regex="^(?!admin_).*",  # 排除 admin_ 开头的表
    field_regex="dept_id|department_id",
    operator_expression="= '{{ user_dept_id }}'",
    variables=["user_dept_id"]
)

1.2 Jinja2 模板渲染

operator_expression: str  # 支持 Jinja2 模板
variables: list[str]  # 需要的变量

简单变量替换

operator_expression = "= '{{ user_id }}'"
variables = {"user_id": "123"}
# 渲染结果:= '123'

复杂逻辑

operator_expression = """
{% if is_admin %}
    IS NOT NULL
{% else %}
    IN ({{ allowed_values | join(', ') }})
{% endif %}
"""
variables = {
    "is_admin": False,
    "allowed_values": ["'北京'", "'上海'", "'广州'"]
}
# 渲染结果:IN ('北京', '上海', '广州')

日期计算

operator_expression = ">= DATE_SUB(NOW(), INTERVAL {{ days }} DAY)"
variables = {"days": 30}
# 渲染结果:>= DATE_SUB(NOW(), INTERVAL 30 DAY)

2. 策略应用流程

def get_real_conditions_by_table(
    self,
    meta: MetaAdmin,
    table_full_names: list[str],
    variables: dict[str, str] | None = None,
):
    """
    返回对应的所有表的真实条件
    返回结果:
    {
      "schema1.table1": [
          "schema1.table1.user_id = '123'",
          "schema1.table1.region = '北京'"
      ],
      "schema1.table2": [
          "schema1.table2.user_id = '123'"
      ]
    }
    """
    result = {}
    log.debug(
        f"Filter {self} by {meta} in {table_full_names} with variables {variables}"
    )

    # 1. 按正则表达式过滤元数据
    filtered_meta = meta.filter_by_regex(
        self.db_regex, self.table_regex, self.field_regex
    )
    log.debug(f"Filtered meta: {filtered_meta}")

    # 2. 渲染表达式
    if variables:
        expr = render_jinja2_template(self.operator_expression, variables)
        log.debug(
            f"Rendered expr: {expr}, operator_expression: {self.operator_expression}, variables: {variables}"
        )
    else:
        expr = self.operator_expression
        log.debug(
            f"Rendered expr: {expr}, operator_expression: {self.operator_expression}"
        )

    # 3. 为每个匹配的字段生成条件
    for schema in filtered_meta.schemas.values():
        for table in schema.tables.values():
            if table.full_name in table_full_names:
                result[table.full_name] = [
                    f"{field.full_name} {expr}" for field in table.fields.values()
                ]

    log.debug(f"Conditions: {result}")
    total_conds = sum([len(conds) for conds in result.values()])
    log.info(
        f"Get {total_conds} conditions by {self} in {len(table_full_names)} tables."
    )
    return result

执行流程

  1. 正则匹配:找到所有匹配的 schema、table、field
  2. 模板渲染:将变量注入到表达式中
  3. 生成条件:为每个匹配的字段生成 WHERE 条件
  4. 返回结果:按表组织条件列表

示例

# 策略定义
policy = RowFilter(
    condition="user_region_filter",
    db_regex="public",
    table_regex="orders|sales",
    field_regex="region",
    operator_expression="= '{{ user_region }}'",
    variables=["user_region"]
)

# 应用策略
conditions = policy.get_real_conditions_by_table(
    meta=meta,
    table_full_names=["public.orders", "public.sales"],
    variables={"user_region": "北京"}
)

# 结果
{
    "public.orders": [
        "public.orders.region = '北京'"
    ],
    "public.sales": [
        "public.sales.region = '北京'"
    ]
}

3. SQL 注入实现

将策略条件注入到用户的 SQL 查询中:

def inject_row_filters(
    original_sql: str,
    conditions: dict[str, list[str]],
    dialect: str = "mysql"
) -> str:
    """
    将行级过滤条件注入到 SQL 中
    """
    import sqlglot

    # 解析 SQL
    parsed = sqlglot.parse_one(original_sql, dialect=dialect)

    # 遍历所有表引用
    for table_node in parsed.find_all(sqlglot.exp.Table):
        table_name = table_node.name
        schema_name = table_node.db or "public"
        full_name = f"{schema_name}.{table_name}"

        # 获取该表的过滤条件
        if full_name in conditions:
            table_conditions = conditions[full_name]

            # 构建 WHERE 条件
            for condition_str in table_conditions:
                condition_expr = sqlglot.parse_one(condition_str, dialect=dialect)

                # 注入到 WHERE 子句
                if parsed.args.get("where"):
                    # 已有 WHERE,用 AND 连接
                    parsed.args["where"] = sqlglot.exp.And(
                        this=parsed.args["where"],
                        expression=condition_expr
                    )
                else:
                    # 没有 WHERE,直接添加
                    parsed.args["where"] = condition_expr

    # 生成新的 SQL
    return parsed.sql(dialect=dialect)

示例

# 原始 SQL
original_sql = """
SELECT
    o.order_id,
    o.amount,
    c.customer_name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'completed'
"""

# 策略条件
conditions = {
    "public.orders": ["public.orders.region = '北京'"],
    "public.customers": ["public.customers.region = '北京'"]
}

# 注入后的 SQL
injected_sql = inject_row_filters(original_sql, conditions)

# 结果
"""
SELECT
    o.order_id,
    o.amount,
    c.customer_name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'completed'
  AND o.region = '北京'
  AND c.region = '北京'
"""

实际应用场景

场景 1:多租户数据隔离

需求:SaaS 平台,每个租户只能查询自己的数据

策略定义

tenant_filter = RowFilter(
    condition="tenant_isolation",
    db_regex=".*",  # 所有 schema
    table_regex=".*",  # 所有表
    field_regex="tenant_id|org_id|company_id",  # 租户 ID 字段
    operator_expression="= '{{ tenant_id }}'",
    variables=["tenant_id"]
)

应用

# 用户登录时获取租户 ID
user = authenticate(username, password)
tenant_id = user.tenant_id

# 查询时自动注入租户过滤
# 原始:SELECT * FROM orders
# 注入:SELECT * FROM orders WHERE tenant_id = 'tenant_001'

效果

场景 2:地区权限控制

需求:销售人员只能查询自己负责地区的数据

策略定义

region_filter = RowFilter(
    condition="region_access",
    db_regex="public",
    table_regex="orders|sales|customers",
    field_regex="region|area|province|city",
    operator_expression="IN ({{ allowed_regions }})",
    variables=["allowed_regions"]
)

应用

# 用户登录时获取负责地区
user = authenticate(username, password)
allowed_regions = user.get_allowed_regions()  # ['北京', '天津', '河北']

# 渲染表达式
variables = {
    "allowed_regions": ", ".join([f"'{r}'" for r in allowed_regions])
}
# 结果:IN ('北京', '天津', '河北')

# 查询时自动注入
# 原始:SELECT * FROM orders
# 注入:SELECT * FROM orders WHERE region IN ('北京', '天津', '河北')

场景 3:时间范围限制

需求:普通用户只能查询最近 90 天的数据,管理员无限制

策略定义

time_filter = RowFilter(
    condition="time_range",
    db_regex=".*",
    table_regex=".*",
    field_regex="created_at|updated_at|order_date|sale_date",
    operator_expression="""
    {% if is_admin %}
        IS NOT NULL
    {% else %}
        >= DATE_SUB(NOW(), INTERVAL {{ days }} DAY)
    {% endif %}
    """,
    variables=["is_admin", "days"]
)

应用

# 普通用户
variables = {"is_admin": False, "days": 90}
# 渲染:>= DATE_SUB(NOW(), INTERVAL 90 DAY)

# 管理员
variables = {"is_admin": True, "days": 90}
# 渲染:IS NOT NULL(相当于无限制)

场景 4:部门数据隔离

需求:用户只能查询本部门及下级部门的数据

策略定义

dept_filter = RowFilter(
    condition="department_hierarchy",
    db_regex=".*",
    table_regex=".*",
    field_regex="dept_id|department_id",
    operator_expression="IN ({{ dept_ids }})",
    variables=["dept_ids"]
)

应用

# 获取用户部门及下级部门
user = authenticate(username, password)
dept_tree = get_department_tree(user.dept_id)
dept_ids = [dept.id for dept in dept_tree]  # ['D001', 'D001-01', 'D001-02']

# 渲染表达式
variables = {
    "dept_ids": ", ".join([f"'{d}'" for d in dept_ids])
}
# 结果:IN ('D001', 'D001-01', 'D001-02')

场景 5:数据脱敏

需求:客服人员可以查询用户信息,但敏感字段需要脱敏

策略定义

# 方案 1:在 SQL 层面脱敏
mask_filter = RowFilter(
    condition="phone_mask",
    db_regex="public",
    table_regex="users|customers",
    field_regex="phone|mobile",
    operator_expression="",  # 不过滤行,而是修改字段
    variables=[]
)

# 方案 2:在结果层面脱敏
def apply_masking(result: pd.DataFrame, meta: MetaAdmin) -> pd.DataFrame:
    for field in meta.all_fields():
        if field.identifiable_type == IdentifiableType.phone:
            result[field.name] = result[field.name].apply(mask_phone)
    return result

性能优化实践

1. 条件缓存

class PolicyCache:
    _cache: dict[str, dict] = {}

    @classmethod
    def get_conditions(cls, policy_id: str, variables: dict) -> dict | None:
        cache_key = f"{policy_id}:{hash(frozenset(variables.items()))}"
        return cls._cache.get(cache_key)

    @classmethod
    def set_conditions(cls, policy_id: str, variables: dict, conditions: dict):
        cache_key = f"{policy_id}:{hash(frozenset(variables.items()))}"
        cls._cache[cache_key] = conditions

2. 索引优化

# 确保过滤字段有索引
CREATE INDEX idx_orders_region ON orders(region);
CREATE INDEX idx_orders_tenant_id ON orders(tenant_id);
CREATE INDEX idx_orders_created_at ON orders(created_at);

# 复合索引
CREATE INDEX idx_orders_tenant_region ON orders(tenant_id, region);

3. 查询优化

# 优化前:多个 AND 条件
WHERE tenant_id = 'T001'
  AND region = '北京'
  AND created_at >= '2024-01-01'

# 优化后:使用复合索引
# 索引:(tenant_id, region, created_at)
# 查询计划:使用索引扫描,而不是全表扫描

最佳实践建议

1. 策略设计原则

最小权限原则

# ✅ 默认拒绝,显式授权
default_filter = RowFilter(
    condition="default_deny",
    db_regex=".*",
    table_regex=".*",
    field_regex="tenant_id",
    operator_expression="= '{{ tenant_id }}'",
    variables=["tenant_id"]
)

# ❌ 默认允许,显式拒绝
# 容易遗漏,不安全

分层策略

# 第一层:租户隔离(所有用户)
tenant_filter = RowFilter(...)

# 第二层:部门隔离(非管理员)
dept_filter = RowFilter(...)

# 第三层:地区隔离(销售人员)
region_filter = RowFilter(...)

2. 变量管理

# ✅ 从用户会话获取变量
variables = {
    "tenant_id": session.get("tenant_id"),
    "user_id": session.get("user_id"),
    "dept_id": session.get("dept_id"),
    "allowed_regions": session.get("allowed_regions"),
}

# ❌ 从用户输入获取变量
# 容易被篡改,不安全
variables = {
    "tenant_id": request.args.get("tenant_id")  # 危险!
}

3. 审计日志

def apply_policy(sql: str, policy: RowFilter, variables: dict) -> str:
    # 记录策略应用
    log.info(f"Applying policy {policy.condition}")
    log.info(f"Original SQL: {sql}")
    log.info(f"Variables: {variables}")

    # 应用策略
    injected_sql = inject_row_filters(sql, policy, variables)

    # 记录结果
    log.info(f"Injected SQL: {injected_sql}")

    # 审计日志
    audit_log.record({
        "user_id": current_user.id,
        "policy": policy.condition,
        "original_sql": sql,
        "injected_sql": injected_sql,
        "variables": variables,
        "timestamp": datetime.now()
    })

    return injected_sql

总结

AskTable 的 Policy 系统通过行级安全控制,实现了细粒度的数据权限管理:

  1. 灵活匹配:支持正则表达式匹配 schema、table、field
  2. 动态渲染:支持 Jinja2 模板,根据用户属性动态生成条件
  3. 自动注入:透明地将过滤条件注入到用户 SQL
  4. 多层策略:支持租户隔离、部门隔离、地区隔离等多层策略
  5. 性能优化:条件缓存、索引优化、查询优化

行级安全是企业级数据分析平台的必备能力,通过 Policy 系统,AskTable 在保证数据安全的同时,提供了灵活的权限管理能力。