import asyncio import sys # Windows平台特殊处理,强制使用SelectorEventLoop if sys.platform.startswith('win'): asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) from loguru import logger from core.config import Config from core.database import DatabaseManager async def import_emails(config: Config, db_manager: DatabaseManager, file_path: str): """导入邮箱账号到MySQL数据库""" DEFAULT_CLIENT_ID = "9e5f94bc-e8a4-4e73-b8be-63364c29d753" # 确保数据库连接已初始化 if not db_manager._pool: await db_manager.initialize() # 读取文件并导入数据 count = 0 duplicate_count = 0 error_count = 0 logger.info(f"开始从 {file_path} 导入邮箱账号") with open(file_path, 'r', encoding='utf-8') as f: for line_num, line in enumerate(f, 1): if not line.strip(): continue try: # 解析数据行 parts = line.strip().split('----') if len(parts) < 4: logger.error(f"行 {line_num}: 格式不正确,期望 'email----password----client_id----refresh_token'") error_count += 1 continue email, password, client_id, refresh_token = parts # 插入数据库 insert_query = ''' INSERT INTO email_accounts (email, password, client_id, refresh_token, status) VALUES (%s, %s, %s, %s, 'pending') ''' try: await db_manager.execute(insert_query, (email, password, client_id, refresh_token)) count += 1 if count % 100 == 0: logger.info(f"已导入 {count} 个邮箱账号") except Exception as e: if "Duplicate entry" in str(e): logger.warning(f"行 {line_num}: 重复的邮箱: {email}") duplicate_count += 1 else: logger.error(f"行 {line_num}: 导入失败: {str(e)}") error_count += 1 except Exception as e: logger.error(f"行 {line_num}: 处理时出错: {str(e)}") error_count += 1 # 如果启用了Redis缓存,清除相关缓存 if db_manager.redis: cleared = await db_manager.clear_cache("db:*") logger.info(f"已清除 {cleared} 个Redis缓存键") logger.success(f"导入完成: 成功 {count} 个, 重复 {duplicate_count} 个, 失败 {error_count} 个") return count async def main(): try: # 加载配置 config = Config.from_yaml() # 初始化数据库管理器 db_manager = DatabaseManager(config) await db_manager.initialize() # 从配置中获取邮箱文件路径,或使用默认值 file_path = config.email_config.file_path if hasattr(config, 'email_config') and config.email_config else "email.txt" # 导入邮箱 await import_emails(config, db_manager, file_path) except Exception as e: logger.error(f"程序执行出错: {str(e)}") finally: # 清理资源 if 'db_manager' in locals(): await db_manager.cleanup() logger.info("程序执行完毕") if __name__ == "__main__": # 设置日志 logger.remove() logger.add(sys.stderr, level="INFO") logger.add("import_emails.log", rotation="1 MB", level="DEBUG") # 执行导入 asyncio.run(main())