
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
In enterprise data analysis scenarios, data security is paramount. Users of different roles should only see data within their permission scope: sales can only see orders in their region, finance can only see approved reports, and management can see global data.
Traditional permission control solutions often require manually concatenating WHERE conditions in the application layer, which is error-prone and struggles to handle complex SQL queries (subqueries, CTE, UNION, etc.). AskTable's SQL Permission Guard implements transparent, secure, and high-performance row-level permission control through SQLGlot AST rewriting technology.
This article will deeply analyze the design and implementation of this permission engine.
Application Layer WHERE Concatenation:
# Traditional approach: manually concatenate permission conditions
if user.role == "sales":
sql = f"SELECT * FROM orders WHERE region = '{user.region}'"
else:
sql = "SELECT * FROM orders"
Problems:
Database View Solution:
-- Create view for each role
CREATE VIEW sales_orders AS
SELECT * FROM orders WHERE region = 'East';
Problems:
user_id)✅ Transparency: Permission control is transparent to AI and users, no manual handling needed
✅ Security: Automatically inject permission conditions, no omissions, no injection risk
✅ Completeness: Support all SQL syntax (subqueries, CTE, UNION, JOIN)
✅ Flexibility: Support dynamic variables (like {{user_id}}) and wildcard rules
✅ High Performance: Low AST rewriting overhead (< 10ms)
SQLGlot is a powerful SQL parser and transformer supporting 20+ SQL dialects.
Core Advantages:
Why Not Regular Expressions?
SQLGlot's Scope is the key to permission injection. Scope represents a query scope in SQL (SELECT, subquery, CTE, etc.), containing:
sources: Tables and aliases in current scopederived_table_scopes: Scope of derived tables (subqueries)cte_scopes: Scope of CTEssubquery_scopes: Subquery scope in WHERE/HAVINGunion_scopes: Scope of UNIONBy recursively traversing the Scope tree, we can inject permission conditions at each query level.
AskTable's SQL Permission Guard uses a clear layered architecture:
A permission rule is a simple SQL conditional expression:
# Example rules
rules = [
"orders.region = 'East'", # Fixed value
"orders.user_id = {{user_id}}", # Jinja2 variable
"*.*.status IN ('approved', 'completed')", # Wildcard (all tables' status field)
]
Rule Format:
schema.table.field <operator> <value>=, !=, >, <, >=, <=, IN, LIKE, IS, NOT IN, NOT LIKE, IS NOT* matches all schemas or tablesdef parse_rule(
rule: str, variables: dict[str, str], sql: str, dialect: str
) -> list[Rule]:
# 1. Render Jinja2 variables
template = Template(rule)
rule_str = template.render(**variables)
# 2. Replace wildcards (* -> ALL)
rule_str = rule_str.replace("*", "ALL")
# 3. Parse rule AST
rule_ast = sqlglot.parse_one(rule_str, read=dialect)
sql_ast = sqlglot.parse_one(sql, read=dialect)
# 4. Extract column info from rule
rule_column = extract_column_from_rule(rule_ast)
# 5. Traverse all tables in SQL, match rules
rules: list[Rule] = []
for table in sql_ast.find_all(exp.Table):
# Check if schema and table match
schema_matches = rule_column.db == "ALL" or rule_column.db == table.db
table_matches = rule_column.table == "ALL" or rule_column.table == table.name
if schema_matches and table_matches:
# Generate rule for this table
qualified_ast = qualify_rule_for_table(rule_ast, table)
rules.append(Rule(
schema=table.db,
table=table.name,
field=rule_column.name,
ast=qualified_ast,
))
return rules
Key Points:
{{user_id}})*.*.* can match specified fields of all tablesExample:
# Input rule
rule = "*.*.status = 'approved'"
# Input SQL
sql = "SELECT * FROM orders JOIN products ON orders.product_id = products.id"
# Parsed rules
[
Rule(schema="public", table="orders", field="status", ast="orders.status = 'approved'"),
Rule(schema="public", table="products", field="status", ast="products.status = 'approved'"),
]
class SqlPermissionGuard:
def apply_rules(
self, sql: str, rules: list[str], variables: dict[str, str] | None = None
) -> str:
# 1. Parse SQL and build Scope tree
ast = sqlglot.parse_one(sql, read=self.dialect).copy()
root_scope = build_scope(ast)
# 2. Parse all rules
parsed_rules = list(
itertools.chain.from_iterable(
parse_rule(r, variables, sql, self.dialect) for r in rules
)
)
# 3. Recursively process Scope tree
self._process_scope(root_scope, parsed_rules)
# 4. Generate new SQL
return ast.sql(dialect=self.dialect)
def _process_scope(self, scope: Scope, rules: list[Rule]):
"""Recursively process Scope and all its sub-Scopes"""
# 1. Build permission conditions for current Scope
conditions = self._build_conditions_for_scope(scope, rules)
if conditions:
self._inject_conditions(scope, conditions)
# 2. Recursively process all sub-Scopes
for subscope in self._get_all_subscopes(scope):
self._process_scope(subscope, rules)
def _get_all_subscopes(self, scope: Scope) -> list[Scope]:
"""Get all sub-Scopes (subqueries, CTE, UNION, etc.)"""
return (
scope.derived_table_scopes # Derived tables (subqueries)
+ scope.cte_scopes # CTE
+ scope.subquery_scopes # Subqueries in WHERE/HAVING
+ scope.union_scopes # UNION
+ scope.udtf_scopes # User-defined table functions
)
Key Points:
def _inject_conditions(self, scope: Scope, conditions: list[exp.Expression]):
"""Inject permission conditions into WHERE clause"""
# 1. Find SELECT statement
select = (
scope.expression
if isinstance(scope.expression, exp.Select)
else scope.expression.find(exp.Select)
)
if not select:
return
# 2. Combine all conditions (deduplicate)
combined = self._combine_conditions(conditions)
if not combined:
return
# 3. Inject into WHERE clause
where = select.find(exp.Where)
if where:
# Already has WHERE: connect with AND
where.set("this", exp.And(this=where.this, expression=combined))
else:
# No WHERE: create new WHERE clause
select.set("where", exp.Where(this=combined))
def _combine_conditions(self, conditions: list[exp.Expression]) -> exp.Expression | None:
"""Combine multiple conditions, deduplicate and connect with AND"""
if not conditions:
return None
# Deduplicate (based on SQL string)
unique_conditions = []
seen = set()
for cond in conditions:
cond_str = cond.sql(dialect=self.dialect)
if cond_str not in seen:
seen.add(cond_str)
unique_conditions.append(cond)
# Connect with AND
if len(unique_conditions) == 1:
return unique_conditions[0]
result = unique_conditions[0]
for cond in unique_conditions[1:]:
result = exp.And(this=result, expression=cond)
return result
Key Points:
Example:
# Original SQL
sql = "SELECT * FROM orders WHERE status = 'pending'"
# Permission rules
rules = ["orders.region = 'East'"]
# Rewritten SQL
"SELECT * FROM orders WHERE status = 'pending' AND orders.region = 'East'"
Tables in SQL may have aliases, permission conditions need to use correct references:
def build_conditions(
self, rule: Rule, tables: list[tuple[str | None, exp.Table]]
) -> list[exp.Expression]:
"""Build permission conditions for matched tables"""
conditions = []
for alias, table in tables:
condition = rule.ast.copy()
column = condition.find(exp.Column)
if column and column.table:
if alias:
# Has alias: use alias
column.set("table", exp.to_identifier(alias))
column.set("db", None)
else:
# No alias: use full table name
column.set("table", exp.to_identifier(table.this.this))
column.set("db", exp.to_identifier(table.db))
column.set("catalog", None)
conditions.append(condition)
return conditions
Example:
# Original SQL (has alias)
sql = "SELECT * FROM orders o WHERE o.status = 'pending'"
# Permission rules
rules = ["orders.region = 'East'"]
# Rewritten SQL (uses alias o)
"SELECT * FROM orders o WHERE o.status = 'pending' AND o.region = 'East'"
# Original SQL
sql = """
SELECT * FROM (
SELECT * FROM orders WHERE status = 'pending'
) AS pending_orders
"""
# Permission rules
rules = ["orders.region = 'East'"]
# Rewritten SQL (subquery internals also injected)
"""
SELECT * FROM (
SELECT * FROM orders WHERE status = 'pending' AND orders.region = 'East'
) AS pending_orders
"""
Key Points:
# Original SQL
sql = """
WITH pending_orders AS (
SELECT * FROM orders WHERE status = 'pending'
)
SELECT * FROM pending_orders
"""
# Permission rules
rules = ["orders.region = 'East'"]
# Rewritten SQL
"""
WITH pending_orders AS (
SELECT * FROM orders WHERE status = 'pending' AND orders.region = 'East'
)
SELECT * FROM pending_orders
"""
# Original SQL
sql = """
SELECT * FROM orders WHERE status = 'pending'
UNION
SELECT * FROM orders WHERE status = 'approved'
"""
# Permission rules
rules = ["orders.region = 'East'"]
# Rewritten SQL (both SELECTs injected)
"""
SELECT * FROM orders WHERE status = 'pending' AND orders.region = 'East'
UNION
SELECT * FROM orders WHERE status = 'approved' AND orders.region = 'East'
"""
# Original SQL
sql = """
SELECT o.*, p.name
FROM orders o
JOIN products p ON o.product_id = p.id
WHERE o.status = 'pending'
"""
# Permission rules
rules = [
"orders.region = 'East'",
"products.category = 'Electronics'"
]
# Rewritten SQL (both tables injected)
"""
SELECT o.*, p.name
FROM orders o
JOIN products p ON o.product_id = p.id
WHERE o.status = 'pending' AND o.region = 'East' AND p.category = 'Electronics'
"""
AST Rewriting Overhead:
Optimization Strategies:
SQL Injection Prevention:
Permission Omission Prevention:
Compatibility Assurance:
# User info
user = {"role": "sales", "region": "East"}
# Permission rules
rules = ["orders.region = '{{region}}'"]
# Original SQL (generated by AI)
sql = "SELECT SUM(amount) FROM orders WHERE status = 'completed'"
# Apply permissions
filtered_sql = guard(
sql=sql,
sql_dialect="mysql",
rules=rules,
variables={"region": user["region"]}
)
# Result
"SELECT SUM(amount) FROM orders WHERE status = 'completed' AND orders.region = 'East'"
# Permission rules (all tables' deleted field must be 0)
rules = ["*.*.deleted = 0"]
# Original SQL
sql = """
SELECT o.*, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'pending'
"""
# Apply permissions
filtered_sql = guard(sql=sql, sql_dialect="postgres", rules=rules)
# Result (both tables injected)
"""
SELECT o.*, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'pending' AND o.deleted = 0 AND c.deleted = 0
"""
# Permission rules
rules = ["orders.user_id = {{user_id}}"]
# Original SQL (contains subquery and CTE)
sql = """
WITH monthly_sales AS (
SELECT DATE_TRUNC('month', order_date) AS month, SUM(amount) AS total
FROM orders
WHERE status = 'completed'
GROUP BY month
)
SELECT * FROM monthly_sales
WHERE total > (
SELECT AVG(total) FROM monthly_sales
)
"""
# Apply permissions
filtered_sql = guard(
sql=sql,
sql_dialect="postgres",
rules=rules,
variables={"user_id": "12345"}
)
# Result (CTE internals injected)
"""
WITH monthly_sales AS (
SELECT DATE_TRUNC('month', order_date) AS month, SUM(amount) AS total
FROM orders
WHERE status = 'completed' AND orders.user_id = '12345'
GROUP BY month
)
SELECT * FROM monthly_sales
WHERE total > (
SELECT AVG(total) FROM monthly_sales
)
"""
| Solution | Transparency | Completeness | Security | Performance | Maintenance Cost |
|---|---|---|---|---|---|
| AST Rewriting | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| Application Layer Concatenation | ⭐⭐ | ⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐ |
| Database Views | ⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐ |
| ORM Filters | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| Row-Level Security | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐ |
Why AskTable chose AST rewriting:
Problem: Different databases have different identifier quoting rules.
Solution:
`table`"table""TABLE" (uppercase)# Decide whether to quote based on dialect
should_quote = self.dialect != "oracle"
column.set("table", exp.to_identifier(name=table_name, quoted=should_quote))
Problem: How to distinguish real aliases from table names?
-- Real alias
SELECT * FROM orders o
-- Table name (no alias)
SELECT * FROM orders
Solution:
has_real_alias = source.alias and source.alias != source.name
Problem: Same table appears multiple times in SQL, will inject conditions repeatedly.
Solution:
set to record already-injected conditionsProblem: What to do when AST rewriting fails?
Solution:
try:
filtered_sql = guard(sql, dialect, rules, variables)
except Exception:
# Fallback to LLM rewriting
filtered_sql = await llm_rewrite_sql(sql, rules)
AskTable's SQL Permission Guard implements:
✅ Transparency: AI and users don't need to perceive permission control ✅ Completeness: Support all SQL syntax (subqueries, CTE, UNION, JOIN) ✅ Security: No SQL injection risk, no permission omissions ✅ Flexibility: Support dynamic variables and wildcard rules ✅ High Performance: AST rewriting overhead < 10ms
We plan to open source the SQL Permission Guard to help more teams build secure data analysis systems. Stay tuned!
Related Reading:
Technical Exchange:
sidebar.noProgrammingNeeded
sidebar.startFreeTrial