UI_CVS_MIX/base_subsystem.py

300 lines
9.5 KiB
Python
Raw Normal View History

2025-11-20 11:25:47 +08:00
"""
子系统基类
所有子系统都应继承此类
"""
import logging
import threading
import message_bus
import database_manager
import time
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
from datetime import datetime
import os
class BaseSubsystem(ABC):
"""
子系统基类
特点
- 独立运行线程
- 共享数据库和消息总线
- 通过消息总线进行通信
- 故障隔离一个子系统故障不影响其他子系统
"""
_instances = {}
_instance_lock = threading.Lock()
def __new__(cls, name: str):
"""确保每个子系统名称只有一个实例"""
with cls._instance_lock:
if name not in cls._instances:
instance = super().__new__(cls)
instance._initialized = False
cls._instances[name] = instance
return cls._instances[name]
def __init__(self, name: str):
"""
初始化子系统
Args:
name: 子系统名称
"""
self.name = name
self.db_manager = database_manager.get_db_manager()
self.message_bus = message_bus.get_message_bus()
# 运行状态
self._running = False
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._lock = threading.Lock()
# 统计信息
self._stats = {
'started_at': None,
'stopped_at': None,
'processed_messages': 0,
'failed_messages': 0,
'last_error': None,
'last_error_time': None,
'restart_count': 0
}
# 日志
self.logger = logging.getLogger(f"subsystem.{name}")
# 如果这个Logger还没有Handler则添加Handler
if not self.logger.handlers:
# 创建FileHandler
# 假设我们希望每个子系统有一个单独的日志文件
log_file = f"logs/{name}.log"
os.makedirs(os.path.dirname(log_file), exist_ok=True)
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.INFO)
# 创建控制台Handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 设置格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
# 设置日志级别默认为INFO
self.logger.setLevel(logging.INFO)
#self.logger = logging.getLogger(f"subsystem.{name}")
self.logger.info(f"子系统 '{name}' 已初始化")
def start(self) -> bool:
"""
启动子系统
Returns:
是否启动成功
"""
with self._lock:
if self._running:
self.logger.warning(f"子系统 '{self.name}' 已在运行中")
return False
try:
# 执行子系统特定的启动逻辑
self._startup()
# 启动运行线程
self._running = True
self._stop_event.clear()
self._thread = threading.Thread(
target=self._run,
name=f"subsystem-{self.name}",
daemon=False
)
self._thread.start()
# 更新统计
self._stats['started_at'] = datetime.utcnow().isoformat()
self._stats['restart_count'] += 1
self.logger.info(f"子系统 '{self.name}' 已启动 (第 {self._stats['restart_count']} 次)")
return True
except Exception as e:
self.logger.error(f"启动子系统 '{self.name}' 失败: {e}", exc_info=True)
self._running = False
return False
def stop(self, timeout: int = 10) -> bool:
"""
停止子系统
Args:
timeout: 等待超时时间
Returns:
是否成功停止
"""
with self._lock:
if not self._running:
self.logger.warning(f"子系统 '{self.name}' 未运行")
return True
try:
# 通知线程停止
self._stop_event.set()
# 等待线程完成
if self._thread and self._thread.is_alive():
self._thread.join(timeout=timeout)
if self._thread.is_alive():
self.logger.warning(f"子系统 '{self.name}' 线程未在 {timeout}s 内停止")
return False
# 执行子系统特定的清理逻辑
self._shutdown()
self._running = False
self._stats['stopped_at'] = datetime.utcnow().isoformat()
self.logger.info(f"子系统 '{self.name}' 已停止")
return True
except Exception as e:
self.logger.error(f"停止子系统 '{self.name}' 失败: {e}", exc_info=True)
self._running = False
return False
def is_running(self) -> bool:
"""检查子系统是否运行中"""
with self._lock:
return self._running and (self._thread is not None and self._thread.is_alive())
def _run(self):
"""
子系统主循环
在线程中持续运行处理消息和业务逻辑
"""
try:
self.logger.info(f"子系统 '{self.name}' 运行循环已启动")
while not self._stop_event.is_set():
try:
# 执行子系统特定的循环逻辑
self.process()
# 使用事件等待而不是 sleep以便快速响应停止信号 100Hz 循环频率
self._stop_event.wait(timeout=0.01)
except Exception as e:
# 捕获业务逻辑错误,不让其中断主循环
self._stats['failed_messages'] += 1
self._stats['last_error'] = str(e)
self._stats['last_error_time'] = datetime.utcnow().isoformat()
self.logger.error(
f"子系统 '{self.name}' 处理错误: {e}",
exc_info=True
)
# 等待一段时间后继续,避免频繁重试
self._stop_event.wait(timeout=5)
self.logger.info(f"子系统 '{self.name}' 运行循环已结束")
except Exception as e:
self.logger.error(
f"子系统 '{self.name}' 运行循环异常: {e}",
exc_info=True
)
self._running = False
def health_check(self) -> Dict[str, Any]:
"""
健康检查
Returns:
包含健康状态的字典
"""
return {
'name': self.name,
'running': self.is_running(),
'thread_alive': self._thread is not None and self._thread.is_alive() if self._thread else False,
'stats': self._stats.copy()
}
def send_message(self, topic: str, payload: Dict[str, Any]) -> bool:
"""
通过消息总线发送消息给其他子系统
Args:
topic: 消息主题通常为 subsystem.operation
payload: 消息载荷
Returns:
是否发送成功
"""
try:
# 这里假设 message_bus 支持同步 publish
# 如果是异步的,子系统需要在 process() 中使用 asyncio
with self.message_bus._lock:
self.message_bus.publish(
topic=topic,
payload=payload,
source=self.name
)
return True
except Exception as e:
self.logger.error(f"发送消息失败: {e}")
return False
@classmethod
def get_instance(cls, name: str):
"""获取子系统实例"""
with cls._instance_lock:
return cls._instances.get(name)
# ========================================================================
# 以下方法应在子类中实现
# ========================================================================
@abstractmethod
def _startup(self):
"""
子系统启动时的初始化逻辑
可以在这里进行数据库初始化订阅消息等
"""
pass
@abstractmethod
def _shutdown(self):
"""
子系统停止时的清理逻辑
可以在这里进行数据保存取消订阅等
"""
pass
@abstractmethod
def process(self):
"""
子系统主要业务逻辑
在运行循环中持续调用
建议
- 使用短超时的消息队列轮询
- 定期执行某些任务
- 与消息总线交互
"""
pass