Files
auto_cursor_online/reset_extracted.py
2025-03-31 09:55:54 +08:00

136 lines
4.7 KiB
Python

import asyncio
import sys
from typing import List, Dict, Any
import aiohttp
from loguru import logger
from core.config import Config
from core.database import DatabaseManager
from core.logger import setup_logger
class ExtractedResetter:
def __init__(self):
self.config = Config.from_yaml()
self.logger = setup_logger(self.config)
self.db_manager = DatabaseManager(self.config)
async def initialize(self):
"""初始化数据库"""
await self.db_manager.initialize()
async def cleanup(self):
"""清理资源"""
await self.db_manager.cleanup()
async def reset_extracted(self, include_pattern: str = None) -> int:
"""将账号的extracted字段重置为0"""
try:
query = """
UPDATE email_accounts
SET
extracted = 0,
updated_at = CURRENT_TIMESTAMP
WHERE status = 'success' AND sold = 1
"""
params = []
# 添加条件过滤
if include_pattern:
query += " AND email LIKE ?"
params.append(f"%{include_pattern}%")
async with self.db_manager.get_connection() as conn:
cursor = await conn.execute(query, tuple(params))
updated_count = cursor.rowcount
await conn.commit()
self.logger.success(f"成功重置 {updated_count} 个账号的extracted状态")
return updated_count
except Exception as e:
self.logger.error(f"重置extracted状态时出错: {e}")
return 0
async def count_success_accounts(self) -> Dict[str, int]:
"""统计成功账号的提取状态"""
try:
async with self.db_manager.get_connection() as conn:
# 统计已提取的账号
cursor1 = await conn.execute(
"SELECT COUNT(*) FROM email_accounts WHERE status = 'success' AND sold = 1 AND extracted = 1"
)
extracted_count = (await cursor1.fetchone())[0]
# 统计未提取的账号
cursor2 = await conn.execute(
"SELECT COUNT(*) FROM email_accounts WHERE status = 'success' AND sold = 1 AND extracted = 0"
)
not_extracted_count = (await cursor2.fetchone())[0]
# 统计总成功账号
cursor3 = await conn.execute(
"SELECT COUNT(*) FROM email_accounts WHERE status = 'success' AND sold = 1"
)
total_count = (await cursor3.fetchone())[0]
return {
"extracted": extracted_count,
"not_extracted": not_extracted_count,
"total": total_count
}
except Exception as e:
self.logger.error(f"统计账号时出错: {e}")
return {
"extracted": 0,
"not_extracted": 0,
"total": 0
}
async def main():
# 解析命令行参数
import argparse
parser = argparse.ArgumentParser(description="重置账号的已提取状态")
parser.add_argument("--pattern", type=str, help="只重置匹配此模式的邮箱")
parser.add_argument("--dry-run", action="store_true", help="仅统计,不重置")
args = parser.parse_args()
# 初始化
resetter = ExtractedResetter()
await resetter.initialize()
try:
# 统计当前状态
stats = await resetter.count_success_accounts()
logger.info(f"当前状态: 总成功账号: {stats['total']}, 已提取: {stats['extracted']}, 未提取: {stats['not_extracted']}")
# 预览模式
if args.dry_run:
logger.info("预览模式,不执行重置操作")
return
# 执行重置
if args.pattern:
logger.info(f"将重置邮箱包含 '{args.pattern}' 的账号的extracted状态")
else:
logger.info("将重置所有成功账号的extracted状态")
updated = await resetter.reset_extracted(args.pattern)
# 统计重置后的状态
new_stats = await resetter.count_success_accounts()
logger.info(f"重置后状态: 总成功账号: {new_stats['total']}, 已提取: {new_stats['extracted']}, 未提取: {new_stats['not_extracted']}")
except Exception as e:
logger.error(f"程序执行出错: {str(e)}")
finally:
await resetter.cleanup()
if __name__ == "__main__":
asyncio.run(main())