Xyzen 架构设计

Xyzen 采用现代化的分层架构设计,通过清晰的关注点分离和模块化设计,实现了一个高效、可扩展的 AI 服务平台。本文档详细介绍系统的整体架构、核心模块、数据流以及技术选型。

系统架构概览

分层架构详解

1. 表现层 (Presentation Layer)

表现层作为系统的入口点,处理来自客户端的所有请求。

API 入口点

REST API 路由

/xyzen/api/v1/
  ├── /agents          # 代理管理
  ├── /sessions        # 会话管理
  ├── /topics          # 主题管理
  ├── /mcps            # MCP 服务器管理
  ├── /providers       # 提供者管理
  └── /auth            # 身份认证

WebSocket 入口

/xyzen/ws/v1/
  └── /chat/[session_id]  # 实时聊天连接

健康检查

/xyzen/api/health  # 服务健康状态检查

FastAPI 应用结构

# FastAPI 应用初始化流程
app = FastAPI(
    title="Xyzen FastAPI Service",
    lifespan=lifespan,  # 生命周期管理
    docs_url="/xyzen/api/docs",
)

# CORS 中间件
app.add_middleware(CORSMiddleware, allow_origins=["*"])

# 根路由器
app.include_router(root_router, prefix="/xyzen")

# MCP 路由自动注册
app.router.routes.extend(setup_mcp_routes(app.state))

2. 中间件层 (Middleware Layer)

中间件负责横切关注点的处理,在请求-响应周期中执行统一的处理逻辑。

身份认证中间件 (auth/)

验证 Bearer Token JWT,提取用户身份信息:

  • 支持多个认证提供者 (Casdoor, Bohrium, BohrApp)
  • 从 Token 中提取 `user_id` 和 `provider`
  • 为后续服务层注入当前用户上下文

数据库连接中间件 (database/)

管理 SQLAlchemy 数据库连接池:

  • 创建 `AsyncSessionLocal` 连接
  • 使用 FastAPI 的 `Depends` 机制依赖注入
  • 支持事务管理和自动回滚

日志中间件 (logger/)

结构化日志记录:

  • 请求入口日志
  • 错误堆栈跟踪
  • 服务启动和关闭日志

动态 MCP 服务器中间件 (dynamic_mcp_server.py)

动态创建和挂载 MCP HTTP 服务器:

  • 为每个注册的 MCP 服务器创建独立的 FastMCP 应用
  • 处理 MCP 生命周期管理
  • 错误隔离和恢复

3. API 处理层 (Handler Layer)

处理层包含所有的 HTTP 路由处理器和 WebSocket 事件处理器。

REST API 处理器 (handler/api/v1/)

模块职责核心端点
agents.py代理 CRUD 和配置POST /agents,GET /agents/id,PUT /agents/id,DELETE /agents/id
sessions.py会话和主题管理POST /sessions,GET /sessions,自动创建默认主题
topics.py主题和消息GET /topics/id/messages,DELETE /topics/id
mcps.pyMCP 服务器集成GET /mcps,POST /mcps/tools/test,Smithery 激活
providers.py提供者配置GET /providers,POST /providers,PUT /providers/id,DELETE /providers/id
auth.py认证状态GET /auth/status,GET /auth/config

WebSocket 处理器 (handler/ws/v1/)

实时聊天通信通道:

  • 连接建立与关闭
  • 消息发送和接收
  • 流式响应处理

内置代理处理器 (handler/builtin_agents/)

预定义的系统级代理:

  • 注册表管理
  • 代理发现和加载
  • 代理启动时数据库种子化

4. 服务层 (Service Layer)

服务层包含业务逻辑和复杂的计算,是应用的核心逻辑所在。

Agent Service (core/agent_service.py)

统一的代理管理服务:

  • UnifiedAgentRead 模型用于读取常规代理和图代理
  • 代理类型检测 (regular vs graph vs builtin)
  • 代理验证和配置加载

Chat Service (core/chat/)

对话执行引擎:

  • 与 LangChain 集成用于标准 LLM 调用
  • 与 LangGraph 集成用于复杂工作流
  • 流式响应处理
  • 工具调用和确认机制

MCP Service (core/mcp.py)

MCP (Model Context Protocol) 服务:

  • MCP 服务器客户端管理
  • Smithery 注册表集成
  • 工具状态检查和健康检测
  • 异步广播更新

Provider Service (core/providers/)

LLM 提供者管理:

  • 提供者初始化 (initialize_providers_on_startup)
  • 凭证安全存储和掩码
  • 多提供者支持 (OpenAI, Azure, Anthropic, Google)
  • 模板管理

Auth Service (core/auth/)

身份认证和授权:

  • Token 验证
  • 用户上下文管理
  • 多提供者认证

Consumption Service (core/consume.py)

使用量跟踪和计费:

  • 记录每次 API 调用的消费
  • 远程计费集成 (Bohrium)
  • 消费记录持久化
  • 幂等性保证

5. 数据层 (Data Layer)

Repository 层 (repo/)

数据访问抽象:

文件功能
agent.pyAgent CRUD
session.pySession/Topic CRUD
message.pyMessage CRUD
provider.pyProvider CRUD
mcp.pyMCP 服务器 CRUD
consume.pyConsumeRecord CRUD
graph.pyGraphAgent CRUD

Repository 模式的好处:

  • 解耦业务逻辑和数据访问
  • 支持单元测试和 Mock
  • 便于切换数据源

SQLModel ORM (models/)

类型安全的 ORM 模型,基于 Pydantic V2 和 SQLAlchemy 2.0:

class Agent(SQLModel, table=True):
    id: UUID = Field(default_factory=uuid4, primary_key=True)
    name: str
    description: str | None = None
    temperature: float | None = None
    prompt: str | None = None
    user_id: str = Field(index=True)
    provider_id: UUID | None = Field(index=True)
    require_tool_confirmation: bool = False
    created_at: datetime  # TIMESTAMP with timezone
    updated_at: datetime  # TIMESTAMP with timezone

数据库支持

PostgreSQL (生产环境)

  • 完整的事务支持
  • JSONB 用于存储复杂结构
  • 全文搜索能力
  • 高并发性能

SQLite (开发环境)

  • 无需外部服务
  • 快速本地开发

核心数据模型

数据模型关系

系统的核心数据模型包括以下实体及其关系:

用户与资源关系

  • 用户拥有多个代理 (Agent)
  • 用户创建多个会话 (Session)
  • 用户创建多个图代理 (GraphAgent)
  • 用户生成消费记录 (ConsumeRecord)

代理与配置关系

  • 代理使用多个 MCP 服务器
  • 代理关联一个提供者 (Provider)

会话层级关系

  • 会话包含多个主题 (Topic)
  • 主题包含多个消息 (Message)

工作流关系

  • 图代理包含多个节点 (GraphNode)
  • 图代理定义多个边 (GraphEdge)

提供者关系

  • 提供者定义多个工具 (Tool)

关键模型详解

User 上下文

虽然 User 信息不直接存储在数据库中,但通过 `user_id` 和 `auth_provider` 字段维护用户上下文:

  • 多租户隔离:每个 user_id 拥有独立的 agents、sessions 等
  • 认证提供者跟踪:支持多个身份提供者

Agent (代理)

class Agent(SQLModel, table=True):
    # 基本信息
    name: str              # 代理名称
    description: str       # 代理描述
    avatar: str            # 头像 URL
    tags: list[str]        # 标签分类
    
    # 配置
    model: str             # 使用的 LLM 模型
    temperature: float     # 温度参数 (0-1)
    prompt: str            # 系统提示词
    
    # 关联
    user_id: str           # 所有者用户 ID
    provider_id: UUID      # 关联的提供者 UUID
    
    # 工具确认
    require_tool_confirmation: bool  # 调用工具前是否需要用户确认
    
    # 时间戳
    created_at: datetime   # 创建时间
    updated_at: datetime   # 更新时间

Session & Topic & Message

会话是多轮对话的容器:

  • Session: 一次完整的会话,包含一个默认主题
  • Topic: 主题是消息的分组,一个会话可有多个主题
  • Message: 单条消息,包含 role (user/assistant) 和 content
# 创建流程
POST /sessions
  └─ 自动创建一个 Topic
      └─ Topic 包含 Messages

GraphAgent (图代理)

基于 LangGraph 的工作流代理:

class GraphAgent(SQLModel, table=True):
    name: str
    description: str
    state_schema: dict    # JSON - 定义状态结构
    is_active: bool       # 是否激活
    is_published: bool    # 是否发布
    is_official: bool     # 是否为官方代理
    parent_agent_id: UUID # 版本控制 - 指向上一版本

ConsumeRecord (消费记录)

记录每次 API 使用的消费:

class ConsumeRecord(SQLModel, table=True):
    user_id: str           # 用户 ID
    amount: int            # 消费数量
    auth_provider: str     # 认证提供者
    
    # 业务关联
    session_id: UUID       # 关联的会话
    topic_id: UUID         # 关联的主题
    message_id: UUID       # 关联的消息
    
    # 计费状态
    consume_state: str     # pending/success/failed
    remote_response: str   # 远程计费系统响应
    
    # 幂等性
    biz_no: int           # 业务编号(自增,用于去重)

通信流程

请求-响应流程

WebSocket 流程

生命周期管理

应用启动流程

关键启动步骤

  1. 数据库表创建: 使用 SQLModel 创建或迁移所有表
  2. 提供者初始化: 从环境变量加载 API 密钥
  3. 系统代理创建: 确保 Chat 和 Workshop 代理存在
  4. 内置代理种子化: 将预定义的图代理加载到数据库
  5. MCP 服务器设置: 为每个已注册的 MCP 服务器创建 HTTP 应用
  6. 生命周期上下文管理: 使用 AsyncExitStack 管理所有异步资源

API 设计模式

RESTful 设计

遵循 REST 最佳实践:

HTTP 方法操作路由示例
GET读取/agents 列表,/agents/id 详情
POST创建/agents 创建新代理,自动关联 user_id
PUT更新/agents/id 更新代理配置
DELETE删除/agents/id 删除代理

请求/响应模式

请求模型 (Pydantic)

class AgentCreate(SQLModel):
    name: str
    description: str | None = None
    model: str | None = None
    temperature: float | None = None
    provider_id: UUID | None = None
    mcp_server_ids: list[UUID] = []

响应模式

class AgentRead(AgentBase):
    id: UUID
    created_at: datetime
    updated_at: datetime
    # 关联数据
    mcp_servers: list[MCPServerRead]
    provider: ProviderRead

错误处理

统一的 HTTP 异常处理:

  • 400 Bad Request - 请求参数错误
  • 401 Unauthorized - 认证失败
  • 403 Forbidden - 权限不足
  • 404 Not Found - 资源不存在
  • 409 Conflict - 业务冲突 (如重名)
  • 500 Internal Server Error - 服务错误

外部集成

LLM 提供者集成

```mermaid graph LR ChatService["Chat Service
core/chat/"] -->|langchain| OpenAI["OpenAI"] ChatService -->|langchain| Claude["Anthropic Claude"] ChatService -->|langchain| Gemini["Google Gemini"] ChatService -->|langchain| Azure["Azure OpenAI"]

style ChatService fill:#fce4ec style OpenAI fill:#bbdefb style Claude fill:#bbdefb style Gemini fill:#bbdefb style Azure fill:#bbdefb ```

通过 LangChain 的 LLMBase 抽象,支持多个 LLM 提供者。每个提供者通过 `Provider` 模型存储其 API 密钥和配置。

MCP (Model Context Protocol) 集成

MCP 提供工具和资源的标准化接口:

  1. MCP 服务器注册: 在 handler/mcp/ 中注册 MCP 服务器
  2. 工具发现: 通过 Smithery 注册表发现可用工具
  3. 工具调用: Chat Service 通过 MCP 调用外部工具
  4. 健康检查: 定期检查 MCP 服务器可用性

LangChain & LangGraph

  • LangChain: 用于构建简单的 LLM 调用链
  • LangGraph: 用于构建复杂的工作流图 (状态机)

Graph Agents 利用 LangGraph 定义:

  • Nodes: 执行步骤 (LLM 调用、工具调用等)
  • Edges: 条件转移逻辑
  • State Schema: 整个工作流的状态定义

部署架构

Docker 容器化

# 基础镜像: Python 3.13.5-slim
FROM python:3.13.5-slim

# 依赖管理: uv (快速 Python 包管理器)
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/

# 环境变量
ENV TZ=Asia/Shanghai
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

# 安装依赖
RUN uv sync --locked

# 暴露端口
EXPOSE 48196

# 启动命令
CMD ["uv", "run", "python", "-m", "app.main"]

环境配置

通过环境变量配置应用:

# 数据库
DATABASE_URL=postgresql://user:password@host:5432/xyzen

# LLM 提供者
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...
GOOGLE_API_KEY=...

# 认证
CASDOOR_ENDPOINT=https://...
BOHRIUM_API_KEY=...

# 应用
DEBUG=false
HOST=0.0.0.0
PORT=48196

多环境部署

  • 开发环境: SQLite + 快速迭代
  • 测试环境: PostgreSQL + 完整功能测试
  • 生产环境: PostgreSQL + 高可用配置 + 监控日志

技术栈总结

分层技术说明
Web 框架FastAPI高性能异步 Web 框架
异步运行时AsyncIOPython 异步编程
ORMSQLModel + SQLAlchemy类型安全的 ORM
数据库PostgreSQL / SQLite关系型数据库
迁移工具Alembic数据库版本管理
AI/AgentLangChainLLM 调用框架
工作流引擎LangGraph复杂业务流程编排
实时通信WebSocket双向实时通信
工具协议MCP标准化工具接口
容器化Docker标准化部署
包管理uv快速依赖管理
认证JWT + 多提供者安全身份验证
日志结构化日志便于调试和监控

设计原则

1. 关注点分离 (Separation of Concerns)

每个模块专注于单一职责:

  • API Handler: 请求/响应处理
  • Service: 业务逻辑
  • Repository: 数据访问
  • Model: 数据定义

2. 依赖注入 (Dependency Injection)

使用 FastAPI 的 Depends 机制实现:

async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    return validate_token(token)

@router.get("/agents")
async def list_agents(current_user: User = Depends(get_current_user)):
    return await agent_service.list_agents(current_user.id)

3. 异步优先 (Async-First)

充分利用 Python AsyncIO:

  • 非阻塞 I/O 操作
  • 高并发处理能力
  • 更好的资源利用

4. 类型安全 (Type Safety)

使用 Python 类型注解和 Pydantic:

  • 编译时类型检查
  • 运行时数据验证
  • 更好的 IDE 支持

5. 多租户隔离 (Multi-Tenancy)

通过 user_id 字段实现:

  • 每个用户的数据完全隔离
  • 数据库查询自动带上 user_id 过滤
  • 无需修改核心逻辑即可支持多用户

可扩展性考虑

水平扩展

  • 无状态 API: 可运行多个副本
  • 共享数据库: PostgreSQL 支持并发访问
  • 负载均衡: 通过 Nginx/HAProxy 分发请求

垂直扩展

  • 异步处理: 使用后台任务处理耗时操作
  • 缓存层: 可添加 Redis 缓存热数据
  • CDN: 静态资源通过 CDN 加速

功能扩展

  • 插件系统: MCP 服务器作为插件机制
  • 自定义 Agent: 用户可创建自己的代理
  • 工作流编排: LangGraph 支持复杂工作流

Was this page helpful?