AskTable

支持 40+ 数据库:AskTable 的多数据源适配架构

AskTable 团队
AskTable 团队 2026年3月4日

在企业数据分析场景中,数据往往分散在不同类型的数据库中:MySQL 存储业务数据,ClickHouse 做实时分析,Oracle 管理财务数据,PostgreSQL 处理用户数据。如何让 AI 无缝访问这些异构数据源?AskTable 通过插件化的数据库适配器架构,实现了对 40+ 种数据库的统一支持。

为什么需要多数据库支持?

企业数据现状

典型企业的数据库分布

挑战

核心架构设计

1. 插件化适配器架构

AskTable 通过抽象基类定义统一接口,每种数据库实现自己的适配器:

class DataSourceAdmin:
    project_id: str
    id: str
    name: str
    engine: DBEngine  # 数据库类型
    access_config: AccessConfigPrivate | None
    file_paths: dict[str, str] | None

    @property
    def accessor(self):
        """获取数据源的数据访问器"""
        if self._accessor:
            return self._accessor
        if not self.access_config:
            raise errors.DataSourceConfigError

        self._accessor = BaseAccessor.create(
            engine=self.engine,
            access_config=self.access_config.model_dump(),
            file_paths=self.file_paths,
        )
        return self._accessor

    @property
    def dialect(self) -> str:
        """获取 SQL 方言"""
        if self.engine == DBEngine.dap:
            return self.accessor.dialect
        return sql_dialect_adaptor(self.engine)

    @property
    def runtime_meta(self) -> MetaBase:
        """实时获取数据源的 MetaData"""
        log.info(f"get runtime meta for {self.id}")
        meta = self.accessor.get_meta()
        return meta

    @property
    def brain_meta(self) -> MetaAdmin:
        """获取 Brain 中的 MetaData(由 get_admin 预加载)"""
        if not self._meta:
            raise RuntimeError("brain_meta not loaded, use get_admin(with_meta=True)")
        return self._meta

设计亮点

1.1 统一的访问接口

# 所有数据库都通过相同的接口访问
accessor = datasource.accessor

# 连接数据库
with accessor.connect():
    # 执行查询
    df = accessor.query("SELECT * FROM users LIMIT 10")

    # 获取元数据
    meta = accessor.get_meta()

优势

1.2 动态适配器创建

self._accessor = BaseAccessor.create(
    engine=self.engine,
    access_config=self.access_config.model_dump(),
    file_paths=self.file_paths,
)

工厂模式:根据 engine 类型动态创建对应的适配器

# 伪代码示例
class BaseAccessor:
    @classmethod
    def create(cls, engine: DBEngine, access_config: dict, file_paths: dict):
        if engine == DBEngine.mysql:
            return MySQLAccessor(access_config)
        elif engine == DBEngine.postgresql:
            return PostgreSQLAccessor(access_config)
        elif engine == DBEngine.clickhouse:
            return ClickHouseAccessor(access_config)
        elif engine == DBEngine.oracle:
            return OracleAccessor(access_config)
        # ... 40+ 种数据库
        else:
            raise UnsupportedDatabaseError(f"Unsupported engine: {engine}")

1.3 SQL 方言适配

@property
def dialect(self) -> str:
    if self.engine == DBEngine.dap:
        return self.accessor.dialect
    return sql_dialect_adaptor(self.engine)

方言映射

def sql_dialect_adaptor(engine: DBEngine) -> str:
    """将数据库引擎映射到 SQL 方言"""
    mapping = {
        DBEngine.mysql: "mysql",
        DBEngine.postgresql: "postgres",
        DBEngine.clickhouse: "clickhouse",
        DBEngine.oracle: "oracle",
        DBEngine.sqlserver: "tsql",
        DBEngine.hive: "hive",
        DBEngine.doris: "mysql",  # Doris 兼容 MySQL 协议
        DBEngine.starrocks: "mysql",  # StarRocks 兼容 MySQL 协议
        # ...
    }
    return mapping.get(engine, "generic")

用途

2. 支持的数据库类型

从代码中可以看到 AskTable 支持的数据库类型:

# 关系型数据库
- MySQL / MariaDB
- PostgreSQL
- Oracle
- SQL Server
- SQLite

# 云数据库
- 阿里云 ADB MySQL / ADB PostgreSQL
- 阿里云 PolarDB MySQL / PolarDB PostgreSQL
- 阿里云 Hologres
- 腾讯云 TDSQL MySQL / TDSQL PostgreSQL
- 华为云 GaussDB / GaussDB DWS

# 分析型数据库
- ClickHouse
- Doris
- StarRocks
- SelectDB
- Databend

# 大数据平台
- Hive
- MaxCompute

# 国产数据库
- 达梦 (DaMeng)
- 人大金仓 (KingBase ES)
- 南大通用 (GBase 8a / 8c)
- 瀚高 (HighGo)
- 虚谷 (XuGu)
- 优炫 (UXDB)
- 神舟通用 (Oscar)
- 翰高 (MogDB)
- 海量 (Vastbase)
- 沃趣 (YashanDB)
- 南大通用 (GreenPlum)
- 巨杉 (SequoiaDB)
- 柏睿 (RapidsDB)
- 偶数 (OceanBase)

# 文档数据
- Excel
- 飞书多维表格 (Bitable)

# 数据湖
- DAP (Data Access Platform)

总计 40+ 种数据库!

3. 适配器实现示例

以 MySQL 为例,看看适配器的实现:

class MySQLAccessor(BaseAccessor):
    def __init__(self, access_config: dict):
        self.host = access_config["host"]
        self.port = access_config["port"]
        self.database = access_config["database"]
        self.username = access_config["username"]
        self.password = access_config["password"]
        self.connection = None

    def connect(self):
        """建立数据库连接"""
        import pymysql
        self.connection = pymysql.connect(
            host=self.host,
            port=self.port,
            database=self.database,
            user=self.username,
            password=self.password,
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor
        )
        return self

    def query(self, sql: str) -> pd.DataFrame:
        """执行查询并返回 DataFrame"""
        if not self.connection:
            raise ConnectionError("Not connected to database")

        with self.connection.cursor() as cursor:
            cursor.execute(sql)
            result = cursor.fetchall()

        return pd.DataFrame(result)

    def get_meta(self) -> MetaBase:
        """获取数据库元数据"""
        schemas = []

        # 获取所有表
        tables_sql = """
        SELECT
            TABLE_SCHEMA,
            TABLE_NAME,
            TABLE_COMMENT
        FROM information_schema.TABLES
        WHERE TABLE_SCHEMA = %s
        """
        tables = self.query_raw(tables_sql, (self.database,))

        for table in tables:
            # 获取表的字段信息
            fields_sql = """
            SELECT
                COLUMN_NAME,
                DATA_TYPE,
                COLUMN_COMMENT,
                IS_NULLABLE,
                COLUMN_KEY
            FROM information_schema.COLUMNS
            WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
            ORDER BY ORDINAL_POSITION
            """
            fields = self.query_raw(fields_sql, (self.database, table['TABLE_NAME']))

            # 构建元数据结构
            # ...

        return MetaBase(schemas=schemas)

    def close(self):
        """关闭连接"""
        if self.connection:
            self.connection.close()
            self.connection = None

关键点

4. 特殊数据库的处理

4.1 ClickHouse 适配

ClickHouse 是列式数据库,有特殊的查询优化:

class ClickHouseAccessor(BaseAccessor):
    def query(self, sql: str) -> pd.DataFrame:
        """ClickHouse 特殊处理"""
        # 添加 FORMAT 子句
        if "FORMAT" not in sql.upper():
            sql = f"{sql} FORMAT JSONEachRow"

        # 使用 HTTP 接口
        response = requests.post(
            f"http://{self.host}:{self.port}",
            params={"database": self.database},
            data=sql,
            auth=(self.username, self.password)
        )

        # 解析 JSON 结果
        lines = response.text.strip().split('\n')
        data = [json.loads(line) for line in lines]
        return pd.DataFrame(data)

    def get_meta(self) -> MetaBase:
        """从 system.tables 和 system.columns 获取元数据"""
        tables_sql = """
        SELECT
            database,
            name as table_name,
            comment
        FROM system.tables
        WHERE database = '{database}'
        """
        # ...

特点

4.2 Oracle 适配

Oracle 有独特的分页和元数据查询方式:

class OracleAccessor(BaseAccessor):
    def query(self, sql: str, limit: int = None) -> pd.DataFrame:
        """Oracle 分页处理"""
        if limit:
            # Oracle 11g 使用 ROWNUM
            sql = f"""
            SELECT * FROM (
                {sql}
            ) WHERE ROWNUM <= {limit}
            """
            # Oracle 12c+ 可以使用 FETCH FIRST
            # sql = f"{sql} FETCH FIRST {limit} ROWS ONLY"

        return super().query(sql)

    def get_meta(self) -> MetaBase:
        """从 ALL_TABLES 和 ALL_TAB_COLUMNS 获取元数据"""
        tables_sql = """
        SELECT
            OWNER,
            TABLE_NAME,
            COMMENTS
        FROM ALL_TAB_COMMENTS
        WHERE OWNER = :owner
        """
        # ...

特点

4.3 Excel 适配

Excel 不是数据库,但 AskTable 也支持:

class ExcelAccessor(BaseAccessor):
    def __init__(self, access_config: dict, file_paths: dict):
        self.file_path = file_paths.get("excel_file")
        self.sheets = {}

    def connect(self):
        """加载 Excel 文件"""
        import openpyxl
        self.workbook = openpyxl.load_workbook(self.file_path)

        # 将每个 sheet 转换为 DataFrame
        for sheet_name in self.workbook.sheetnames:
            sheet = self.workbook[sheet_name]
            data = sheet.values
            cols = next(data)
            self.sheets[sheet_name] = pd.DataFrame(data, columns=cols)

        return self

    def query(self, sql: str) -> pd.DataFrame:
        """使用 DuckDB 执行 SQL"""
        import duckdb

        # 创建临时数据库
        conn = duckdb.connect(':memory:')

        # 注册所有 sheet 为表
        for sheet_name, df in self.sheets.items():
            conn.register(sheet_name, df)

        # 执行 SQL
        result = conn.execute(sql).fetchdf()
        conn.close()

        return result

    def get_meta(self) -> MetaBase:
        """从 DataFrame 推断元数据"""
        schemas = []
        for sheet_name, df in self.sheets.items():
            fields = []
            for col in df.columns:
                fields.append({
                    "name": col,
                    "data_type": str(df[col].dtype),
                    "description": ""
                })
            # ...
        return MetaBase(schemas=schemas)

特点

实际应用场景

场景 1:跨数据库联合查询

需求:用户想知道"MySQL 中的订单数据和 ClickHouse 中的用户行为数据的关联分析"

实现

# 1. 从 MySQL 查询订单数据
mysql_ds = get_datasource("mysql_orders")
orders_df = mysql_ds.accessor.query("""
    SELECT user_id, order_amount, order_date
    FROM orders
    WHERE order_date >= '2024-01-01'
""")

# 2. 从 ClickHouse 查询用户行为数据
clickhouse_ds = get_datasource("clickhouse_events")
events_df = clickhouse_ds.accessor.query("""
    SELECT user_id, event_type, event_count
    FROM user_events
    WHERE event_date >= '2024-01-01'
    GROUP BY user_id, event_type
""")

# 3. 在内存中关联
import duckdb
conn = duckdb.connect(':memory:')
conn.register('orders', orders_df)
conn.register('events', events_df)

result = conn.execute("""
    SELECT
        o.user_id,
        SUM(o.order_amount) as total_amount,
        e.event_count
    FROM orders o
    LEFT JOIN events e ON o.user_id = e.user_id
    WHERE e.event_type = 'page_view'
    GROUP BY o.user_id, e.event_count
    ORDER BY total_amount DESC
    LIMIT 100
""").fetchdf()

场景 2:国产数据库替换

背景:企业从 Oracle 迁移到达梦数据库

AskTable 的优势

# 只需修改配置,无需修改代码
# 原来
datasource_config = {
    "engine": "oracle",
    "host": "oracle.example.com",
    "port": 1521,
    "database": "ORCL",
    # ...
}

# 迁移后
datasource_config = {
    "engine": "dameng",  # 只改这一行
    "host": "dameng.example.com",
    "port": 5236,
    "database": "DMDB",
    # ...
}

无缝切换

场景 3:混合云部署

背景

AskTable 统一接入

# 配置多个数据源
datasources = [
    {
        "name": "本地业务库",
        "engine": "mysql",
        "host": "192.168.1.100",
        # ...
    },
    {
        "name": "阿里云分析库",
        "engine": "clickhouse",
        "host": "xxx.clickhouse.aliyuncs.com",
        # ...
    },
    {
        "name": "腾讯云日志",
        "engine": "cls",
        "endpoint": "xxx.cls.tencentyun.com",
        # ...
    }
]

# 用户提问:"对比本地订单和云端分析数据"
# AskTable 自动:
# 1. 识别需要访问哪些数据源
# 2. 生成对应方言的 SQL
# 3. 并行查询
# 4. 合并结果

性能优化实践

1. 连接池管理

class ConnectionPool:
    def __init__(self, accessor_factory, max_connections=10):
        self.accessor_factory = accessor_factory
        self.max_connections = max_connections
        self.pool = queue.Queue(maxsize=max_connections)
        self._init_pool()

    def _init_pool(self):
        for _ in range(self.max_connections):
            accessor = self.accessor_factory()
            accessor.connect()
            self.pool.put(accessor)

    def get_connection(self):
        return self.pool.get(timeout=30)

    def return_connection(self, accessor):
        self.pool.put(accessor)

优势

2. 元数据缓存

class DataSourceAdmin:
    _meta_cache: dict[str, MetaAdmin] = {}
    _cache_ttl: int = 3600  # 1 小时

    @property
    def brain_meta(self) -> MetaAdmin:
        cache_key = f"{self.id}:{self.modified_at}"

        if cache_key in self._meta_cache:
            return self._meta_cache[cache_key]

        # 从数据库加载元数据
        meta = self._load_meta_from_db()
        self._meta_cache[cache_key] = meta

        return meta

优势

3. 查询优化

def optimize_query(sql: str, dialect: str) -> str:
    """根据方言优化 SQL"""
    if dialect == "clickhouse":
        # ClickHouse 优化
        # 1. 添加 PREWHERE 子句
        # 2. 使用 FINAL 处理重复数据
        # 3. 优化 JOIN 顺序
        pass
    elif dialect == "mysql":
        # MySQL 优化
        # 1. 添加索引提示
        # 2. 优化子查询
        # 3. 使用 STRAIGHT_JOIN
        pass
    # ...
    return optimized_sql

最佳实践建议

1. 数据源配置

安全性

# ✅ 推荐:使用密钥管理服务
access_config = {
    "host": "db.example.com",
    "port": 3306,
    "database": "mydb",
    "username": "user",
    "password": get_secret("db_password")  # 从密钥管理服务获取
}

# ❌ 不推荐:明文存储密码
access_config = {
    "password": "plain_text_password"
}

2. 错误处理

try:
    result = datasource.accessor.query(sql)
except ConnectionError:
    # 连接失败,重试
    retry_with_backoff()
except QueryTimeoutError:
    # 查询超时,优化 SQL 或增加超时时间
    log.warning(f"Query timeout: {sql}")
except PermissionError:
    # 权限不足,检查用户权限
    log.error(f"Permission denied for user {username}")

3. 监控和告警

# 记录查询性能
@monitor_query_performance
def query(self, sql: str) -> pd.DataFrame:
    start_time = time.time()
    result = self._execute_query(sql)
    duration = time.time() - start_time

    # 记录慢查询
    if duration > 5.0:
        log.warning(f"Slow query ({duration:.2f}s): {sql}")

    # 上报指标
    metrics.record("query_duration", duration, tags={
        "database": self.engine,
        "datasource": self.id
    })

    return result

总结

AskTable 通过插件化的数据库适配器架构,实现了对 40+ 种数据库的统一支持:

  1. 统一接口:上层业务逻辑无需关心底层数据库类型
  2. 灵活扩展:新增数据库支持只需实现适配器
  3. 方言适配:自动处理不同数据库的 SQL 语法差异
  4. 性能优化:连接池、缓存、查询优化
  5. 企业级特性:安全、监控、错误处理

这种架构让 AskTable 能够适应各种复杂的企业数据环境,无论是传统数据库、云数据库、国产数据库,还是大数据平台,都能无缝接入,为用户提供统一的 AI 数据分析体验。