Xyzen 架构设计
Xyzen 采用现代化的分层架构设计,通过清晰的关注点分离和模块化设计,实现了一个高效、可扩展的 AI 服务平台。本文档详细介绍系统的整体架构、核心模块、数据流以及技术选型。
系统架构概览
分层架构详解
1. 表现层 (Presentation Layer)
表现层作为系统的入口点,处理来自客户端的所有请求。
API 入口点
REST API 路由
/xyzen/api/v1/
├── /agents # 代理管理
├── /sessions # 会话管理
├── /topics # 主题管理
├── /mcps # MCP 服务器管理
├── /providers # 提供者管理
└── /auth # 身份认证
WebSocket 入口
/xyzen/ws/v1/
└── /chat/[session_id] # 实时聊天连接
健康检查
/xyzen/api/health # 服务健康状态检查
FastAPI 应用结构
# FastAPI 应用初始化流程
app = FastAPI(
title="Xyzen FastAPI Service",
lifespan=lifespan, # 生命周期管理
docs_url="/xyzen/api/docs",
)
# CORS 中间件
app.add_middleware(CORSMiddleware, allow_origins=["*"])
# 根路由器
app.include_router(root_router, prefix="/xyzen")
# MCP 路由自动注册
app.router.routes.extend(setup_mcp_routes(app.state))
2. 中间件层 (Middleware Layer)
中间件负责横切关注点的处理,在请求-响应周期中执行统一的处理逻辑。
身份认证中间件 (auth/)
验证 Bearer Token JWT,提取用户身份信息:
- 支持多个认证提供者 (Casdoor, Bohrium, BohrApp)
- 从 Token 中提取 `user_id` 和 `provider`
- 为后续服务层注入当前用户上下文
数据库连接中间件 (database/)
管理 SQLAlchemy 数据库连接池:
- 创建 `AsyncSessionLocal` 连接
- 使用 FastAPI 的 `Depends` 机制依赖注入
- 支持事务管理和自动回滚
日志中间件 (logger/)
结构化日志记录:
- 请求入口日志
- 错误堆栈跟踪
- 服务启动和关闭日志
动态 MCP 服务器中间件 (dynamic_mcp_server.py)
动态创建和挂载 MCP HTTP 服务器:
- 为每个注册的 MCP 服务器创建独立的 FastMCP 应用
- 处理 MCP 生命周期管理
- 错误隔离和恢复
3. API 处理层 (Handler Layer)
处理层包含所有的 HTTP 路由处理器和 WebSocket 事件处理器。
REST API 处理器 (handler/api/v1/)
| 模块 | 职责 | 核心端点 |
|---|---|---|
| agents.py | 代理 CRUD 和配置 | POST /agents,GET /agents/id,PUT /agents/id,DELETE /agents/id |
| sessions.py | 会话和主题管理 | POST /sessions,GET /sessions,自动创建默认主题 |
| topics.py | 主题和消息 | GET /topics/id/messages,DELETE /topics/id |
| mcps.py | MCP 服务器集成 | GET /mcps,POST /mcps/tools/test,Smithery 激活 |
| providers.py | 提供者配置 | GET /providers,POST /providers,PUT /providers/id,DELETE /providers/id |
| auth.py | 认证状态 | GET /auth/status,GET /auth/config |
WebSocket 处理器 (handler/ws/v1/)
实时聊天通信通道:
- 连接建立与关闭
- 消息发送和接收
- 流式响应处理
内置代理处理器 (handler/builtin_agents/)
预定义的系统级代理:
- 注册表管理
- 代理发现和加载
- 代理启动时数据库种子化
4. 服务层 (Service Layer)
服务层包含业务逻辑和复杂的计算,是应用的核心逻辑所在。
Agent Service (core/agent_service.py)
统一的代理管理服务:
UnifiedAgentRead模型用于读取常规代理和图代理- 代理类型检测 (regular vs graph vs builtin)
- 代理验证和配置加载
Chat Service (core/chat/)
对话执行引擎:
- 与 LangChain 集成用于标准 LLM 调用
- 与 LangGraph 集成用于复杂工作流
- 流式响应处理
- 工具调用和确认机制
MCP Service (core/mcp.py)
MCP (Model Context Protocol) 服务:
- MCP 服务器客户端管理
- Smithery 注册表集成
- 工具状态检查和健康检测
- 异步广播更新
Provider Service (core/providers/)
LLM 提供者管理:
- 提供者初始化 (
initialize_providers_on_startup) - 凭证安全存储和掩码
- 多提供者支持 (OpenAI, Azure, Anthropic, Google)
- 模板管理
Auth Service (core/auth/)
身份认证和授权:
- Token 验证
- 用户上下文管理
- 多提供者认证
Consumption Service (core/consume.py)
使用量跟踪和计费:
- 记录每次 API 调用的消费
- 远程计费集成 (Bohrium)
- 消费记录持久化
- 幂等性保证
5. 数据层 (Data Layer)
Repository 层 (repo/)
数据访问抽象:
| 文件 | 功能 |
|---|---|
| agent.py | Agent CRUD |
| session.py | Session/Topic CRUD |
| message.py | Message CRUD |
| provider.py | Provider CRUD |
| mcp.py | MCP 服务器 CRUD |
| consume.py | ConsumeRecord CRUD |
| graph.py | GraphAgent CRUD |
Repository 模式的好处:
- 解耦业务逻辑和数据访问
- 支持单元测试和 Mock
- 便于切换数据源
SQLModel ORM (models/)
类型安全的 ORM 模型,基于 Pydantic V2 和 SQLAlchemy 2.0:
class Agent(SQLModel, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True)
name: str
description: str | None = None
temperature: float | None = None
prompt: str | None = None
user_id: str = Field(index=True)
provider_id: UUID | None = Field(index=True)
require_tool_confirmation: bool = False
created_at: datetime # TIMESTAMP with timezone
updated_at: datetime # TIMESTAMP with timezone
数据库支持
PostgreSQL (生产环境)
- 完整的事务支持
- JSONB 用于存储复杂结构
- 全文搜索能力
- 高并发性能
SQLite (开发环境)
- 无需外部服务
- 快速本地开发
核心数据模型
数据模型关系
系统的核心数据模型包括以下实体及其关系:
用户与资源关系
- 用户拥有多个代理 (Agent)
- 用户创建多个会话 (Session)
- 用户创建多个图代理 (GraphAgent)
- 用户生成消费记录 (ConsumeRecord)
代理与配置关系
- 代理使用多个 MCP 服务器
- 代理关联一个提供者 (Provider)
会话层级关系
- 会话包含多个主题 (Topic)
- 主题包含多个消息 (Message)
工作流关系
- 图代理包含多个节点 (GraphNode)
- 图代理定义多个边 (GraphEdge)
提供者关系
- 提供者定义多个工具 (Tool)
关键模型详解
User 上下文
虽然 User 信息不直接存储在数据库中,但通过 `user_id` 和 `auth_provider` 字段维护用户上下文:
- 多租户隔离:每个 user_id 拥有独立的 agents、sessions 等
- 认证提供者跟踪:支持多个身份提供者
Agent (代理)
class Agent(SQLModel, table=True):
# 基本信息
name: str # 代理名称
description: str # 代理描述
avatar: str # 头像 URL
tags: list[str] # 标签分类
# 配置
model: str # 使用的 LLM 模型
temperature: float # 温度参数 (0-1)
prompt: str # 系统提示词
# 关联
user_id: str # 所有者用户 ID
provider_id: UUID # 关联的提供者 UUID
# 工具确认
require_tool_confirmation: bool # 调用工具前是否需要用户确认
# 时间戳
created_at: datetime # 创建时间
updated_at: datetime # 更新时间
Session & Topic & Message
会话是多轮对话的容器:
- Session: 一次完整的会话,包含一个默认主题
- Topic: 主题是消息的分组,一个会话可有多个主题
- Message: 单条消息,包含 role (user/assistant) 和 content
# 创建流程
POST /sessions
└─ 自动创建一个 Topic
└─ Topic 包含 Messages
GraphAgent (图代理)
基于 LangGraph 的工作流代理:
class GraphAgent(SQLModel, table=True):
name: str
description: str
state_schema: dict # JSON - 定义状态结构
is_active: bool # 是否激活
is_published: bool # 是否发布
is_official: bool # 是否为官方代理
parent_agent_id: UUID # 版本控制 - 指向上一版本
ConsumeRecord (消费记录)
记录每次 API 使用的消费:
class ConsumeRecord(SQLModel, table=True):
user_id: str # 用户 ID
amount: int # 消费数量
auth_provider: str # 认证提供者
# 业务关联
session_id: UUID # 关联的会话
topic_id: UUID # 关联的主题
message_id: UUID # 关联的消息
# 计费状态
consume_state: str # pending/success/failed
remote_response: str # 远程计费系统响应
# 幂等性
biz_no: int # 业务编号(自增,用于去重)
通信流程
请求-响应流程
WebSocket 流程
生命周期管理
应用启动流程
关键启动步骤
- 数据库表创建: 使用 SQLModel 创建或迁移所有表
- 提供者初始化: 从环境变量加载 API 密钥
- 系统代理创建: 确保 Chat 和 Workshop 代理存在
- 内置代理种子化: 将预定义的图代理加载到数据库
- MCP 服务器设置: 为每个已注册的 MCP 服务器创建 HTTP 应用
- 生命周期上下文管理: 使用 AsyncExitStack 管理所有异步资源
API 设计模式
RESTful 设计
遵循 REST 最佳实践:
| HTTP 方法 | 操作 | 路由示例 |
|---|---|---|
| GET | 读取 | /agents 列表,/agents/id 详情 |
| POST | 创建 | /agents 创建新代理,自动关联 user_id |
| PUT | 更新 | /agents/id 更新代理配置 |
| DELETE | 删除 | /agents/id 删除代理 |
请求/响应模式
请求模型 (Pydantic)
class AgentCreate(SQLModel):
name: str
description: str | None = None
model: str | None = None
temperature: float | None = None
provider_id: UUID | None = None
mcp_server_ids: list[UUID] = []
响应模式
class AgentRead(AgentBase):
id: UUID
created_at: datetime
updated_at: datetime
# 关联数据
mcp_servers: list[MCPServerRead]
provider: ProviderRead
错误处理
统一的 HTTP 异常处理:
- 400 Bad Request - 请求参数错误
- 401 Unauthorized - 认证失败
- 403 Forbidden - 权限不足
- 404 Not Found - 资源不存在
- 409 Conflict - 业务冲突 (如重名)
- 500 Internal Server Error - 服务错误
外部集成
LLM 提供者集成
```mermaid
graph LR
ChatService["Chat Service
core/chat/"] -->|langchain| OpenAI["OpenAI"]
ChatService -->|langchain| Claude["Anthropic Claude"]
ChatService -->|langchain| Gemini["Google Gemini"]
ChatService -->|langchain| Azure["Azure OpenAI"]
style ChatService fill:#fce4ec style OpenAI fill:#bbdefb style Claude fill:#bbdefb style Gemini fill:#bbdefb style Azure fill:#bbdefb ```
通过 LangChain 的 LLMBase 抽象,支持多个 LLM 提供者。每个提供者通过 `Provider` 模型存储其 API 密钥和配置。
MCP (Model Context Protocol) 集成
MCP 提供工具和资源的标准化接口:
- MCP 服务器注册: 在
handler/mcp/中注册 MCP 服务器 - 工具发现: 通过 Smithery 注册表发现可用工具
- 工具调用: Chat Service 通过 MCP 调用外部工具
- 健康检查: 定期检查 MCP 服务器可用性
LangChain & LangGraph
- LangChain: 用于构建简单的 LLM 调用链
- LangGraph: 用于构建复杂的工作流图 (状态机)
Graph Agents 利用 LangGraph 定义:
- Nodes: 执行步骤 (LLM 调用、工具调用等)
- Edges: 条件转移逻辑
- State Schema: 整个工作流的状态定义
部署架构
Docker 容器化
# 基础镜像: Python 3.13.5-slim
FROM python:3.13.5-slim
# 依赖管理: uv (快速 Python 包管理器)
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
# 环境变量
ENV TZ=Asia/Shanghai
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
# 安装依赖
RUN uv sync --locked
# 暴露端口
EXPOSE 48196
# 启动命令
CMD ["uv", "run", "python", "-m", "app.main"]
环境配置
通过环境变量配置应用:
# 数据库
DATABASE_URL=postgresql://user:password@host:5432/xyzen
# LLM 提供者
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...
GOOGLE_API_KEY=...
# 认证
CASDOOR_ENDPOINT=https://...
BOHRIUM_API_KEY=...
# 应用
DEBUG=false
HOST=0.0.0.0
PORT=48196
多环境部署
- 开发环境: SQLite + 快速迭代
- 测试环境: PostgreSQL + 完整功能测试
- 生产环境: PostgreSQL + 高可用配置 + 监控日志
技术栈总结
| 分层 | 技术 | 说明 |
|---|---|---|
| Web 框架 | FastAPI | 高性能异步 Web 框架 |
| 异步运行时 | AsyncIO | Python 异步编程 |
| ORM | SQLModel + SQLAlchemy | 类型安全的 ORM |
| 数据库 | PostgreSQL / SQLite | 关系型数据库 |
| 迁移工具 | Alembic | 数据库版本管理 |
| AI/Agent | LangChain | LLM 调用框架 |
| 工作流引擎 | LangGraph | 复杂业务流程编排 |
| 实时通信 | WebSocket | 双向实时通信 |
| 工具协议 | MCP | 标准化工具接口 |
| 容器化 | Docker | 标准化部署 |
| 包管理 | uv | 快速依赖管理 |
| 认证 | JWT + 多提供者 | 安全身份验证 |
| 日志 | 结构化日志 | 便于调试和监控 |
设计原则
1. 关注点分离 (Separation of Concerns)
每个模块专注于单一职责:
- API Handler: 请求/响应处理
- Service: 业务逻辑
- Repository: 数据访问
- Model: 数据定义
2. 依赖注入 (Dependency Injection)
使用 FastAPI 的 Depends 机制实现:
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
return validate_token(token)
@router.get("/agents")
async def list_agents(current_user: User = Depends(get_current_user)):
return await agent_service.list_agents(current_user.id)
3. 异步优先 (Async-First)
充分利用 Python AsyncIO:
- 非阻塞 I/O 操作
- 高并发处理能力
- 更好的资源利用
4. 类型安全 (Type Safety)
使用 Python 类型注解和 Pydantic:
- 编译时类型检查
- 运行时数据验证
- 更好的 IDE 支持
5. 多租户隔离 (Multi-Tenancy)
通过 user_id 字段实现:
- 每个用户的数据完全隔离
- 数据库查询自动带上 user_id 过滤
- 无需修改核心逻辑即可支持多用户
可扩展性考虑
水平扩展
- 无状态 API: 可运行多个副本
- 共享数据库: PostgreSQL 支持并发访问
- 负载均衡: 通过 Nginx/HAProxy 分发请求
垂直扩展
- 异步处理: 使用后台任务处理耗时操作
- 缓存层: 可添加 Redis 缓存热数据
- CDN: 静态资源通过 CDN 加速
功能扩展
- 插件系统: MCP 服务器作为插件机制
- 自定义 Agent: 用户可创建自己的代理
- 工作流编排: LangGraph 支持复杂工作流