UI_CVS_MIX/database_manager.py
2025-11-20 11:40:03 +08:00

424 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
数据模型和 ORM 定义
使用 SQLAlchemy 定义数据库表结构
"""
import threading
from datetime import datetime
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Boolean, JSON, Enum, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from typing import Callable, Dict, List, Any, Optional
import enum
import json
# 创建基类
Base = declarative_base()
db_lock = threading.RLock()
class WorkflowStatus(enum.Enum):
"""工作流状态"""
READY = "ready" # 就绪
RUNNING = "running" # 运行中
PAUSED = "paused" # 暂停
COMPLETED = "completed" # 已完成
FAILED = "failed" # 失败
CANCELLED = "cancelled" # 已取消
class TaskStatus(enum.Enum):
"""任务状态"""
PENDING = "pending" # 待处理
RUNNING = "running" # 运行中
COMPLETED = "completed" # 已完成
FAILED = "failed" # 失败
SKIPPED = "skipped" # 已跳过
CANCELLED = "cancelled" # 已取消
# ═══════════════════════════════════════════════════════════════════════════
# 系统配置表
# ═══════════════════════════════════════════════════════════════════════════
class SystemConfig(Base):
"""系统配置"""
__tablename__ = "system_config"
id = Column(Integer, primary_key=True)
key = Column(String(100), unique=True, nullable=False, index=True)
value = Column(Text, nullable=False)
description = Column(String(500))
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<SystemConfig {self.key}={self.value[:50]}>"
def to_dict(self):
return {
"id": self.id,
"key": self.key,
"value": self.value,
"description": self.description,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
class Project(Base):
"""项目表"""
__tablename__ = "project_table"
id = Column(Integer, primary_key=True)
#任务名称
name = Column(String(500),default="")
#任务创建者
creator= Column(String(500),default="")
#任务描述
description= Column(String(1000),default="")
#工作流+草稿+作者组成一个项目。此处保存工作流的UUID和草稿的UUID
corr_workflow= Column(String(500),default="")
corr_draft= Column(String(500),default="")
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<Project {self.name}>"
def to_dict(self):
return {
"id": self.id,
"name": self.name,
"creator": self.creator,
"description": self.description,
"corr_workflow": self.corr_workflow,
"corr_draft": self.corr_draft,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
class WorkFlow(Base):
"""任务表"""
__tablename__ = "workflow_table"
id = Column(Integer, primary_key=True)
#工作流UUID
uuid= Column(String(500),default="")
#工作流名称
name = Column(String(500),default="")
#任务创建者
creator= Column(String(500),default="")
#工作流描述
description= Column(String(1000),default="")
#具体工作流内容JSON格式保存
content= Column(JSON, default=dict)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<WorkFlow {self.name}>"
def to_dict(self):
return {
"id": self.id,
"name": self.name,
"creator": self.creator,
"description": self.description,
"content": self.content,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
class Draft(Base):
"""草稿表"""
__tablename__ = "draft_table"
id = Column(Integer, primary_key=True)
#草稿UUID
uuid= Column(String(500),default="")
#草稿名称
name = Column(String(500),default="")
#草稿创建者
creator= Column(String(500),default="")
#草稿描述
description= Column(String(1000),default="")
#具体草稿内容JSON格式保存
content= Column(JSON, default=dict)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<Draft {self.name}>"
def to_dict(self):
return {
"id": self.id,
"name": self.name,
"creator": self.creator,
"discription": self.discription,
"content": self.content,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
class User(Base):
"""用户表"""
__tablename__ = "user_table"
id = Column(Integer, primary_key=True)
username = Column(String(500),default="")
password= Column(String(500),default="")
role= Column(String(500),default="") #admin/user/guest
discription= Column(String(1000),default="")
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<User {self.username}>"
def to_dict(self):
return {
"id": self.id,
"username": self.username,
"password": self.password,
"role": self.role,
"discription": self.discription,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
# ═══════════════════════════════════════════════════════════════════════════
# 工作流执行实例表
# ═══════════════════════════════════════════════════════════════════════════
class WorkflowInstance(Base):
"""工作流执行实例"""
__tablename__ = "workflow_instances"
id = Column(Integer, primary_key=True)
workflow_uuid = Column(String(500))
workflow_name = Column(String(500))
status = Column(Enum(WorkflowStatus), default=WorkflowStatus.READY)
# 执行参数
parameters = Column(JSON, default=dict)
# 执行结果和输出
result = Column(JSON, default=dict)
#临时数据区
temp_data=Column(JSON, default=dict)
# 当前步骤
current_step_params = Column(JSON, default=dict)
current_step_index = Column(Integer, default=0)
current_step_content = Column(JSON, default=dict)
current_step_result = Column(JSON, default=dict)
# 已完成的步骤
completed_steps = Column(JSON, default=list)
# 失败信息
error_message = Column(Text)
error_traceback = Column(Text)
# 重试次数
retry_count = Column(Integer, default=0)
max_retries = Column(Integer, default=3)
created_at = Column(DateTime, default=datetime.utcnow)
started_at = Column(DateTime)
completed_at = Column(DateTime)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<WorkflowInstance {self.workflow_name} #{self.id}>"
def to_dict(self):
return {
"id": self.id,
"workflow_uuid": self.workflow_uuid,
"workflow_name": self.workflow_name,
"status": self.status.value if self.status else None,
"parameters": self.parameters,
"result": self.result,
"temp_data": self.temp_data,
"current_step_params": self.current_step_params,
"current_step_index": self.current_step_index,
"current_step_content": self.current_step_content,
"current_step_result": self.current_step_result,
"completed_steps": self.completed_steps,
"error_message": self.error_message,
"error_traceback": self.error_traceback,
"retry_count": self.retry_count,
"max_retries": self.max_retries,
"created_at": self.created_at.isoformat() if self.created_at else None,
"started_at": self.started_at.isoformat() if self.started_at else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
# ═══════════════════════════════════════════════════════════════════════════
# 操作日志表
# ═══════════════════════════════════════════════════════════════════════════
class OperationLog(Base):
"""操作日志"""
__tablename__ = "operation_logs"
id = Column(Integer, primary_key=True)
# 操作类型: "create", "update", "delete", "execute", etc.
operation_type = Column(String(50), nullable=False, index=True)
# 操作对象: "workflow", "task", etc.
object_type = Column(String(50), nullable=False)
object_id = Column(Integer)
object_name = Column(String(100))
# 操作详情
details = Column(JSON, default=dict)
# 操作者 (用户或系统)
operator = Column(String(100), default="system")
# 操作结果
status = Column(String(50)) # "success", "failure"
error_message = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow, index=True)
def __repr__(self):
return f"<OperationLog {self.operation_type} {self.object_type} #{self.object_id}>"
def to_dict(self):
return {
"id": self.id,
"operation_type": self.operation_type,
"object_type": self.object_type,
"object_id": self.object_id,
"object_name": self.object_name,
"details": self.details,
"operator": self.operator,
"status": self.status,
"error_message": self.error_message,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
# ═══════════════════════════════════════════════════════════════════════════
# 测试表
# ═══════════════════════════════════════════════════════════════════════════
class Test(Base):
"""测试表"""
__tablename__ = "test_tb"
id = Column(Integer, primary_key=True)
name = Column(String(500),default="")
value = Column(String(500),default="")
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<Workflow {self.value}>"
def to_dict(self):
return {
"id": self.id,
"name": self.name,
"value": self.value,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
# ═══════════════════════════════════════════════════════════════════════════
# 数据库初始化和会话管理
# ═══════════════════════════════════════════════════════════════════════════
class DatabaseManager:
"""数据库管理器"""
def __init__(self, database_url: str = "sqlite:///hy02d_system.db"):
"""
初始化数据库管理器
Args:
database_url: 数据库连接字符串
- SQLite: "sqlite:///path/to/db.db"
- MySQL: "mysql+pymysql://user:password@host:port/dbname"
- PostgreSQL: "postgresql://user:password@host:port/dbname"
"""
self.database_url = database_url
self.engine = None
self.Session = None
self._lock=threading.RLock()
def initialize(self):
"""初始化数据库"""
self.engine = create_engine(
self.database_url,
echo=False, # 设置为 True 查看 SQL 语句
pool_pre_ping=True # 测试连接有效性
)
# 创建所有表
Base.metadata.create_all(self.engine)
# 创建会话工厂
self.Session = sessionmaker(bind=self.engine)
print(f"✓ 数据库已初始化: {self.database_url}")
def get_session(self):
"""获取数据库会话"""
if self.Session is None:
raise RuntimeError("数据库尚未初始化,请先调用 initialize()")
return self.Session()
def close(self):
"""关闭数据库连接"""
if self.engine:
self.engine.dispose()
print("✓ 数据库连接已关闭")
# 全局数据库管理器实例
_db_manager:Optional[DatabaseManager] = None
def get_db_manager() -> DatabaseManager:
"""
获取或创建全局数据库管理器实例
Returns:
数据库管理器实例
"""
global _db_manager
with db_lock:
if _db_manager is None:
_db_manager = DatabaseManager()
return _db_manager
def reset_db_manager() -> None:
"""重置全局数据库管理器实例(用于测试)"""
global _db_manager
_db_manager = None