110 lines
3.7 KiB
Python
110 lines
3.7 KiB
Python
import asyncio
|
||
import sys
|
||
|
||
# Windows平台特殊处理,强制使用SelectorEventLoop
|
||
if sys.platform.startswith('win'):
|
||
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
|
||
|
||
from loguru import logger
|
||
|
||
from core.config import Config
|
||
from core.database import DatabaseManager
|
||
|
||
|
||
async def import_emails(config: Config, db_manager: DatabaseManager, file_path: str):
|
||
"""导入邮箱账号到MySQL数据库"""
|
||
DEFAULT_CLIENT_ID = "9e5f94bc-e8a4-4e73-b8be-63364c29d753"
|
||
|
||
# 确保数据库连接已初始化
|
||
if not db_manager._pool:
|
||
await db_manager.initialize()
|
||
|
||
# 读取文件并导入数据
|
||
count = 0
|
||
duplicate_count = 0
|
||
error_count = 0
|
||
|
||
logger.info(f"开始从 {file_path} 导入邮箱账号")
|
||
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
for line_num, line in enumerate(f, 1):
|
||
if not line.strip():
|
||
continue
|
||
|
||
try:
|
||
# 解析数据行
|
||
parts = line.strip().split('----')
|
||
if len(parts) < 4:
|
||
logger.error(f"行 {line_num}: 格式不正确,期望 'email----password----client_id----refresh_token'")
|
||
error_count += 1
|
||
continue
|
||
|
||
email, password, client_id, refresh_token = parts
|
||
|
||
# 插入数据库
|
||
insert_query = '''
|
||
INSERT INTO email_accounts
|
||
(email, password, client_id, refresh_token, status)
|
||
VALUES (%s, %s, %s, %s, 'pending')
|
||
'''
|
||
|
||
try:
|
||
await db_manager.execute(insert_query, (email, password, client_id, refresh_token))
|
||
count += 1
|
||
|
||
if count % 100 == 0:
|
||
logger.info(f"已导入 {count} 个邮箱账号")
|
||
|
||
except Exception as e:
|
||
if "Duplicate entry" in str(e):
|
||
logger.warning(f"行 {line_num}: 重复的邮箱: {email}")
|
||
duplicate_count += 1
|
||
else:
|
||
logger.error(f"行 {line_num}: 导入失败: {str(e)}")
|
||
error_count += 1
|
||
|
||
except Exception as e:
|
||
logger.error(f"行 {line_num}: 处理时出错: {str(e)}")
|
||
error_count += 1
|
||
|
||
# 如果启用了Redis缓存,清除相关缓存
|
||
if db_manager.redis:
|
||
cleared = await db_manager.clear_cache("db:*")
|
||
logger.info(f"已清除 {cleared} 个Redis缓存键")
|
||
|
||
logger.success(f"导入完成: 成功 {count} 个, 重复 {duplicate_count} 个, 失败 {error_count} 个")
|
||
return count
|
||
|
||
|
||
async def main():
|
||
try:
|
||
# 加载配置
|
||
config = Config.from_yaml()
|
||
|
||
# 初始化数据库管理器
|
||
db_manager = DatabaseManager(config)
|
||
await db_manager.initialize()
|
||
|
||
# 从配置中获取邮箱文件路径,或使用默认值
|
||
file_path = config.email_config.file_path if hasattr(config, 'email_config') and config.email_config else "email.txt"
|
||
|
||
# 导入邮箱
|
||
await import_emails(config, db_manager, file_path)
|
||
|
||
except Exception as e:
|
||
logger.error(f"程序执行出错: {str(e)}")
|
||
finally:
|
||
# 清理资源
|
||
if 'db_manager' in locals():
|
||
await db_manager.cleanup()
|
||
logger.info("程序执行完毕")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 设置日志
|
||
logger.remove()
|
||
logger.add(sys.stderr, level="INFO")
|
||
logger.add("import_emails.log", rotation="1 MB", level="DEBUG")
|
||
|
||
# 执行导入
|
||
asyncio.run(main()) |