UI_CVS_MIX/system_manager.py
2025-11-20 11:40:03 +08:00

410 lines
15 KiB
Python

"""
系统管理器
管理所有子系统的生命周期,提供动态挂载/卸载、故障监控等功能
"""
import logging
import threading
import time
import importlib.util
import pathlib
from typing import Dict, Optional, List, Any
from datetime import datetime
from base_subsystem import BaseSubsystem
logger = logging.getLogger(__name__)
system_manager_lock = threading.RLock()
config=[]
def import_all_from_folder(folder_path: str):
global config
"""动态导入指定文件夹下的所有Python模块"""
folder = pathlib.Path(folder_path)
if not folder.is_dir():
logger.warning(f"路径 '{folder_path}' 不是有效的文件夹")
return
for file in folder.glob("*.py"):
module_name = file.stem
spec = importlib.util.spec_from_file_location(module_name, str(file))
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
globals()[module_name] = module
spec.loader.exec_module(module)
#logger.info(f"已导入模块: {module_name} from {file}")
#print(f"已导入模块: {module_name} from {file}")
config.extend(module.config)
else:
logger.warning(f"无法导入模块: {module_name} from {file}")
folder_path = pathlib.Path(__file__).parent / "subsystems"
import_all_from_folder(str(folder_path))
class SystemManager:
"""
系统管理器
主要功能:
1. 管理多个独立的子系统
2. 支持动态挂载和卸载子系统
3. 监控子系统健康状态
4. 隔离子系统故障,防止连锁反应
5. 提供统一的系统状态查询接口
"""
def __init__(self):
"""
初始化系统管理器
Args:
db_manager: 数据库管理器(所有子系统共享)
message_bus: 消息总线(所有子系统共享)
"""
# 子系统容器
self._subsystems: Dict[str, BaseSubsystem] = {}
self._subsystems_lock = threading.RLock() # 递归锁,支持嵌套锁定
# 管理器状态
self._running = False
self._monitor_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
# 监控配置
self._monitor_interval = 30 # 检查间隔(秒)
self._health_check_timeout = 5 # 健康检查超时
self._auto_restart = True # 是否自动重启失败的子系统
# 系统统计
self._stats = {
'started_at': None,
'stopped_at': None,
'total_subsystems_added': 0,
'total_subsystems_removed': 0,
'subsystem_restarts': {} # {subsystem_name: restart_count}
}
logger.info("系统管理器已初始化")
# ========================================================================
# 子系统管理
# ========================================================================
def register_subsystem(self, subsystem: BaseSubsystem, autostart: bool = True) -> bool:
"""
注册(挂载)一个子系统
Args:
subsystem: 子系统实例
autostart: 是否自动启动
Returns:
是否注册成功
"""
with self._subsystems_lock:
if subsystem.name in self._subsystems:
logger.warning(f"子系统 '{subsystem.name}' 已存在,跳过注册")
return False
try:
self._subsystems[subsystem.name] = subsystem
self._stats['total_subsystems_added'] += 1
self._stats['subsystem_restarts'][subsystem.name] = 0
logger.info(f"子系统 '{subsystem.name}' 已注册")
# 如果系统已运行且需要自动启动
if self._running and autostart:
if subsystem.start():
logger.info(f"子系统 '{subsystem.name}' 已自动启动")
else:
logger.error(f"自动启动子系统 '{subsystem.name}' 失败")
return False
return True
except Exception as e:
logger.error(f"注册子系统 '{subsystem.name}' 失败: {e}", exc_info=True)
return False
def unregister_subsystem(self, subsystem_name: str, autostop: bool = True) -> bool:
"""
卸载一个子系统
Args:
subsystem_name: 子系统名称
autostop: 是否自动停止
Returns:
是否卸载成功
"""
with self._subsystems_lock:
if subsystem_name not in self._subsystems:
logger.warning(f"子系统 '{subsystem_name}' 不存在")
return False
try:
subsystem = self._subsystems[subsystem_name]
# 如果需要自动停止
if autostop and subsystem.is_running():
subsystem.stop(timeout=10)
del self._subsystems[subsystem_name]
self._stats['total_subsystems_removed'] += 1
logger.info(f"子系统 '{subsystem_name}' 已卸载")
return True
except Exception as e:
logger.error(f"卸载子系统 '{subsystem_name}' 失败: {e}", exc_info=True)
return False
def get_subsystem(self, subsystem_name: str) -> Optional[BaseSubsystem]:
"""获取指定的子系统"""
with self._subsystems_lock:
return self._subsystems.get(subsystem_name)
def list_subsystems(self) -> List[str]:
"""列出所有子系统名称"""
with self._subsystems_lock:
return list(self._subsystems.keys())
def register_subsystem_from_config(self) -> None:
"""
从配置注册子系统
Args:
config: 可选的子系统配置列表
"""
global config
if config is None:
# 使用默认配置
config = []
for subsys_conf in config:
try:
cls = subsys_conf['class']
name = subsys_conf['name']
params = subsys_conf.get('params', {})
subsystem = cls(name, **params)
self.register_subsystem(subsystem, autostart=subsys_conf.get('autostart', True))
except Exception as e:
logger.error(f"从配置注册子系统失败: {e}", exc_info=True)
# ========================================================================
# 生命周期管理
# ========================================================================
def start(self) -> bool:
"""
启动系统管理器和所有已注册的子系统
Returns:
是否启动成功
"""
try:
logger.info("系统管理器启动中...")
# 启动所有子系统
with self._subsystems_lock:
failed = []
for name, subsystem in self._subsystems.items():
try:
if not subsystem.start():
failed.append(name)
logger.error(f"启动子系统 '{name}' 失败")
except Exception as e:
failed.append(name)
logger.error(f"启动子系统 '{name}' 异常: {e}", exc_info=True)
if failed:
logger.warning(f"{len(failed)} 个子系统启动失败: {failed}")
# 启动监控线程
self._running = True
self._stop_event.clear()
self._stats['started_at'] = datetime.utcnow().isoformat()
self._monitor_thread = threading.Thread(
target=self._monitor_loop,
name="system-monitor",
daemon=False
)
self._monitor_thread.start()
logger.info(f"系统管理器已启动({len(self._subsystems)} 个子系统)")
return True
except Exception as e:
logger.error(f"启动系统管理器失败: {e}", exc_info=True)
self._running = False
return False
def stop(self, timeout: int = 30) -> bool:
"""
停止系统管理器和所有子系统
Args:
timeout: 等待所有子系统停止的总超时时间
Returns:
是否成功停止
"""
try:
logger.info("系统管理器停止中...")
# 通知监控线程停止
self._running = False
self._stop_event.set()
# 等待监控线程
if self._monitor_thread and self._monitor_thread.is_alive():
self._monitor_thread.join(timeout=5)
# 停止所有子系统
with self._subsystems_lock:
subsystems_list = list(self._subsystems.items())
# 计算每个子系统的超时时间
per_subsystem_timeout = timeout // max(len(subsystems_list), 1)
failed = []
for name, subsystem in subsystems_list:
try:
if not subsystem.stop(timeout=per_subsystem_timeout):
failed.append(name)
logger.error(f"停止子系统 '{name}' 失败或超时")
except Exception as e:
failed.append(name)
logger.error(f"停止子系统 '{name}' 异常: {e}", exc_info=True)
self._stats['stopped_at'] = datetime.utcnow().isoformat()
if failed:
logger.warning(f"{len(failed)} 个子系统停止失败: {failed}")
return False
logger.info("系统管理器已停止")
return True
except Exception as e:
logger.error(f"停止系统管理器失败: {e}", exc_info=True)
return False
# ========================================================================
# 监控和维护
# ========================================================================
def _monitor_loop(self):
"""
监控循环
持续检查子系统健康状态,检测故障并自动恢复
"""
logger.info("监控线程已启动")
while self._running:
try:
with self._subsystems_lock:
subsystems_snapshot = list(self._subsystems.items())
# 检查每个子系统的健康状态
for name, subsystem in subsystems_snapshot:
try:
health = subsystem.health_check()
# 检查子系统是否运行异常
if not health['running']:
logger.warning(
f"检测到子系统 '{name}' 已停止运行\n"
f" - 线程活跃: {health['thread_alive']}\n"
f" - 统计: {health['stats']}"
)
# 自动重启失败的子系统
if self._auto_restart:
logger.info(f"正在自动重启子系统 '{name}'...")
if subsystem.start():
self._stats['subsystem_restarts'][name] += 1
logger.info(
f"子系统 '{name}' 已重启 "
f"(共 {self._stats['subsystem_restarts'][name]} 次)"
)
else:
logger.error(f"重启子系统 '{name}' 失败")
# 检查最后一次错误
if health['stats']['last_error']:
logger.warning(
f"子系统 '{name}' 上次错误: {health['stats']['last_error']}\n"
f" 时间: {health['stats']['last_error_time']}\n"
f" 失败消息数: {health['stats']['failed_messages']}"
)
except Exception as e:
logger.error(f"检查子系统 '{name}' 健康状态失败: {e}", exc_info=True)
# 等待直到下一次检查或接收停止信号
self._stop_event.wait(timeout=self._monitor_interval)
except Exception as e:
logger.error(f"监控循环异常: {e}", exc_info=True)
self._stop_event.wait(timeout=self._monitor_interval)
logger.info("监控线程已停止")
def get_system_status(self) -> Dict[str, Any]:
"""
获取完整的系统状态
Returns:
系统状态字典
"""
with self._subsystems_lock:
subsystems_status = {}
for name, subsystem in self._subsystems.items():
try:
subsystems_status[name] = subsystem.health_check()
except Exception as e:
subsystems_status[name] = {
'name': name,
'running': False,
'error': str(e)
}
return {
'manager_running': self._running,
'timestamp': datetime.utcnow().isoformat(),
'total_subsystems': len(self._subsystems),
'subsystems': subsystems_status,
'stats': self._stats.copy(),
'auto_restart': self._auto_restart,
'monitor_interval': self._monitor_interval
}
def set_monitor_interval(self, interval: int):
"""设置监控间隔时间(秒)"""
self._monitor_interval = interval
logger.info(f"监控间隔已设置为 {interval}")
def set_auto_restart(self, enabled: bool):
"""设置是否自动重启失败的子系统"""
self._auto_restart = enabled
logger.info(f"自动重启已设置为 {enabled}")
_system_manager:Optional[SystemManager]=None
def get_system_manager(db_manager, message_bus) -> SystemManager:
"""获取全局系统管理器实例"""
global _system_manager
with system_manager_lock:
if _system_manager is None:
_system_manager = SystemManager()
return _system_manager