
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在企业数据分析场景中,数据往往分散在不同类型的数据库中:MySQL 存储业务数据,ClickHouse 做实时分析,Oracle 管理财务数据,PostgreSQL 处理用户数据。如何让 AI 无缝访问这些异构数据源?AskTable 通过插件化的数据库适配器架构,实现了对 40+ 种数据库的统一支持。
典型企业的数据库分布:
挑战:
AskTable 通过抽象基类定义统一接口,每种数据库实现自己的适配器:
class DataSourceAdmin: project_id: str id: str name: str engine: DBEngine # 数据库类型 access_config: AccessConfigPrivate | None file_paths: dict[str, str] | None @property def accessor(self): """获取数据源的数据访问器""" if self._accessor: return self._accessor if not self.access_config: raise errors.DataSourceConfigError self._accessor = BaseAccessor.create( engine=self.engine, access_config=self.access_config.model_dump(), file_paths=self.file_paths, ) return self._accessor @property def dialect(self) -> str: """获取 SQL 方言""" if self.engine == DBEngine.dap: return self.accessor.dialect return sql_dialect_adaptor(self.engine) @property def runtime_meta(self) -> MetaBase: """实时获取数据源的 MetaData""" log.info(f"get runtime meta for {self.id}") meta = self.accessor.get_meta() return meta @property def brain_meta(self) -> MetaAdmin: """获取 Brain 中的 MetaData(由 get_admin 预加载)""" if not self._meta: raise RuntimeError("brain_meta not loaded, use get_admin(with_meta=True)") return self._meta
设计亮点:
# 所有数据库都通过相同的接口访问 accessor = datasource.accessor # 连接数据库 with accessor.connect(): # 执行查询 df = accessor.query("SELECT * FROM users LIMIT 10") # 获取元数据 meta = accessor.get_meta()
优势:
self._accessor = BaseAccessor.create( engine=self.engine, access_config=self.access_config.model_dump(), file_paths=self.file_paths, )
工厂模式:根据 engine 类型动态创建对应的适配器
# 伪代码示例 class BaseAccessor: @classmethod def create(cls, engine: DBEngine, access_config: dict, file_paths: dict): if engine == DBEngine.mysql: return MySQLAccessor(access_config) elif engine == DBEngine.postgresql: return PostgreSQLAccessor(access_config) elif engine == DBEngine.clickhouse: return ClickHouseAccessor(access_config) elif engine == DBEngine.oracle: return OracleAccessor(access_config) # ... 40+ 种数据库 else: raise UnsupportedDatabaseError(f"Unsupported engine: {engine}")
@property def dialect(self) -> str: if self.engine == DBEngine.dap: return self.accessor.dialect return sql_dialect_adaptor(self.engine)
方言映射:
def sql_dialect_adaptor(engine: DBEngine) -> str: """将数据库引擎映射到 SQL 方言""" mapping = { DBEngine.mysql: "mysql", DBEngine.postgresql: "postgres", DBEngine.clickhouse: "clickhouse", DBEngine.oracle: "oracle", DBEngine.sqlserver: "tsql", DBEngine.hive: "hive", DBEngine.doris: "mysql", # Doris 兼容 MySQL 协议 DBEngine.starrocks: "mysql", # StarRocks 兼容 MySQL 协议 # ... } return mapping.get(engine, "generic")
用途:
从代码中可以看到 AskTable 支持的数据库类型:
# 关系型数据库 - MySQL / MariaDB - PostgreSQL - Oracle - SQL Server - SQLite # 云数据库 - 阿里云 ADB MySQL / ADB PostgreSQL - 阿里云 PolarDB MySQL / PolarDB PostgreSQL - 阿里云 Hologres - 腾讯云 TDSQL MySQL / TDSQL PostgreSQL - 华为云 GaussDB / GaussDB DWS # 分析型数据库 - ClickHouse - Doris - StarRocks - SelectDB - Databend # 大数据平台 - Hive - MaxCompute # 国产数据库 - 达梦 (DaMeng) - 人大金仓 (KingBase ES) - 南大通用 (GBase 8a / 8c) - 瀚高 (HighGo) - 虚谷 (XuGu) - 优炫 (UXDB) - 神舟通用 (Oscar) - 翰高 (MogDB) - 海量 (Vastbase) - 沃趣 (YashanDB) - 南大通用 (GreenPlum) - 巨杉 (SequoiaDB) - 柏睿 (RapidsDB) - 偶数 (OceanBase) # 文档数据 - Excel - 飞书多维表格 (Bitable) # 数据湖 - DAP (Data Access Platform)
总计 40+ 种数据库!
以 MySQL 为例,看看适配器的实现:
class MySQLAccessor(BaseAccessor): def __init__(self, access_config: dict): self.host = access_config["host"] self.port = access_config["port"] self.database = access_config["database"] self.username = access_config["username"] self.password = access_config["password"] self.connection = None def connect(self): """建立数据库连接""" import pymysql self.connection = pymysql.connect( host=self.host, port=self.port, database=self.database, user=self.username, password=self.password, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) return self def query(self, sql: str) -> pd.DataFrame: """执行查询并返回 DataFrame""" if not self.connection: raise ConnectionError("Not connected to database") with self.connection.cursor() as cursor: cursor.execute(sql) result = cursor.fetchall() return pd.DataFrame(result) def get_meta(self) -> MetaBase: """获取数据库元数据""" schemas = [] # 获取所有表 tables_sql = """ SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_COMMENT FROM information_schema.TABLES WHERE TABLE_SCHEMA = %s """ tables = self.query_raw(tables_sql, (self.database,)) for table in tables: # 获取表的字段信息 fields_sql = """ SELECT COLUMN_NAME, DATA_TYPE, COLUMN_COMMENT, IS_NULLABLE, COLUMN_KEY FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s ORDER BY ORDINAL_POSITION """ fields = self.query_raw(fields_sql, (self.database, table['TABLE_NAME'])) # 构建元数据结构 # ... return MetaBase(schemas=schemas) def close(self): """关闭连接""" if self.connection: self.connection.close() self.connection = None
关键点:
ClickHouse 是列式数据库,有特殊的查询优化:
class ClickHouseAccessor(BaseAccessor): def query(self, sql: str) -> pd.DataFrame: """ClickHouse 特殊处理""" # 添加 FORMAT 子句 if "FORMAT" not in sql.upper(): sql = f"{sql} FORMAT JSONEachRow" # 使用 HTTP 接口 response = requests.post( f"http://{self.host}:{self.port}", params={"database": self.database}, data=sql, auth=(self.username, self.password) ) # 解析 JSON 结果 lines = response.text.strip().split('\n') data = [json.loads(line) for line in lines] return pd.DataFrame(data) def get_meta(self) -> MetaBase: """从 system.tables 和 system.columns 获取元数据""" tables_sql = """ SELECT database, name as table_name, comment FROM system.tables WHERE database = '{database}' """ # ...
特点:
Oracle 有独特的分页和元数据查询方式:
class OracleAccessor(BaseAccessor): def query(self, sql: str, limit: int = None) -> pd.DataFrame: """Oracle 分页处理""" if limit: # Oracle 11g 使用 ROWNUM sql = f""" SELECT * FROM ( {sql} ) WHERE ROWNUM <= {limit} """ # Oracle 12c+ 可以使用 FETCH FIRST # sql = f"{sql} FETCH FIRST {limit} ROWS ONLY" return super().query(sql) def get_meta(self) -> MetaBase: """从 ALL_TABLES 和 ALL_TAB_COLUMNS 获取元数据""" tables_sql = """ SELECT OWNER, TABLE_NAME, COMMENTS FROM ALL_TAB_COMMENTS WHERE OWNER = :owner """ # ...
特点:
Excel 不是数据库,但 AskTable 也支持:
class ExcelAccessor(BaseAccessor): def __init__(self, access_config: dict, file_paths: dict): self.file_path = file_paths.get("excel_file") self.sheets = {} def connect(self): """加载 Excel 文件""" import openpyxl self.workbook = openpyxl.load_workbook(self.file_path) # 将每个 sheet 转换为 DataFrame for sheet_name in self.workbook.sheetnames: sheet = self.workbook[sheet_name] data = sheet.values cols = next(data) self.sheets[sheet_name] = pd.DataFrame(data, columns=cols) return self def query(self, sql: str) -> pd.DataFrame: """使用 DuckDB 执行 SQL""" import duckdb # 创建临时数据库 conn = duckdb.connect(':memory:') # 注册所有 sheet 为表 for sheet_name, df in self.sheets.items(): conn.register(sheet_name, df) # 执行 SQL result = conn.execute(sql).fetchdf() conn.close() return result def get_meta(self) -> MetaBase: """从 DataFrame 推断元数据""" schemas = [] for sheet_name, df in self.sheets.items(): fields = [] for col in df.columns: fields.append({ "name": col, "data_type": str(df[col].dtype), "description": "" }) # ... return MetaBase(schemas=schemas)
特点:
需求:用户想知道"MySQL 中的订单数据和 ClickHouse 中的用户行为数据的关联分析"
实现:
# 1. 从 MySQL 查询订单数据 mysql_ds = get_datasource("mysql_orders") orders_df = mysql_ds.accessor.query(""" SELECT user_id, order_amount, order_date FROM orders WHERE order_date >= '2024-01-01' """) # 2. 从 ClickHouse 查询用户行为数据 clickhouse_ds = get_datasource("clickhouse_events") events_df = clickhouse_ds.accessor.query(""" SELECT user_id, event_type, event_count FROM user_events WHERE event_date >= '2024-01-01' GROUP BY user_id, event_type """) # 3. 在内存中关联 import duckdb conn = duckdb.connect(':memory:') conn.register('orders', orders_df) conn.register('events', events_df) result = conn.execute(""" SELECT o.user_id, SUM(o.order_amount) as total_amount, e.event_count FROM orders o LEFT JOIN events e ON o.user_id = e.user_id WHERE e.event_type = 'page_view' GROUP BY o.user_id, e.event_count ORDER BY total_amount DESC LIMIT 100 """).fetchdf()
背景:企业从 Oracle 迁移到达梦数据库
AskTable 的优势:
# 只需修改配置,无需修改代码 # 原来 datasource_config = { "engine": "oracle", "host": "oracle.example.com", "port": 1521, "database": "ORCL", # ... } # 迁移后 datasource_config = { "engine": "dameng", # 只改这一行 "host": "dameng.example.com", "port": 5236, "database": "DMDB", # ... }
无缝切换:
背景:
AskTable 统一接入:
# 配置多个数据源 datasources = [ { "name": "本地业务库", "engine": "mysql", "host": "192.168.1.100", # ... }, { "name": "阿里云分析库", "engine": "clickhouse", "host": "xxx.clickhouse.aliyuncs.com", # ... }, { "name": "腾讯云日志", "engine": "cls", "endpoint": "xxx.cls.tencentyun.com", # ... } ] # 用户提问:"对比本地订单和云端分析数据" # AskTable 自动: # 1. 识别需要访问哪些数据源 # 2. 生成对应方言的 SQL # 3. 并行查询 # 4. 合并结果
class ConnectionPool: def __init__(self, accessor_factory, max_connections=10): self.accessor_factory = accessor_factory self.max_connections = max_connections self.pool = queue.Queue(maxsize=max_connections) self._init_pool() def _init_pool(self): for _ in range(self.max_connections): accessor = self.accessor_factory() accessor.connect() self.pool.put(accessor) def get_connection(self): return self.pool.get(timeout=30) def return_connection(self, accessor): self.pool.put(accessor)
优势:
class DataSourceAdmin: _meta_cache: dict[str, MetaAdmin] = {} _cache_ttl: int = 3600 # 1 小时 @property def brain_meta(self) -> MetaAdmin: cache_key = f"{self.id}:{self.modified_at}" if cache_key in self._meta_cache: return self._meta_cache[cache_key] # 从数据库加载元数据 meta = self._load_meta_from_db() self._meta_cache[cache_key] = meta return meta
优势:
def optimize_query(sql: str, dialect: str) -> str: """根据方言优化 SQL""" if dialect == "clickhouse": # ClickHouse 优化 # 1. 添加 PREWHERE 子句 # 2. 使用 FINAL 处理重复数据 # 3. 优化 JOIN 顺序 pass elif dialect == "mysql": # MySQL 优化 # 1. 添加索引提示 # 2. 优化子查询 # 3. 使用 STRAIGHT_JOIN pass # ... return optimized_sql
安全性:
# ✅ 推荐:使用密钥管理服务 access_config = { "host": "db.example.com", "port": 3306, "database": "mydb", "username": "user", "password": get_secret("db_password") # 从密钥管理服务获取 } # ❌ 不推荐:明文存储密码 access_config = { "password": "plain_text_password" }
try: result = datasource.accessor.query(sql) except ConnectionError: # 连接失败,重试 retry_with_backoff() except QueryTimeoutError: # 查询超时,优化 SQL 或增加超时时间 log.warning(f"Query timeout: {sql}") except PermissionError: # 权限不足,检查用户权限 log.error(f"Permission denied for user {username}")
# 记录查询性能 @monitor_query_performance def query(self, sql: str) -> pd.DataFrame: start_time = time.time() result = self._execute_query(sql) duration = time.time() - start_time # 记录慢查询 if duration > 5.0: log.warning(f"Slow query ({duration:.2f}s): {sql}") # 上报指标 metrics.record("query_duration", duration, tags={ "database": self.engine, "datasource": self.id }) return result
AskTable 通过插件化的数据库适配器架构,实现了对 40+ 种数据库的统一支持:
这种架构让 AskTable 能够适应各种复杂的企业数据环境,无论是传统数据库、云数据库、国产数据库,还是大数据平台,都能无缝接入,为用户提供统一的 AI 数据分析体验。