AskTable
sidebar.freeTrial

Isolated Python Executor - Secure Remote Code Execution

AskTable Team
AskTable Team 2026-03-04

Isolated Python Executor - Secure Remote Code Execution

In AI-driven data analysis systems, securely executing Python code generated by LLMs is a core challenge. Directly executing user code in the main process poses significant security risks. AskTable's PythonExecutor adopts a remote execution architecture to achieve secure, isolated code execution.

Architecture Design

加载图表中...

Core Design Principles

1. Executor Isolation

Each executor instance runs in an isolated environment:

class PythonExecutor:
    def __init__(self, packages: list[str] = DEFAULT_PACKAGES):
        self._base_url = BASE_URL
        self._packages = packages
        # Create an isolated executor instance
        self._executor_id = self._create_executor(packages)

Isolation Features:

  • Independent Python process or container
  • Independent package environment
  • Independent global variable space
  • Independent file system (optional)

2. Package Whitelist

Only packages specified at creation time are allowed:

DEFAULT_PACKAGES = ["pandas", "numpy", "scipy"]

# Specify allowed packages when creating the executor
executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])

Security Advantages:

  • Blocks dangerous packages like os, subprocess
  • Blocks network packages like requests, urllib
  • Blocks pickle (except for internal serialization)

3. Remote Execution

Code executes in a remote service, completely isolated from the main process:

def _execute(self, code: str) -> CodeExecutionResponse:
    response = requests.post(
        f"{self._base_url}/python/executor/{self._executor_id}/code",
        json={"code": code},
    )
    return response.json()

Executor Lifecycle

1. Creating an Executor

def _create_executor(self, packages: list[str]) -> str:
    """Call POST /python/executor to create an executor"""
    response = requests.post(
        f"{self._base_url}/python/executor",
        json={"packages": packages},
    )
    if response.status_code != 201:
        raise Exception(f"Failed to create executor: {response.text}")

    data = response.json()
    return data["executor_id"]

Server-Side Implementation (pseudocode):

@app.post("/python/executor")
def create_executor(request: CreateExecutorRequest):
    # 1. Create isolated environment
    executor_id = generate_unique_id()
    env = create_isolated_environment(executor_id)

    # 2. Install specified packages
    for package in request.packages:
        if package not in ALLOWED_PACKAGES:
            raise ValueError(f"Package {package} is not allowed")
        env.install_package(package)

    # 3. Initialize global variables
    env.globals["_saved_dataframes"] = {}
    env.globals["load_dataframe"] = load_dataframe_func
    env.globals["save_dataframe"] = save_dataframe_func

    # 4. Save executor
    executors[executor_id] = env

    return {"executor_id": executor_id}

2. Executing Code

def execute(
    self,
    code: str,
    variables: dict[str, Any] = {},
    dataframes: dict[str, pd.DataFrame] = {},
) -> CodeExecutionResponse:
    # 1. Inject variables
    if variables:
        pickled_vars = base64.b64encode(pickle.dumps(variables)).decode()
        code_inject = f"""
vars_dict = pickle.loads(base64.b64decode('{pickled_vars}'))
locals().update(vars_dict)
"""
        self._execute(code=code_inject)

    # 2. Inject DataFrames
    if dataframes:
        self.send_dataframes(dataframes)

    # 3. Execute user code
    return self._execute(code=code)

3. Destroying an Executor

def close(self):
    """Destroy the executor and release resources"""
    response = requests.delete(
        f"{self._base_url}/python/executor/{self._executor_id}"
    )
    if response.status_code != 204:
        raise Exception(f"Failed to close executor: {response.text}")

DataFrame Serialization

1. Sending DataFrames

Using Pickle + Base64 serialization:

def send_dataframes(self, dataframes: dict[str, pd.DataFrame]) -> None:
    for df_name, df in dataframes.items():
        # Serialize DataFrame
        pickled_df = base64.b64encode(pickle.dumps(df)).decode()

        # Inject into execution environment
        code = dedent(f"""
            import pickle
            import base64
            df = pickle.loads(base64.b64decode('{pickled_df}'))
            _saved_dataframes['{df_name}'] = {{
                "df": df,
                "description": "initial dataframe"
            }}
        """)
        self._execute(code=code)

Why Use Pickle + Base64?

  1. Pickle: Python's native serialization, supports complex objects
  2. Base64: Text encoding, suitable for JSON transmission
  3. Efficient: Faster than JSON, supports more data types

2. Receiving DataFrames

def _execute(self, code: str) -> CodeExecutionResponse:
    response = requests.post(
        f"{self._base_url}/python/executor/{self._executor_id}/code",
        json={"code": code},
    )
    response_json = response.json()

    # Deserialize DataFrames
    dataframes = {}
    for df_name, df_data in response_json.get("dataframes", {}).items():
        raw_df = pickle.loads(base64.b64decode(df_data))
        if raw_df:
            dataframes[df_name] = ResponseDataframe(
                df=raw_df.get("df"),
                description=raw_df.get("description"),
                sql=None,
            )

    return CodeExecutionResponse(
        stdout=response_json.get("stdout"),
        error=response_json.get("error"),
        executor_id=response_json.get("executor_id"),
        dataframes=dataframes,
    )

Built-in Functions

The executor provides two built-in functions for DataFrame management:

1. load_dataframe

def load_dataframe(df_id: str) -> pd.DataFrame:
    """Load a DataFrame"""
    if df_id not in _saved_dataframes:
        raise ValueError(f"DataFrame {df_id} not found")
    return _saved_dataframes[df_id]["df"]

2. save_dataframe

def save_dataframe(df: pd.DataFrame, description: str) -> str:
    """Save a DataFrame and return its ID"""
    df_id = f"df_{generate_id()}"
    _saved_dataframes[df_id] = {
        "df": df,
        "description": description
    }
    return df_id

Practical Examples

Example 1: Data Cleaning

executor = PythonExecutor(packages=["pandas", "numpy"])

# Send raw data
executor.send_dataframes({"df_raw": raw_df})

# Execute cleaning code
code = """
import pandas as pd
import numpy as np

# Load data
df = load_dataframe('df_raw')

# Data cleaning
df_cleaned = df.copy()
df_cleaned = df_cleaned.dropna()
df_cleaned['date'] = pd.to_datetime(df_cleaned['date'])
df_cleaned = df_cleaned[df_cleaned['amount'] > 0]

# Save result
df_id = save_dataframe(df_cleaned, "Cleaned data")
print(f"Cleaning complete, {len(df_cleaned)} records total")
print(f"DataFrame ID: {df_id}")
"""

result = executor.execute(code)
print(result["stdout"])
# Output: Cleaning complete, 1234 records total
#        DataFrame ID: df_abc123

# Get cleaned data
cleaned_df = result["dataframes"]["df_abc123"]["df"]

Example 2: Statistical Analysis

executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])

code = """
import pandas as pd
import numpy as np
from scipy import stats

# Load data
df = load_dataframe('df_sales')

# Basic statistics
print("=== Basic Statistics ===")
print(f"Mean: {df['amount'].mean():.2f}")
print(f"Median: {df['amount'].median():.2f}")
print(f"Std Dev: {df['amount'].std():.2f}")

# Correlation analysis
print("\\n=== Correlation Analysis ===")
corr = df[['amount', 'quantity', 'price']].corr()
print(corr)

# Hypothesis testing
print("\\n=== Hypothesis Testing ===")
group_a = df[df['region'] == 'A']['amount']
group_b = df[df['region'] == 'B']['amount']
t_stat, p_value = stats.ttest_ind(group_a, group_b)
print(f"t-statistic: {t_stat:.4f}")
print(f"p-value: {p_value:.4f}")
"""

result = executor.execute(code)
print(result["stdout"])

Example 3: Data Transformation

code = """
import pandas as pd

# Load data
df = load_dataframe('df_orders')

# Time series aggregation
df['date'] = pd.to_datetime(df['date'])
df['month'] = df['date'].dt.to_period('M')

monthly = df.groupby('month').agg({
    'order_id': 'count',
    'amount': 'sum',
    'customer_id': 'nunique'
}).reset_index()

monthly.columns = ['Month', 'Orders', 'Revenue', 'Customers']

# Save result
df_id = save_dataframe(monthly, "Monthly Summary")
print(f"Summary complete, {len(monthly)} months total")
"""

result = executor.execute(code)
monthly_df = result["dataframes"][list(result["dataframes"].keys())[0]]["df"]

Example 4: Variable Injection

# Inject variables
variables = {
    "threshold": 1000,
    "start_date": "2024-01-01",
    "end_date": "2024-12-31"
}

code = """
import pandas as pd

df = load_dataframe('df_sales')

# Use injected variables
df_filtered = df[
    (df['amount'] > threshold) &
    (df['date'] >= start_date) &
    (df['date'] <= end_date)
]

print(f"Filter: amount > {threshold}, date in [{start_date}, {end_date}]")
print(f"Filtered records: {len(df_filtered)}")

df_id = save_dataframe(df_filtered, "Filtered data")
"""

result = executor.execute(code, variables=variables)

Security Mechanisms

1. Package Whitelist

ALLOWED_PACKAGES = [
    "pandas", "numpy", "scipy",
    "scikit-learn", "statsmodels",
    # Blocked: os, subprocess, requests, socket, etc.
]

2. Timeout Control

# Server-side configuration
EXECUTION_TIMEOUT = 30  # 30-second timeout

@app.post("/python/executor/{executor_id}/code")
async def execute_code(executor_id: str, request: ExecuteCodeRequest):
    try:
        result = await asyncio.wait_for(
            execute_in_env(executor_id, request.code),
            timeout=EXECUTION_TIMEOUT
        )
        return result
    except asyncio.TimeoutError:
        return {"error": "Execution timeout"}

3. Resource Limits

# Docker container resource limits
docker run \
    --memory="512m" \
    --cpus="1.0" \
    --network="none" \
    python-executor

4. File System Isolation

# Read-only file system
docker run \
    --read-only \
    --tmpfs /tmp:rw,noexec,nosuid,size=100m \
    python-executor

Performance Optimization

1. Executor Reuse

class DataAnalysisAgent:
    def __init__(self):
        # Create once, reuse multiple times
        self.executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])

    def analyze(self, df: pd.DataFrame):
        # Reuse executor
        self.executor.send_dataframes({"df": df})
        result = self.executor.execute(analysis_code)
        return result

2. DataFrame Caching

# First send
executor.send_dataframes({"df_1": df1, "df_2": df2})

# Subsequent code can use directly, no need to resend
code1 = "df = load_dataframe('df_1'); ..."
code2 = "df = load_dataframe('df_2'); ..."

3. Batch Operations

# Combine multiple operations into one execution
code = """
df1 = load_dataframe('df_1')
df2 = load_dataframe('df_2')

# Operation 1
result1 = df1.groupby('region')['sales'].sum()

# Operation 2
result2 = df2.merge(df1, on='id')

# Operation 3
final = result2[result2['amount'] > 1000]

save_dataframe(final, "Final Result")
"""

4. Connection Pooling

# Use connection pool to reuse HTTP connections
session = requests.Session()
adapter = HTTPAdapter(pool_connections=10, pool_maxsize=20)
session.mount('http://', adapter)

Error Handling

1. Execution Errors

result = executor.execute(code)

if result["error"]:
    print(f"Execution error: {result['error']}")
else:
    print(f"Execution successful: {result['stdout']}")

2. Serialization Errors

try:
    executor.send_dataframes({"df": df})
except Exception as e:
    print(f"DataFrame serialization failed: {e}")

3. Timeout Errors

try:
    result = executor.execute(code, timeout=30)
except TimeoutError:
    print("Code execution timed out")

Technical Advantages

1. Security

  • Process Isolation: Independent processes or containers
  • Package Whitelist: Only allows safe packages
  • Resource Limits: CPU, memory, network restrictions
  • Timeout Protection: Prevents infinite loops

2. Flexibility

  • Dynamic Package Management: Install different packages as needed
  • Variable Injection: Supports dynamic parameters
  • DataFrame Management: Built-in load/save functions

3. Performance

  • Executor Reuse: Reduces creation overhead
  • DataFrame Caching: Avoids redundant transfers
  • Batch Operations: Reduces network round trips

Summary

AskTable's PythonExecutor achieves secure, efficient Python code execution through a remote execution architecture:

  1. Isolated Execution: Independent execution environments, fully isolated
  2. Package Whitelist: Only allows safe data analysis packages
  3. DataFrame Serialization: Efficient Pickle + Base64 transfer
  4. Resource Control: Timeout, memory, CPU limits
  5. Ease of Use: Clean API with built-in DataFrame management

This architecture not only guarantees security but also provides a reliable execution environment for LLM-generated data analysis code.

cta.readyToSimplify

sidebar.noProgrammingNeededsidebar.startFreeTrial

cta.noCreditCard
cta.quickStart
cta.dbSupport