AskTable
sidebar.freeTrial

Supporting 33 Databases: AskTable's Multi-Datasource Adapter Architecture

AskTable Team
AskTable Team 2026-03-04

In enterprise data analysis scenarios, data is often scattered across different types of databases: MySQL stores business data, ClickHouse does real-time analysis, Oracle manages financial data, PostgreSQL handles user data. How can AI seamlessly access these heterogeneous datasources? AskTable achieves unified support for 33 databases through a plugin-based database adapter architecture.

Why Do We Need Multi-Database Support?

Enterprise Data Status

Typical Enterprise Database Distribution:

  • Business Systems: MySQL, SQL Server, Oracle
  • Data Warehouses: ClickHouse, Doris, StarRocks
  • Big Data Platforms: Hive, MaxCompute, Databend
  • Cloud Databases: Alibaba Cloud RDS, Tencent Cloud TDSQL, Huawei Cloud GaussDB
  • Domestic Databases: DaMeng, KingBase, GBase, XuGu
  • Document Data: Excel, Feishu Bitable

Challenges:

  • SQL dialect differences: Each database has subtle SQL syntax differences
  • Different connection methods: JDBC, ODBC, HTTP API, SDK
  • Different metadata acquisition methods: information_schema, system tables, API
  • Data type mapping: Different databases have different data type definitions
  • Different performance optimization strategies: indexes, partitions, query optimizers

Core Architecture Design

1. Plugin Adapter Architecture

AskTable defines unified interfaces through abstract base classes, with each database implementing its own adapter:

class DataSourceAdmin:
    project_id: str
    id: str
    name: str
    engine: DBEngine  # Database type
    access_config: AccessConfigPrivate | None
    file_paths: dict[str, str] | None

    @property
    def accessor(self):
        """Get the data accessor for the datasource"""
        if self._accessor:
            return self._accessor
        if not self.access_config:
            raise errors.DataSourceConfigError

        self._accessor = BaseAccessor.create(
            engine=self.engine,
            access_config=self.access_config.model_dump(),
            file_paths=self.file_paths,
        )
        return self._accessor

    @property
    def dialect(self) -> str:
        """Get SQL dialect"""
        if self.engine == DBEngine.dap:
            return self.accessor.dialect
        return sql_dialect_adaptor(self.engine)

    @property
    def runtime_meta(self) -> MetaBase:
        """Get real-time metadata of the datasource"""
        log.info(f"get runtime meta for {self.id}")
        meta = self.accessor.get_meta()
        return meta

    @property
    def brain_meta(self) -> MetaAdmin:
        """Get metadata from Brain (pre-loaded by get_admin)"""
        if not self._meta:
            raise RuntimeError("brain_meta not loaded, use get_admin(with_meta=True)")
        return self._meta

Design Highlights:

1.1 Unified Access Interface

# All databases are accessed through the same interface
accessor = datasource.accessor

# Connect to database
with accessor.connect():
    # Execute query
    df = accessor.query("SELECT * FROM users LIMIT 10")

    # Get metadata
    meta = accessor.get_meta()

Advantages:

  • Upper-layer business logic doesn't need to care about underlying database type
  • Adding new database support only requires implementing adapter, without affecting existing code
  • Easy to test and maintain

1.2 Dynamic Adapter Creation

self._accessor = BaseAccessor.create(
    engine=self.engine,
    access_config=self.access_config.model_dump(),
    file_paths=self.file_paths,
)

Factory Pattern: Dynamically create corresponding adapters based on engine type

# Pseudocode example
class BaseAccessor:
    @classmethod
    def create(cls, engine: DBEngine, access_config: dict, file_paths: dict):
        if engine == DBEngine.mysql:
            return MySQLAccessor(access_config)
        elif engine == DBEngine.postgresql:
            return PostgreSQLAccessor(access_config)
        elif engine == DBEngine.clickhouse:
            return ClickHouseAccessor(access_config)
        elif engine == DBEngine.oracle:
            return OracleAccessor(access_config)
        # ... 33 databases
        else:
            raise UnsupportedDatabaseError(f"Unsupported engine: {engine}")

1.3 SQL Dialect Adaptation

@property
def dialect(self) -> str:
    if self.engine == DBEngine.dap:
        return self.accessor.dialect
    return sql_dialect_adaptor(self.engine)

Dialect Mapping:

def sql_dialect_adaptor(engine: DBEngine) -> str:
    """Map database engine to SQL dialect"""
    mapping = {
        DBEngine.mysql: "mysql",
        DBEngine.postgresql: "postgres",
        DBEngine.clickhouse: "clickhouse",
        DBEngine.oracle: "oracle",
        DBEngine.sqlserver: "tsql",
        DBEngine.hive: "hive",
        DBEngine.doris: "mysql",  # Doris compatible with MySQL protocol
        DBEngine.starrocks: "mysql",  # StarRocks compatible with MySQL protocol
        # ...
    }
    return mapping.get(engine, "generic")

Uses:

  • SQL generation: Generate correct SQL syntax based on dialect
  • SQL parsing: Use tools like sqlglot to parse and convert SQL
  • Query optimization: Apply specific optimization strategies based on dialect

2. Supported Database Types

From the code, you can see the database types AskTable supports:

# Relational Databases
- MySQL / MariaDB
- PostgreSQL
- Oracle
- SQL Server
- SQLite

# Cloud Databases
- Alibaba Cloud ADB MySQL / ADB PostgreSQL
- Alibaba Cloud PolarDB MySQL / PolarDB PostgreSQL
- Alibaba Cloud Hologres
- Tencent Cloud TDSQL MySQL / TDSQL PostgreSQL
- Huawei Cloud GaussDB / GaussDB DWS

# Analytic Databases
- ClickHouse
- Doris
- StarRocks
- SelectDB
- Databend

# Big Data Platforms
- Hive
- MaxCompute

# Domestic Databases
- DaMeng
- KingBase ES
- GBase 8a / 8c
- HighGo
- XuGu
- UXDB
- Oscar
- MogDB
- Vastbase
- YashanDB
- GreenPlum
- SequoiaDB
- RapidsDB
- OceanBase

# Document Data
- Excel
- Feishu Bitable

# Data Lake
- DAP (Data Access Platform)

Total: 33 Database Types!

3. Adapter Implementation Example

Using MySQL as an example, let's look at the adapter implementation:

class MySQLAccessor(BaseAccessor):
    def __init__(self, access_config: dict):
        self.host = access_config["host"]
        self.port = access_config["port"]
        self.database = access_config["database"]
        self.username = access_config["username"]
        self.password = access_config["password"]
        self.connection = None

    def connect(self):
        """Establish database connection"""
        import pymysql
        self.connection = pymysql.connect(
            host=self.host,
            port=self.port,
            database=self.database,
            user=self.username,
            password=self.password,
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor
        )
        return self

    def query(self, sql: str) -> pd.DataFrame:
        """Execute query and return DataFrame"""
        if not self.connection:
            raise ConnectionError("Not connected to database")

        with self.connection.cursor() as cursor:
            cursor.execute(sql)
            result = cursor.fetchall()

        return pd.DataFrame(result)

    def get_meta(self) -> MetaBase:
        """Get database metadata"""
        schemas = []

        # Get all tables
        tables_sql = """
        SELECT
            TABLE_SCHEMA,
            TABLE_NAME,
            TABLE_COMMENT
        FROM information_schema.TABLES
        WHERE TABLE_SCHEMA = %s
        """
        tables = self.query_raw(tables_sql, (self.database,))

        for table in tables:
            # Get field information for the table
            fields_sql = """
            SELECT
                COLUMN_NAME,
                DATA_TYPE,
                COLUMN_COMMENT,
                IS_NULLABLE,
                COLUMN_KEY
            FROM information_schema.COLUMNS
            WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
            ORDER BY ORDINAL_POSITION
            """
            fields = self.query_raw(fields_sql, (self.database, table['TABLE_NAME']))

            # Build metadata structure
            # ...

        return MetaBase(schemas=schemas)

    def close(self):
        """Close connection"""
        if self.connection:
            self.connection.close()
            self.connection = None

Key Points:

  • Connection Management: Support connection pooling, timeout control
  • Query Execution: Handle parameterized queries, transactions
  • Metadata Acquisition: Get from information_schema or system tables
  • Error Handling: Unified exception handling and retry mechanism

4. Special Database Handling

4.1 ClickHouse Adapter

ClickHouse is a columnar database with special query optimization:

class ClickHouseAccessor(BaseAccessor):
    def query(self, sql: str) -> pd.DataFrame:
        """ClickHouse special handling"""
        # Add FORMAT clause
        if "FORMAT" not in sql.upper():
            sql = f"{sql} FORMAT JSONEachRow"

        # Use HTTP interface
        response = requests.post(
            f"http://{self.host}:{self.port}",
            params={"database": self.database},
            data=sql,
            auth=(self.username, self.password)
        )

        # Parse JSON results
        lines = response.text.strip().split('\n')
        data = [json.loads(line) for line in lines]
        return pd.DataFrame(data)

    def get_meta(self) -> MetaBase:
        """Get metadata from system.tables and system.columns"""
        tables_sql = """
        SELECT
            database,
            name as table_name,
            comment
        FROM system.tables
        WHERE database = '{database}'
        """
        # ...

Characteristics:

  • Use HTTP interface instead of JDBC
  • Support ClickHouse-specific data types (Array, Tuple, Nested)
  • Optimize large data volume queries

4.2 Oracle Adapter

Oracle has unique pagination and metadata query methods:

class OracleAccessor(BaseAccessor):
    def query(self, sql: str, limit: int = None) -> pd.DataFrame:
        """Oracle pagination handling"""
        if limit:
            # Oracle 11g uses ROWNUM
            sql = f"""
            SELECT * FROM (
                {sql}
            ) WHERE ROWNUM <= {limit}
            """
            # Oracle 12c+ can use FETCH FIRST
            # sql = f"{sql} FETCH FIRST {limit} ROWS ONLY"

        return super().query(sql)

    def get_meta(self) -> MetaBase:
        """Get metadata from ALL_TABLES and ALL_TAB_COLUMNS"""
        tables_sql = """
        SELECT
            OWNER,
            TABLE_NAME,
            COMMENTS
        FROM ALL_TAB_COMMENTS
        WHERE OWNER = :owner
        """
        # ...

Characteristics:

  • Handle Oracle-specific pagination syntax
  • Support Oracle data types (NUMBER, VARCHAR2, CLOB)
  • Handle case sensitivity issues

4.3 Excel Adapter

Excel is not a database, but AskTable also supports it:

class ExcelAccessor(BaseAccessor):
    def __init__(self, access_config: dict, file_paths: dict):
        self.file_path = file_paths.get("excel_file")
        self.sheets = {}

    def connect(self):
        """Load Excel file"""
        import openpyxl
        self.workbook = openpyxl.load_workbook(self.file_path)

        # Convert each sheet to DataFrame
        for sheet_name in self.workbook.sheetnames:
            sheet = self.workbook[sheet_name]
            data = sheet.values
            cols = next(data)
            self.sheets[sheet_name] = pd.DataFrame(data, columns=cols)

        return self

    def query(self, sql: str) -> pd.DataFrame:
        """Use DuckDB to execute SQL"""
        import duckdb

        # Create temporary database
        conn = duckdb.connect(':memory:')

        # Register all sheets as tables
        for sheet_name, df in self.sheets.items():
            conn.register(sheet_name, df)

        # Execute SQL
        result = conn.execute(sql).fetchdf()
        conn.close()

        return result

    def get_meta(self) -> MetaBase:
        """Infer metadata from DataFrame"""
        schemas = []
        for sheet_name, df in self.sheets.items():
            fields = []
            for col in df.columns:
                fields.append({
                    "name": col,
                    "data_type": str(df[col].dtype),
                    "description": ""
                })
            # ...
        return MetaBase(schemas=schemas)

Characteristics:

  • Map Excel sheets to database tables
  • Use DuckDB to provide SQL query capability
  • Automatically infer data types

Practical Application Scenarios

Scenario 1: Cross-Database Joint Query

Requirement: User wants to know "correlated analysis of order data in MySQL and user behavior data in ClickHouse"

Implementation:

# 1. Query order data from MySQL
mysql_ds = get_datasource("mysql_orders")
orders_df = mysql_ds.accessor.query("""
    SELECT user_id, order_amount, order_date
    FROM orders
    WHERE order_date >= '2024-01-01'
""")

# 2. Query user behavior data from ClickHouse
clickhouse_ds = get_datasource("clickhouse_events")
events_df = clickhouse_ds.accessor.query("""
    SELECT user_id, event_type, event_count
    FROM user_events
    WHERE event_date >= '2024-01-01'
    GROUP BY user_id, event_type
""")

# 3. Correlate in memory
import duckdb
conn = duckdb.connect(':memory:')
conn.register('orders', orders_df)
conn.register('events', events_df)

result = conn.execute("""
    SELECT
        o.user_id,
        SUM(o.order_amount) as total_amount,
        e.event_count
    FROM orders o
    LEFT JOIN events e ON o.user_id = e.user_id
    WHERE e.event_type = 'page_view'
    GROUP BY o.user_id, e.event_count
    ORDER BY total_amount DESC
    LIMIT 100
""").fetchdf()

Scenario 2: Domestic Database Replacement

Background: Enterprise migrating from Oracle to DaMeng database

AskTable's Advantage:

# Only need to modify configuration, no code changes needed
# Original
datasource_config = {
    "engine": "oracle",
    "host": "oracle.example.com",
    "port": 1521,
    "database": "ORCL",
    # ...
}

# After migration
datasource_config = {
    "engine": "dameng",  # Only change this line
    "host": "dameng.example.com",
    "port": 5236,
    "database": "DMDB",
    # ...
}

Seamless Switch:

  • SQL generation automatically adapts to DaMeng dialect
  • Metadata acquisition automatically uses DaMeng system tables
  • Users don't notice any difference

Scenario 3: Hybrid Cloud Deployment

Background:

  • Core business data in local MySQL
  • Analytics data in Alibaba Cloud ClickHouse
  • Log data in Tencent Cloud CLS

AskTable Unified Access:

# Configure multiple datasources
datasources = [
    {
        "name": "Local Business Database",
        "engine": "mysql",
        "host": "192.168.1.100",
        # ...
    },
    {
        "name": "Alibaba Cloud Analytics Database",
        "engine": "clickhouse",
        "host": "xxx.clickhouse.aliyuncs.com",
        # ...
    },
    {
        "name": "Tencent Cloud Logs",
        "engine": "cls",
        "endpoint": "xxx.cls.tencentyun.com",
        # ...
    }
]

# User asks: "Compare local orders with cloud analytics data"
# AskTable automatically:
# 1. Identify which datasources need to be accessed
# 2. Generate SQL in corresponding dialect
# 3. Query in parallel
# 4. Merge results

Performance Optimization Practices

1. Connection Pool Management

class ConnectionPool:
    def __init__(self, accessor_factory, max_connections=10):
        self.accessor_factory = accessor_factory
        self.max_connections = max_connections
        self.pool = queue.Queue(maxsize=max_connections)
        self._init_pool()

    def _init_pool(self):
        for _ in range(self.max_connections):
            accessor = self.accessor_factory()
            accessor.connect()
            self.pool.put(accessor)

    def get_connection(self):
        return self.pool.get(timeout=30)

    def return_connection(self, accessor):
        self.pool.put(accessor)

Advantages:

  • Avoid frequent connection establishment
  • Control concurrent connection count
  • Improve query performance

2. Metadata Caching

class DataSourceAdmin:
    _meta_cache: dict[str, MetaAdmin] = {}
    _cache_ttl: int = 3600  # 1 hour

    @property
    def brain_meta(self) -> MetaAdmin:
        cache_key = f"{self.id}:{self.modified_at}"

        if cache_key in self._meta_cache:
            return self._meta_cache[cache_key]

        # Load metadata from database
        meta = self._load_meta_from_db()
        self._meta_cache[cache_key] = meta

        return meta

Advantages:

  • Avoid frequent metadata queries
  • Reduce database load
  • Improve response speed

3. Query Optimization

def optimize_query(sql: str, dialect: str) -> str:
    """Optimize SQL based on dialect"""
    if dialect == "clickhouse":
        # ClickHouse optimization
        # 1. Add PREWHERE clause
        # 2. Use FINAL to handle duplicate data
        # 3. Optimize JOIN order
        pass
    elif dialect == "mysql":
        # MySQL optimization
        # 1. Add index hints
        # 2. Optimize subqueries
        # 3. Use STRAIGHT_JOIN
        pass
    # ...
    return optimized_sql

Best Practice Recommendations

1. Datasource Configuration

Security:

# ✅ Recommended: Use key management service
access_config = {
    "host": "db.example.com",
    "port": 3306,
    "database": "mydb",
    "username": "user",
    "password": get_secret("db_password")  # Get from key management service
}

# ❌ Not recommended: Store password in plain text
access_config = {
    "password": "plain_text_password"
}

2. Error Handling

try:
    result = datasource.accessor.query(sql)
except ConnectionError:
    # Connection failed, retry
    retry_with_backoff()
except QueryTimeoutError:
    # Query timeout, optimize SQL or increase timeout
    log.warning(f"Query timeout: {sql}")
except PermissionError:
    # Insufficient permissions, check user permissions
    log.error(f"Permission denied for user {username}")

3. Monitoring and Alerting

# Record query performance
@monitor_query_performance
def query(self, sql: str) -> pd.DataFrame:
    start_time = time.time()
    result = self._execute_query(sql)
    duration = time.time() - start_time

    # Record slow queries
    if duration > 5.0:
        log.warning(f"Slow query ({duration:.2f}s): {sql}")

    # Report metrics
    metrics.record("query_duration", duration, tags={
        "database": self.engine,
        "datasource": self.id
    })

    return result

Summary

AskTable achieves unified support for 33 databases through a plugin-based database adapter architecture:

  1. Unified Interface: Upper-layer business logic doesn't need to care about underlying database type
  2. Flexible Extension: Adding new database support only requires implementing adapter
  3. Dialect Adaptation: Automatically handle SQL syntax differences across different databases
  4. Performance Optimization: Connection pooling, caching, query optimization
  5. Enterprise Features: Security, monitoring, error handling

This architecture allows AskTable to adapt to various complex enterprise data environments, whether traditional databases, cloud databases, domestic databases, or big data platforms, seamlessly integrated to provide users with a unified AI data analysis experience.

cta.readyToSimplify

sidebar.noProgrammingNeededsidebar.startFreeTrial

cta.noCreditCard
cta.quickStart
cta.dbSupport