
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在 AI 驱动的数据分析系统中,让 LLM 生成的 Python 代码安全执行是一个核心挑战。直接在主进程中执行用户代码存在巨大的安全风险。AskTable 的 PythonExecutor 采用了远程执行架构,实现了代码的安全隔离执行。
加载图表中...
每个执行器实例运行在独立的环境中:
class PythonExecutor: def __init__(self, packages: list[str] = DEFAULT_PACKAGES): self._base_url = BASE_URL self._packages = packages # 创建隔离的执行器实例 self._executor_id = self._create_executor(packages)
隔离特性:
只允许使用创建时指定的包:
DEFAULT_PACKAGES = ["pandas", "numpy", "scipy"] # 创建执行器时指定允许的包 executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])
安全优势:
os、subprocess 等危险包requests、urllib 等网络包pickle(除了内部序列化)代码在远程服务中执行,与主进程完全隔离:
def _execute(self, code: str) -> CodeExecutionResponse: response = requests.post( f"{self._base_url}/python/executor/{self._executor_id}/code", json={"code": code}, ) return response.json()
def _create_executor(self, packages: list[str]) -> str: """调用 POST /python/executor 创建执行器""" response = requests.post( f"{self._base_url}/python/executor", json={"packages": packages}, ) if response.status_code != 201: raise Exception(f"Failed to create executor: {response.text}") data = response.json() return data["executor_id"]
服务端实现(伪代码):
@app.post("/python/executor") def create_executor(request: CreateExecutorRequest): # 1. 创建隔离环境 executor_id = generate_unique_id() env = create_isolated_environment(executor_id) # 2. 安装指定的包 for package in request.packages: if package not in ALLOWED_PACKAGES: raise ValueError(f"Package {package} is not allowed") env.install_package(package) # 3. 初始化全局变量 env.globals["_saved_dataframes"] = {} env.globals["load_dataframe"] = load_dataframe_func env.globals["save_dataframe"] = save_dataframe_func # 4. 保存执行器 executors[executor_id] = env return {"executor_id": executor_id}
def execute( self, code: str, variables: dict[str, Any] = {}, dataframes: dict[str, pd.DataFrame] = {}, ) -> CodeExecutionResponse: # 1. 注入变量 if variables: pickled_vars = base64.b64encode(pickle.dumps(variables)).decode() code_inject = f""" vars_dict = pickle.loads(base64.b64decode('{pickled_vars}')) locals().update(vars_dict) """ self._execute(code=code_inject) # 2. 注入 DataFrame if dataframes: self.send_dataframes(dataframes) # 3. 执行用户代码 return self._execute(code=code)
def close(self): """销毁执行器,释放资源""" response = requests.delete( f"{self._base_url}/python/executor/{self._executor_id}" ) if response.status_code != 204: raise Exception(f"Failed to close executor: {response.text}")
使用 Pickle + Base64 序列化:
def send_dataframes(self, dataframes: dict[str, pd.DataFrame]) -> None: for df_name, df in dataframes.items(): # 序列化 DataFrame pickled_df = base64.b64encode(pickle.dumps(df)).decode() # 注入到执行环境 code = dedent(f""" import pickle import base64 df = pickle.loads(base64.b64decode('{pickled_df}')) _saved_dataframes['{df_name}'] = {{ "df": df, "description": "initial dataframe" }} """) self._execute(code=code)
为什么使用 Pickle + Base64?
def _execute(self, code: str) -> CodeExecutionResponse: response = requests.post( f"{self._base_url}/python/executor/{self._executor_id}/code", json={"code": code}, ) response_json = response.json() # 反序列化 DataFrame dataframes = {} for df_name, df_data in response_json.get("dataframes", {}).items(): raw_df = pickle.loads(base64.b64decode(df_data)) if raw_df: dataframes[df_name] = ResponseDataframe( df=raw_df.get("df"), description=raw_df.get("description"), sql=None, ) return CodeExecutionResponse( stdout=response_json.get("stdout"), error=response_json.get("error"), executor_id=response_json.get("executor_id"), dataframes=dataframes, )
执行器提供两个内置函数用于 DataFrame 管理:
def load_dataframe(df_id: str) -> pd.DataFrame: """加载 DataFrame""" if df_id not in _saved_dataframes: raise ValueError(f"DataFrame {df_id} not found") return _saved_dataframes[df_id]["df"]
def save_dataframe(df: pd.DataFrame, description: str) -> str: """保存 DataFrame 并返回 ID""" df_id = f"df_{generate_id()}" _saved_dataframes[df_id] = { "df": df, "description": description } return df_id
executor = PythonExecutor(packages=["pandas", "numpy"]) # 发送原始数据 executor.send_dataframes({"df_raw": raw_df}) # 执行清洗代码 code = """ import pandas as pd import numpy as np # 加载数据 df = load_dataframe('df_raw') # 数据清洗 df_cleaned = df.copy() df_cleaned = df_cleaned.dropna() df_cleaned['date'] = pd.to_datetime(df_cleaned['date']) df_cleaned = df_cleaned[df_cleaned['amount'] > 0] # 保存结果 df_id = save_dataframe(df_cleaned, "清洗后的数据") print(f"清洗完成,共 {len(df_cleaned)} 条记录") print(f"DataFrame ID: {df_id}") """ result = executor.execute(code) print(result["stdout"]) # 输出: 清洗完成,共 1234 条记录 # DataFrame ID: df_abc123 # 获取清洗后的数据 cleaned_df = result["dataframes"]["df_abc123"]["df"]
executor = PythonExecutor(packages=["pandas", "numpy", "scipy"]) code = """ import pandas as pd import numpy as np from scipy import stats # 加载数据 df = load_dataframe('df_sales') # 基础统计 print("=== 基础统计 ===") print(f"平均值: {df['amount'].mean():.2f}") print(f"中位数: {df['amount'].median():.2f}") print(f"标准差: {df['amount'].std():.2f}") # 相关性分析 print("\\n=== 相关性分析 ===") corr = df[['amount', 'quantity', 'price']].corr() print(corr) # 假设检验 print("\\n=== 假设检验 ===") group_a = df[df['region'] == 'A']['amount'] group_b = df[df['region'] == 'B']['amount'] t_stat, p_value = stats.ttest_ind(group_a, group_b) print(f"t-statistic: {t_stat:.4f}") print(f"p-value: {p_value:.4f}") """ result = executor.execute(code) print(result["stdout"])
code = """ import pandas as pd # 加载数据 df = load_dataframe('df_orders') # 时间序列聚合 df['date'] = pd.to_datetime(df['date']) df['month'] = df['date'].dt.to_period('M') monthly = df.groupby('month').agg({ 'order_id': 'count', 'amount': 'sum', 'customer_id': 'nunique' }).reset_index() monthly.columns = ['月份', '订单数', '销售额', '客户数'] # 保存结果 df_id = save_dataframe(monthly, "月度汇总") print(f"汇总完成,共 {len(monthly)} 个月") """ result = executor.execute(code) monthly_df = result["dataframes"][list(result["dataframes"].keys())[0]]["df"]
# 注入变量 variables = { "threshold": 1000, "start_date": "2024-01-01", "end_date": "2024-12-31" } code = """ import pandas as pd df = load_dataframe('df_sales') # 使用注入的变量 df_filtered = df[ (df['amount'] > threshold) & (df['date'] >= start_date) & (df['date'] <= end_date) ] print(f"筛选条件: amount > {threshold}, date in [{start_date}, {end_date}]") print(f"筛选后记录数: {len(df_filtered)}") df_id = save_dataframe(df_filtered, "筛选后的数据") """ result = executor.execute(code, variables=variables)
ALLOWED_PACKAGES = [ "pandas", "numpy", "scipy", "scikit-learn", "statsmodels", # 禁止: os, subprocess, requests, socket, etc. ]
# 服务端配置 EXECUTION_TIMEOUT = 30 # 30 秒超时 @app.post("/python/executor/{executor_id}/code") async def execute_code(executor_id: str, request: ExecuteCodeRequest): try: result = await asyncio.wait_for( execute_in_env(executor_id, request.code), timeout=EXECUTION_TIMEOUT ) return result except asyncio.TimeoutError: return {"error": "Execution timeout"}
# Docker 容器资源限制 docker run \ --memory="512m" \ --cpus="1.0" \ --network="none" \ python-executor
# 只读文件系统 docker run \ --read-only \ --tmpfs /tmp:rw,noexec,nosuid,size=100m \ python-executor
class DataAnalysisAgent: def __init__(self): # 创建一次,多次使用 self.executor = PythonExecutor(packages=["pandas", "numpy", "scipy"]) def analyze(self, df: pd.DataFrame): # 复用执行器 self.executor.send_dataframes({"df": df}) result = self.executor.execute(analysis_code) return result
# 第一次发送 executor.send_dataframes({"df_1": df1, "df_2": df2}) # 后续代码可直接使用,无需重复发送 code1 = "df = load_dataframe('df_1'); ..." code2 = "df = load_dataframe('df_2'); ..."
# 合并多个操作为一次执行 code = """ df1 = load_dataframe('df_1') df2 = load_dataframe('df_2') # 操作 1 result1 = df1.groupby('region')['sales'].sum() # 操作 2 result2 = df2.merge(df1, on='id') # 操作 3 final = result2[result2['amount'] > 1000] save_dataframe(final, "最终结果") """
# 使用连接池复用 HTTP 连接 session = requests.Session() adapter = HTTPAdapter(pool_connections=10, pool_maxsize=20) session.mount('http://', adapter)
result = executor.execute(code) if result["error"]: print(f"执行错误: {result['error']}") else: print(f"执行成功: {result['stdout']}")
try: executor.send_dataframes({"df": df}) except Exception as e: print(f"DataFrame 序列化失败: {e}")
try: result = executor.execute(code, timeout=30) except TimeoutError: print("代码执行超时")
AskTable 的 PythonExecutor 通过远程执行架构,实现了安全、高效的 Python 代码执行:
这种架构不仅保证了安全性,还为 LLM 生成的数据分析代码提供了可靠的执行环境。