AskTable

隔离的 Python 执行器 - 远程代码安全执行

AskTable 团队
AskTable 团队 2026年3月4日

隔离的 Python 执行器 - 远程代码安全执行

在 AI 驱动的数据分析系统中,让 LLM 生成的 Python 代码安全执行是一个核心挑战。直接在主进程中执行用户代码存在巨大的安全风险。AskTable 的 PythonExecutor 采用了远程执行架构,实现了代码的安全隔离执行。

架构设计

加载图表中...

核心设计原则

1. 执行器隔离

每个执行器实例运行在独立的环境中:

class PythonExecutor:
    def __init__(self, packages: list[str] = DEFAULT_PACKAGES):
        self._base_url = BASE_URL
        self._packages = packages
        # 创建隔离的执行器实例
        self._executor_id = self._create_executor(packages)

隔离特性

2. 包白名单

只允许使用创建时指定的包:

DEFAULT_PACKAGES = ["pandas", "numpy", "scipy"]

# 创建执行器时指定允许的包
executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])

安全优势

3. 远程执行

代码在远程服务中执行,与主进程完全隔离:

def _execute(self, code: str) -> CodeExecutionResponse:
    response = requests.post(
        f"{self._base_url}/python/executor/{self._executor_id}/code",
        json={"code": code},
    )
    return response.json()

执行器生命周期

1. 创建执行器

def _create_executor(self, packages: list[str]) -> str:
    """调用 POST /python/executor 创建执行器"""
    response = requests.post(
        f"{self._base_url}/python/executor",
        json={"packages": packages},
    )
    if response.status_code != 201:
        raise Exception(f"Failed to create executor: {response.text}")

    data = response.json()
    return data["executor_id"]

服务端实现(伪代码):

@app.post("/python/executor")
def create_executor(request: CreateExecutorRequest):
    # 1. 创建隔离环境
    executor_id = generate_unique_id()
    env = create_isolated_environment(executor_id)

    # 2. 安装指定的包
    for package in request.packages:
        if package not in ALLOWED_PACKAGES:
            raise ValueError(f"Package {package} is not allowed")
        env.install_package(package)

    # 3. 初始化全局变量
    env.globals["_saved_dataframes"] = {}
    env.globals["load_dataframe"] = load_dataframe_func
    env.globals["save_dataframe"] = save_dataframe_func

    # 4. 保存执行器
    executors[executor_id] = env

    return {"executor_id": executor_id}

2. 执行代码

def execute(
    self,
    code: str,
    variables: dict[str, Any] = {},
    dataframes: dict[str, pd.DataFrame] = {},
) -> CodeExecutionResponse:
    # 1. 注入变量
    if variables:
        pickled_vars = base64.b64encode(pickle.dumps(variables)).decode()
        code_inject = f"""
vars_dict = pickle.loads(base64.b64decode('{pickled_vars}'))
locals().update(vars_dict)
"""
        self._execute(code=code_inject)

    # 2. 注入 DataFrame
    if dataframes:
        self.send_dataframes(dataframes)

    # 3. 执行用户代码
    return self._execute(code=code)

3. 销毁执行器

def close(self):
    """销毁执行器,释放资源"""
    response = requests.delete(
        f"{self._base_url}/python/executor/{self._executor_id}"
    )
    if response.status_code != 204:
        raise Exception(f"Failed to close executor: {response.text}")

DataFrame 序列化

1. 发送 DataFrame

使用 Pickle + Base64 序列化:

def send_dataframes(self, dataframes: dict[str, pd.DataFrame]) -> None:
    for df_name, df in dataframes.items():
        # 序列化 DataFrame
        pickled_df = base64.b64encode(pickle.dumps(df)).decode()

        # 注入到执行环境
        code = dedent(f"""
            import pickle
            import base64
            df = pickle.loads(base64.b64decode('{pickled_df}'))
            _saved_dataframes['{df_name}'] = {{
                "df": df,
                "description": "initial dataframe"
            }}
        """)
        self._execute(code=code)

为什么使用 Pickle + Base64?

  1. Pickle:Python 原生序列化,支持复杂对象
  2. Base64:文本编码,便于 JSON 传输
  3. 高效:比 JSON 更快,支持更多数据类型

2. 接收 DataFrame

def _execute(self, code: str) -> CodeExecutionResponse:
    response = requests.post(
        f"{self._base_url}/python/executor/{self._executor_id}/code",
        json={"code": code},
    )
    response_json = response.json()

    # 反序列化 DataFrame
    dataframes = {}
    for df_name, df_data in response_json.get("dataframes", {}).items():
        raw_df = pickle.loads(base64.b64decode(df_data))
        if raw_df:
            dataframes[df_name] = ResponseDataframe(
                df=raw_df.get("df"),
                description=raw_df.get("description"),
                sql=None,
            )

    return CodeExecutionResponse(
        stdout=response_json.get("stdout"),
        error=response_json.get("error"),
        executor_id=response_json.get("executor_id"),
        dataframes=dataframes,
    )

内置函数

执行器提供两个内置函数用于 DataFrame 管理:

1. load_dataframe

def load_dataframe(df_id: str) -> pd.DataFrame:
    """加载 DataFrame"""
    if df_id not in _saved_dataframes:
        raise ValueError(f"DataFrame {df_id} not found")
    return _saved_dataframes[df_id]["df"]

2. save_dataframe

def save_dataframe(df: pd.DataFrame, description: str) -> str:
    """保存 DataFrame 并返回 ID"""
    df_id = f"df_{generate_id()}"
    _saved_dataframes[df_id] = {
        "df": df,
        "description": description
    }
    return df_id

实际应用示例

示例 1: 数据清洗

executor = PythonExecutor(packages=["pandas", "numpy"])

# 发送原始数据
executor.send_dataframes({"df_raw": raw_df})

# 执行清洗代码
code = """
import pandas as pd
import numpy as np

# 加载数据
df = load_dataframe('df_raw')

# 数据清洗
df_cleaned = df.copy()
df_cleaned = df_cleaned.dropna()
df_cleaned['date'] = pd.to_datetime(df_cleaned['date'])
df_cleaned = df_cleaned[df_cleaned['amount'] > 0]

# 保存结果
df_id = save_dataframe(df_cleaned, "清洗后的数据")
print(f"清洗完成,共 {len(df_cleaned)} 条记录")
print(f"DataFrame ID: {df_id}")
"""

result = executor.execute(code)
print(result["stdout"])
# 输出: 清洗完成,共 1234 条记录
#      DataFrame ID: df_abc123

# 获取清洗后的数据
cleaned_df = result["dataframes"]["df_abc123"]["df"]

示例 2: 统计分析

executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])

code = """
import pandas as pd
import numpy as np
from scipy import stats

# 加载数据
df = load_dataframe('df_sales')

# 基础统计
print("=== 基础统计 ===")
print(f"平均值: {df['amount'].mean():.2f}")
print(f"中位数: {df['amount'].median():.2f}")
print(f"标准差: {df['amount'].std():.2f}")

# 相关性分析
print("\\n=== 相关性分析 ===")
corr = df[['amount', 'quantity', 'price']].corr()
print(corr)

# 假设检验
print("\\n=== 假设检验 ===")
group_a = df[df['region'] == 'A']['amount']
group_b = df[df['region'] == 'B']['amount']
t_stat, p_value = stats.ttest_ind(group_a, group_b)
print(f"t-statistic: {t_stat:.4f}")
print(f"p-value: {p_value:.4f}")
"""

result = executor.execute(code)
print(result["stdout"])

示例 3: 数据转换

code = """
import pandas as pd

# 加载数据
df = load_dataframe('df_orders')

# 时间序列聚合
df['date'] = pd.to_datetime(df['date'])
df['month'] = df['date'].dt.to_period('M')

monthly = df.groupby('month').agg({
    'order_id': 'count',
    'amount': 'sum',
    'customer_id': 'nunique'
}).reset_index()

monthly.columns = ['月份', '订单数', '销售额', '客户数']

# 保存结果
df_id = save_dataframe(monthly, "月度汇总")
print(f"汇总完成,共 {len(monthly)} 个月")
"""

result = executor.execute(code)
monthly_df = result["dataframes"][list(result["dataframes"].keys())[0]]["df"]

示例 4: 变量注入

# 注入变量
variables = {
    "threshold": 1000,
    "start_date": "2024-01-01",
    "end_date": "2024-12-31"
}

code = """
import pandas as pd

df = load_dataframe('df_sales')

# 使用注入的变量
df_filtered = df[
    (df['amount'] > threshold) &
    (df['date'] >= start_date) &
    (df['date'] <= end_date)
]

print(f"筛选条件: amount > {threshold}, date in [{start_date}, {end_date}]")
print(f"筛选后记录数: {len(df_filtered)}")

df_id = save_dataframe(df_filtered, "筛选后的数据")
"""

result = executor.execute(code, variables=variables)

安全机制

1. 包白名单

ALLOWED_PACKAGES = [
    "pandas", "numpy", "scipy",
    "scikit-learn", "statsmodels",
    # 禁止: os, subprocess, requests, socket, etc.
]

2. 超时控制

# 服务端配置
EXECUTION_TIMEOUT = 30  # 30 秒超时

@app.post("/python/executor/{executor_id}/code")
async def execute_code(executor_id: str, request: ExecuteCodeRequest):
    try:
        result = await asyncio.wait_for(
            execute_in_env(executor_id, request.code),
            timeout=EXECUTION_TIMEOUT
        )
        return result
    except asyncio.TimeoutError:
        return {"error": "Execution timeout"}

3. 资源限制

# Docker 容器资源限制
docker run \
    --memory="512m" \
    --cpus="1.0" \
    --network="none" \
    python-executor

4. 文件系统隔离

# 只读文件系统
docker run \
    --read-only \
    --tmpfs /tmp:rw,noexec,nosuid,size=100m \
    python-executor

性能优化

1. 执行器复用

class DataAnalysisAgent:
    def __init__(self):
        # 创建一次,多次使用
        self.executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])

    def analyze(self, df: pd.DataFrame):
        # 复用执行器
        self.executor.send_dataframes({"df": df})
        result = self.executor.execute(analysis_code)
        return result

2. DataFrame 缓存

# 第一次发送
executor.send_dataframes({"df_1": df1, "df_2": df2})

# 后续代码可直接使用,无需重复发送
code1 = "df = load_dataframe('df_1'); ..."
code2 = "df = load_dataframe('df_2'); ..."

3. 批量操作

# 合并多个操作为一次执行
code = """
df1 = load_dataframe('df_1')
df2 = load_dataframe('df_2')

# 操作 1
result1 = df1.groupby('region')['sales'].sum()

# 操作 2
result2 = df2.merge(df1, on='id')

# 操作 3
final = result2[result2['amount'] > 1000]

save_dataframe(final, "最终结果")
"""

4. 连接池

# 使用连接池复用 HTTP 连接
session = requests.Session()
adapter = HTTPAdapter(pool_connections=10, pool_maxsize=20)
session.mount('http://', adapter)

错误处理

1. 执行错误

result = executor.execute(code)

if result["error"]:
    print(f"执行错误: {result['error']}")
else:
    print(f"执行成功: {result['stdout']}")

2. 序列化错误

try:
    executor.send_dataframes({"df": df})
except Exception as e:
    print(f"DataFrame 序列化失败: {e}")

3. 超时错误

try:
    result = executor.execute(code, timeout=30)
except TimeoutError:
    print("代码执行超时")

技术优势

1. 安全性

2. 灵活性

3. 性能

总结

AskTable 的 PythonExecutor 通过远程执行架构,实现了安全、高效的 Python 代码执行:

  1. 隔离执行:独立的执行环境,完全隔离
  2. 包白名单:只允许安全的数据分析包
  3. DataFrame 序列化:Pickle + Base64 高效传输
  4. 资源控制:超时、内存、CPU 限制
  5. 易于使用:简洁的 API,内置 DataFrame 管理

这种架构不仅保证了安全性,还为 LLM 生成的数据分析代码提供了可靠的执行环境。

相关资源