UI_CVS_MIX/base_subsystem.py
2025-11-20 11:40:03 +08:00

300 lines
9.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
子系统基类
所有子系统都应继承此类
"""
import logging
import threading
import message_bus
import database_manager
import time
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
from datetime import datetime
import os
class BaseSubsystem(ABC):
"""
子系统基类
特点:
- 独立运行线程
- 共享数据库和消息总线
- 通过消息总线进行通信
- 故障隔离(一个子系统故障不影响其他子系统)
"""
_instances = {}
_instance_lock = threading.Lock()
def __new__(cls, name: str):
"""确保每个子系统名称只有一个实例"""
with cls._instance_lock:
if name not in cls._instances:
instance = super().__new__(cls)
instance._initialized = False
cls._instances[name] = instance
return cls._instances[name]
def __init__(self, name: str):
"""
初始化子系统
Args:
name: 子系统名称
"""
self.name = name
self.db_manager = database_manager.get_db_manager()
self.message_bus = message_bus.get_message_bus()
# 运行状态
self._running = False
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._lock = threading.Lock()
# 统计信息
self._stats = {
'started_at': None,
'stopped_at': None,
'processed_messages': 0,
'failed_messages': 0,
'last_error': None,
'last_error_time': None,
'restart_count': 0
}
# 日志
self.logger = logging.getLogger(f"subsystem.{name}")
# 如果这个Logger还没有Handler则添加Handler
if not self.logger.handlers:
# 创建FileHandler
# 假设我们希望每个子系统有一个单独的日志文件
log_file = f"logs/{name}.log"
os.makedirs(os.path.dirname(log_file), exist_ok=True)
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.INFO)
# 创建控制台Handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 设置格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
# 设置日志级别默认为INFO
self.logger.setLevel(logging.INFO)
#self.logger = logging.getLogger(f"subsystem.{name}")
self.logger.info(f"子系统 '{name}' 已初始化")
def start(self) -> bool:
"""
启动子系统
Returns:
是否启动成功
"""
with self._lock:
if self._running:
self.logger.warning(f"子系统 '{self.name}' 已在运行中")
return False
try:
# 执行子系统特定的启动逻辑
self._startup()
# 启动运行线程
self._running = True
self._stop_event.clear()
self._thread = threading.Thread(
target=self._run,
name=f"subsystem-{self.name}",
daemon=False
)
self._thread.start()
# 更新统计
self._stats['started_at'] = datetime.utcnow().isoformat()
self._stats['restart_count'] += 1
self.logger.info(f"子系统 '{self.name}' 已启动 (第 {self._stats['restart_count']} 次)")
return True
except Exception as e:
self.logger.error(f"启动子系统 '{self.name}' 失败: {e}", exc_info=True)
self._running = False
return False
def stop(self, timeout: int = 10) -> bool:
"""
停止子系统
Args:
timeout: 等待超时时间(秒)
Returns:
是否成功停止
"""
with self._lock:
if not self._running:
self.logger.warning(f"子系统 '{self.name}' 未运行")
return True
try:
# 通知线程停止
self._stop_event.set()
# 等待线程完成
if self._thread and self._thread.is_alive():
self._thread.join(timeout=timeout)
if self._thread.is_alive():
self.logger.warning(f"子系统 '{self.name}' 线程未在 {timeout}s 内停止")
return False
# 执行子系统特定的清理逻辑
self._shutdown()
self._running = False
self._stats['stopped_at'] = datetime.utcnow().isoformat()
self.logger.info(f"子系统 '{self.name}' 已停止")
return True
except Exception as e:
self.logger.error(f"停止子系统 '{self.name}' 失败: {e}", exc_info=True)
self._running = False
return False
def is_running(self) -> bool:
"""检查子系统是否运行中"""
with self._lock:
return self._running and (self._thread is not None and self._thread.is_alive())
def _run(self):
"""
子系统主循环
在线程中持续运行,处理消息和业务逻辑
"""
try:
self.logger.info(f"子系统 '{self.name}' 运行循环已启动")
while not self._stop_event.is_set():
try:
# 执行子系统特定的循环逻辑
self.process()
# 使用事件等待而不是 sleep以便快速响应停止信号 100Hz 循环频率
self._stop_event.wait(timeout=0.01)
except Exception as e:
# 捕获业务逻辑错误,不让其中断主循环
self._stats['failed_messages'] += 1
self._stats['last_error'] = str(e)
self._stats['last_error_time'] = datetime.utcnow().isoformat()
self.logger.error(
f"子系统 '{self.name}' 处理错误: {e}",
exc_info=True
)
# 等待一段时间后继续,避免频繁重试
self._stop_event.wait(timeout=5)
self.logger.info(f"子系统 '{self.name}' 运行循环已结束")
except Exception as e:
self.logger.error(
f"子系统 '{self.name}' 运行循环异常: {e}",
exc_info=True
)
self._running = False
def health_check(self) -> Dict[str, Any]:
"""
健康检查
Returns:
包含健康状态的字典
"""
return {
'name': self.name,
'running': self.is_running(),
'thread_alive': self._thread is not None and self._thread.is_alive() if self._thread else False,
'stats': self._stats.copy()
}
def send_message(self, topic: str, payload: Dict[str, Any]) -> bool:
"""
通过消息总线发送消息给其他子系统
Args:
topic: 消息主题(通常为 subsystem.operation
payload: 消息载荷
Returns:
是否发送成功
"""
try:
# 这里假设 message_bus 支持同步 publish
# 如果是异步的,子系统需要在 process() 中使用 asyncio
with self.message_bus._lock:
self.message_bus.publish(
topic=topic,
payload=payload,
source=self.name
)
return True
except Exception as e:
self.logger.error(f"发送消息失败: {e}")
return False
@classmethod
def get_instance(cls, name: str):
"""获取子系统实例"""
with cls._instance_lock:
return cls._instances.get(name)
# ========================================================================
# 以下方法应在子类中实现
# ========================================================================
@abstractmethod
def _startup(self):
"""
子系统启动时的初始化逻辑
可以在这里进行数据库初始化、订阅消息等
"""
pass
@abstractmethod
def _shutdown(self):
"""
子系统停止时的清理逻辑
可以在这里进行数据保存、取消订阅等
"""
pass
@abstractmethod
def process(self):
"""
子系统主要业务逻辑
在运行循环中持续调用
建议:
- 使用短超时的消息队列轮询
- 定期执行某些任务
- 与消息总线交互
"""
pass