""" 工作流核心模块 """ import asyncio import json import uuid from typing import Callable, Dict, List, Any, Optional from dataclasses import dataclass, field, asdict from datetime import datetime from enum import Enum import logging logger = logging.getLogger(__name__) class WorkType(Enum): """工作枚举""" EVENT = "event" @dataclass class Work: """消息数据类""" id: str = field(default_factory=lambda: str(uuid.uuid4())) work_type: WorkType = WorkType.EVENT topic: str = "" payload: Dict[str, Any] = field(default_factory=dict) source: str = "unknown" timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) correlation_id: Optional[str] = None def to_dict(self) -> Dict[str, Any]: """转换为字典""" data = asdict(self) data['work_type'] = self.work_type.value return data def to_json(self) -> str: """转换为JSON字符串""" return json.dumps(self.to_dict()) @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'Work': """从字典创建数据""" data['work_type'] = WorkType(data.get('work_type', 'event')) return cls(**data) class WorkflowEngine: """工作流引擎 - 核心组件""" def __init__(self): """初始化工作流引擎""" # 锁用于线程安全 self._lock = asyncio.Lock() logger.info("工作流引擎已初始化") _work_flow_engine: Optional[WorkflowEngine] = None def get_work_flow_engine() -> WorkflowEngine: """ 获取或创建全局工作流引擎实例 Returns: 工作流引擎实例 """ global _work_flow_engine if _work_flow_engine is None: _work_flow_engine = WorkflowEngine() return _work_flow_engine def reset_work_flow_engine() -> None: """重置全局工作流引擎实例(用于测试)""" global _work_flow_engine _work_flow_engine = None