300 lines
9.5 KiB
Python
300 lines
9.5 KiB
Python
"""
|
||
子系统基类
|
||
所有子系统都应继承此类
|
||
"""
|
||
|
||
import logging
|
||
import threading
|
||
import message_bus
|
||
import database_manager
|
||
import time
|
||
from abc import ABC, abstractmethod
|
||
from typing import Dict, Any, Optional
|
||
from datetime import datetime
|
||
import os
|
||
|
||
|
||
class BaseSubsystem(ABC):
|
||
"""
|
||
子系统基类
|
||
|
||
特点:
|
||
- 独立运行线程
|
||
- 共享数据库和消息总线
|
||
- 通过消息总线进行通信
|
||
- 故障隔离(一个子系统故障不影响其他子系统)
|
||
"""
|
||
_instances = {}
|
||
_instance_lock = threading.Lock()
|
||
|
||
def __new__(cls, name: str):
|
||
"""确保每个子系统名称只有一个实例"""
|
||
with cls._instance_lock:
|
||
if name not in cls._instances:
|
||
instance = super().__new__(cls)
|
||
instance._initialized = False
|
||
cls._instances[name] = instance
|
||
return cls._instances[name]
|
||
|
||
|
||
|
||
def __init__(self, name: str):
|
||
"""
|
||
初始化子系统
|
||
|
||
Args:
|
||
name: 子系统名称
|
||
"""
|
||
|
||
self.name = name
|
||
self.db_manager = database_manager.get_db_manager()
|
||
self.message_bus = message_bus.get_message_bus()
|
||
|
||
# 运行状态
|
||
self._running = False
|
||
self._thread: Optional[threading.Thread] = None
|
||
self._stop_event = threading.Event()
|
||
self._lock = threading.Lock()
|
||
|
||
# 统计信息
|
||
self._stats = {
|
||
'started_at': None,
|
||
'stopped_at': None,
|
||
'processed_messages': 0,
|
||
'failed_messages': 0,
|
||
'last_error': None,
|
||
'last_error_time': None,
|
||
'restart_count': 0
|
||
}
|
||
|
||
# 日志
|
||
|
||
self.logger = logging.getLogger(f"subsystem.{name}")
|
||
# 如果这个Logger还没有Handler,则添加Handler
|
||
if not self.logger.handlers:
|
||
# 创建FileHandler
|
||
# 假设我们希望每个子系统有一个单独的日志文件
|
||
log_file = f"logs/{name}.log"
|
||
os.makedirs(os.path.dirname(log_file), exist_ok=True)
|
||
file_handler = logging.FileHandler(log_file, encoding='utf-8')
|
||
file_handler.setLevel(logging.INFO)
|
||
|
||
# 创建控制台Handler
|
||
console_handler = logging.StreamHandler()
|
||
console_handler.setLevel(logging.INFO)
|
||
|
||
# 设置格式
|
||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||
file_handler.setFormatter(formatter)
|
||
console_handler.setFormatter(formatter)
|
||
|
||
self.logger.addHandler(file_handler)
|
||
self.logger.addHandler(console_handler)
|
||
# 设置日志级别,默认为INFO
|
||
self.logger.setLevel(logging.INFO)
|
||
|
||
|
||
#self.logger = logging.getLogger(f"subsystem.{name}")
|
||
self.logger.info(f"子系统 '{name}' 已初始化")
|
||
|
||
def start(self) -> bool:
|
||
"""
|
||
启动子系统
|
||
|
||
Returns:
|
||
是否启动成功
|
||
"""
|
||
with self._lock:
|
||
if self._running:
|
||
self.logger.warning(f"子系统 '{self.name}' 已在运行中")
|
||
return False
|
||
|
||
try:
|
||
# 执行子系统特定的启动逻辑
|
||
self._startup()
|
||
|
||
# 启动运行线程
|
||
self._running = True
|
||
self._stop_event.clear()
|
||
self._thread = threading.Thread(
|
||
target=self._run,
|
||
name=f"subsystem-{self.name}",
|
||
daemon=False
|
||
)
|
||
self._thread.start()
|
||
|
||
# 更新统计
|
||
self._stats['started_at'] = datetime.utcnow().isoformat()
|
||
self._stats['restart_count'] += 1
|
||
|
||
self.logger.info(f"子系统 '{self.name}' 已启动 (第 {self._stats['restart_count']} 次)")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"启动子系统 '{self.name}' 失败: {e}", exc_info=True)
|
||
self._running = False
|
||
return False
|
||
|
||
def stop(self, timeout: int = 10) -> bool:
|
||
"""
|
||
停止子系统
|
||
|
||
Args:
|
||
timeout: 等待超时时间(秒)
|
||
|
||
Returns:
|
||
是否成功停止
|
||
"""
|
||
with self._lock:
|
||
if not self._running:
|
||
self.logger.warning(f"子系统 '{self.name}' 未运行")
|
||
return True
|
||
|
||
try:
|
||
# 通知线程停止
|
||
self._stop_event.set()
|
||
|
||
# 等待线程完成
|
||
if self._thread and self._thread.is_alive():
|
||
self._thread.join(timeout=timeout)
|
||
|
||
if self._thread.is_alive():
|
||
self.logger.warning(f"子系统 '{self.name}' 线程未在 {timeout}s 内停止")
|
||
return False
|
||
|
||
# 执行子系统特定的清理逻辑
|
||
self._shutdown()
|
||
|
||
self._running = False
|
||
self._stats['stopped_at'] = datetime.utcnow().isoformat()
|
||
|
||
self.logger.info(f"子系统 '{self.name}' 已停止")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"停止子系统 '{self.name}' 失败: {e}", exc_info=True)
|
||
self._running = False
|
||
return False
|
||
|
||
def is_running(self) -> bool:
|
||
"""检查子系统是否运行中"""
|
||
with self._lock:
|
||
return self._running and (self._thread is not None and self._thread.is_alive())
|
||
|
||
def _run(self):
|
||
"""
|
||
子系统主循环
|
||
在线程中持续运行,处理消息和业务逻辑
|
||
"""
|
||
try:
|
||
self.logger.info(f"子系统 '{self.name}' 运行循环已启动")
|
||
|
||
while not self._stop_event.is_set():
|
||
try:
|
||
# 执行子系统特定的循环逻辑
|
||
self.process()
|
||
|
||
# 使用事件等待而不是 sleep,以便快速响应停止信号 100Hz 循环频率
|
||
self._stop_event.wait(timeout=0.01)
|
||
|
||
except Exception as e:
|
||
# 捕获业务逻辑错误,不让其中断主循环
|
||
self._stats['failed_messages'] += 1
|
||
self._stats['last_error'] = str(e)
|
||
self._stats['last_error_time'] = datetime.utcnow().isoformat()
|
||
|
||
self.logger.error(
|
||
f"子系统 '{self.name}' 处理错误: {e}",
|
||
exc_info=True
|
||
)
|
||
|
||
# 等待一段时间后继续,避免频繁重试
|
||
self._stop_event.wait(timeout=5)
|
||
|
||
self.logger.info(f"子系统 '{self.name}' 运行循环已结束")
|
||
|
||
except Exception as e:
|
||
self.logger.error(
|
||
f"子系统 '{self.name}' 运行循环异常: {e}",
|
||
exc_info=True
|
||
)
|
||
self._running = False
|
||
|
||
def health_check(self) -> Dict[str, Any]:
|
||
"""
|
||
健康检查
|
||
|
||
Returns:
|
||
包含健康状态的字典
|
||
"""
|
||
return {
|
||
'name': self.name,
|
||
'running': self.is_running(),
|
||
'thread_alive': self._thread is not None and self._thread.is_alive() if self._thread else False,
|
||
'stats': self._stats.copy()
|
||
}
|
||
|
||
def send_message(self, topic: str, payload: Dict[str, Any]) -> bool:
|
||
"""
|
||
通过消息总线发送消息给其他子系统
|
||
|
||
Args:
|
||
topic: 消息主题(通常为 subsystem.operation)
|
||
payload: 消息载荷
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
try:
|
||
# 这里假设 message_bus 支持同步 publish
|
||
# 如果是异步的,子系统需要在 process() 中使用 asyncio
|
||
with self.message_bus._lock:
|
||
self.message_bus.publish(
|
||
topic=topic,
|
||
payload=payload,
|
||
source=self.name
|
||
)
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"发送消息失败: {e}")
|
||
return False
|
||
|
||
@classmethod
|
||
def get_instance(cls, name: str):
|
||
"""获取子系统实例"""
|
||
with cls._instance_lock:
|
||
return cls._instances.get(name)
|
||
|
||
# ========================================================================
|
||
# 以下方法应在子类中实现
|
||
# ========================================================================
|
||
|
||
@abstractmethod
|
||
def _startup(self):
|
||
"""
|
||
子系统启动时的初始化逻辑
|
||
可以在这里进行数据库初始化、订阅消息等
|
||
"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def _shutdown(self):
|
||
"""
|
||
子系统停止时的清理逻辑
|
||
可以在这里进行数据保存、取消订阅等
|
||
"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def process(self):
|
||
"""
|
||
子系统主要业务逻辑
|
||
在运行循环中持续调用
|
||
|
||
建议:
|
||
- 使用短超时的消息队列轮询
|
||
- 定期执行某些任务
|
||
- 与消息总线交互
|
||
"""
|
||
pass
|