AskTable

MCP 架构与原理:深入理解 Model Context Protocol

AskTable 团队
AskTable 团队 2026年3月8日

理解 MCP 的技术架构和工作原理,可以帮助你更好地使用和优化 MCP 服务。本文将深入剖析 MCP 的核心技术。


一、整体架构

1. 三层架构

加载图表中...

应用层

协议层

服务层

2. 数据流

加载图表中...

二、通信协议详解

1. JSON-RPC 2.0

MCP 基于 JSON-RPC 2.0 协议,这是一个轻量级的远程过程调用协议。

请求格式

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/call",
  "params": {
    "name": "query",
    "arguments": {
      "question": "查询订单总额",
      "role_id": null,
      "role_variables": null
    }
  }
}

响应格式

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "{\"status\": \"success\", \"data\": \"订单总额为 ¥123,456\"}"
      }
    ]
  }
}

2. Stdio 模式

工作原理

加载图表中...

特点

启动流程

# AI 应用启动 MCP Server
$ uvx asktable-mcp-server@latest

# 环境变量
API_KEY=xxx
DATASOURCE_ID=yyy

# 通信
stdin  ← JSON-RPC 请求
stdout → JSON-RPC 响应
stderr → 日志输出

优势

劣势

3. SSE 模式

工作原理

加载图表中...

特点

连接流程

加载图表中...

SSE 事件格式

event: message
data: {"jsonrpc":"2.0","id":1,"result":{...}}

event: endpoint
data: http://localhost:8095/messages/

优势

劣势


三、安全机制

1. 认证与授权

API Key 认证

加载图表中...

认证流程

  1. 客户端在请求中携带 API Key
  2. MCP Server 转发到 AskTable API
  3. AskTable API 验证 API Key 的有效性
  4. 验证通过后执行查询

SSE 模式的凭证传递

在 SSE 模式中,API Key 通过两种方式传递:

方式 1:URL 参数(推荐)

GET /sse/?api_key=xxx&datasource_id=yyy

方式 2:Session 存储

# 连接时存储凭证
pending_credentials[f"{client_ip}_{timestamp}"] = {
    "api_key": api_key,
    "datasource_id": datasource_id
}

# 首次请求时关联到 session
session_credentials[session_id] = pending_credentials[key]

# 后续请求从 session 读取
credentials = session_credentials[session_id]

2. 权限控制

行级权限

# 用户查询
query(
    question="查询订单",
    role_id="role_sales",
    role_variables={"region": "华东"}
)

# 实际执行的 SQL
SELECT * FROM orders
WHERE region = '华东'  -- 自动注入权限条件

权限注入流程

加载图表中...

权限规则示例

# 角色定义
role = {
    "id": "role_sales",
    "rules": [
        "orders.region = {{region}}",
        "orders.status IN ('approved', 'completed')"
    ]
}

# 变量替换
variables = {"region": "华东"}

# 最终 SQL
SELECT * FROM orders
WHERE region = '华东'
  AND status IN ('approved', 'completed')

3. 数据安全

只读访问

SQL 注入防护

敏感数据脱敏


四、性能优化

1. 查询优化

智能 Schema Linking

加载图表中...

优化策略

示例

# 用户问题
"查询学生姓名"

# 优化前
SELECT * FROM students

# 优化后
SELECT name FROM students LIMIT 1000

2. 缓存机制

多层缓存

加载图表中...

缓存类型

缓存策略

3. 连接池

数据库连接池

# 连接池配置
pool = create_pool(
    min_size=5,      # 最小连接数
    max_size=20,     # 最大连接数
    timeout=30,      # 获取连接超时
    recycle=3600     # 连接回收时间
)

优势

4. 异步处理

异步查询

async def query(question: str):
    # 异步生成 SQL
    sql = await generate_sql(question)

    # 异步执行查询
    result = await execute_query(sql)

    # 异步格式化结果
    formatted = await format_result(result)

    return formatted

优势


五、AskTable MCP Server 实现

1. 核心组件

# server.py
from fastmcp import FastMCP

mcp = FastMCP(name="Asktable MCP Server")

@mcp.tool(name='使用 AskTable 查询数据')
async def query(
    question: str,
    role_id: str = None,
    role_variables: dict = None
) -> dict:
    # 调用 AskTable API
    result = await get_asktable_answer(
        api_key=os.getenv("API_KEY"),
        datasource_id=os.getenv("DATASOURCE_ID"),
        question=question,
        role_id=role_id,
        role_variables=role_variables
    )
    return result

2. API 调用

# at_apis.py
from asktable import Asktable

async def get_asktable_answer(
    api_key, datasource_id, question,
    base_url=None, role_id=None, role_variables=None
) -> dict:
    # 创建客户端
    client = Asktable(api_key=api_key, base_url=base_url)

    # 调用 API
    response = client.answers.create(
        datasource_id=datasource_id,
        question=question,
        role_id=role_id,
        role_variables=role_variables
    )

    # 返回结果
    return {
        "status": "success" if response.answer else "failure",
        "data": response.answer.text if response.answer else None
    }

3. SSE Server

# sse_server.py
from fastapi import FastAPI
from fastmcp.server import FastMCPServer

app = FastAPI()

# 创建 MCP Server
mcp_server = FastMCPServer(mcp)

# SSE 端点
@app.get("/sse/")
async def sse_endpoint(
    api_key: str,
    datasource_id: str
):
    # 存储凭证
    store_credentials(api_key, datasource_id)

    # 返回 SSE 流
    return mcp_server.sse_handler()

# 消息端点
@app.post("/messages/")
async def messages_endpoint(
    session_id: str,
    request: dict
):
    # 获取凭证
    credentials = get_credentials(session_id)

    # 处理请求
    return await mcp_server.handle_request(request, credentials)

六、监控与调试

1. 日志

日志级别

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

关键日志

日志示例

2026-03-08 10:30:15 - INFO - Received query: 查询订单总额
2026-03-08 10:30:16 - INFO - Generated SQL: SELECT SUM(amount) FROM orders
2026-03-08 10:30:17 - INFO - Query completed in 1.2s

2. 性能监控

关键指标

监控工具

3. 调试技巧

Stdio 模式调试

# 手动运行 MCP Server
$ uvx asktable-mcp-server@latest

# 输入 JSON-RPC 请求
{"jsonrpc":"2.0","id":1,"method":"tools/list"}

# 查看响应
{"jsonrpc":"2.0","id":1,"result":{...}}

SSE 模式调试

# 测试 SSE 连接
$ curl -N http://localhost:8095/sse/?api_key=xxx&datasource_id=yyy

# 测试消息端点
$ curl -X POST http://localhost:8095/messages/?session_id=abc \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","id":1,"method":"tools/list"}'

七、扩展与定制

1. 添加自定义工具

@mcp.tool(name='自定义工具')
async def custom_tool(param: str) -> dict:
    # 实现自定义逻辑
    result = await do_something(param)
    return {"status": "success", "data": result}

2. 添加资源

@mcp.resource("config://settings")
async def get_settings() -> str:
    # 返回配置信息
    return json.dumps({"key": "value"})

3. 添加提示词模板

@mcp.prompt(name="数据分析模板")
async def analysis_prompt(topic: str) -> str:
    return f"请分析 {topic} 的数据,包括趋势、异常和建议"

八、总结

MCP 的技术架构体现了以下设计理念:

简单性

安全性

性能

可扩展性

下一步


相关阅读

技术交流