
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
In enterprise data analysis scenarios, different users should see different data. Sales personnel can only query data for their responsible regions, and finance personnel can only query their department's data. Traditional table-level permission control is too coarse, and AskTable implements flexible row-level security control through the Policy system.
Table-Level Permission:
-- Grant user permission to query orders table
GRANT SELECT ON orders TO user_sales;
Problem:
View Solution:
-- Create view for each region
CREATE VIEW orders_beijing AS
SELECT * FROM orders WHERE region = 'Beijing';
CREATE VIEW orders_shanghai AS
SELECT * FROM orders WHERE region = 'Shanghai';
-- Grant different users different view permissions
GRANT SELECT ON orders_beijing TO user_beijing;
GRANT SELECT ON orders_shanghai TO user_shanghai;
Problem:
Through the Policy system, define flexible row-level filtering rules:
# Define policy: Sales personnel can only query data for their responsible regions
policy = RowFilter(
condition="region_filter",
db_regex=".*", # Match all schemas
table_regex="orders|sales", # Match orders and sales tables
field_regex="region|area", # Match region fields
operator_expression="= '{{ user_region }}'", # Filtering condition
variables=["user_region"] # Required variables
)
# Apply policy
# Original SQL: SELECT * FROM orders
# After injection: SELECT * FROM orders WHERE region = 'Beijing'
Advantages:
The diagram shows the complete process from policy definition to application: matching related fields through regex, rendering filtering conditions using Jinja2, and finally injecting conditions into user SQL for transparent row-level filtering.
class RowFilter:
condition: str # Policy name
db_regex: str # Schema regex
table_regex: str # Table name regex
field_regex: str # Field name regex
operator_expression: str # Filtering expression
variables: list[str] # Required variables
def __init__(
self,
condition: str,
db_regex: str,
table_regex: str,
field_regex: str,
operator_expression: str,
variables: list[str],
):
self.condition = condition
self.db_regex = db_regex
self.table_regex = table_regex
self.field_regex = field_regex
self.operator_expression = operator_expression
self.variables = variables
def __repr__(self):
return f"<Filter {self.condition}>"
Design Highlights:
db_regex: str # Match Schema
table_regex: str # Match table name
field_regex: str # Match field name
Why three layers?
Scenario 1: Cross-table Unified Filtering
# All tables with user_id field filter by user
RowFilter(
condition="user_filter",
db_regex=".*", # All schemas
table_regex=".*", # All tables
field_regex="user_id", # Only match user_id field
operator_expression="= '{{ current_user_id }}'",
variables=["current_user_id"]
)
# Applied to:
# - orders table: WHERE user_id = '123'
# - payments table: WHERE user_id = '123'
# - reviews table: WHERE user_id = '123'
Scenario 2: Specific Table Specific Field
# Only filter region fields in orders table
RowFilter(
condition="region_filter",
db_regex="public", # Only match public schema
table_regex="orders|order_items", # Only match orders-related tables
field_regex="region|area|province", # Match region-related fields
operator_expression="IN ({{ allowed_regions }})",
variables=["allowed_regions"]
)
Scenario 3: Exclude Specific Tables
# Except admin tables, all other tables filter by department
RowFilter(
condition="department_filter",
db_regex=".*",
table_regex="^(?!admin_).*", # Exclude tables starting with admin_
field_regex="dept_id|department_id",
operator_expression="= '{{ user_dept_id }}'",
variables=["user_dept_id"]
)
operator_expression: str # Supports Jinja2 template
variables: list[str] # Required variables
Simple Variable Replacement:
operator_expression = "= '{{ user_id }}'"
variables = {"user_id": "123"}
# Rendered result: = '123'
Complex Logic:
operator_expression = """
{% if is_admin %}
IS NOT NULL
{% else %}
IN ({{ allowed_values | join(', ') }})
{% endif %}
"""
variables = {
"is_admin": False,
"allowed_values": ["'Beijing'", "'Shanghai'", "'Guangzhou'"]
}
# Rendered result: IN ('Beijing', 'Shanghai', 'Guangzhou')
Date Calculation:
operator_expression = ">= DATE_SUB(NOW(), INTERVAL {{ days }} DAY)"
variables = {"days": 30}
# Rendered result: >= DATE_SUB(NOW(), INTERVAL 30 DAY)
def get_real_conditions_by_table(
self,
meta: MetaAdmin,
table_full_names: list[str],
variables: dict[str, str] | None = None,
):
"""
Return real conditions for all corresponding tables
Return result:
{
"schema1.table1": [
"schema1.table1.user_id = '123'",
"schema1.table1.region = 'Beijing'"
],
"schema1.table2": [
"schema1.table2.user_id = '123'"
]
}
"""
result = {}
log.debug(
f"Filter {self} by {meta} in {table_full_names} with variables {variables}"
)
# 1. Filter metadata by regex
filtered_meta = meta.filter_by_regex(
self.db_regex, self.table_regex, self.field_regex
)
log.debug(f"Filtered meta: {filtered_meta}")
# 2. Render expression
if variables:
expr = render_jinja2_template(self.operator_expression, variables)
log.debug(
f"Rendered expr: {expr}, operator_expression: {self.operator_expression}, variables: {variables}"
)
else:
expr = self.operator_expression
log.debug(
f"Rendered expr: {expr}, operator_expression: {self.operator_expression}"
)
# 3. Generate conditions for each matched field
for schema in filtered_meta.schemas.values():
for table in schema.tables.values():
if table.full_name in table_full_names:
result[table.full_name] = [
f"{field.full_name} {expr}" for field in table.fields.values()
]
log.debug(f"Conditions: {result}")
total_conds = sum([len(conds) for conds in result.values()])
log.info(
f"Get {total_conds} conditions by {self} in {len(table_full_names)} tables."
)
return result
Execution Process:
Example:
# Policy definition
policy = RowFilter(
condition="user_region_filter",
db_regex="public",
table_regex="orders|sales",
field_regex="region",
operator_expression="= '{{ user_region }}'",
variables=["user_region"]
)
# Apply policy
conditions = policy.get_real_conditions_by_table(
meta=meta,
table_full_names=["public.orders", "public.sales"],
variables={"user_region": "Beijing"}
)
# Result
{
"public.orders": [
"public.orders.region = 'Beijing'"
],
"public.sales": [
"public.sales.region = 'Beijing'"
]
}
Inject policy conditions into user's SQL query:
def inject_row_filters(
original_sql: str,
conditions: dict[str, list[str]],
dialect: str = "mysql"
) -> str:
"""
Inject row-level filtering conditions into SQL
"""
import sqlglot
# Parse SQL
parsed = sqlglot.parse_one(original_sql, dialect=dialect)
# Traverse all table references
for table_node in parsed.find_all(sqlglot.exp.Table):
table_name = table_node.name
schema_name = table_node.db or "public"
full_name = f"{schema_name}.{table_name}"
# Get filtering conditions for this table
if full_name in conditions:
table_conditions = conditions[full_name]
# Build WHERE condition
for condition_str in table_conditions:
condition_expr = sqlglot.parse_one(condition_str, dialect=dialect)
# Inject into WHERE clause
if parsed.args.get("where"):
# Has WHERE, connect with AND
parsed.args["where"] = sqlglot.exp.And(
this=parsed.args["where"],
expression=condition_expr
)
else:
# No WHERE, add directly
parsed.args["where"] = condition_expr
# Generate new SQL
return parsed.sql(dialect=dialect)
Example:
# Original SQL
original_sql = """
SELECT
o.order_id,
o.amount,
c.customer_name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'completed'
"""
# Policy conditions
conditions = {
"public.orders": ["public.orders.region = 'Beijing'"],
"public.customers": ["public.customers.region = 'Beijing'"]
}
# Injected SQL
injected_sql = inject_row_filters(original_sql, conditions)
# Result
"""
SELECT
o.order_id,
o.amount,
c.customer_name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'completed'
AND o.region = 'Beijing'
AND c.region = 'Beijing'
"""
Requirement: SaaS platform, each tenant can only query their own data
Policy Definition:
tenant_filter = RowFilter(
condition="tenant_isolation",
db_regex=".*", # All schemas
table_regex=".*", # All tables
field_regex="tenant_id|org_id|company_id", # Tenant ID fields
operator_expression="= '{{ tenant_id }}'",
variables=["tenant_id"]
)
Application:
# Get tenant ID when user logs in
user = authenticate(username, password)
tenant_id = user.tenant_id
# Automatically inject tenant filtering during query
# Original: SELECT * FROM orders
# Injected: SELECT * FROM orders WHERE tenant_id = 'tenant_001'
Effect:
Requirement: Sales personnel can only query data for their responsible regions
Policy Definition:
region_filter = RowFilter(
condition="region_access",
db_regex="public",
table_regex="orders|sales|customers",
field_regex="region|area|province|city",
operator_expression="IN ({{ allowed_regions }})",
variables=["allowed_regions"]
)
Application:
# Get responsible regions when user logs in
user = authenticate(username, password)
allowed_regions = user.get_allowed_regions() # ['Beijing', 'Tianjin', 'Hebei']
# Render expression
variables = {
"allowed_regions": ", ".join([f"'{r}'" for r in allowed_regions])
}
# Result: IN ('Beijing', 'Tianjin', 'Hebei')
# Automatically inject during query
# Original: SELECT * FROM orders
# Injected: SELECT * FROM orders WHERE region IN ('Beijing', 'Tianjin', 'Hebei')
Requirement: Regular users can only query data from the last 90 days, admins have no restriction
Policy Definition:
time_filter = RowFilter(
condition="time_range",
db_regex=".*",
table_regex=".*",
field_regex="created_at|updated_at|order_date|sale_date",
operator_expression="""
{% if is_admin %}
IS NOT NULL
{% else %}
>= DATE_SUB(NOW(), INTERVAL {{ days }} DAY)
{% endif %}
""",
variables=["is_admin", "days"]
)
Application:
# Regular user
variables = {"is_admin": False, "days": 90}
# Rendered: >= DATE_SUB(NOW(), INTERVAL 90 DAY)
# Admin
variables = {"is_admin": True, "days": 90}
# Rendered: IS NOT NULL (equivalent to no restriction)
Requirement: Users can only query data for their department and subordinate departments
Policy Definition:
dept_filter = RowFilter(
condition="department_hierarchy",
db_regex=".*",
table_regex=".*",
field_regex="dept_id|department_id",
operator_expression="IN ({{ dept_ids }})",
variables=["dept_ids"]
)
Application:
# Get user department and subordinate departments
user = authenticate(username, password)
dept_tree = get_department_tree(user.dept_id)
dept_ids = [dept.id for dept in dept_tree] # ['D001', 'D001-01', 'D001-02']
# Render expression
variables = {
"dept_ids": ", ".join([f"'{d}'" for d in dept_ids])
}
# Result: IN ('D001', 'D001-01', 'D001-02')
Requirement: Customer service personnel can query user information, but sensitive fields need masking
Policy Definition:
# Approach 1: Masking at SQL level
mask_filter = RowFilter(
condition="phone_mask",
db_regex="public",
table_regex="users|customers",
field_regex="phone|mobile",
operator_expression="", # Don't filter rows, but modify fields
variables=[]
)
# Approach 2: Masking at result level
def apply_masking(result: pd.DataFrame, meta: MetaAdmin) -> pd.DataFrame:
for field in meta.all_fields():
if field.identifiable_type == IdentifiableType.phone:
result[field.name] = result[field.name].apply(mask_phone)
return result
class PolicyCache:
_cache: dict[str, dict] = {}
@classmethod
def get_conditions(cls, policy_id: str, variables: dict) -> dict | None:
cache_key = f"{policy_id}:{hash(frozenset(variables.items()))}"
return cls._cache.get(cache_key)
@classmethod
def set_conditions(cls, policy_id: str, variables: dict, conditions: dict):
cache_key = f"{policy_id}:{hash(frozenset(variables.items()))}"
cls._cache[cache_key] = conditions
# Ensure filtering fields have indexes
CREATE INDEX idx_orders_region ON orders(region);
CREATE INDEX idx_orders_tenant_id ON orders(tenant_id);
CREATE INDEX idx_orders_created_at ON orders(created_at);
# Composite index
CREATE INDEX idx_orders_tenant_region ON orders(tenant_id, region);
# Before optimization: multiple AND conditions
WHERE tenant_id = 'T001'
AND region = 'Beijing'
AND created_at >= '2024-01-01'
# After optimization: use composite index
# Index: (tenant_id, region, created_at)
# Query plan: use index scan instead of full table scan
Principle of Least Privilege:
# ✅ Default deny, explicit grant
default_filter = RowFilter(
condition="default_deny",
db_regex=".*",
table_regex=".*",
field_regex="tenant_id",
operator_expression="= '{{ tenant_id }}'",
variables=["tenant_id"]
)
# ❌ Default allow, explicit deny
# Easy to miss, not secure
Layered Policies:
# First layer: Tenant isolation (all users)
tenant_filter = RowFilter(...)
# Second layer: Department isolation (non-admins)
dept_filter = RowFilter(...)
# Third layer: Region isolation (sales personnel)
region_filter = RowFilter(...)
# ✅ Get variables from user session
variables = {
"tenant_id": session.get("tenant_id"),
"user_id": session.get("user_id"),
"dept_id": session.get("dept_id"),
"allowed_regions": session.get("allowed_regions"),
}
# ❌ Get variables from user input
# Easy to tamper with, not secure
variables = {
"tenant_id": request.args.get("tenant_id") # Dangerous!
}
def apply_policy(sql: str, policy: RowFilter, variables: dict) -> str:
# Record policy application
log.info(f"Applying policy {policy.condition}")
log.info(f"Original SQL: {sql}")
log.info(f"Variables: {variables}")
# Apply policy
injected_sql = inject_row_filters(sql, policy, variables)
# Record result
log.info(f"Injected SQL: {injected_sql}")
# Audit log
audit_log.record({
"user_id": current_user.id,
"policy": policy.condition,
"original_sql": sql,
"injected_sql": injected_sql,
"variables": variables,
"timestamp": datetime.now()
})
return injected_sql
AskTable's Policy system implements fine-grained data permission management through row-level security control:
Row-level security is an essential capability for enterprise data analysis platforms. Through the Policy system, AskTable provides flexible permission management while ensuring data security.
sidebar.noProgrammingNeeded
sidebar.startFreeTrial