import sqlite3 import sys from datetime import datetime def format_timestamp(timestamp): """格式化时间戳为可读格式""" if not timestamp: return "无" try: dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) return dt.strftime('%Y-%m-%d') except: return timestamp def read_accounts(limit=10, offset=0): """读取数据库中的账号信息""" try: # 连接到数据库 conn = sqlite3.connect('cursor.db') # 设置行工厂,让查询结果以字典形式返回 conn.row_factory = sqlite3.Row cursor = conn.cursor() # 获取总账号数 cursor.execute("SELECT COUNT(*) FROM email_accounts") total_count = cursor.fetchone()[0] # 按各种状态统计账号数量 cursor.execute("SELECT COUNT(*) FROM email_accounts WHERE status = 'success'") success_count = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM email_accounts WHERE in_use = 1") in_use_count = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM email_accounts WHERE sold = 1") sold_count = cursor.fetchone()[0] # 获取账号详情 cursor.execute(f""" SELECT id, email, password, cursor_password, status, in_use, sold, created_at, updated_at FROM email_accounts ORDER BY id DESC LIMIT {limit} OFFSET {offset} """) accounts = cursor.fetchall() # 打印统计信息 print("-" * 80) print(f"总账号数: {total_count} | 注册成功: {success_count} | 使用中: {in_use_count} | 已售出: {sold_count}") success_rate = (success_count / total_count * 100) if total_count > 0 else 0 print(f"注册成功率: {success_rate:.2f}%") print("-" * 80) # 打印表头 print(f"{'ID':<5} {'邮箱':<25} {'密码':<15} {'Cursor密码':<15} {'状态':<8} {'使用中':<4} {'已售':<4} {'创建时间':<10}") print("-" * 80) # 打印账号信息 for account in accounts: in_use = "是" if account['in_use'] else "否" sold = "是" if account['sold'] else "否" created_at = format_timestamp(account['created_at']) # 截断过长的字段 email = account['email'][:23] + '..' if len(account['email']) > 25 else account['email'] password = account['password'][:13] + '..' if len(account['password']) > 15 else account['password'] cursor_pwd = (account['cursor_password'] or '无')[:13] + '..' if account['cursor_password'] and len(account['cursor_password']) > 15 else (account['cursor_password'] or '无') print(f"{account['id']:<5} {email:<25} {password:<15} " f"{cursor_pwd:<15} {account['status']:<8} " f"{in_use:<4} {sold:<4} {created_at:<10}") print("-" * 80) print(f"显示 {len(accounts)} 条记录,总共 {total_count} 条记录") except sqlite3.Error as e: print(f"数据库错误: {e}") except Exception as e: print(f"发生错误: {e}") finally: if conn: conn.close() def analyze_domains(): """分析邮箱域名分布""" try: conn = sqlite3.connect('cursor.db') conn.row_factory = sqlite3.Row cursor = conn.cursor() # 提取域名并计数 cursor.execute(""" SELECT substr(email, instr(email, '@') + 1) as domain, COUNT(*) as count, SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success_count FROM email_accounts GROUP BY domain ORDER BY count DESC """) domains = cursor.fetchall() print("\n----- 邮箱域名分布 -----") print(f"{'域名':<20} {'账号数':<10} {'成功数':<10} {'成功率':<10}") print("-" * 50) for domain in domains: success_rate = (domain['success_count'] / domain['count'] * 100) if domain['count'] > 0 else 0 print(f"{domain['domain']:<20} {domain['count']:<10} {domain['success_count']:<10} {success_rate:.2f}%") except sqlite3.Error as e: print(f"数据库错误: {e}") finally: if conn: conn.close() def analyze_time(): """分析注册时间分布""" try: conn = sqlite3.connect('cursor.db') conn.row_factory = sqlite3.Row cursor = conn.cursor() # 按日期分组统计 cursor.execute(""" SELECT substr(created_at, 1, 10) as date, COUNT(*) as total, SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success FROM email_accounts GROUP BY date ORDER BY date DESC LIMIT 30 """) dates = cursor.fetchall() print("\n----- 注册时间分布 (最近30天) -----") print(f"{'日期':<15} {'注册数':<10} {'成功数':<10} {'成功率':<10}") print("-" * 50) for date in dates: success_rate = (date['success'] / date['total'] * 100) if date['total'] > 0 else 0 print(f"{date['date']:<15} {date['total']:<10} {date['success']:<10} {success_rate:.2f}%") except sqlite3.Error as e: print(f"数据库错误: {e}") finally: if conn: conn.close() def analyze_status(): """分析注册状态分布""" try: conn = sqlite3.connect('cursor.db') conn.row_factory = sqlite3.Row cursor = conn.cursor() # 按状态分组统计 cursor.execute(""" SELECT status, COUNT(*) as count, (COUNT(*) * 100.0 / (SELECT COUNT(*) FROM email_accounts)) as percentage FROM email_accounts GROUP BY status ORDER BY count DESC """) statuses = cursor.fetchall() print("\n----- 注册状态分布 -----") print(f"{'状态':<20} {'数量':<10} {'百分比':<10}") print("-" * 40) for status in statuses: print(f"{status['status']:<20} {status['count']:<10} {status['percentage']:.2f}%") except sqlite3.Error as e: print(f"数据库错误: {e}") finally: if conn: conn.close() if __name__ == "__main__": # 解析命令行参数 limit = 20 # 默认显示20条 offset = 0 # 默认从第一条开始 if len(sys.argv) > 1: if sys.argv[1] == 'domains': analyze_domains() sys.exit(0) elif sys.argv[1] == 'time': analyze_time() sys.exit(0) elif sys.argv[1] == 'status': analyze_status() sys.exit(0) elif sys.argv[1] == 'all': read_accounts(10, 0) analyze_domains() analyze_time() analyze_status() sys.exit(0) try: limit = int(sys.argv[1]) except ValueError: print("参数错误: limit必须是整数") sys.exit(1) if len(sys.argv) > 2: try: offset = int(sys.argv[2]) except ValueError: print("参数错误: offset必须是整数") sys.exit(1) read_accounts(limit, offset) print("\n使用方法:") print("python read_db.py [limit] [offset] - 显示账号列表") print("python read_db.py domains - 显示邮箱域名分布") print("python read_db.py time - 显示注册时间分布") print("python read_db.py status - 显示注册状态分布") print("python read_db.py all - 显示所有统计信息")