UI_CVS_MIX/message_bus.py

288 lines
7.9 KiB
Python
Raw Normal View History

2025-11-20 11:25:47 +08:00
"""
消息总线核心模块
支持发布-订阅模式异步消息处理
"""
import asyncio
import json
import threading
import uuid
from typing import Callable, Dict, List, Any, Optional
from dataclasses import dataclass, field, asdict
from datetime import datetime
from enum import Enum
import logging
logger = logging.getLogger(__name__)
message_bus_lock = threading.RLock()
class MessageType(Enum):
"""消息类型枚举"""
EVENT = "event"
COMMAND = "command"
QUERY = "query"
@dataclass
class Message:
"""消息数据类"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
message_type: MessageType = MessageType.EVENT
topic: str = ""
payload: Dict[str, Any] = field(default_factory=dict)
source: str = "unknown"
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
correlation_id: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
data = asdict(self)
data['message_type'] = self.message_type.value
return data
def to_json(self) -> str:
"""转换为JSON字符串"""
return json.dumps(self.to_dict())
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Message':
"""从字典创建消息"""
data['message_type'] = MessageType(data.get('message_type', 'event'))
return cls(**data)
class MessageBus:
"""消息总线 - 核心组件"""
def __init__(self):
"""初始化消息总线"""
# 话题订阅者映射: {topic: [handler1, handler2, ...]}
self._subscribers: Dict[str, List[Callable]] = {}
# 中间件列表
self._middlewares: List[Callable] = []
# 消息历史(可选)
self._message_history: List[Message] = []
# 最大历史消息数
self._max_history = 1000
# 死信队列
self._dead_letter_queue: List[Message] = []
logger.info("消息总线已初始化")
def subscribe(self, topic: str, handler: Callable) -> str:
"""
订阅主题
Args:
topic: 主题名称
handler: 消息处理函数 (async or sync)
Returns:
订阅ID
"""
with message_bus_lock:
if topic not in self._subscribers:
self._subscribers[topic] = []
self._subscribers[topic].append(handler)
subscription_id = f"{topic}_{id(handler)}"
logger.info(f"订阅者已注册: {subscription_id}")
return subscription_id
def unsubscribe(self, topic: str, handler: Callable) -> bool:
"""
取消订阅
Args:
topic: 主题名称
handler: 处理函数
Returns:
是否成功取消
"""
with message_bus_lock:
if topic in self._subscribers:
try:
self._subscribers[topic].remove(handler)
logger.info(f"订阅者已移除: {topic}")
return True
except ValueError:
return False
return False
def add_middleware(self, middleware: Callable) -> None:
"""
添加中间件
Args:
middleware: 中间件函数
"""
with message_bus_lock:
self._middlewares.append(middleware)
logger.info(f"中间件已添加: {middleware.__name__}")
def publish(self,
topic: str,
payload: Dict[str, Any],
message_type: MessageType = MessageType.EVENT,
source: str = "system",
correlation_id: Optional[str] = None) -> Message:
"""
发布消息
Args:
topic: 主题名称
payload: 消息内容
message_type: 消息类型
source: 消息源
correlation_id: 关联ID
Returns:
发布的消息对象
"""
with message_bus_lock:
# 创建消息
message = Message(
message_type=message_type,
topic=topic,
payload=payload,
source=source,
correlation_id=correlation_id or str(uuid.uuid4())
)
# 记录消息历史
self._record_message(message)
# 执行中间件
for middleware in self._middlewares:
middleware(message)
logger.info(f"消息已发布到主题 '{topic}': {message.id}")
# 分发消息给订阅者
self._dispatch(message)
return message
def _dispatch(self, message: Message) -> None:
"""
分发消息给订阅者
Args:
message: 消息对象
"""
handlers = self._subscribers.get(message.topic, [])
if not handlers:
logger.warning(f"主题 '{message.topic}' 没有订阅者")
return
tasks = []
for handler in handlers:
try:
handler(message)
except Exception as e:
logger.error(f"处理程序出错: {e}")
self._dead_letter_queue.append(message)
def _record_message(self, message: Message) -> None:
"""
记录消息到历史
Args:
message: 消息对象
"""
self._message_history.append(message)
# 保持历史大小
if len(self._message_history) > self._max_history:
self._message_history.pop(0)
def get_message_history(self, topic: Optional[str] = None, limit: int = 100) -> List[Message]:
"""
获取消息历史
Args:
topic: 可选的主题过滤
limit: 返回消息数量限制
Returns:
消息列表
"""
with message_bus_lock:
history = self._message_history
if topic:
history = [msg for msg in history if msg.topic == topic]
return history[-limit:]
def get_dead_letter_queue(self) -> List[Message]:
"""
获取死信队列
Returns:
失败消息列表
"""
with message_bus_lock:
return self._dead_letter_queue.copy()
def clear_dead_letter_queue(self) -> None:
"""清空死信队列"""
with message_bus_lock:
self._dead_letter_queue.clear()
logger.info("死信队列已清空")
def get_topics(self) -> List[str]:
"""
获取所有主题
Returns:
主题列表
"""
with message_bus_lock:
return list(self._subscribers.keys())
def get_subscriber_count(self, topic: str) -> int:
"""
获取主题订阅者数量
Args:
topic: 主题名称
Returns:
订阅者数量
"""
with message_bus_lock:
return len(self._subscribers.get(topic, []))
async def shutdown(self) -> None:
"""优雅关闭消息总线"""
logger.info("消息总线正在关闭...")
self._subscribers.clear()
self._middlewares.clear()
logger.info("消息总线已关闭")
# 全局消息总线实例
_message_bus: Optional[MessageBus] = None
def get_message_bus() -> MessageBus:
"""
获取或创建全局消息总线实例
Returns:
消息总线实例
"""
global _message_bus
with message_bus_lock:
if _message_bus is None:
_message_bus = MessageBus()
return _message_bus
def reset_message_bus() -> None:
"""重置全局消息总线实例(用于测试)"""
global _message_bus
_message_bus = None