""" 系统管理器 管理所有子系统的生命周期,提供动态挂载/卸载、故障监控等功能 """ import logging import threading import time import importlib.util import pathlib from typing import Dict, Optional, List, Any from datetime import datetime from base_subsystem import BaseSubsystem logger = logging.getLogger(__name__) system_manager_lock = threading.RLock() config=[] def import_all_from_folder(folder_path: str): global config """动态导入指定文件夹下的所有Python模块""" folder = pathlib.Path(folder_path) if not folder.is_dir(): logger.warning(f"路径 '{folder_path}' 不是有效的文件夹") return for file in folder.glob("*.py"): module_name = file.stem spec = importlib.util.spec_from_file_location(module_name, str(file)) if spec and spec.loader: module = importlib.util.module_from_spec(spec) globals()[module_name] = module spec.loader.exec_module(module) #logger.info(f"已导入模块: {module_name} from {file}") #print(f"已导入模块: {module_name} from {file}") config.extend(module.config) else: logger.warning(f"无法导入模块: {module_name} from {file}") folder_path = pathlib.Path(__file__).parent / "subsystems" import_all_from_folder(str(folder_path)) class SystemManager: """ 系统管理器 主要功能: 1. 管理多个独立的子系统 2. 支持动态挂载和卸载子系统 3. 监控子系统健康状态 4. 隔离子系统故障,防止连锁反应 5. 提供统一的系统状态查询接口 """ def __init__(self): """ 初始化系统管理器 Args: db_manager: 数据库管理器(所有子系统共享) message_bus: 消息总线(所有子系统共享) """ # 子系统容器 self._subsystems: Dict[str, BaseSubsystem] = {} self._subsystems_lock = threading.RLock() # 递归锁,支持嵌套锁定 # 管理器状态 self._running = False self._monitor_thread: Optional[threading.Thread] = None self._stop_event = threading.Event() # 监控配置 self._monitor_interval = 30 # 检查间隔(秒) self._health_check_timeout = 5 # 健康检查超时 self._auto_restart = True # 是否自动重启失败的子系统 # 系统统计 self._stats = { 'started_at': None, 'stopped_at': None, 'total_subsystems_added': 0, 'total_subsystems_removed': 0, 'subsystem_restarts': {} # {subsystem_name: restart_count} } logger.info("系统管理器已初始化") # ======================================================================== # 子系统管理 # ======================================================================== def register_subsystem(self, subsystem: BaseSubsystem, autostart: bool = True) -> bool: """ 注册(挂载)一个子系统 Args: subsystem: 子系统实例 autostart: 是否自动启动 Returns: 是否注册成功 """ with self._subsystems_lock: if subsystem.name in self._subsystems: logger.warning(f"子系统 '{subsystem.name}' 已存在,跳过注册") return False try: self._subsystems[subsystem.name] = subsystem self._stats['total_subsystems_added'] += 1 self._stats['subsystem_restarts'][subsystem.name] = 0 logger.info(f"子系统 '{subsystem.name}' 已注册") # 如果系统已运行且需要自动启动 if self._running and autostart: if subsystem.start(): logger.info(f"子系统 '{subsystem.name}' 已自动启动") else: logger.error(f"自动启动子系统 '{subsystem.name}' 失败") return False return True except Exception as e: logger.error(f"注册子系统 '{subsystem.name}' 失败: {e}", exc_info=True) return False def unregister_subsystem(self, subsystem_name: str, autostop: bool = True) -> bool: """ 卸载一个子系统 Args: subsystem_name: 子系统名称 autostop: 是否自动停止 Returns: 是否卸载成功 """ with self._subsystems_lock: if subsystem_name not in self._subsystems: logger.warning(f"子系统 '{subsystem_name}' 不存在") return False try: subsystem = self._subsystems[subsystem_name] # 如果需要自动停止 if autostop and subsystem.is_running(): subsystem.stop(timeout=10) del self._subsystems[subsystem_name] self._stats['total_subsystems_removed'] += 1 logger.info(f"子系统 '{subsystem_name}' 已卸载") return True except Exception as e: logger.error(f"卸载子系统 '{subsystem_name}' 失败: {e}", exc_info=True) return False def get_subsystem(self, subsystem_name: str) -> Optional[BaseSubsystem]: """获取指定的子系统""" with self._subsystems_lock: return self._subsystems.get(subsystem_name) def list_subsystems(self) -> List[str]: """列出所有子系统名称""" with self._subsystems_lock: return list(self._subsystems.keys()) def register_subsystem_from_config(self) -> None: """ 从配置注册子系统 Args: config: 可选的子系统配置列表 """ global config if config is None: # 使用默认配置 config = [] for subsys_conf in config: try: cls = subsys_conf['class'] name = subsys_conf['name'] params = subsys_conf.get('params', {}) subsystem = cls(name, **params) self.register_subsystem(subsystem, autostart=subsys_conf.get('autostart', True)) except Exception as e: logger.error(f"从配置注册子系统失败: {e}", exc_info=True) # ======================================================================== # 生命周期管理 # ======================================================================== def start(self) -> bool: """ 启动系统管理器和所有已注册的子系统 Returns: 是否启动成功 """ try: logger.info("系统管理器启动中...") # 启动所有子系统 with self._subsystems_lock: failed = [] for name, subsystem in self._subsystems.items(): try: if not subsystem.start(): failed.append(name) logger.error(f"启动子系统 '{name}' 失败") except Exception as e: failed.append(name) logger.error(f"启动子系统 '{name}' 异常: {e}", exc_info=True) if failed: logger.warning(f"有 {len(failed)} 个子系统启动失败: {failed}") # 启动监控线程 self._running = True self._stop_event.clear() self._stats['started_at'] = datetime.utcnow().isoformat() self._monitor_thread = threading.Thread( target=self._monitor_loop, name="system-monitor", daemon=False ) self._monitor_thread.start() logger.info(f"系统管理器已启动({len(self._subsystems)} 个子系统)") return True except Exception as e: logger.error(f"启动系统管理器失败: {e}", exc_info=True) self._running = False return False def stop(self, timeout: int = 30) -> bool: """ 停止系统管理器和所有子系统 Args: timeout: 等待所有子系统停止的总超时时间 Returns: 是否成功停止 """ try: logger.info("系统管理器停止中...") # 通知监控线程停止 self._running = False self._stop_event.set() # 等待监控线程 if self._monitor_thread and self._monitor_thread.is_alive(): self._monitor_thread.join(timeout=5) # 停止所有子系统 with self._subsystems_lock: subsystems_list = list(self._subsystems.items()) # 计算每个子系统的超时时间 per_subsystem_timeout = timeout // max(len(subsystems_list), 1) failed = [] for name, subsystem in subsystems_list: try: if not subsystem.stop(timeout=per_subsystem_timeout): failed.append(name) logger.error(f"停止子系统 '{name}' 失败或超时") except Exception as e: failed.append(name) logger.error(f"停止子系统 '{name}' 异常: {e}", exc_info=True) self._stats['stopped_at'] = datetime.utcnow().isoformat() if failed: logger.warning(f"有 {len(failed)} 个子系统停止失败: {failed}") return False logger.info("系统管理器已停止") return True except Exception as e: logger.error(f"停止系统管理器失败: {e}", exc_info=True) return False # ======================================================================== # 监控和维护 # ======================================================================== def _monitor_loop(self): """ 监控循环 持续检查子系统健康状态,检测故障并自动恢复 """ logger.info("监控线程已启动") while self._running: try: with self._subsystems_lock: subsystems_snapshot = list(self._subsystems.items()) # 检查每个子系统的健康状态 for name, subsystem in subsystems_snapshot: try: health = subsystem.health_check() # 检查子系统是否运行异常 if not health['running']: logger.warning( f"检测到子系统 '{name}' 已停止运行\n" f" - 线程活跃: {health['thread_alive']}\n" f" - 统计: {health['stats']}" ) # 自动重启失败的子系统 if self._auto_restart: logger.info(f"正在自动重启子系统 '{name}'...") if subsystem.start(): self._stats['subsystem_restarts'][name] += 1 logger.info( f"子系统 '{name}' 已重启 " f"(共 {self._stats['subsystem_restarts'][name]} 次)" ) else: logger.error(f"重启子系统 '{name}' 失败") # 检查最后一次错误 if health['stats']['last_error']: logger.warning( f"子系统 '{name}' 上次错误: {health['stats']['last_error']}\n" f" 时间: {health['stats']['last_error_time']}\n" f" 失败消息数: {health['stats']['failed_messages']}" ) except Exception as e: logger.error(f"检查子系统 '{name}' 健康状态失败: {e}", exc_info=True) # 等待直到下一次检查或接收停止信号 self._stop_event.wait(timeout=self._monitor_interval) except Exception as e: logger.error(f"监控循环异常: {e}", exc_info=True) self._stop_event.wait(timeout=self._monitor_interval) logger.info("监控线程已停止") def get_system_status(self) -> Dict[str, Any]: """ 获取完整的系统状态 Returns: 系统状态字典 """ with self._subsystems_lock: subsystems_status = {} for name, subsystem in self._subsystems.items(): try: subsystems_status[name] = subsystem.health_check() except Exception as e: subsystems_status[name] = { 'name': name, 'running': False, 'error': str(e) } return { 'manager_running': self._running, 'timestamp': datetime.utcnow().isoformat(), 'total_subsystems': len(self._subsystems), 'subsystems': subsystems_status, 'stats': self._stats.copy(), 'auto_restart': self._auto_restart, 'monitor_interval': self._monitor_interval } def set_monitor_interval(self, interval: int): """设置监控间隔时间(秒)""" self._monitor_interval = interval logger.info(f"监控间隔已设置为 {interval} 秒") def set_auto_restart(self, enabled: bool): """设置是否自动重启失败的子系统""" self._auto_restart = enabled logger.info(f"自动重启已设置为 {enabled}") _system_manager:Optional[SystemManager]=None def get_system_manager(db_manager, message_bus) -> SystemManager: """获取全局系统管理器实例""" global _system_manager with system_manager_lock: if _system_manager is None: _system_manager = SystemManager() return _system_manager