import asyncio import sys from datetime import datetime, timedelta import argparse from loguru import logger from core.config import Config from core.database import DatabaseManager from core.logger import setup_logger class DatabaseCleaner: def __init__(self): self.config = Config.from_yaml() self.logger = setup_logger(self.config) self.db_manager = DatabaseManager(self.config) async def initialize(self): """初始化数据库连接""" await self.db_manager.initialize() async def cleanup(self): """清理资源""" await self.db_manager.cleanup() async def get_status_stats(self) -> dict: """获取所有状态的统计信息""" try: async with self.db_manager.get_connection() as conn: cursor = await conn.execute( """ SELECT status, COUNT(*) as count FROM email_accounts GROUP BY status ORDER BY count DESC """ ) rows = await cursor.fetchall() stats = {} total = 0 for row in rows: status, count = row stats[status] = count total += count stats['total'] = total return stats except Exception as e: self.logger.error(f"获取状态统计时出错: {e}") return {} async def delete_all(self, is_dry_run: bool = True) -> int: """删除所有记录""" try: async with self.db_manager.get_connection() as conn: if is_dry_run: # 在预览模式下,只统计数量 cursor = await conn.execute("SELECT COUNT(*) FROM email_accounts") count = (await cursor.fetchone())[0] self.logger.info(f"预览模式: 将删除 {count} 条记录") return count else: # 实际删除 await conn.execute("DELETE FROM email_accounts") await conn.commit() self.logger.success("已删除所有记录") return 0 except Exception as e: self.logger.error(f"删除所有记录时出错: {e}") return -1 async def delete_by_conditions(self, status: str = None, before_date: str = None, email_domain: str = None, is_dry_run: bool = True) -> int: """根据条件删除记录""" try: conditions = [] params = [] if status: conditions.append("status = ?") params.append(status) if before_date: conditions.append("created_at < ?") params.append(before_date) if email_domain: conditions.append("email LIKE ?") params.append(f"%@{email_domain}") if not conditions: self.logger.error("未设置任何删除条件") return -1 where_clause = " AND ".join(conditions) async with self.db_manager.get_connection() as conn: if is_dry_run: # 在预览模式下,只统计数量 cursor = await conn.execute( f"SELECT COUNT(*) FROM email_accounts WHERE {where_clause}", params ) count = (await cursor.fetchone())[0] self.logger.info(f"预览模式: 将删除 {count} 条记录") return count else: # 实际删除 await conn.execute( f"DELETE FROM email_accounts WHERE {where_clause}", params ) await conn.commit() self.logger.success("已删除符合条件的记录") return 0 except Exception as e: self.logger.error(f"删除记录时出错: {e}") return -1 async def main(): # 解析命令行参数 parser = argparse.ArgumentParser(description="删除数据库中的记录") parser.add_argument("--status", help="账号状态 (success/failed/pending/unavailable/retrying)") parser.add_argument("--before", help="删除指定日期之前的记录 (YYYY-MM-DD)") parser.add_argument("--domain", help="删除指定邮箱域名的记录") parser.add_argument("--dry-run", action="store_true", help="预览模式,不实际删除") parser.add_argument("--confirm", action="store_true", help="确认删除") parser.add_argument("--all", action="store_true", help="删除所有记录") parser.add_argument("--interactive", action="store_true", help="交互式删除") args = parser.parse_args() # 初始化 cleaner = DatabaseCleaner() await cleaner.initialize() try: if args.interactive: # 获取状态统计 stats = await cleaner.get_status_stats() if not stats: print("获取状态统计失败") return print("\n当前数据库状态统计:") print("-" * 40) for status, count in stats.items(): if status != 'total': print(f"{status:15} : {count:5} 条记录") print("-" * 40) print(f"总计: {stats['total']} 条记录") # 交互式选择要删除的状态 print("\n请选择要删除的状态 (输入状态名称,多个状态用逗号分隔):") status_input = input().strip() if not status_input: print("未选择任何状态") return # 解析输入的状态 statuses = [s.strip() for s in status_input.split(',')] # 验证状态是否有效 valid_statuses = [s for s in statuses if s in stats] if not valid_statuses: print("没有有效的状态") return # 显示将要删除的记录数量 total_to_delete = sum(stats[s] for s in valid_statuses) print(f"\n将要删除 {total_to_delete} 条记录") print("确定要删除吗?(y/n)", end=" ") if input().strip().lower() != 'y': print("操作已取消") return # 执行删除 for status in valid_statuses: await cleaner.delete_by_conditions(status=status, is_dry_run=False) return # 检查是否设置了任何条件 if not any([args.status, args.before, args.domain, args.all]): print("错误: 必须设置至少一个删除条件") return # 检查是否确认删除 if not args.dry_run and not args.confirm: print("错误: 实际删除操作需要添加 --confirm 参数") return if args.all: # 删除所有记录 await cleaner.delete_all(args.dry_run) else: # 按条件删除 await cleaner.delete_by_conditions( status=args.status, before_date=args.before, email_domain=args.domain, is_dry_run=args.dry_run ) finally: await cleaner.cleanup() if __name__ == "__main__": if len(sys.argv) == 1: print("\n删除数据库账号工具使用说明:") print(" 交互式删除:python delete_db.py --interactive") print(" 删除指定状态的账号:python delete_db.py --status unavailable") print(" 删除指定日期前的账号:python delete_db.py --before 2025-03-20") print(" 删除指定域名的账号:python delete_db.py --domain outlook.com") print(" 组合条件:python delete_db.py --status unavailable --domain outlook.com") print(" 预览模式(不实际删除):python delete_db.py --status unavailable --dry-run") print(" 自动确认删除(不提示):python delete_db.py --status unavailable --confirm") print(" 删除所有记录:python delete_db.py --all --confirm") print("\n注意:必须指定至少一个过滤条件或使用交互式模式\n") asyncio.run(main())