410 lines
15 KiB
Python
410 lines
15 KiB
Python
"""
|
|
系统管理器
|
|
管理所有子系统的生命周期,提供动态挂载/卸载、故障监控等功能
|
|
"""
|
|
|
|
import logging
|
|
import threading
|
|
import time
|
|
import importlib.util
|
|
import pathlib
|
|
from typing import Dict, Optional, List, Any
|
|
from datetime import datetime
|
|
from base_subsystem import BaseSubsystem
|
|
logger = logging.getLogger(__name__)
|
|
|
|
system_manager_lock = threading.RLock()
|
|
config=[]
|
|
|
|
def import_all_from_folder(folder_path: str):
|
|
global config
|
|
"""动态导入指定文件夹下的所有Python模块"""
|
|
folder = pathlib.Path(folder_path)
|
|
if not folder.is_dir():
|
|
logger.warning(f"路径 '{folder_path}' 不是有效的文件夹")
|
|
return
|
|
|
|
for file in folder.glob("*.py"):
|
|
module_name = file.stem
|
|
spec = importlib.util.spec_from_file_location(module_name, str(file))
|
|
if spec and spec.loader:
|
|
module = importlib.util.module_from_spec(spec)
|
|
globals()[module_name] = module
|
|
spec.loader.exec_module(module)
|
|
#logger.info(f"已导入模块: {module_name} from {file}")
|
|
#print(f"已导入模块: {module_name} from {file}")
|
|
config.extend(module.config)
|
|
else:
|
|
logger.warning(f"无法导入模块: {module_name} from {file}")
|
|
|
|
folder_path = pathlib.Path(__file__).parent / "subsystems"
|
|
import_all_from_folder(str(folder_path))
|
|
|
|
|
|
|
|
|
|
class SystemManager:
|
|
"""
|
|
系统管理器
|
|
|
|
主要功能:
|
|
1. 管理多个独立的子系统
|
|
2. 支持动态挂载和卸载子系统
|
|
3. 监控子系统健康状态
|
|
4. 隔离子系统故障,防止连锁反应
|
|
5. 提供统一的系统状态查询接口
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""
|
|
初始化系统管理器
|
|
|
|
Args:
|
|
db_manager: 数据库管理器(所有子系统共享)
|
|
message_bus: 消息总线(所有子系统共享)
|
|
"""
|
|
|
|
# 子系统容器
|
|
self._subsystems: Dict[str, BaseSubsystem] = {}
|
|
self._subsystems_lock = threading.RLock() # 递归锁,支持嵌套锁定
|
|
|
|
# 管理器状态
|
|
self._running = False
|
|
self._monitor_thread: Optional[threading.Thread] = None
|
|
self._stop_event = threading.Event()
|
|
|
|
# 监控配置
|
|
self._monitor_interval = 30 # 检查间隔(秒)
|
|
self._health_check_timeout = 5 # 健康检查超时
|
|
self._auto_restart = True # 是否自动重启失败的子系统
|
|
|
|
# 系统统计
|
|
self._stats = {
|
|
'started_at': None,
|
|
'stopped_at': None,
|
|
'total_subsystems_added': 0,
|
|
'total_subsystems_removed': 0,
|
|
'subsystem_restarts': {} # {subsystem_name: restart_count}
|
|
}
|
|
|
|
logger.info("系统管理器已初始化")
|
|
|
|
# ========================================================================
|
|
# 子系统管理
|
|
# ========================================================================
|
|
|
|
def register_subsystem(self, subsystem: BaseSubsystem, autostart: bool = True) -> bool:
|
|
"""
|
|
注册(挂载)一个子系统
|
|
|
|
Args:
|
|
subsystem: 子系统实例
|
|
autostart: 是否自动启动
|
|
|
|
Returns:
|
|
是否注册成功
|
|
"""
|
|
with self._subsystems_lock:
|
|
if subsystem.name in self._subsystems:
|
|
logger.warning(f"子系统 '{subsystem.name}' 已存在,跳过注册")
|
|
return False
|
|
|
|
try:
|
|
self._subsystems[subsystem.name] = subsystem
|
|
self._stats['total_subsystems_added'] += 1
|
|
self._stats['subsystem_restarts'][subsystem.name] = 0
|
|
|
|
logger.info(f"子系统 '{subsystem.name}' 已注册")
|
|
|
|
# 如果系统已运行且需要自动启动
|
|
if self._running and autostart:
|
|
if subsystem.start():
|
|
logger.info(f"子系统 '{subsystem.name}' 已自动启动")
|
|
else:
|
|
logger.error(f"自动启动子系统 '{subsystem.name}' 失败")
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"注册子系统 '{subsystem.name}' 失败: {e}", exc_info=True)
|
|
return False
|
|
|
|
def unregister_subsystem(self, subsystem_name: str, autostop: bool = True) -> bool:
|
|
"""
|
|
卸载一个子系统
|
|
|
|
Args:
|
|
subsystem_name: 子系统名称
|
|
autostop: 是否自动停止
|
|
|
|
Returns:
|
|
是否卸载成功
|
|
"""
|
|
with self._subsystems_lock:
|
|
if subsystem_name not in self._subsystems:
|
|
logger.warning(f"子系统 '{subsystem_name}' 不存在")
|
|
return False
|
|
|
|
try:
|
|
subsystem = self._subsystems[subsystem_name]
|
|
|
|
# 如果需要自动停止
|
|
if autostop and subsystem.is_running():
|
|
subsystem.stop(timeout=10)
|
|
|
|
del self._subsystems[subsystem_name]
|
|
self._stats['total_subsystems_removed'] += 1
|
|
|
|
logger.info(f"子系统 '{subsystem_name}' 已卸载")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"卸载子系统 '{subsystem_name}' 失败: {e}", exc_info=True)
|
|
return False
|
|
|
|
def get_subsystem(self, subsystem_name: str) -> Optional[BaseSubsystem]:
|
|
"""获取指定的子系统"""
|
|
with self._subsystems_lock:
|
|
return self._subsystems.get(subsystem_name)
|
|
|
|
def list_subsystems(self) -> List[str]:
|
|
"""列出所有子系统名称"""
|
|
with self._subsystems_lock:
|
|
return list(self._subsystems.keys())
|
|
|
|
def register_subsystem_from_config(self) -> None:
|
|
"""
|
|
从配置注册子系统
|
|
|
|
Args:
|
|
config: 可选的子系统配置列表
|
|
"""
|
|
global config
|
|
if config is None:
|
|
# 使用默认配置
|
|
config = []
|
|
|
|
for subsys_conf in config:
|
|
try:
|
|
cls = subsys_conf['class']
|
|
name = subsys_conf['name']
|
|
params = subsys_conf.get('params', {})
|
|
|
|
subsystem = cls(name, **params)
|
|
self.register_subsystem(subsystem, autostart=subsys_conf.get('autostart', True))
|
|
|
|
except Exception as e:
|
|
logger.error(f"从配置注册子系统失败: {e}", exc_info=True)
|
|
|
|
# ========================================================================
|
|
# 生命周期管理
|
|
# ========================================================================
|
|
|
|
def start(self) -> bool:
|
|
"""
|
|
启动系统管理器和所有已注册的子系统
|
|
|
|
Returns:
|
|
是否启动成功
|
|
"""
|
|
try:
|
|
logger.info("系统管理器启动中...")
|
|
|
|
# 启动所有子系统
|
|
with self._subsystems_lock:
|
|
failed = []
|
|
for name, subsystem in self._subsystems.items():
|
|
try:
|
|
if not subsystem.start():
|
|
failed.append(name)
|
|
logger.error(f"启动子系统 '{name}' 失败")
|
|
except Exception as e:
|
|
failed.append(name)
|
|
logger.error(f"启动子系统 '{name}' 异常: {e}", exc_info=True)
|
|
|
|
if failed:
|
|
logger.warning(f"有 {len(failed)} 个子系统启动失败: {failed}")
|
|
|
|
# 启动监控线程
|
|
self._running = True
|
|
self._stop_event.clear()
|
|
self._stats['started_at'] = datetime.utcnow().isoformat()
|
|
|
|
self._monitor_thread = threading.Thread(
|
|
target=self._monitor_loop,
|
|
name="system-monitor",
|
|
daemon=False
|
|
)
|
|
self._monitor_thread.start()
|
|
|
|
logger.info(f"系统管理器已启动({len(self._subsystems)} 个子系统)")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"启动系统管理器失败: {e}", exc_info=True)
|
|
self._running = False
|
|
return False
|
|
|
|
def stop(self, timeout: int = 30) -> bool:
|
|
"""
|
|
停止系统管理器和所有子系统
|
|
|
|
Args:
|
|
timeout: 等待所有子系统停止的总超时时间
|
|
|
|
Returns:
|
|
是否成功停止
|
|
"""
|
|
try:
|
|
logger.info("系统管理器停止中...")
|
|
|
|
# 通知监控线程停止
|
|
self._running = False
|
|
self._stop_event.set()
|
|
|
|
# 等待监控线程
|
|
if self._monitor_thread and self._monitor_thread.is_alive():
|
|
self._monitor_thread.join(timeout=5)
|
|
|
|
# 停止所有子系统
|
|
with self._subsystems_lock:
|
|
subsystems_list = list(self._subsystems.items())
|
|
|
|
# 计算每个子系统的超时时间
|
|
per_subsystem_timeout = timeout // max(len(subsystems_list), 1)
|
|
|
|
failed = []
|
|
for name, subsystem in subsystems_list:
|
|
try:
|
|
if not subsystem.stop(timeout=per_subsystem_timeout):
|
|
failed.append(name)
|
|
logger.error(f"停止子系统 '{name}' 失败或超时")
|
|
except Exception as e:
|
|
failed.append(name)
|
|
logger.error(f"停止子系统 '{name}' 异常: {e}", exc_info=True)
|
|
|
|
self._stats['stopped_at'] = datetime.utcnow().isoformat()
|
|
|
|
if failed:
|
|
logger.warning(f"有 {len(failed)} 个子系统停止失败: {failed}")
|
|
return False
|
|
|
|
logger.info("系统管理器已停止")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"停止系统管理器失败: {e}", exc_info=True)
|
|
return False
|
|
|
|
# ========================================================================
|
|
# 监控和维护
|
|
# ========================================================================
|
|
|
|
def _monitor_loop(self):
|
|
"""
|
|
监控循环
|
|
持续检查子系统健康状态,检测故障并自动恢复
|
|
"""
|
|
logger.info("监控线程已启动")
|
|
|
|
while self._running:
|
|
try:
|
|
with self._subsystems_lock:
|
|
subsystems_snapshot = list(self._subsystems.items())
|
|
|
|
# 检查每个子系统的健康状态
|
|
for name, subsystem in subsystems_snapshot:
|
|
try:
|
|
health = subsystem.health_check()
|
|
|
|
# 检查子系统是否运行异常
|
|
if not health['running']:
|
|
logger.warning(
|
|
f"检测到子系统 '{name}' 已停止运行\n"
|
|
f" - 线程活跃: {health['thread_alive']}\n"
|
|
f" - 统计: {health['stats']}"
|
|
)
|
|
|
|
# 自动重启失败的子系统
|
|
if self._auto_restart:
|
|
logger.info(f"正在自动重启子系统 '{name}'...")
|
|
if subsystem.start():
|
|
self._stats['subsystem_restarts'][name] += 1
|
|
logger.info(
|
|
f"子系统 '{name}' 已重启 "
|
|
f"(共 {self._stats['subsystem_restarts'][name]} 次)"
|
|
)
|
|
else:
|
|
logger.error(f"重启子系统 '{name}' 失败")
|
|
|
|
# 检查最后一次错误
|
|
if health['stats']['last_error']:
|
|
logger.warning(
|
|
f"子系统 '{name}' 上次错误: {health['stats']['last_error']}\n"
|
|
f" 时间: {health['stats']['last_error_time']}\n"
|
|
f" 失败消息数: {health['stats']['failed_messages']}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"检查子系统 '{name}' 健康状态失败: {e}", exc_info=True)
|
|
|
|
# 等待直到下一次检查或接收停止信号
|
|
self._stop_event.wait(timeout=self._monitor_interval)
|
|
|
|
except Exception as e:
|
|
logger.error(f"监控循环异常: {e}", exc_info=True)
|
|
self._stop_event.wait(timeout=self._monitor_interval)
|
|
|
|
logger.info("监控线程已停止")
|
|
|
|
def get_system_status(self) -> Dict[str, Any]:
|
|
"""
|
|
获取完整的系统状态
|
|
|
|
Returns:
|
|
系统状态字典
|
|
"""
|
|
with self._subsystems_lock:
|
|
subsystems_status = {}
|
|
for name, subsystem in self._subsystems.items():
|
|
try:
|
|
subsystems_status[name] = subsystem.health_check()
|
|
except Exception as e:
|
|
subsystems_status[name] = {
|
|
'name': name,
|
|
'running': False,
|
|
'error': str(e)
|
|
}
|
|
|
|
return {
|
|
'manager_running': self._running,
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'total_subsystems': len(self._subsystems),
|
|
'subsystems': subsystems_status,
|
|
'stats': self._stats.copy(),
|
|
'auto_restart': self._auto_restart,
|
|
'monitor_interval': self._monitor_interval
|
|
}
|
|
|
|
def set_monitor_interval(self, interval: int):
|
|
"""设置监控间隔时间(秒)"""
|
|
self._monitor_interval = interval
|
|
logger.info(f"监控间隔已设置为 {interval} 秒")
|
|
|
|
def set_auto_restart(self, enabled: bool):
|
|
"""设置是否自动重启失败的子系统"""
|
|
self._auto_restart = enabled
|
|
logger.info(f"自动重启已设置为 {enabled}")
|
|
|
|
_system_manager:Optional[SystemManager]=None
|
|
|
|
def get_system_manager(db_manager, message_bus) -> SystemManager:
|
|
"""获取全局系统管理器实例"""
|
|
global _system_manager
|
|
|
|
with system_manager_lock:
|
|
if _system_manager is None:
|
|
_system_manager = SystemManager()
|
|
return _system_manager
|