Phase 58 — 联邦硬化系统设计
版本: v2.0.0 创建日期: 2026-02-28 状态: ✅ 已实现 (v2.0.0)
一、模块概述
Phase 58 引入联邦硬化系统,通过熔断器、健康检查和连接池管理提升 Cowork 联邦网络的稳定性和可靠性。
1.1 核心目标
- 熔断器机制: 故障隔离,防止雪崩效应
- 健康检查: 实时监控节点健康状态
- 连接池管理: 优化网络连接资源
- 自动降级: 故障节点自动摘除和恢复
1.2 技术架构
┌──────────────────────────────────────────────────────┐
│ Cowork Federation │
│ (ai-engine/cowork/) │
└───────────────────────┬──────────────────────────────┘
│
┌───────────────────────┼──────────────────────────────┐
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Federation Hardening │ │
│ │ (federation-hardening.js) │ │
│ │ ┌─────────────────────────────────────┐ │ │
│ │ │ Circuit Breaker │ │ │
│ │ │ - CLOSED / OPEN / HALF_OPEN │ │ │
│ │ │ - 故障计数 │ │ │
│ │ │ - 超时检测 │ │ │
│ │ └─────────────────────────────────────┘ │ │
│ │ ┌─────────────────────────────────────┐ │ │
│ │ │ Health Check │ │ │
│ │ │ - 心跳监控 │ │ │
│ │ │ - 延迟检测 │ │ │
│ │ │ - 成功率统计 │ │ │
│ │ └─────────────────────────────────────┘ │ │
│ │ ┌─────────────────────────────────────┐ │ │
│ │ │ Connection Pool │ │ │
│ │ │ - 连接复用 │ │ │
│ │ │ - 最大连接数 │ │ │
│ │ │ - 空闲连接回收 │ │ │
│ │ └─────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────┘二、核心模块设计
2.1 Circuit Breaker (熔断器)
熔断器状态机:
javascript
const CIRCUIT_BREAKER_STATES = {
CLOSED: "closed", // 正常状态,允许请求
OPEN: "open", // 熔断状态,拒绝请求
HALF_OPEN: "half_open", // 半开状态,尝试恢复
};状态转换规则:
CLOSED → OPEN: 连续失败次数 >= failureThreshold
OPEN → HALF_OPEN: 超过 openTimeout
HALF_OPEN → CLOSED: 探测成功
HALF_OPEN → OPEN: 探测失败API方法:
javascript
class CircuitBreaker {
constructor(options = {}) {
this.failureThreshold = options.failureThreshold || 5;
this.successThreshold = options.successThreshold || 2;
this.openTimeout = options.openTimeout || 60000; // 60s
this.requestTimeout = options.requestTimeout || 10000; // 10s
}
// 执行请求(带熔断保护)
async execute(fn, fallback) {}
// 获取熔断器状态
getState() {}
// 手动重置熔断器
reset() {}
// 获取统计信息
getStats() {}
}2.2 Health Check (健康检查)
健康检查指标:
javascript
const HEALTH_METRICS = {
HEARTBEAT: "heartbeat", // 心跳
LATENCY: "latency", // 延迟
SUCCESS_RATE: "success_rate", // 成功率
CPU_USAGE: "cpu_usage", // CPU使用率
MEMORY_USAGE: "memory_usage", // 内存使用率
};健康状态:
javascript
const HEALTH_STATUS = {
HEALTHY: "healthy", // 健康
DEGRADED: "degraded", // 降级
UNHEALTHY: "unhealthy", // 不健康
UNKNOWN: "unknown", // 未知
};API方法:
javascript
class HealthChecker {
constructor(db, config) {}
// 执行健康检查
async checkHealth(nodeId) {}
// 获取节点健康状态
async getHealthStatus(nodeId) {}
// 获取所有节点健康状态
async getAllHealthStatus() {}
// 订阅健康状态变化
onHealthChange(callback) {}
// 启动定期健康检查
startPeriodicCheck(interval = 30000) {}
// 停止定期健康检查
stopPeriodicCheck() {}
}2.3 Connection Pool (连接池)
连接池配置:
javascript
const POOL_CONFIG = {
minConnections: 5, // 最小连接数
maxConnections: 50, // 最大连接数
idleTimeout: 300000, // 空闲超时 (5分钟)
acquireTimeout: 30000, // 获取连接超时 (30秒)
validationInterval: 60000, // 验证间隔 (1分钟)
};API方法:
javascript
class ConnectionPool {
constructor(config = POOL_CONFIG) {}
// 获取连接
async acquire(nodeId) {}
// 释放连接
async release(connection) {}
// 销毁连接
async destroy(connection) {}
// 获取池统计信息
getStats() {}
// 清理空闲连接
async cleanup() {}
// 关闭连接池
async close() {}
}三、数据库设计
3.1 熔断器状态表
sql
CREATE TABLE IF NOT EXISTS federation_circuit_breakers (
node_id TEXT PRIMARY KEY,
state TEXT NOT NULL, -- closed/open/half_open
failure_count INTEGER DEFAULT 0,
success_count INTEGER DEFAULT 0,
last_failure_time INTEGER,
last_success_time INTEGER,
state_changed_at INTEGER NOT NULL,
metadata TEXT, -- JSON: 额外元数据
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_circuit_breakers_state
ON federation_circuit_breakers(state);3.2 健康检查记录表
sql
CREATE TABLE IF NOT EXISTS federation_health_checks (
check_id TEXT PRIMARY KEY,
node_id TEXT NOT NULL,
check_type TEXT NOT NULL, -- heartbeat/latency/success_rate
status TEXT NOT NULL, -- healthy/degraded/unhealthy
metrics TEXT, -- JSON: {latency: 50, success_rate: 0.99}
checked_at INTEGER NOT NULL,
FOREIGN KEY (node_id) REFERENCES federation_nodes(node_id)
);
CREATE INDEX IF NOT EXISTS idx_health_checks_node_id
ON federation_health_checks(node_id);
CREATE INDEX IF NOT EXISTS idx_health_checks_checked_at
ON federation_health_checks(checked_at DESC);四、IPC 接口设计
文件: desktop-app-vue/src/main/ai-engine/cowork/federation-hardening-ipc.js
通道列表 (4个):
javascript
const CHANNELS = [
"federation-hardening:get-circuit-breaker-status",
"federation-hardening:reset-circuit-breaker",
"federation-hardening:get-health-checks",
"federation-hardening:get-connection-pool-stats",
];处理器实现:
javascript
function registerFederationHardeningIPC(dependencies) {
const { ipcMain, federationHardening } = dependencies;
// 获取熔断器状态
ipcMain.handle(
"federation-hardening:get-circuit-breaker-status",
async (event, { nodeId }) => {
return await federationHardening.getCircuitBreakerStatus(nodeId);
},
);
// 重置熔断器
ipcMain.handle(
"federation-hardening:reset-circuit-breaker",
async (event, { nodeId }) => {
return await federationHardening.resetCircuitBreaker(nodeId);
},
);
// 获取健康检查结果
ipcMain.handle(
"federation-hardening:get-health-checks",
async (event, { nodeId, limit }) => {
return await federationHardening.getHealthChecks(nodeId, limit);
},
);
// 获取连接池统计
ipcMain.handle("federation-hardening:get-connection-pool-stats", async () => {
return await federationHardening.getConnectionPoolStats();
});
return { handlerCount: 4 };
}五、前端集成
5.1 Pinia Store
文件: desktop-app-vue/src/renderer/stores/federationHardening.ts
typescript
import { defineStore } from "pinia";
export const useFederationHardeningStore = defineStore("federationHardening", {
state: () => ({
circuitBreakers: {} as Record<string, any>,
healthChecks: [] as any[],
poolStats: null as any | null,
loading: false,
}),
getters: {
openCircuitBreakers: (state) => {
return Object.values(state.circuitBreakers).filter(
(cb: any) => cb.state === "open",
);
},
unhealthyNodes: (state) => {
return state.healthChecks.filter((hc: any) => hc.status === "unhealthy");
},
},
actions: {
async loadCircuitBreakerStatus(nodeId: string) {
const status = await (window as any).electronAPI.invoke(
"federation-hardening:get-circuit-breaker-status",
{ nodeId },
);
this.circuitBreakers[nodeId] = status;
},
async resetCircuitBreaker(nodeId: string) {
await (window as any).electronAPI.invoke(
"federation-hardening:reset-circuit-breaker",
{ nodeId },
);
await this.loadCircuitBreakerStatus(nodeId);
},
async loadHealthChecks(nodeId?: string) {
this.healthChecks = await (window as any).electronAPI.invoke(
"federation-hardening:get-health-checks",
{ nodeId, limit: 100 },
);
},
async loadPoolStats() {
this.poolStats = await (window as any).electronAPI.invoke(
"federation-hardening:get-connection-pool-stats",
);
},
},
});5.2 Vue 页面
文件: desktop-app-vue/src/renderer/pages/ai/FederationHardeningPage.vue
主要功能:
- 熔断器监控 - 实时显示熔断器状态,支持手动重置
- 健康检查仪表板 - 可视化节点健康状态
- 连接池监控 - 显示连接池使用情况
- 告警展示 - 展示降级和故障节点
六、配置管理
配置段: federationHardening
javascript
federationHardening: {
enabled: true,
circuitBreaker: {
failureThreshold: 5, // 连续失败5次触发熔断
successThreshold: 2, // 半开状态成功2次恢复
openTimeout: 60000, // 熔断后60秒尝试恢复
requestTimeout: 10000, // 请求超时10秒
},
healthCheck: {
enabled: true,
interval: 30000, // 30秒检查一次
timeout: 5000, // 健康检查超时5秒
unhealthyThreshold: 3, // 连续失败3次标记为不健康
degradedLatency: 1000, // 延迟>1秒标记为降级
},
connectionPool: {
minConnections: 5,
maxConnections: 50,
idleTimeout: 300000, // 5分钟
acquireTimeout: 30000, // 30秒
},
}七、测试覆盖
7.1 单元测试
文件: desktop-app-vue/tests/unit/cowork/federation-hardening.test.js
- ✅ 熔断器状态转换
- ✅ 健康检查执行
- ✅ 连接池获取和释放
- ✅ 自动降级和恢复
7.2 E2E 测试
文件: desktop-app-vue/tests/e2e/ai/federation-hardening.e2e.test.ts
- ✅ 完整的熔断器流程
- ✅ 健康检查和告警
- ✅ 连接池压力测试
八、故障恢复策略
熔断器恢复:
- OPEN → HALF_OPEN: 超过 openTimeout 后自动尝试
- HALF_OPEN → CLOSED: 成功阈值达标后恢复
节点摘除:
- 连续健康检查失败 >= unhealthyThreshold
- 自动从路由表中摘除
节点恢复:
- 健康检查恢复正常
- 自动重新加入路由表
九、监控指标
推荐监控的关键指标:
- 熔断器打开次数 (Circuit Breaker Opens)
- 不健康节点数量 (Unhealthy Nodes Count)
- 平均延迟 (Average Latency)
- 请求成功率 (Request Success Rate)
- 连接池使用率 (Pool Utilization)
相关文档:
