288 lines
7.9 KiB
Python
288 lines
7.9 KiB
Python
"""
|
|
消息总线核心模块
|
|
支持发布-订阅模式,异步消息处理
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import threading
|
|
import uuid
|
|
from typing import Callable, Dict, List, Any, Optional
|
|
from dataclasses import dataclass, field, asdict
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
message_bus_lock = threading.RLock()
|
|
|
|
class MessageType(Enum):
|
|
"""消息类型枚举"""
|
|
EVENT = "event"
|
|
COMMAND = "command"
|
|
QUERY = "query"
|
|
|
|
|
|
@dataclass
|
|
class Message:
|
|
"""消息数据类"""
|
|
id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
|
message_type: MessageType = MessageType.EVENT
|
|
topic: str = ""
|
|
payload: Dict[str, Any] = field(default_factory=dict)
|
|
source: str = "unknown"
|
|
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
|
|
correlation_id: Optional[str] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""转换为字典"""
|
|
data = asdict(self)
|
|
data['message_type'] = self.message_type.value
|
|
return data
|
|
|
|
def to_json(self) -> str:
|
|
"""转换为JSON字符串"""
|
|
return json.dumps(self.to_dict())
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> 'Message':
|
|
"""从字典创建消息"""
|
|
data['message_type'] = MessageType(data.get('message_type', 'event'))
|
|
return cls(**data)
|
|
|
|
|
|
class MessageBus:
|
|
"""消息总线 - 核心组件"""
|
|
|
|
def __init__(self):
|
|
"""初始化消息总线"""
|
|
# 话题订阅者映射: {topic: [handler1, handler2, ...]}
|
|
self._subscribers: Dict[str, List[Callable]] = {}
|
|
# 中间件列表
|
|
self._middlewares: List[Callable] = []
|
|
# 消息历史(可选)
|
|
self._message_history: List[Message] = []
|
|
# 最大历史消息数
|
|
self._max_history = 1000
|
|
# 死信队列
|
|
self._dead_letter_queue: List[Message] = []
|
|
|
|
logger.info("消息总线已初始化")
|
|
|
|
def subscribe(self, topic: str, handler: Callable) -> str:
|
|
"""
|
|
订阅主题
|
|
|
|
Args:
|
|
topic: 主题名称
|
|
handler: 消息处理函数 (async or sync)
|
|
|
|
Returns:
|
|
订阅ID
|
|
"""
|
|
with message_bus_lock:
|
|
if topic not in self._subscribers:
|
|
self._subscribers[topic] = []
|
|
|
|
self._subscribers[topic].append(handler)
|
|
subscription_id = f"{topic}_{id(handler)}"
|
|
logger.info(f"订阅者已注册: {subscription_id}")
|
|
|
|
return subscription_id
|
|
|
|
def unsubscribe(self, topic: str, handler: Callable) -> bool:
|
|
"""
|
|
取消订阅
|
|
|
|
Args:
|
|
topic: 主题名称
|
|
handler: 处理函数
|
|
|
|
Returns:
|
|
是否成功取消
|
|
"""
|
|
|
|
with message_bus_lock:
|
|
if topic in self._subscribers:
|
|
try:
|
|
self._subscribers[topic].remove(handler)
|
|
logger.info(f"订阅者已移除: {topic}")
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
return False
|
|
|
|
def add_middleware(self, middleware: Callable) -> None:
|
|
"""
|
|
添加中间件
|
|
|
|
Args:
|
|
middleware: 中间件函数
|
|
"""
|
|
with message_bus_lock:
|
|
self._middlewares.append(middleware)
|
|
logger.info(f"中间件已添加: {middleware.__name__}")
|
|
|
|
def publish(self,
|
|
topic: str,
|
|
payload: Dict[str, Any],
|
|
message_type: MessageType = MessageType.EVENT,
|
|
source: str = "system",
|
|
correlation_id: Optional[str] = None) -> Message:
|
|
"""
|
|
发布消息
|
|
|
|
Args:
|
|
topic: 主题名称
|
|
payload: 消息内容
|
|
message_type: 消息类型
|
|
source: 消息源
|
|
correlation_id: 关联ID
|
|
|
|
Returns:
|
|
发布的消息对象
|
|
"""
|
|
with message_bus_lock:
|
|
# 创建消息
|
|
message = Message(
|
|
message_type=message_type,
|
|
topic=topic,
|
|
payload=payload,
|
|
source=source,
|
|
correlation_id=correlation_id or str(uuid.uuid4())
|
|
)
|
|
|
|
# 记录消息历史
|
|
self._record_message(message)
|
|
|
|
# 执行中间件
|
|
for middleware in self._middlewares:
|
|
middleware(message)
|
|
|
|
logger.info(f"消息已发布到主题 '{topic}': {message.id}")
|
|
|
|
# 分发消息给订阅者
|
|
self._dispatch(message)
|
|
return message
|
|
|
|
def _dispatch(self, message: Message) -> None:
|
|
"""
|
|
分发消息给订阅者
|
|
|
|
Args:
|
|
message: 消息对象
|
|
"""
|
|
handlers = self._subscribers.get(message.topic, [])
|
|
|
|
if not handlers:
|
|
logger.warning(f"主题 '{message.topic}' 没有订阅者")
|
|
return
|
|
|
|
tasks = []
|
|
for handler in handlers:
|
|
try:
|
|
handler(message)
|
|
except Exception as e:
|
|
logger.error(f"处理程序出错: {e}")
|
|
self._dead_letter_queue.append(message)
|
|
|
|
|
|
def _record_message(self, message: Message) -> None:
|
|
"""
|
|
记录消息到历史
|
|
|
|
Args:
|
|
message: 消息对象
|
|
"""
|
|
self._message_history.append(message)
|
|
# 保持历史大小
|
|
if len(self._message_history) > self._max_history:
|
|
self._message_history.pop(0)
|
|
|
|
def get_message_history(self, topic: Optional[str] = None, limit: int = 100) -> List[Message]:
|
|
"""
|
|
获取消息历史
|
|
|
|
Args:
|
|
topic: 可选的主题过滤
|
|
limit: 返回消息数量限制
|
|
|
|
Returns:
|
|
消息列表
|
|
"""
|
|
with message_bus_lock:
|
|
history = self._message_history
|
|
if topic:
|
|
history = [msg for msg in history if msg.topic == topic]
|
|
return history[-limit:]
|
|
|
|
def get_dead_letter_queue(self) -> List[Message]:
|
|
"""
|
|
获取死信队列
|
|
|
|
Returns:
|
|
失败消息列表
|
|
"""
|
|
with message_bus_lock:
|
|
return self._dead_letter_queue.copy()
|
|
|
|
def clear_dead_letter_queue(self) -> None:
|
|
"""清空死信队列"""
|
|
with message_bus_lock:
|
|
self._dead_letter_queue.clear()
|
|
logger.info("死信队列已清空")
|
|
|
|
def get_topics(self) -> List[str]:
|
|
"""
|
|
获取所有主题
|
|
|
|
Returns:
|
|
主题列表
|
|
"""
|
|
with message_bus_lock:
|
|
return list(self._subscribers.keys())
|
|
|
|
def get_subscriber_count(self, topic: str) -> int:
|
|
"""
|
|
获取主题订阅者数量
|
|
|
|
Args:
|
|
topic: 主题名称
|
|
|
|
Returns:
|
|
订阅者数量
|
|
"""
|
|
with message_bus_lock:
|
|
return len(self._subscribers.get(topic, []))
|
|
|
|
async def shutdown(self) -> None:
|
|
"""优雅关闭消息总线"""
|
|
logger.info("消息总线正在关闭...")
|
|
self._subscribers.clear()
|
|
self._middlewares.clear()
|
|
logger.info("消息总线已关闭")
|
|
|
|
|
|
# 全局消息总线实例
|
|
_message_bus: Optional[MessageBus] = None
|
|
|
|
|
|
def get_message_bus() -> MessageBus:
|
|
"""
|
|
获取或创建全局消息总线实例
|
|
|
|
Returns:
|
|
消息总线实例
|
|
"""
|
|
global _message_bus
|
|
with message_bus_lock:
|
|
if _message_bus is None:
|
|
_message_bus = MessageBus()
|
|
return _message_bus
|
|
|
|
|
|
def reset_message_bus() -> None:
|
|
"""重置全局消息总线实例(用于测试)"""
|
|
global _message_bus
|
|
_message_bus = None
|