#!/usr/bin/env python3 """ 数据库升级脚本 在已有的数据库中添加新字段和索引 """ import asyncio import sys import yaml import traceback from loguru import logger # Windows平台特殊处理,强制使用SelectorEventLoop if sys.platform.startswith('win'): asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) from core.config import Config from core.database import DatabaseManager async def add_extracted_field(db_manager): """向email_accounts表添加extracted字段和索引""" logger.info("正在检查extracted字段...") # 检查extracted字段是否存在 check_query = """ SELECT COUNT(*) as field_exists FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'email_accounts' AND COLUMN_NAME = 'extracted' """ result = await db_manager.fetch_one(check_query) field_exists = result['field_exists'] > 0 if result else False if field_exists: logger.info("extracted字段已存在,无需添加") else: logger.info("添加extracted字段...") add_field_query = """ ALTER TABLE email_accounts ADD COLUMN extracted BOOLEAN DEFAULT 0 """ await db_manager.execute(add_field_query) logger.success("成功添加extracted字段") # 检查索引是否存在 check_index_query = """ SELECT COUNT(*) as index_exists FROM information_schema.STATISTICS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'email_accounts' AND INDEX_NAME = 'idx_extracted' """ result = await db_manager.fetch_one(check_index_query) index_exists = result['index_exists'] > 0 if result else False if index_exists: logger.info("idx_extracted索引已存在,无需添加") else: logger.info("添加idx_extracted索引...") add_index_query = """ ALTER TABLE email_accounts ADD INDEX idx_extracted (extracted, status, sold) """ await db_manager.execute(add_index_query) logger.success("成功添加idx_extracted索引") logger.info("数据库升级完成") # 设置所有注册成功且已售出的账号的extracted状态 update_query = """ UPDATE email_accounts SET extracted = 0 WHERE status = 'success' AND sold = 1 AND extracted IS NULL """ affected = await db_manager.execute(update_query) logger.info(f"已更新 {affected} 条记录的extracted状态") async def upgrade_database(): """升级数据库结构""" logger.info("开始数据库升级...") try: # 加载配置 config = Config.from_yaml() # 创建数据库管理器并初始化 db_manager = DatabaseManager(config) await db_manager.initialize() try: # 添加extracted字段和索引 await add_extracted_field(db_manager) logger.success("数据库升级成功完成") finally: # 清理资源 await db_manager.cleanup() except Exception as e: logger.error(f"数据库升级失败: {str(e)}") logger.error(traceback.format_exc()) if __name__ == "__main__": # 设置日志 logger.remove() logger.add(sys.stderr, level="INFO") logger.add("upgrade_database.log", rotation="1 MB", level="DEBUG") # 执行升级 logger.info("启动数据库升级") asyncio.run(upgrade_database()) logger.info("数据库升级脚本执行完毕")