82 lines
2.0 KiB
Python
82 lines
2.0 KiB
Python
|
|
"""
|
||
|
|
工作流核心模块
|
||
|
|
"""
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
import json
|
||
|
|
import uuid
|
||
|
|
from typing import Callable, Dict, List, Any, Optional
|
||
|
|
from dataclasses import dataclass, field, asdict
|
||
|
|
from datetime import datetime
|
||
|
|
from enum import Enum
|
||
|
|
import logging
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
class WorkType(Enum):
|
||
|
|
"""工作枚举"""
|
||
|
|
EVENT = "event"
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class Work:
|
||
|
|
"""消息数据类"""
|
||
|
|
id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
||
|
|
work_type: WorkType = WorkType.EVENT
|
||
|
|
topic: str = ""
|
||
|
|
payload: Dict[str, Any] = field(default_factory=dict)
|
||
|
|
source: str = "unknown"
|
||
|
|
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
|
||
|
|
correlation_id: Optional[str] = None
|
||
|
|
|
||
|
|
def to_dict(self) -> Dict[str, Any]:
|
||
|
|
"""转换为字典"""
|
||
|
|
data = asdict(self)
|
||
|
|
data['work_type'] = self.work_type.value
|
||
|
|
return data
|
||
|
|
|
||
|
|
def to_json(self) -> str:
|
||
|
|
"""转换为JSON字符串"""
|
||
|
|
return json.dumps(self.to_dict())
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def from_dict(cls, data: Dict[str, Any]) -> 'Work':
|
||
|
|
"""从字典创建数据"""
|
||
|
|
data['work_type'] = WorkType(data.get('work_type', 'event'))
|
||
|
|
return cls(**data)
|
||
|
|
|
||
|
|
|
||
|
|
class WorkflowEngine:
|
||
|
|
"""工作流引擎 - 核心组件"""
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
"""初始化工作流引擎"""
|
||
|
|
|
||
|
|
# 锁用于线程安全
|
||
|
|
self._lock = asyncio.Lock()
|
||
|
|
logger.info("工作流引擎已初始化")
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
_work_flow_engine: Optional[WorkflowEngine] = None
|
||
|
|
|
||
|
|
|
||
|
|
def get_work_flow_engine() -> WorkflowEngine:
|
||
|
|
"""
|
||
|
|
获取或创建全局工作流引擎实例
|
||
|
|
|
||
|
|
Returns: 工作流引擎实例
|
||
|
|
"""
|
||
|
|
global _work_flow_engine
|
||
|
|
if _work_flow_engine is None:
|
||
|
|
_work_flow_engine = WorkflowEngine()
|
||
|
|
return _work_flow_engine
|
||
|
|
|
||
|
|
|
||
|
|
def reset_work_flow_engine() -> None:
|
||
|
|
"""重置全局工作流引擎实例(用于测试)"""
|
||
|
|
global _work_flow_engine
|
||
|
|
_work_flow_engine = None
|