
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
In AI-driven data analysis systems, securely executing Python code generated by LLMs is a core challenge. Directly executing user code in the main process poses significant security risks. AskTable's PythonExecutor adopts a remote execution architecture to achieve secure, isolated code execution.
Each executor instance runs in an isolated environment:
class PythonExecutor:
def __init__(self, packages: list[str] = DEFAULT_PACKAGES):
self._base_url = BASE_URL
self._packages = packages
# Create an isolated executor instance
self._executor_id = self._create_executor(packages)
Isolation Features:
Only packages specified at creation time are allowed:
DEFAULT_PACKAGES = ["pandas", "numpy", "scipy"]
# Specify allowed packages when creating the executor
executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])
Security Advantages:
os, subprocessrequests, urllibpickle (except for internal serialization)Code executes in a remote service, completely isolated from the main process:
def _execute(self, code: str) -> CodeExecutionResponse:
response = requests.post(
f"{self._base_url}/python/executor/{self._executor_id}/code",
json={"code": code},
)
return response.json()
def _create_executor(self, packages: list[str]) -> str:
"""Call POST /python/executor to create an executor"""
response = requests.post(
f"{self._base_url}/python/executor",
json={"packages": packages},
)
if response.status_code != 201:
raise Exception(f"Failed to create executor: {response.text}")
data = response.json()
return data["executor_id"]
Server-Side Implementation (pseudocode):
@app.post("/python/executor")
def create_executor(request: CreateExecutorRequest):
# 1. Create isolated environment
executor_id = generate_unique_id()
env = create_isolated_environment(executor_id)
# 2. Install specified packages
for package in request.packages:
if package not in ALLOWED_PACKAGES:
raise ValueError(f"Package {package} is not allowed")
env.install_package(package)
# 3. Initialize global variables
env.globals["_saved_dataframes"] = {}
env.globals["load_dataframe"] = load_dataframe_func
env.globals["save_dataframe"] = save_dataframe_func
# 4. Save executor
executors[executor_id] = env
return {"executor_id": executor_id}
def execute(
self,
code: str,
variables: dict[str, Any] = {},
dataframes: dict[str, pd.DataFrame] = {},
) -> CodeExecutionResponse:
# 1. Inject variables
if variables:
pickled_vars = base64.b64encode(pickle.dumps(variables)).decode()
code_inject = f"""
vars_dict = pickle.loads(base64.b64decode('{pickled_vars}'))
locals().update(vars_dict)
"""
self._execute(code=code_inject)
# 2. Inject DataFrames
if dataframes:
self.send_dataframes(dataframes)
# 3. Execute user code
return self._execute(code=code)
def close(self):
"""Destroy the executor and release resources"""
response = requests.delete(
f"{self._base_url}/python/executor/{self._executor_id}"
)
if response.status_code != 204:
raise Exception(f"Failed to close executor: {response.text}")
Using Pickle + Base64 serialization:
def send_dataframes(self, dataframes: dict[str, pd.DataFrame]) -> None:
for df_name, df in dataframes.items():
# Serialize DataFrame
pickled_df = base64.b64encode(pickle.dumps(df)).decode()
# Inject into execution environment
code = dedent(f"""
import pickle
import base64
df = pickle.loads(base64.b64decode('{pickled_df}'))
_saved_dataframes['{df_name}'] = {{
"df": df,
"description": "initial dataframe"
}}
""")
self._execute(code=code)
Why Use Pickle + Base64?
def _execute(self, code: str) -> CodeExecutionResponse:
response = requests.post(
f"{self._base_url}/python/executor/{self._executor_id}/code",
json={"code": code},
)
response_json = response.json()
# Deserialize DataFrames
dataframes = {}
for df_name, df_data in response_json.get("dataframes", {}).items():
raw_df = pickle.loads(base64.b64decode(df_data))
if raw_df:
dataframes[df_name] = ResponseDataframe(
df=raw_df.get("df"),
description=raw_df.get("description"),
sql=None,
)
return CodeExecutionResponse(
stdout=response_json.get("stdout"),
error=response_json.get("error"),
executor_id=response_json.get("executor_id"),
dataframes=dataframes,
)
The executor provides two built-in functions for DataFrame management:
def load_dataframe(df_id: str) -> pd.DataFrame:
"""Load a DataFrame"""
if df_id not in _saved_dataframes:
raise ValueError(f"DataFrame {df_id} not found")
return _saved_dataframes[df_id]["df"]
def save_dataframe(df: pd.DataFrame, description: str) -> str:
"""Save a DataFrame and return its ID"""
df_id = f"df_{generate_id()}"
_saved_dataframes[df_id] = {
"df": df,
"description": description
}
return df_id
executor = PythonExecutor(packages=["pandas", "numpy"])
# Send raw data
executor.send_dataframes({"df_raw": raw_df})
# Execute cleaning code
code = """
import pandas as pd
import numpy as np
# Load data
df = load_dataframe('df_raw')
# Data cleaning
df_cleaned = df.copy()
df_cleaned = df_cleaned.dropna()
df_cleaned['date'] = pd.to_datetime(df_cleaned['date'])
df_cleaned = df_cleaned[df_cleaned['amount'] > 0]
# Save result
df_id = save_dataframe(df_cleaned, "Cleaned data")
print(f"Cleaning complete, {len(df_cleaned)} records total")
print(f"DataFrame ID: {df_id}")
"""
result = executor.execute(code)
print(result["stdout"])
# Output: Cleaning complete, 1234 records total
# DataFrame ID: df_abc123
# Get cleaned data
cleaned_df = result["dataframes"]["df_abc123"]["df"]
executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])
code = """
import pandas as pd
import numpy as np
from scipy import stats
# Load data
df = load_dataframe('df_sales')
# Basic statistics
print("=== Basic Statistics ===")
print(f"Mean: {df['amount'].mean():.2f}")
print(f"Median: {df['amount'].median():.2f}")
print(f"Std Dev: {df['amount'].std():.2f}")
# Correlation analysis
print("\\n=== Correlation Analysis ===")
corr = df[['amount', 'quantity', 'price']].corr()
print(corr)
# Hypothesis testing
print("\\n=== Hypothesis Testing ===")
group_a = df[df['region'] == 'A']['amount']
group_b = df[df['region'] == 'B']['amount']
t_stat, p_value = stats.ttest_ind(group_a, group_b)
print(f"t-statistic: {t_stat:.4f}")
print(f"p-value: {p_value:.4f}")
"""
result = executor.execute(code)
print(result["stdout"])
code = """
import pandas as pd
# Load data
df = load_dataframe('df_orders')
# Time series aggregation
df['date'] = pd.to_datetime(df['date'])
df['month'] = df['date'].dt.to_period('M')
monthly = df.groupby('month').agg({
'order_id': 'count',
'amount': 'sum',
'customer_id': 'nunique'
}).reset_index()
monthly.columns = ['Month', 'Orders', 'Revenue', 'Customers']
# Save result
df_id = save_dataframe(monthly, "Monthly Summary")
print(f"Summary complete, {len(monthly)} months total")
"""
result = executor.execute(code)
monthly_df = result["dataframes"][list(result["dataframes"].keys())[0]]["df"]
# Inject variables
variables = {
"threshold": 1000,
"start_date": "2024-01-01",
"end_date": "2024-12-31"
}
code = """
import pandas as pd
df = load_dataframe('df_sales')
# Use injected variables
df_filtered = df[
(df['amount'] > threshold) &
(df['date'] >= start_date) &
(df['date'] <= end_date)
]
print(f"Filter: amount > {threshold}, date in [{start_date}, {end_date}]")
print(f"Filtered records: {len(df_filtered)}")
df_id = save_dataframe(df_filtered, "Filtered data")
"""
result = executor.execute(code, variables=variables)
ALLOWED_PACKAGES = [
"pandas", "numpy", "scipy",
"scikit-learn", "statsmodels",
# Blocked: os, subprocess, requests, socket, etc.
]
# Server-side configuration
EXECUTION_TIMEOUT = 30 # 30-second timeout
@app.post("/python/executor/{executor_id}/code")
async def execute_code(executor_id: str, request: ExecuteCodeRequest):
try:
result = await asyncio.wait_for(
execute_in_env(executor_id, request.code),
timeout=EXECUTION_TIMEOUT
)
return result
except asyncio.TimeoutError:
return {"error": "Execution timeout"}
# Docker container resource limits
docker run \
--memory="512m" \
--cpus="1.0" \
--network="none" \
python-executor
# Read-only file system
docker run \
--read-only \
--tmpfs /tmp:rw,noexec,nosuid,size=100m \
python-executor
class DataAnalysisAgent:
def __init__(self):
# Create once, reuse multiple times
self.executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])
def analyze(self, df: pd.DataFrame):
# Reuse executor
self.executor.send_dataframes({"df": df})
result = self.executor.execute(analysis_code)
return result
# First send
executor.send_dataframes({"df_1": df1, "df_2": df2})
# Subsequent code can use directly, no need to resend
code1 = "df = load_dataframe('df_1'); ..."
code2 = "df = load_dataframe('df_2'); ..."
# Combine multiple operations into one execution
code = """
df1 = load_dataframe('df_1')
df2 = load_dataframe('df_2')
# Operation 1
result1 = df1.groupby('region')['sales'].sum()
# Operation 2
result2 = df2.merge(df1, on='id')
# Operation 3
final = result2[result2['amount'] > 1000]
save_dataframe(final, "Final Result")
"""
# Use connection pool to reuse HTTP connections
session = requests.Session()
adapter = HTTPAdapter(pool_connections=10, pool_maxsize=20)
session.mount('http://', adapter)
result = executor.execute(code)
if result["error"]:
print(f"Execution error: {result['error']}")
else:
print(f"Execution successful: {result['stdout']}")
try:
executor.send_dataframes({"df": df})
except Exception as e:
print(f"DataFrame serialization failed: {e}")
try:
result = executor.execute(code, timeout=30)
except TimeoutError:
print("Code execution timed out")
AskTable's PythonExecutor achieves secure, efficient Python code execution through a remote execution architecture:
This architecture not only guarantees security but also provides a reliable execution environment for LLM-generated data analysis code.
sidebar.noProgrammingNeeded
sidebar.startFreeTrial