AskTable
sidebar.freeTrial

Row-Level Security: Policy-Based Fine-Grained Permission Control

AskTable Team
AskTable Team 2026-03-04

In enterprise data analysis scenarios, different users should see different data. Sales personnel can only query data for their responsible regions, and finance personnel can only query their department's data. Traditional table-level permission control is too coarse, and AskTable implements flexible row-level security control through the Policy system.

Why Do We Need Row-Level Security?

Limitations of Traditional Permission Control

Table-Level Permission:

-- Grant user permission to query orders table
GRANT SELECT ON orders TO user_sales;

Problem:

  • User can query all order data
  • Cannot restrict to specific region data
  • Cannot dynamically filter based on user attributes

View Solution:

-- Create view for each region
CREATE VIEW orders_beijing AS
SELECT * FROM orders WHERE region = 'Beijing';

CREATE VIEW orders_shanghai AS
SELECT * FROM orders WHERE region = 'Shanghai';

-- Grant different users different view permissions
GRANT SELECT ON orders_beijing TO user_beijing;
GRANT SELECT ON orders_shanghai TO user_shanghai;

Problem:

  • Need to create a view for each permission combination
  • High maintenance cost, difficult to scale
  • Cannot support complex dynamic filtering logic

AskTable's Solution

Through the Policy system, define flexible row-level filtering rules:

# Define policy: Sales personnel can only query data for their responsible regions
policy = RowFilter(
    condition="region_filter",
    db_regex=".*",  # Match all schemas
    table_regex="orders|sales",  # Match orders and sales tables
    field_regex="region|area",  # Match region fields
    operator_expression="= '{{ user_region }}'",  # Filtering condition
    variables=["user_region"]  # Required variables
)

# Apply policy
# Original SQL: SELECT * FROM orders
# After injection: SELECT * FROM orders WHERE region = 'Beijing'

Advantages:

  • One policy supports multiple tables
  • Supports regex matching for fields
  • Supports Jinja2 template dynamic rendering
  • No need to modify database structure

Policy Application Process

加载图表中...

The diagram shows the complete process from policy definition to application: matching related fields through regex, rendering filtering conditions using Jinja2, and finally injecting conditions into user SQL for transparent row-level filtering.

Core Architecture Design

1. RowFilter Policy Definition

class RowFilter:
    condition: str  # Policy name
    db_regex: str  # Schema regex
    table_regex: str  # Table name regex
    field_regex: str  # Field name regex
    operator_expression: str  # Filtering expression
    variables: list[str]  # Required variables

    def __init__(
        self,
        condition: str,
        db_regex: str,
        table_regex: str,
        field_regex: str,
        operator_expression: str,
        variables: list[str],
    ):
        self.condition = condition
        self.db_regex = db_regex
        self.table_regex = table_regex
        self.field_regex = field_regex
        self.operator_expression = operator_expression
        self.variables = variables

    def __repr__(self):
        return f"<Filter {self.condition}>"

Design Highlights:

1.1 Three-Layer Regex Matching

db_regex: str  # Match Schema
table_regex: str  # Match table name
field_regex: str  # Match field name

Why three layers?

Scenario 1: Cross-table Unified Filtering

# All tables with user_id field filter by user
RowFilter(
    condition="user_filter",
    db_regex=".*",  # All schemas
    table_regex=".*",  # All tables
    field_regex="user_id",  # Only match user_id field
    operator_expression="= '{{ current_user_id }}'",
    variables=["current_user_id"]
)

# Applied to:
# - orders table: WHERE user_id = '123'
# - payments table: WHERE user_id = '123'
# - reviews table: WHERE user_id = '123'

Scenario 2: Specific Table Specific Field

# Only filter region fields in orders table
RowFilter(
    condition="region_filter",
    db_regex="public",  # Only match public schema
    table_regex="orders|order_items",  # Only match orders-related tables
    field_regex="region|area|province",  # Match region-related fields
    operator_expression="IN ({{ allowed_regions }})",
    variables=["allowed_regions"]
)

Scenario 3: Exclude Specific Tables

# Except admin tables, all other tables filter by department
RowFilter(
    condition="department_filter",
    db_regex=".*",
    table_regex="^(?!admin_).*",  # Exclude tables starting with admin_
    field_regex="dept_id|department_id",
    operator_expression="= '{{ user_dept_id }}'",
    variables=["user_dept_id"]
)

1.2 Jinja2 Template Rendering

operator_expression: str  # Supports Jinja2 template
variables: list[str]  # Required variables

Simple Variable Replacement:

operator_expression = "= '{{ user_id }}'"
variables = {"user_id": "123"}
# Rendered result: = '123'

Complex Logic:

operator_expression = """
{% if is_admin %}
    IS NOT NULL
{% else %}
    IN ({{ allowed_values | join(', ') }})
{% endif %}
"""
variables = {
    "is_admin": False,
    "allowed_values": ["'Beijing'", "'Shanghai'", "'Guangzhou'"]
}
# Rendered result: IN ('Beijing', 'Shanghai', 'Guangzhou')

Date Calculation:

operator_expression = ">= DATE_SUB(NOW(), INTERVAL {{ days }} DAY)"
variables = {"days": 30}
# Rendered result: >= DATE_SUB(NOW(), INTERVAL 30 DAY)

2. Policy Application Process

def get_real_conditions_by_table(
    self,
    meta: MetaAdmin,
    table_full_names: list[str],
    variables: dict[str, str] | None = None,
):
    """
    Return real conditions for all corresponding tables
    Return result:
    {
      "schema1.table1": [
          "schema1.table1.user_id = '123'",
          "schema1.table1.region = 'Beijing'"
      ],
      "schema1.table2": [
          "schema1.table2.user_id = '123'"
      ]
    }
    """
    result = {}
    log.debug(
        f"Filter {self} by {meta} in {table_full_names} with variables {variables}"
    )

    # 1. Filter metadata by regex
    filtered_meta = meta.filter_by_regex(
        self.db_regex, self.table_regex, self.field_regex
    )
    log.debug(f"Filtered meta: {filtered_meta}")

    # 2. Render expression
    if variables:
        expr = render_jinja2_template(self.operator_expression, variables)
        log.debug(
            f"Rendered expr: {expr}, operator_expression: {self.operator_expression}, variables: {variables}"
        )
    else:
        expr = self.operator_expression
        log.debug(
            f"Rendered expr: {expr}, operator_expression: {self.operator_expression}"
        )

    # 3. Generate conditions for each matched field
    for schema in filtered_meta.schemas.values():
        for table in schema.tables.values():
            if table.full_name in table_full_names:
                result[table.full_name] = [
                    f"{field.full_name} {expr}" for field in table.fields.values()
                ]

    log.debug(f"Conditions: {result}")
    total_conds = sum([len(conds) for conds in result.values()])
    log.info(
        f"Get {total_conds} conditions by {self} in {len(table_full_names)} tables."
    )
    return result

Execution Process:

  1. Regex Match: Find all matching schemas, tables, fields
  2. Template Rendering: Inject variables into expressions
  3. Generate Conditions: Generate WHERE conditions for each matched field
  4. Return Results: Organize conditions by table

Example:

# Policy definition
policy = RowFilter(
    condition="user_region_filter",
    db_regex="public",
    table_regex="orders|sales",
    field_regex="region",
    operator_expression="= '{{ user_region }}'",
    variables=["user_region"]
)

# Apply policy
conditions = policy.get_real_conditions_by_table(
    meta=meta,
    table_full_names=["public.orders", "public.sales"],
    variables={"user_region": "Beijing"}
)

# Result
{
    "public.orders": [
        "public.orders.region = 'Beijing'"
    ],
    "public.sales": [
        "public.sales.region = 'Beijing'"
    ]
}

3. SQL Injection Implementation

Inject policy conditions into user's SQL query:

def inject_row_filters(
    original_sql: str,
    conditions: dict[str, list[str]],
    dialect: str = "mysql"
) -> str:
    """
    Inject row-level filtering conditions into SQL
    """
    import sqlglot

    # Parse SQL
    parsed = sqlglot.parse_one(original_sql, dialect=dialect)

    # Traverse all table references
    for table_node in parsed.find_all(sqlglot.exp.Table):
        table_name = table_node.name
        schema_name = table_node.db or "public"
        full_name = f"{schema_name}.{table_name}"

        # Get filtering conditions for this table
        if full_name in conditions:
            table_conditions = conditions[full_name]

            # Build WHERE condition
            for condition_str in table_conditions:
                condition_expr = sqlglot.parse_one(condition_str, dialect=dialect)

                # Inject into WHERE clause
                if parsed.args.get("where"):
                    # Has WHERE, connect with AND
                    parsed.args["where"] = sqlglot.exp.And(
                        this=parsed.args["where"],
                        expression=condition_expr
                    )
                else:
                    # No WHERE, add directly
                    parsed.args["where"] = condition_expr

    # Generate new SQL
    return parsed.sql(dialect=dialect)

Example:

# Original SQL
original_sql = """
SELECT
    o.order_id,
    o.amount,
    c.customer_name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'completed'
"""

# Policy conditions
conditions = {
    "public.orders": ["public.orders.region = 'Beijing'"],
    "public.customers": ["public.customers.region = 'Beijing'"]
}

# Injected SQL
injected_sql = inject_row_filters(original_sql, conditions)

# Result
"""
SELECT
    o.order_id,
    o.amount,
    c.customer_name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'completed'
  AND o.region = 'Beijing'
  AND c.region = 'Beijing'
"""

Practical Application Scenarios

Scenario 1: Multi-Tenant Data Isolation

Requirement: SaaS platform, each tenant can only query their own data

Policy Definition:

tenant_filter = RowFilter(
    condition="tenant_isolation",
    db_regex=".*",  # All schemas
    table_regex=".*",  # All tables
    field_regex="tenant_id|org_id|company_id",  # Tenant ID fields
    operator_expression="= '{{ tenant_id }}'",
    variables=["tenant_id"]
)

Application:

# Get tenant ID when user logs in
user = authenticate(username, password)
tenant_id = user.tenant_id

# Automatically inject tenant filtering during query
# Original: SELECT * FROM orders
# Injected: SELECT * FROM orders WHERE tenant_id = 'tenant_001'

Effect:

  • Tenant A cannot query Tenant B's data
  • Cannot bypass even if knowing other tenant IDs
  • All queries automatically apply filtering

Scenario 2: Region Permission Control

Requirement: Sales personnel can only query data for their responsible regions

Policy Definition:

region_filter = RowFilter(
    condition="region_access",
    db_regex="public",
    table_regex="orders|sales|customers",
    field_regex="region|area|province|city",
    operator_expression="IN ({{ allowed_regions }})",
    variables=["allowed_regions"]
)

Application:

# Get responsible regions when user logs in
user = authenticate(username, password)
allowed_regions = user.get_allowed_regions()  # ['Beijing', 'Tianjin', 'Hebei']

# Render expression
variables = {
    "allowed_regions": ", ".join([f"'{r}'" for r in allowed_regions])
}
# Result: IN ('Beijing', 'Tianjin', 'Hebei')

# Automatically inject during query
# Original: SELECT * FROM orders
# Injected: SELECT * FROM orders WHERE region IN ('Beijing', 'Tianjin', 'Hebei')

Scenario 3: Time Range Restriction

Requirement: Regular users can only query data from the last 90 days, admins have no restriction

Policy Definition:

time_filter = RowFilter(
    condition="time_range",
    db_regex=".*",
    table_regex=".*",
    field_regex="created_at|updated_at|order_date|sale_date",
    operator_expression="""
    {% if is_admin %}
        IS NOT NULL
    {% else %}
        >= DATE_SUB(NOW(), INTERVAL {{ days }} DAY)
    {% endif %}
    """,
    variables=["is_admin", "days"]
)

Application:

# Regular user
variables = {"is_admin": False, "days": 90}
# Rendered: >= DATE_SUB(NOW(), INTERVAL 90 DAY)

# Admin
variables = {"is_admin": True, "days": 90}
# Rendered: IS NOT NULL (equivalent to no restriction)

Scenario 4: Department Data Isolation

Requirement: Users can only query data for their department and subordinate departments

Policy Definition:

dept_filter = RowFilter(
    condition="department_hierarchy",
    db_regex=".*",
    table_regex=".*",
    field_regex="dept_id|department_id",
    operator_expression="IN ({{ dept_ids }})",
    variables=["dept_ids"]
)

Application:

# Get user department and subordinate departments
user = authenticate(username, password)
dept_tree = get_department_tree(user.dept_id)
dept_ids = [dept.id for dept in dept_tree]  # ['D001', 'D001-01', 'D001-02']

# Render expression
variables = {
    "dept_ids": ", ".join([f"'{d}'" for d in dept_ids])
}
# Result: IN ('D001', 'D001-01', 'D001-02')

Scenario 5: Data Masking

Requirement: Customer service personnel can query user information, but sensitive fields need masking

Policy Definition:

# Approach 1: Masking at SQL level
mask_filter = RowFilter(
    condition="phone_mask",
    db_regex="public",
    table_regex="users|customers",
    field_regex="phone|mobile",
    operator_expression="",  # Don't filter rows, but modify fields
    variables=[]
)

# Approach 2: Masking at result level
def apply_masking(result: pd.DataFrame, meta: MetaAdmin) -> pd.DataFrame:
    for field in meta.all_fields():
        if field.identifiable_type == IdentifiableType.phone:
            result[field.name] = result[field.name].apply(mask_phone)
    return result

Performance Optimization Practices

1. Condition Caching

class PolicyCache:
    _cache: dict[str, dict] = {}

    @classmethod
    def get_conditions(cls, policy_id: str, variables: dict) -> dict | None:
        cache_key = f"{policy_id}:{hash(frozenset(variables.items()))}"
        return cls._cache.get(cache_key)

    @classmethod
    def set_conditions(cls, policy_id: str, variables: dict, conditions: dict):
        cache_key = f"{policy_id}:{hash(frozenset(variables.items()))}"
        cls._cache[cache_key] = conditions

2. Index Optimization

# Ensure filtering fields have indexes
CREATE INDEX idx_orders_region ON orders(region);
CREATE INDEX idx_orders_tenant_id ON orders(tenant_id);
CREATE INDEX idx_orders_created_at ON orders(created_at);

# Composite index
CREATE INDEX idx_orders_tenant_region ON orders(tenant_id, region);

3. Query Optimization

# Before optimization: multiple AND conditions
WHERE tenant_id = 'T001'
  AND region = 'Beijing'
  AND created_at >= '2024-01-01'

# After optimization: use composite index
# Index: (tenant_id, region, created_at)
# Query plan: use index scan instead of full table scan

Best Practice Recommendations

1. Policy Design Principles

Principle of Least Privilege:

# ✅ Default deny, explicit grant
default_filter = RowFilter(
    condition="default_deny",
    db_regex=".*",
    table_regex=".*",
    field_regex="tenant_id",
    operator_expression="= '{{ tenant_id }}'",
    variables=["tenant_id"]
)

# ❌ Default allow, explicit deny
# Easy to miss, not secure

Layered Policies:

# First layer: Tenant isolation (all users)
tenant_filter = RowFilter(...)

# Second layer: Department isolation (non-admins)
dept_filter = RowFilter(...)

# Third layer: Region isolation (sales personnel)
region_filter = RowFilter(...)

2. Variable Management

# ✅ Get variables from user session
variables = {
    "tenant_id": session.get("tenant_id"),
    "user_id": session.get("user_id"),
    "dept_id": session.get("dept_id"),
    "allowed_regions": session.get("allowed_regions"),
}

# ❌ Get variables from user input
# Easy to tamper with, not secure
variables = {
    "tenant_id": request.args.get("tenant_id")  # Dangerous!
}

3. Audit Logging

def apply_policy(sql: str, policy: RowFilter, variables: dict) -> str:
    # Record policy application
    log.info(f"Applying policy {policy.condition}")
    log.info(f"Original SQL: {sql}")
    log.info(f"Variables: {variables}")

    # Apply policy
    injected_sql = inject_row_filters(sql, policy, variables)

    # Record result
    log.info(f"Injected SQL: {injected_sql}")

    # Audit log
    audit_log.record({
        "user_id": current_user.id,
        "policy": policy.condition,
        "original_sql": sql,
        "injected_sql": injected_sql,
        "variables": variables,
        "timestamp": datetime.now()
    })

    return injected_sql

Summary

AskTable's Policy system implements fine-grained data permission management through row-level security control:

  1. Flexible Matching: Supports regex matching for schemas, tables, fields
  2. Dynamic Rendering: Supports Jinja2 templates, dynamically generate conditions based on user attributes
  3. Automatic Injection: Transparently inject filtering conditions into user SQL
  4. Multi-Layer Policies: Supports tenant isolation, department isolation, region isolation and more
  5. Performance Optimization: Condition caching, index optimization, query optimization

Row-level security is an essential capability for enterprise data analysis platforms. Through the Policy system, AskTable provides flexible permission management while ensuring data security.

cta.readyToSimplify

sidebar.noProgrammingNeededsidebar.startFreeTrial

cta.noCreditCard
cta.quickStart
cta.dbSupport