Files
auto_cursor_online/delete_db.py
2025-03-31 09:55:54 +08:00

227 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import sys
from datetime import datetime, timedelta
import argparse
from loguru import logger
from core.config import Config
from core.database import DatabaseManager
from core.logger import setup_logger
class DatabaseCleaner:
def __init__(self):
self.config = Config.from_yaml()
self.logger = setup_logger(self.config)
self.db_manager = DatabaseManager(self.config)
async def initialize(self):
"""初始化数据库连接"""
await self.db_manager.initialize()
async def cleanup(self):
"""清理资源"""
await self.db_manager.cleanup()
async def get_status_stats(self) -> dict:
"""获取所有状态的统计信息"""
try:
async with self.db_manager.get_connection() as conn:
cursor = await conn.execute(
"""
SELECT status, COUNT(*) as count
FROM email_accounts
GROUP BY status
ORDER BY count DESC
"""
)
rows = await cursor.fetchall()
stats = {}
total = 0
for row in rows:
status, count = row
stats[status] = count
total += count
stats['total'] = total
return stats
except Exception as e:
self.logger.error(f"获取状态统计时出错: {e}")
return {}
async def delete_all(self, is_dry_run: bool = True) -> int:
"""删除所有记录"""
try:
async with self.db_manager.get_connection() as conn:
if is_dry_run:
# 在预览模式下,只统计数量
cursor = await conn.execute("SELECT COUNT(*) FROM email_accounts")
count = (await cursor.fetchone())[0]
self.logger.info(f"预览模式: 将删除 {count} 条记录")
return count
else:
# 实际删除
await conn.execute("DELETE FROM email_accounts")
await conn.commit()
self.logger.success("已删除所有记录")
return 0
except Exception as e:
self.logger.error(f"删除所有记录时出错: {e}")
return -1
async def delete_by_conditions(self, status: str = None, before_date: str = None,
email_domain: str = None, is_dry_run: bool = True) -> int:
"""根据条件删除记录"""
try:
conditions = []
params = []
if status:
conditions.append("status = ?")
params.append(status)
if before_date:
conditions.append("created_at < ?")
params.append(before_date)
if email_domain:
conditions.append("email LIKE ?")
params.append(f"%@{email_domain}")
if not conditions:
self.logger.error("未设置任何删除条件")
return -1
where_clause = " AND ".join(conditions)
async with self.db_manager.get_connection() as conn:
if is_dry_run:
# 在预览模式下,只统计数量
cursor = await conn.execute(
f"SELECT COUNT(*) FROM email_accounts WHERE {where_clause}",
params
)
count = (await cursor.fetchone())[0]
self.logger.info(f"预览模式: 将删除 {count} 条记录")
return count
else:
# 实际删除
await conn.execute(
f"DELETE FROM email_accounts WHERE {where_clause}",
params
)
await conn.commit()
self.logger.success("已删除符合条件的记录")
return 0
except Exception as e:
self.logger.error(f"删除记录时出错: {e}")
return -1
async def main():
# 解析命令行参数
parser = argparse.ArgumentParser(description="删除数据库中的记录")
parser.add_argument("--status", help="账号状态 (success/failed/pending/unavailable/retrying)")
parser.add_argument("--before", help="删除指定日期之前的记录 (YYYY-MM-DD)")
parser.add_argument("--domain", help="删除指定邮箱域名的记录")
parser.add_argument("--dry-run", action="store_true", help="预览模式,不实际删除")
parser.add_argument("--confirm", action="store_true", help="确认删除")
parser.add_argument("--all", action="store_true", help="删除所有记录")
parser.add_argument("--interactive", action="store_true", help="交互式删除")
args = parser.parse_args()
# 初始化
cleaner = DatabaseCleaner()
await cleaner.initialize()
try:
if args.interactive:
# 获取状态统计
stats = await cleaner.get_status_stats()
if not stats:
print("获取状态统计失败")
return
print("\n当前数据库状态统计:")
print("-" * 40)
for status, count in stats.items():
if status != 'total':
print(f"{status:15} : {count:5} 条记录")
print("-" * 40)
print(f"总计: {stats['total']} 条记录")
# 交互式选择要删除的状态
print("\n请选择要删除的状态 (输入状态名称,多个状态用逗号分隔):")
status_input = input().strip()
if not status_input:
print("未选择任何状态")
return
# 解析输入的状态
statuses = [s.strip() for s in status_input.split(',')]
# 验证状态是否有效
valid_statuses = [s for s in statuses if s in stats]
if not valid_statuses:
print("没有有效的状态")
return
# 显示将要删除的记录数量
total_to_delete = sum(stats[s] for s in valid_statuses)
print(f"\n将要删除 {total_to_delete} 条记录")
print("确定要删除吗?(y/n)", end=" ")
if input().strip().lower() != 'y':
print("操作已取消")
return
# 执行删除
for status in valid_statuses:
await cleaner.delete_by_conditions(status=status, is_dry_run=False)
return
# 检查是否设置了任何条件
if not any([args.status, args.before, args.domain, args.all]):
print("错误: 必须设置至少一个删除条件")
return
# 检查是否确认删除
if not args.dry_run and not args.confirm:
print("错误: 实际删除操作需要添加 --confirm 参数")
return
if args.all:
# 删除所有记录
await cleaner.delete_all(args.dry_run)
else:
# 按条件删除
await cleaner.delete_by_conditions(
status=args.status,
before_date=args.before,
email_domain=args.domain,
is_dry_run=args.dry_run
)
finally:
await cleaner.cleanup()
if __name__ == "__main__":
if len(sys.argv) == 1:
print("\n删除数据库账号工具使用说明:")
print(" 交互式删除python delete_db.py --interactive")
print(" 删除指定状态的账号python delete_db.py --status unavailable")
print(" 删除指定日期前的账号python delete_db.py --before 2025-03-20")
print(" 删除指定域名的账号python delete_db.py --domain outlook.com")
print(" 组合条件python delete_db.py --status unavailable --domain outlook.com")
print(" 预览模式不实际删除python delete_db.py --status unavailable --dry-run")
print(" 自动确认删除不提示python delete_db.py --status unavailable --confirm")
print(" 删除所有记录python delete_db.py --all --confirm")
print("\n注意:必须指定至少一个过滤条件或使用交互式模式\n")
asyncio.run(main())