Files
auto_cursor_online/read_db.py
2025-03-31 09:55:54 +08:00

224 lines
7.8 KiB
Python

import sqlite3
import sys
from datetime import datetime
def format_timestamp(timestamp):
"""格式化时间戳为可读格式"""
if not timestamp:
return ""
try:
dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
return dt.strftime('%Y-%m-%d')
except:
return timestamp
def read_accounts(limit=10, offset=0):
"""读取数据库中的账号信息"""
try:
# 连接到数据库
conn = sqlite3.connect('cursor.db')
# 设置行工厂,让查询结果以字典形式返回
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 获取总账号数
cursor.execute("SELECT COUNT(*) FROM email_accounts")
total_count = cursor.fetchone()[0]
# 按各种状态统计账号数量
cursor.execute("SELECT COUNT(*) FROM email_accounts WHERE status = 'success'")
success_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM email_accounts WHERE in_use = 1")
in_use_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM email_accounts WHERE sold = 1")
sold_count = cursor.fetchone()[0]
# 获取账号详情
cursor.execute(f"""
SELECT id, email, password, cursor_password, status, in_use, sold, created_at, updated_at
FROM email_accounts
ORDER BY id DESC
LIMIT {limit} OFFSET {offset}
""")
accounts = cursor.fetchall()
# 打印统计信息
print("-" * 80)
print(f"总账号数: {total_count} | 注册成功: {success_count} | 使用中: {in_use_count} | 已售出: {sold_count}")
success_rate = (success_count / total_count * 100) if total_count > 0 else 0
print(f"注册成功率: {success_rate:.2f}%")
print("-" * 80)
# 打印表头
print(f"{'ID':<5} {'邮箱':<25} {'密码':<15} {'Cursor密码':<15} {'状态':<8} {'使用中':<4} {'已售':<4} {'创建时间':<10}")
print("-" * 80)
# 打印账号信息
for account in accounts:
in_use = "" if account['in_use'] else ""
sold = "" if account['sold'] else ""
created_at = format_timestamp(account['created_at'])
# 截断过长的字段
email = account['email'][:23] + '..' if len(account['email']) > 25 else account['email']
password = account['password'][:13] + '..' if len(account['password']) > 15 else account['password']
cursor_pwd = (account['cursor_password'] or '')[:13] + '..' if account['cursor_password'] and len(account['cursor_password']) > 15 else (account['cursor_password'] or '')
print(f"{account['id']:<5} {email:<25} {password:<15} "
f"{cursor_pwd:<15} {account['status']:<8} "
f"{in_use:<4} {sold:<4} {created_at:<10}")
print("-" * 80)
print(f"显示 {len(accounts)} 条记录,总共 {total_count} 条记录")
except sqlite3.Error as e:
print(f"数据库错误: {e}")
except Exception as e:
print(f"发生错误: {e}")
finally:
if conn:
conn.close()
def analyze_domains():
"""分析邮箱域名分布"""
try:
conn = sqlite3.connect('cursor.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 提取域名并计数
cursor.execute("""
SELECT
substr(email, instr(email, '@') + 1) as domain,
COUNT(*) as count,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success_count
FROM email_accounts
GROUP BY domain
ORDER BY count DESC
""")
domains = cursor.fetchall()
print("\n----- 邮箱域名分布 -----")
print(f"{'域名':<20} {'账号数':<10} {'成功数':<10} {'成功率':<10}")
print("-" * 50)
for domain in domains:
success_rate = (domain['success_count'] / domain['count'] * 100) if domain['count'] > 0 else 0
print(f"{domain['domain']:<20} {domain['count']:<10} {domain['success_count']:<10} {success_rate:.2f}%")
except sqlite3.Error as e:
print(f"数据库错误: {e}")
finally:
if conn:
conn.close()
def analyze_time():
"""分析注册时间分布"""
try:
conn = sqlite3.connect('cursor.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 按日期分组统计
cursor.execute("""
SELECT
substr(created_at, 1, 10) as date,
COUNT(*) as total,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success
FROM email_accounts
GROUP BY date
ORDER BY date DESC
LIMIT 30
""")
dates = cursor.fetchall()
print("\n----- 注册时间分布 (最近30天) -----")
print(f"{'日期':<15} {'注册数':<10} {'成功数':<10} {'成功率':<10}")
print("-" * 50)
for date in dates:
success_rate = (date['success'] / date['total'] * 100) if date['total'] > 0 else 0
print(f"{date['date']:<15} {date['total']:<10} {date['success']:<10} {success_rate:.2f}%")
except sqlite3.Error as e:
print(f"数据库错误: {e}")
finally:
if conn:
conn.close()
def analyze_status():
"""分析注册状态分布"""
try:
conn = sqlite3.connect('cursor.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 按状态分组统计
cursor.execute("""
SELECT
status,
COUNT(*) as count,
(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM email_accounts)) as percentage
FROM email_accounts
GROUP BY status
ORDER BY count DESC
""")
statuses = cursor.fetchall()
print("\n----- 注册状态分布 -----")
print(f"{'状态':<20} {'数量':<10} {'百分比':<10}")
print("-" * 40)
for status in statuses:
print(f"{status['status']:<20} {status['count']:<10} {status['percentage']:.2f}%")
except sqlite3.Error as e:
print(f"数据库错误: {e}")
finally:
if conn:
conn.close()
if __name__ == "__main__":
# 解析命令行参数
limit = 20 # 默认显示20条
offset = 0 # 默认从第一条开始
if len(sys.argv) > 1:
if sys.argv[1] == 'domains':
analyze_domains()
sys.exit(0)
elif sys.argv[1] == 'time':
analyze_time()
sys.exit(0)
elif sys.argv[1] == 'status':
analyze_status()
sys.exit(0)
elif sys.argv[1] == 'all':
read_accounts(10, 0)
analyze_domains()
analyze_time()
analyze_status()
sys.exit(0)
try:
limit = int(sys.argv[1])
except ValueError:
print("参数错误: limit必须是整数")
sys.exit(1)
if len(sys.argv) > 2:
try:
offset = int(sys.argv[2])
except ValueError:
print("参数错误: offset必须是整数")
sys.exit(1)
read_accounts(limit, offset)
print("\n使用方法:")
print("python read_db.py [limit] [offset] - 显示账号列表")
print("python read_db.py domains - 显示邮箱域名分布")
print("python read_db.py time - 显示注册时间分布")
print("python read_db.py status - 显示注册状态分布")
print("python read_db.py all - 显示所有统计信息")