import asyncio import sys from typing import List, Dict, Any import aiohttp from loguru import logger from core.config import Config from core.database import DatabaseManager from core.logger import setup_logger class ExtractedResetter: def __init__(self): self.config = Config.from_yaml() self.logger = setup_logger(self.config) self.db_manager = DatabaseManager(self.config) async def initialize(self): """初始化数据库""" await self.db_manager.initialize() async def cleanup(self): """清理资源""" await self.db_manager.cleanup() async def reset_extracted(self, include_pattern: str = None) -> int: """将账号的extracted字段重置为0""" try: query = """ UPDATE email_accounts SET extracted = 0, updated_at = CURRENT_TIMESTAMP WHERE status = 'success' AND sold = 1 """ params = [] # 添加条件过滤 if include_pattern: query += " AND email LIKE ?" params.append(f"%{include_pattern}%") async with self.db_manager.get_connection() as conn: cursor = await conn.execute(query, tuple(params)) updated_count = cursor.rowcount await conn.commit() self.logger.success(f"成功重置 {updated_count} 个账号的extracted状态") return updated_count except Exception as e: self.logger.error(f"重置extracted状态时出错: {e}") return 0 async def count_success_accounts(self) -> Dict[str, int]: """统计成功账号的提取状态""" try: async with self.db_manager.get_connection() as conn: # 统计已提取的账号 cursor1 = await conn.execute( "SELECT COUNT(*) FROM email_accounts WHERE status = 'success' AND sold = 1 AND extracted = 1" ) extracted_count = (await cursor1.fetchone())[0] # 统计未提取的账号 cursor2 = await conn.execute( "SELECT COUNT(*) FROM email_accounts WHERE status = 'success' AND sold = 1 AND extracted = 0" ) not_extracted_count = (await cursor2.fetchone())[0] # 统计总成功账号 cursor3 = await conn.execute( "SELECT COUNT(*) FROM email_accounts WHERE status = 'success' AND sold = 1" ) total_count = (await cursor3.fetchone())[0] return { "extracted": extracted_count, "not_extracted": not_extracted_count, "total": total_count } except Exception as e: self.logger.error(f"统计账号时出错: {e}") return { "extracted": 0, "not_extracted": 0, "total": 0 } async def main(): # 解析命令行参数 import argparse parser = argparse.ArgumentParser(description="重置账号的已提取状态") parser.add_argument("--pattern", type=str, help="只重置匹配此模式的邮箱") parser.add_argument("--dry-run", action="store_true", help="仅统计,不重置") args = parser.parse_args() # 初始化 resetter = ExtractedResetter() await resetter.initialize() try: # 统计当前状态 stats = await resetter.count_success_accounts() logger.info(f"当前状态: 总成功账号: {stats['total']}, 已提取: {stats['extracted']}, 未提取: {stats['not_extracted']}") # 预览模式 if args.dry_run: logger.info("预览模式,不执行重置操作") return # 执行重置 if args.pattern: logger.info(f"将重置邮箱包含 '{args.pattern}' 的账号的extracted状态") else: logger.info("将重置所有成功账号的extracted状态") updated = await resetter.reset_extracted(args.pattern) # 统计重置后的状态 new_stats = await resetter.count_success_accounts() logger.info(f"重置后状态: 总成功账号: {new_stats['total']}, 已提取: {new_stats['extracted']}, 未提取: {new_stats['not_extracted']}") except Exception as e: logger.error(f"程序执行出错: {str(e)}") finally: await resetter.cleanup() if __name__ == "__main__": asyncio.run(main())