
sidebar.wechat

sidebar.feishu
sidebar.chooseYourWayToJoin

sidebar.scanToAddConsultant
In enterprise data analysis scenarios, data is often scattered across different types of databases: MySQL stores business data, ClickHouse does real-time analysis, Oracle manages financial data, PostgreSQL handles user data. How can AI seamlessly access these heterogeneous datasources? AskTable achieves unified support for 33 databases through a plugin-based database adapter architecture.
Typical Enterprise Database Distribution:
Challenges:
AskTable defines unified interfaces through abstract base classes, with each database implementing its own adapter:
class DataSourceAdmin:
project_id: str
id: str
name: str
engine: DBEngine # Database type
access_config: AccessConfigPrivate | None
file_paths: dict[str, str] | None
@property
def accessor(self):
"""Get the data accessor for the datasource"""
if self._accessor:
return self._accessor
if not self.access_config:
raise errors.DataSourceConfigError
self._accessor = BaseAccessor.create(
engine=self.engine,
access_config=self.access_config.model_dump(),
file_paths=self.file_paths,
)
return self._accessor
@property
def dialect(self) -> str:
"""Get SQL dialect"""
if self.engine == DBEngine.dap:
return self.accessor.dialect
return sql_dialect_adaptor(self.engine)
@property
def runtime_meta(self) -> MetaBase:
"""Get real-time metadata of the datasource"""
log.info(f"get runtime meta for {self.id}")
meta = self.accessor.get_meta()
return meta
@property
def brain_meta(self) -> MetaAdmin:
"""Get metadata from Brain (pre-loaded by get_admin)"""
if not self._meta:
raise RuntimeError("brain_meta not loaded, use get_admin(with_meta=True)")
return self._meta
Design Highlights:
# All databases are accessed through the same interface
accessor = datasource.accessor
# Connect to database
with accessor.connect():
# Execute query
df = accessor.query("SELECT * FROM users LIMIT 10")
# Get metadata
meta = accessor.get_meta()
Advantages:
self._accessor = BaseAccessor.create(
engine=self.engine,
access_config=self.access_config.model_dump(),
file_paths=self.file_paths,
)
Factory Pattern: Dynamically create corresponding adapters based on engine type
# Pseudocode example
class BaseAccessor:
@classmethod
def create(cls, engine: DBEngine, access_config: dict, file_paths: dict):
if engine == DBEngine.mysql:
return MySQLAccessor(access_config)
elif engine == DBEngine.postgresql:
return PostgreSQLAccessor(access_config)
elif engine == DBEngine.clickhouse:
return ClickHouseAccessor(access_config)
elif engine == DBEngine.oracle:
return OracleAccessor(access_config)
# ... 33 databases
else:
raise UnsupportedDatabaseError(f"Unsupported engine: {engine}")
@property
def dialect(self) -> str:
if self.engine == DBEngine.dap:
return self.accessor.dialect
return sql_dialect_adaptor(self.engine)
Dialect Mapping:
def sql_dialect_adaptor(engine: DBEngine) -> str:
"""Map database engine to SQL dialect"""
mapping = {
DBEngine.mysql: "mysql",
DBEngine.postgresql: "postgres",
DBEngine.clickhouse: "clickhouse",
DBEngine.oracle: "oracle",
DBEngine.sqlserver: "tsql",
DBEngine.hive: "hive",
DBEngine.doris: "mysql", # Doris compatible with MySQL protocol
DBEngine.starrocks: "mysql", # StarRocks compatible with MySQL protocol
# ...
}
return mapping.get(engine, "generic")
Uses:
From the code, you can see the database types AskTable supports:
# Relational Databases
- MySQL / MariaDB
- PostgreSQL
- Oracle
- SQL Server
- SQLite
# Cloud Databases
- Alibaba Cloud ADB MySQL / ADB PostgreSQL
- Alibaba Cloud PolarDB MySQL / PolarDB PostgreSQL
- Alibaba Cloud Hologres
- Tencent Cloud TDSQL MySQL / TDSQL PostgreSQL
- Huawei Cloud GaussDB / GaussDB DWS
# Analytic Databases
- ClickHouse
- Doris
- StarRocks
- SelectDB
- Databend
# Big Data Platforms
- Hive
- MaxCompute
# Domestic Databases
- DaMeng
- KingBase ES
- GBase 8a / 8c
- HighGo
- XuGu
- UXDB
- Oscar
- MogDB
- Vastbase
- YashanDB
- GreenPlum
- SequoiaDB
- RapidsDB
- OceanBase
# Document Data
- Excel
- Feishu Bitable
# Data Lake
- DAP (Data Access Platform)
Total: 33 Database Types!
Using MySQL as an example, let's look at the adapter implementation:
class MySQLAccessor(BaseAccessor):
def __init__(self, access_config: dict):
self.host = access_config["host"]
self.port = access_config["port"]
self.database = access_config["database"]
self.username = access_config["username"]
self.password = access_config["password"]
self.connection = None
def connect(self):
"""Establish database connection"""
import pymysql
self.connection = pymysql.connect(
host=self.host,
port=self.port,
database=self.database,
user=self.username,
password=self.password,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
return self
def query(self, sql: str) -> pd.DataFrame:
"""Execute query and return DataFrame"""
if not self.connection:
raise ConnectionError("Not connected to database")
with self.connection.cursor() as cursor:
cursor.execute(sql)
result = cursor.fetchall()
return pd.DataFrame(result)
def get_meta(self) -> MetaBase:
"""Get database metadata"""
schemas = []
# Get all tables
tables_sql = """
SELECT
TABLE_SCHEMA,
TABLE_NAME,
TABLE_COMMENT
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = %s
"""
tables = self.query_raw(tables_sql, (self.database,))
for table in tables:
# Get field information for the table
fields_sql = """
SELECT
COLUMN_NAME,
DATA_TYPE,
COLUMN_COMMENT,
IS_NULLABLE,
COLUMN_KEY
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
ORDER BY ORDINAL_POSITION
"""
fields = self.query_raw(fields_sql, (self.database, table['TABLE_NAME']))
# Build metadata structure
# ...
return MetaBase(schemas=schemas)
def close(self):
"""Close connection"""
if self.connection:
self.connection.close()
self.connection = None
Key Points:
ClickHouse is a columnar database with special query optimization:
class ClickHouseAccessor(BaseAccessor):
def query(self, sql: str) -> pd.DataFrame:
"""ClickHouse special handling"""
# Add FORMAT clause
if "FORMAT" not in sql.upper():
sql = f"{sql} FORMAT JSONEachRow"
# Use HTTP interface
response = requests.post(
f"http://{self.host}:{self.port}",
params={"database": self.database},
data=sql,
auth=(self.username, self.password)
)
# Parse JSON results
lines = response.text.strip().split('\n')
data = [json.loads(line) for line in lines]
return pd.DataFrame(data)
def get_meta(self) -> MetaBase:
"""Get metadata from system.tables and system.columns"""
tables_sql = """
SELECT
database,
name as table_name,
comment
FROM system.tables
WHERE database = '{database}'
"""
# ...
Characteristics:
Oracle has unique pagination and metadata query methods:
class OracleAccessor(BaseAccessor):
def query(self, sql: str, limit: int = None) -> pd.DataFrame:
"""Oracle pagination handling"""
if limit:
# Oracle 11g uses ROWNUM
sql = f"""
SELECT * FROM (
{sql}
) WHERE ROWNUM <= {limit}
"""
# Oracle 12c+ can use FETCH FIRST
# sql = f"{sql} FETCH FIRST {limit} ROWS ONLY"
return super().query(sql)
def get_meta(self) -> MetaBase:
"""Get metadata from ALL_TABLES and ALL_TAB_COLUMNS"""
tables_sql = """
SELECT
OWNER,
TABLE_NAME,
COMMENTS
FROM ALL_TAB_COMMENTS
WHERE OWNER = :owner
"""
# ...
Characteristics:
Excel is not a database, but AskTable also supports it:
class ExcelAccessor(BaseAccessor):
def __init__(self, access_config: dict, file_paths: dict):
self.file_path = file_paths.get("excel_file")
self.sheets = {}
def connect(self):
"""Load Excel file"""
import openpyxl
self.workbook = openpyxl.load_workbook(self.file_path)
# Convert each sheet to DataFrame
for sheet_name in self.workbook.sheetnames:
sheet = self.workbook[sheet_name]
data = sheet.values
cols = next(data)
self.sheets[sheet_name] = pd.DataFrame(data, columns=cols)
return self
def query(self, sql: str) -> pd.DataFrame:
"""Use DuckDB to execute SQL"""
import duckdb
# Create temporary database
conn = duckdb.connect(':memory:')
# Register all sheets as tables
for sheet_name, df in self.sheets.items():
conn.register(sheet_name, df)
# Execute SQL
result = conn.execute(sql).fetchdf()
conn.close()
return result
def get_meta(self) -> MetaBase:
"""Infer metadata from DataFrame"""
schemas = []
for sheet_name, df in self.sheets.items():
fields = []
for col in df.columns:
fields.append({
"name": col,
"data_type": str(df[col].dtype),
"description": ""
})
# ...
return MetaBase(schemas=schemas)
Characteristics:
Requirement: User wants to know "correlated analysis of order data in MySQL and user behavior data in ClickHouse"
Implementation:
# 1. Query order data from MySQL
mysql_ds = get_datasource("mysql_orders")
orders_df = mysql_ds.accessor.query("""
SELECT user_id, order_amount, order_date
FROM orders
WHERE order_date >= '2024-01-01'
""")
# 2. Query user behavior data from ClickHouse
clickhouse_ds = get_datasource("clickhouse_events")
events_df = clickhouse_ds.accessor.query("""
SELECT user_id, event_type, event_count
FROM user_events
WHERE event_date >= '2024-01-01'
GROUP BY user_id, event_type
""")
# 3. Correlate in memory
import duckdb
conn = duckdb.connect(':memory:')
conn.register('orders', orders_df)
conn.register('events', events_df)
result = conn.execute("""
SELECT
o.user_id,
SUM(o.order_amount) as total_amount,
e.event_count
FROM orders o
LEFT JOIN events e ON o.user_id = e.user_id
WHERE e.event_type = 'page_view'
GROUP BY o.user_id, e.event_count
ORDER BY total_amount DESC
LIMIT 100
""").fetchdf()
Background: Enterprise migrating from Oracle to DaMeng database
AskTable's Advantage:
# Only need to modify configuration, no code changes needed
# Original
datasource_config = {
"engine": "oracle",
"host": "oracle.example.com",
"port": 1521,
"database": "ORCL",
# ...
}
# After migration
datasource_config = {
"engine": "dameng", # Only change this line
"host": "dameng.example.com",
"port": 5236,
"database": "DMDB",
# ...
}
Seamless Switch:
Background:
AskTable Unified Access:
# Configure multiple datasources
datasources = [
{
"name": "Local Business Database",
"engine": "mysql",
"host": "192.168.1.100",
# ...
},
{
"name": "Alibaba Cloud Analytics Database",
"engine": "clickhouse",
"host": "xxx.clickhouse.aliyuncs.com",
# ...
},
{
"name": "Tencent Cloud Logs",
"engine": "cls",
"endpoint": "xxx.cls.tencentyun.com",
# ...
}
]
# User asks: "Compare local orders with cloud analytics data"
# AskTable automatically:
# 1. Identify which datasources need to be accessed
# 2. Generate SQL in corresponding dialect
# 3. Query in parallel
# 4. Merge results
class ConnectionPool:
def __init__(self, accessor_factory, max_connections=10):
self.accessor_factory = accessor_factory
self.max_connections = max_connections
self.pool = queue.Queue(maxsize=max_connections)
self._init_pool()
def _init_pool(self):
for _ in range(self.max_connections):
accessor = self.accessor_factory()
accessor.connect()
self.pool.put(accessor)
def get_connection(self):
return self.pool.get(timeout=30)
def return_connection(self, accessor):
self.pool.put(accessor)
Advantages:
class DataSourceAdmin:
_meta_cache: dict[str, MetaAdmin] = {}
_cache_ttl: int = 3600 # 1 hour
@property
def brain_meta(self) -> MetaAdmin:
cache_key = f"{self.id}:{self.modified_at}"
if cache_key in self._meta_cache:
return self._meta_cache[cache_key]
# Load metadata from database
meta = self._load_meta_from_db()
self._meta_cache[cache_key] = meta
return meta
Advantages:
def optimize_query(sql: str, dialect: str) -> str:
"""Optimize SQL based on dialect"""
if dialect == "clickhouse":
# ClickHouse optimization
# 1. Add PREWHERE clause
# 2. Use FINAL to handle duplicate data
# 3. Optimize JOIN order
pass
elif dialect == "mysql":
# MySQL optimization
# 1. Add index hints
# 2. Optimize subqueries
# 3. Use STRAIGHT_JOIN
pass
# ...
return optimized_sql
Security:
# ✅ Recommended: Use key management service
access_config = {
"host": "db.example.com",
"port": 3306,
"database": "mydb",
"username": "user",
"password": get_secret("db_password") # Get from key management service
}
# ❌ Not recommended: Store password in plain text
access_config = {
"password": "plain_text_password"
}
try:
result = datasource.accessor.query(sql)
except ConnectionError:
# Connection failed, retry
retry_with_backoff()
except QueryTimeoutError:
# Query timeout, optimize SQL or increase timeout
log.warning(f"Query timeout: {sql}")
except PermissionError:
# Insufficient permissions, check user permissions
log.error(f"Permission denied for user {username}")
# Record query performance
@monitor_query_performance
def query(self, sql: str) -> pd.DataFrame:
start_time = time.time()
result = self._execute_query(sql)
duration = time.time() - start_time
# Record slow queries
if duration > 5.0:
log.warning(f"Slow query ({duration:.2f}s): {sql}")
# Report metrics
metrics.record("query_duration", duration, tags={
"database": self.engine,
"datasource": self.id
})
return result
AskTable achieves unified support for 33 databases through a plugin-based database adapter architecture:
This architecture allows AskTable to adapt to various complex enterprise data environments, whether traditional databases, cloud databases, domestic databases, or big data platforms, seamlessly integrated to provide users with a unified AI data analysis experience.
sidebar.noProgrammingNeeded
sidebar.startFreeTrial