
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在 AI 驱动的数据分析系统中,如何让 LLM 生成的代码安全、高效地执行,是一个核心挑战。AskTable 的 ReportAgent 采用了一种创新的架构:JSX 编译 + Python 沙箱执行,实现了数据报告的动态生成。
加载图表中...
ReportAgent 负责生成数据报告组件,其核心流程包括:
load_dataframe() 引用,验证数据源class ReportAgent(DBAgent): def __init__(self, datasource: DataSourceAdmin, assumed_role: RoleAdmin | None = None): super().__init__( prompt_name="agent/report_generator", datasource=datasource, assumed_role=assumed_role, ) self.add_tool(self.show_table) self.add_tool(self.search_metadata) self.add_tool(self.execute_sql) self.set_output_parser(self.output_parser) self.compiled_code: str | None = None self.source_code: str | None = None self.referenced_dataframes: list[str] = []
LLM 生成的代码需要包裹在
<code>...</code> 标签中:
def output_parser(self, output: str) -> None: # 1. 提取代码块 pattern = r"<code>(.*?)</code>" match = re.search(pattern, output, re.DOTALL) if not match: raise ValueError("Invalid output format. Expected: <code>...</code>") code = match.group(1).strip() # 2. 提取 DataFrame 引用 load_df_pattern = r"load_dataframe\(\s*['\"]( df_[A-Za-z0-9]+)['\"]\s*\)" referenced_dataframes = re.findall(load_df_pattern, code) if not referenced_dataframes: raise ValueError("No load_dataframe('df_id') pattern found in code.") # 3. 编译 JSX 代码 self.compiled_code = compile_jsx(code) self.source_code = code # 4. 验证 DataFrame 是否存在 missing_ids = set(referenced_dataframes) - set(self.data_workspace.keys()) if missing_ids: raise ValueError(f"Referenced dataframes {missing_ids} are not in the data workspace") self.referenced_dataframes = referenced_dataframes
JSX 编译通过远程服务实现,避免在 Python 环境中引入 Node.js 依赖:
BASE_URL = "http://localhost:5300/jsx" def compile_jsx(code: str) -> str: response = requests.post(BASE_URL, json={"code": code}) if response.status_code != 200: raise Exception(response.json()["error"]) return response.json()["compiledCode"]
编译服务使用 Babel 进行 JSX 转译:
// Node.js 编译服务 const express = require('express'); const babel = require('@babel/core'); app.post('/jsx', (req, res) => { const { code } = req.body; try { const result = babel.transformSync(code, { presets: ['@babel/preset-react'], plugins: ['@babel/plugin-transform-modules-commonjs'] }); res.json({ compiledCode: result.code }); } catch (error) { res.status(400).json({ error: error.message }); } });
<code> import { BarChart } from '@/components/charts'; function SalesReport() { const data = load_dataframe('df_abc123'); return ( <div className="report"> <h2>月度销售报告</h2> <BarChart data={data} xField="month" yField="sales" title="2024 年销售趋势" /> <p>总销售额: {data.reduce((sum, row) => sum + row.sales, 0)}</p> </div> ); } </code>
对于数据处理和分析任务,ReportAgent 支持执行 Python 代码:
class CorrAnalyzerAgent(DBAgent): def __init__(self, datasource, assumed_role=None, preference=None, user_profile=None): super().__init__( prompt_name="agent/analysis_report_generator", datasource=datasource, assumed_role=assumed_role, ) self.add_tool(self.execute_python) self.executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])
class PythonExecutor: def __init__(self, packages: list[str] = DEFAULT_PACKAGES): self._base_url = BASE_URL self._packages = packages self._executor_id = self._create_executor(packages) def _create_executor(self, packages: list[str]) -> str: """创建隔离的执行器实例""" response = requests.post( f"{self._base_url}/python/executor", json={"packages": packages}, ) if response.status_code != 201: raise Exception(f"Failed to create executor: {response.text}") return response.json()["executor_id"]
使用 Pickle + Base64 进行 DataFrame 传输:
def send_dataframes(self, dataframes: dict[str, pd.DataFrame]) -> None: for df_name, df in dataframes.items(): # 序列化 DataFrame pickled_df = base64.b64encode(pickle.dumps(df)).decode() # 注入到执行环境 code = dedent(f""" import pickle import base64 df = pickle.loads(base64.b64decode('{pickled_df}')) _saved_dataframes['{df_name}'] = {{ "df": df, "description": "initial dataframe" }} """) self._execute(code=code)
def execute(self, code: str, variables: dict = {}, dataframes: dict = {}) -> CodeExecutionResponse: # 1. 注入变量 if variables: pickled_vars = base64.b64encode(pickle.dumps(variables)).decode() code_inject = f""" vars_dict = pickle.loads(base64.b64decode('{pickled_vars}')) locals().update(vars_dict) """ self._execute(code=code_inject) # 2. 注入 DataFrame if dataframes: self.send_dataframes(dataframes) # 3. 执行用户代码 return self._execute(code=code)
def _execute(self, code: str) -> CodeExecutionResponse: response = requests.post( f"{self._base_url}/python/executor/{self._executor_id}/code", json={"code": code}, ) response_json = response.json() # 反序列化 DataFrame dataframes = {} for df_name, df_data in response_json.get("dataframes", {}).items(): raw_df = pickle.loads(base64.b64decode(df_data)) if raw_df: dataframes[df_name] = ResponseDataframe( df=raw_df.get("df"), description=raw_df.get("description"), sql=None, ) return CodeExecutionResponse( stdout=response_json.get("stdout"), error=response_json.get("error"), executor_id=response_json.get("executor_id"), dataframes=dataframes, )
code = """ import pandas as pd # 加载数据 df = load_dataframe('df_raw_sales') # 数据清洗 df_cleaned = df.dropna() df_cleaned['date'] = pd.to_datetime(df_cleaned['date']) df_cleaned = df_cleaned[df_cleaned['amount'] > 0] # 保存结果 df_id = save_dataframe(df_cleaned, "清洗后的销售数据") print(f"清洗完成,共 {len(df_cleaned)} 条记录") """ result = executor.execute(code) print(result["stdout"]) # "清洗完成,共 1234 条记录"
code = """ import pandas as pd import numpy as np from scipy import stats # 加载数据 df = load_dataframe('df_sales') # 计算统计指标 mean_sales = df['amount'].mean() median_sales = df['amount'].median() std_sales = df['amount'].std() # 相关性分析 corr = df[['amount', 'quantity']].corr() print(f"平均销售额: {mean_sales:.2f}") print(f"中位数: {median_sales:.2f}") print(f"标准差: {std_sales:.2f}") print(f"相关系数:\\n{corr}") """ result = executor.execute(code)
code = """ import pandas as pd # 加载数据 df = load_dataframe('df_orders') # 按月汇总 df['month'] = pd.to_datetime(df['date']).dt.to_period('M') monthly_sales = df.groupby('month').agg({ 'amount': 'sum', 'order_id': 'count' }).reset_index() monthly_sales.columns = ['月份', '销售额', '订单数'] # 保存结果 df_id = save_dataframe(monthly_sales, "月度销售汇总") """ result = executor.execute(code)
每个执行器实例只能使用创建时指定的包:
executor = PythonExecutor(packages=["pandas", "numpy", "scipy"]) # 无法使用 requests、os 等危险包
每个执行器实例独立运行,互不干扰:
executor1 = PythonExecutor(packages=["pandas"]) executor2 = PythonExecutor(packages=["numpy"]) # executor1 和 executor2 完全隔离
执行器服务端设置超时限制,防止无限循环:
# 服务端配置 EXECUTION_TIMEOUT = 30 # 30 秒超时
限制内存和 CPU 使用:
# Docker 容器资源限制 docker run --memory="512m" --cpus="1.0" python-executor
同一会话中复用执行器实例,避免重复创建:
class CorrAnalyzerAgent: def __init__(self): # 创建一次,多次使用 self.executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])
执行器内部缓存 DataFrame,避免重复传输:
# 第一次传输 executor.send_dataframes({"df_1": df}) # 后续代码可直接使用 code = "df = load_dataframe('df_1')"
合并多个小操作为一次执行:
code = """ df1 = load_dataframe('df_1') df2 = load_dataframe('df_2') result = pd.merge(df1, df2, on='id') save_dataframe(result, "合并结果") """
AskTable 的 JSX 编译与 Python 沙箱执行系统,通过以下技术实现了安全、高效的代码执行:
这种架构不仅保证了安全性,还为 LLM 生成的代码提供了可靠的执行环境,是 AI 驱动数据分析系统的关键基础设施。