#!/usr/bin/env python3 """ 数据模型和 ORM 定义 使用 SQLAlchemy 定义数据库表结构 """ import threading from datetime import datetime from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Boolean, JSON, Enum, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from typing import Callable, Dict, List, Any, Optional import enum import json # 创建基类 Base = declarative_base() db_lock = threading.RLock() class WorkflowStatus(enum.Enum): """工作流状态""" READY = "ready" # 就绪 RUNNING = "running" # 运行中 PAUSED = "paused" # 暂停 COMPLETED = "completed" # 已完成 FAILED = "failed" # 失败 CANCELLED = "cancelled" # 已取消 class TaskStatus(enum.Enum): """任务状态""" PENDING = "pending" # 待处理 RUNNING = "running" # 运行中 COMPLETED = "completed" # 已完成 FAILED = "failed" # 失败 SKIPPED = "skipped" # 已跳过 CANCELLED = "cancelled" # 已取消 # ═══════════════════════════════════════════════════════════════════════════ # 系统配置表 # ═══════════════════════════════════════════════════════════════════════════ class SystemConfig(Base): """系统配置""" __tablename__ = "system_config" id = Column(Integer, primary_key=True) key = Column(String(100), unique=True, nullable=False, index=True) value = Column(Text, nullable=False) description = Column(String(500)) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def __repr__(self): return f"" def to_dict(self): return { "id": self.id, "key": self.key, "value": self.value, "description": self.description, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None, } class Project(Base): """项目表""" __tablename__ = "project_table" id = Column(Integer, primary_key=True) #任务名称 name = Column(String(500),default="") #任务创建者 creator= Column(String(500),default="") #任务描述 description= Column(String(1000),default="") #工作流+草稿+作者,组成一个项目。此处保存工作流的UUID和草稿的UUID corr_workflow= Column(String(500),default="") corr_draft= Column(String(500),default="") created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def __repr__(self): return f"" def to_dict(self): return { "id": self.id, "name": self.name, "creator": self.creator, "description": self.description, "corr_workflow": self.corr_workflow, "corr_draft": self.corr_draft, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None, } class WorkFlow(Base): """任务表""" __tablename__ = "workflow_table" id = Column(Integer, primary_key=True) #工作流UUID uuid= Column(String(500),default="") #工作流名称 name = Column(String(500),default="") #任务创建者 creator= Column(String(500),default="") #工作流描述 description= Column(String(1000),default="") #具体工作流内容,JSON格式保存 content= Column(JSON, default=dict) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def __repr__(self): return f"" def to_dict(self): return { "id": self.id, "name": self.name, "creator": self.creator, "description": self.description, "content": self.content, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None, } class Draft(Base): """草稿表""" __tablename__ = "draft_table" id = Column(Integer, primary_key=True) #草稿UUID uuid= Column(String(500),default="") #草稿名称 name = Column(String(500),default="") #草稿创建者 creator= Column(String(500),default="") #草稿描述 description= Column(String(1000),default="") #具体草稿内容,JSON格式保存 content= Column(JSON, default=dict) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def __repr__(self): return f"" def to_dict(self): return { "id": self.id, "name": self.name, "creator": self.creator, "discription": self.discription, "content": self.content, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None, } class User(Base): """用户表""" __tablename__ = "user_table" id = Column(Integer, primary_key=True) username = Column(String(500),default="") password= Column(String(500),default="") role= Column(String(500),default="") #admin/user/guest discription= Column(String(1000),default="") created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def __repr__(self): return f"" def to_dict(self): return { "id": self.id, "username": self.username, "password": self.password, "role": self.role, "discription": self.discription, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None, } # ═══════════════════════════════════════════════════════════════════════════ # 工作流执行实例表 # ═══════════════════════════════════════════════════════════════════════════ class WorkflowInstance(Base): """工作流执行实例""" __tablename__ = "workflow_instances" id = Column(Integer, primary_key=True) workflow_uuid = Column(String(500)) workflow_name = Column(String(500)) status = Column(Enum(WorkflowStatus), default=WorkflowStatus.READY) # 执行参数 parameters = Column(JSON, default=dict) # 执行结果和输出 result = Column(JSON, default=dict) #临时数据区 temp_data=Column(JSON, default=dict) # 当前步骤 current_step_params = Column(JSON, default=dict) current_step_index = Column(Integer, default=0) current_step_content = Column(JSON, default=dict) current_step_result = Column(JSON, default=dict) # 已完成的步骤 completed_steps = Column(JSON, default=list) # 失败信息 error_message = Column(Text) error_traceback = Column(Text) # 重试次数 retry_count = Column(Integer, default=0) max_retries = Column(Integer, default=3) created_at = Column(DateTime, default=datetime.utcnow) started_at = Column(DateTime) completed_at = Column(DateTime) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def __repr__(self): return f"" def to_dict(self): return { "id": self.id, "workflow_uuid": self.workflow_uuid, "workflow_name": self.workflow_name, "status": self.status.value if self.status else None, "parameters": self.parameters, "result": self.result, "temp_data": self.temp_data, "current_step_params": self.current_step_params, "current_step_index": self.current_step_index, "current_step_content": self.current_step_content, "current_step_result": self.current_step_result, "completed_steps": self.completed_steps, "error_message": self.error_message, "error_traceback": self.error_traceback, "retry_count": self.retry_count, "max_retries": self.max_retries, "created_at": self.created_at.isoformat() if self.created_at else None, "started_at": self.started_at.isoformat() if self.started_at else None, "completed_at": self.completed_at.isoformat() if self.completed_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None, } # ═══════════════════════════════════════════════════════════════════════════ # 操作日志表 # ═══════════════════════════════════════════════════════════════════════════ class OperationLog(Base): """操作日志""" __tablename__ = "operation_logs" id = Column(Integer, primary_key=True) # 操作类型: "create", "update", "delete", "execute", etc. operation_type = Column(String(50), nullable=False, index=True) # 操作对象: "workflow", "task", etc. object_type = Column(String(50), nullable=False) object_id = Column(Integer) object_name = Column(String(100)) # 操作详情 details = Column(JSON, default=dict) # 操作者 (用户或系统) operator = Column(String(100), default="system") # 操作结果 status = Column(String(50)) # "success", "failure" error_message = Column(Text) created_at = Column(DateTime, default=datetime.utcnow, index=True) def __repr__(self): return f"" def to_dict(self): return { "id": self.id, "operation_type": self.operation_type, "object_type": self.object_type, "object_id": self.object_id, "object_name": self.object_name, "details": self.details, "operator": self.operator, "status": self.status, "error_message": self.error_message, "created_at": self.created_at.isoformat() if self.created_at else None, } # ═══════════════════════════════════════════════════════════════════════════ # 测试表 # ═══════════════════════════════════════════════════════════════════════════ class Test(Base): """测试表""" __tablename__ = "test_tb" id = Column(Integer, primary_key=True) name = Column(String(500),default="") value = Column(String(500),default="") created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def __repr__(self): return f"" def to_dict(self): return { "id": self.id, "name": self.name, "value": self.value, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None, } # ═══════════════════════════════════════════════════════════════════════════ # 数据库初始化和会话管理 # ═══════════════════════════════════════════════════════════════════════════ class DatabaseManager: """数据库管理器""" def __init__(self, database_url: str = "sqlite:///hy02d_system.db"): """ 初始化数据库管理器 Args: database_url: 数据库连接字符串 - SQLite: "sqlite:///path/to/db.db" - MySQL: "mysql+pymysql://user:password@host:port/dbname" - PostgreSQL: "postgresql://user:password@host:port/dbname" """ self.database_url = database_url self.engine = None self.Session = None self._lock=threading.RLock() def initialize(self): """初始化数据库""" self.engine = create_engine( self.database_url, echo=False, # 设置为 True 查看 SQL 语句 pool_pre_ping=True # 测试连接有效性 ) # 创建所有表 Base.metadata.create_all(self.engine) # 创建会话工厂 self.Session = sessionmaker(bind=self.engine) print(f"✓ 数据库已初始化: {self.database_url}") def get_session(self): """获取数据库会话""" if self.Session is None: raise RuntimeError("数据库尚未初始化,请先调用 initialize()") return self.Session() def close(self): """关闭数据库连接""" if self.engine: self.engine.dispose() print("✓ 数据库连接已关闭") # 全局数据库管理器实例 _db_manager:Optional[DatabaseManager] = None def get_db_manager() -> DatabaseManager: """ 获取或创建全局数据库管理器实例 Returns: 数据库管理器实例 """ global _db_manager with db_lock: if _db_manager is None: _db_manager = DatabaseManager() return _db_manager def reset_db_manager() -> None: """重置全局数据库管理器实例(用于测试)""" global _db_manager _db_manager = None