463 lines
16 KiB
Python
463 lines
16 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
自建邮箱管理模块
|
||
支持创建邮箱用户和IMAP收件
|
||
使用 Stalwart API
|
||
"""
|
||
|
||
import imaplib
|
||
import email
|
||
import time
|
||
import re
|
||
import random
|
||
import string
|
||
import secrets
|
||
from typing import Optional, List, Dict, Tuple
|
||
from datetime import datetime, timedelta
|
||
from colorama import init, Fore
|
||
from stalwart_client import StalwartClient
|
||
from exceptions import StalwartError, ApiError
|
||
|
||
# 初始化 colorama
|
||
init(autoreset=True)
|
||
|
||
class EmailManager:
|
||
"""邮箱管理器"""
|
||
|
||
def __init__(self, api_base_url: str = "https://mail.evnmail.com/api",
|
||
api_key: str = "admin:Hunter1520.",
|
||
imap_server: str = "mail.evnmail.com",
|
||
imap_port: int = 993):
|
||
"""
|
||
初始化邮箱管理器
|
||
|
||
Args:
|
||
api_base_url: Stalwart API基础URL
|
||
api_key: API认证密钥 (格式: username:password)
|
||
imap_server: IMAP服务器地址
|
||
imap_port: IMAP端口
|
||
"""
|
||
self.api_base_url = api_base_url
|
||
self.api_key = api_key
|
||
self.imap_server = imap_server
|
||
self.imap_port = imap_port
|
||
|
||
# 初始化 Stalwart 客户端
|
||
self.client = StalwartClient(api_base_url, api_key=api_key)
|
||
|
||
def generate_random_password(self, length: int = 12) -> str:
|
||
"""
|
||
生成随机安全密码(只包含大小写字母和数字)
|
||
|
||
Args:
|
||
length: 密码长度
|
||
|
||
Returns:
|
||
随机密码
|
||
"""
|
||
characters = string.ascii_letters + string.digits
|
||
password = [
|
||
secrets.choice(string.ascii_uppercase),
|
||
secrets.choice(string.ascii_lowercase),
|
||
secrets.choice(string.digits),
|
||
]
|
||
password.extend(secrets.choice(characters) for _ in range(length - 3))
|
||
random.shuffle(password)
|
||
return ''.join(password)
|
||
|
||
def generate_random_username(self, prefix: str = "user", length: int = 8) -> str:
|
||
"""
|
||
生成随机用户名
|
||
|
||
Args:
|
||
prefix: 用户名前缀
|
||
length: 随机部分长度
|
||
|
||
Returns:
|
||
随机用户名
|
||
"""
|
||
random_part = ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
|
||
return f"{prefix}{random_part}"
|
||
|
||
def create_email_account(self, username: Optional[str] = None,
|
||
domain: str = "cursor.edu.kg",
|
||
password: Optional[str] = None,
|
||
quota_mb: int = 10) -> Dict[str, str]:
|
||
"""
|
||
创建邮箱账户
|
||
|
||
Args:
|
||
username: 用户名,如果为None则随机生成
|
||
domain: 邮箱域名
|
||
password: 密码,如果为None则随机生成
|
||
quota_mb: 邮箱配额(MB)
|
||
|
||
Returns:
|
||
包含邮箱信息的字典
|
||
"""
|
||
# 生成用户名和密码
|
||
if not username:
|
||
username = self.generate_random_username()
|
||
if not password:
|
||
password = self.generate_random_password()
|
||
|
||
email_address = f"{username}@{domain}"
|
||
quota_bytes = quota_mb * 1024 * 1024
|
||
|
||
print(f"{Fore.CYAN}[Email] 创建邮箱账户: {email_address}")
|
||
|
||
# 准备用户数据
|
||
user_data = {
|
||
"type": "individual",
|
||
"name": email_address, # 使用完整邮箱地址作为name
|
||
"description": f"{email_address} account",
|
||
"quota": quota_bytes,
|
||
"emails": [email_address],
|
||
"roles": ["user"],
|
||
"memberOf": [],
|
||
"secrets": [password] # 直接在创建时设置密码
|
||
}
|
||
|
||
try:
|
||
# 使用 Stalwart 客户端创建用户
|
||
print(f"{Fore.YELLOW}[Email] 提交用户创建请求...")
|
||
response = self.client.create_principal(user_data)
|
||
|
||
# 获取用户ID
|
||
user_id = response.get('data') if isinstance(response, dict) else response
|
||
|
||
if user_id:
|
||
print(f"{Fore.GREEN}[Email] ✅ 邮箱创建成功!ID: {user_id}")
|
||
|
||
# 验证用户信息
|
||
try:
|
||
user_info = self.client.fetch_principal(user_id)
|
||
print(f"{Fore.GREEN}[Email] 验证完成")
|
||
except Exception as e:
|
||
print(f"{Fore.YELLOW}[Email] 验证失败,但用户已创建: {str(e)}")
|
||
|
||
return {
|
||
"success": True,
|
||
"user_id": user_id,
|
||
"email": email_address,
|
||
"username": username,
|
||
"password": password,
|
||
"domain": domain,
|
||
"imap_server": self.imap_server,
|
||
"imap_port": self.imap_port
|
||
}
|
||
else:
|
||
print(f"{Fore.RED}[Email] 创建失败: 未返回用户ID")
|
||
return {"success": False, "message": "No user ID returned"}
|
||
|
||
except ApiError as e:
|
||
print(f"{Fore.RED}[Email] API错误: {e}")
|
||
return {"success": False, "message": f"API Error: {str(e)}"}
|
||
except StalwartError as e:
|
||
print(f"{Fore.RED}[Email] Stalwart错误: {e}")
|
||
return {"success": False, "message": f"Stalwart Error: {str(e)}"}
|
||
except Exception as e:
|
||
print(f"{Fore.RED}[Email] 创建异常: {str(e)}")
|
||
return {"success": False, "message": str(e)}
|
||
|
||
def connect_imap(self, email_address: str, password: str) -> Optional[imaplib.IMAP4_SSL]:
|
||
"""
|
||
连接到IMAP服务器
|
||
|
||
Args:
|
||
email_address: 邮箱地址
|
||
password: 密码
|
||
|
||
Returns:
|
||
IMAP连接对象
|
||
"""
|
||
try:
|
||
print(f"{Fore.YELLOW}[IMAP] 连接到 {self.imap_server}:{self.imap_port}")
|
||
|
||
# 连接到IMAP服务器
|
||
imap = imaplib.IMAP4_SSL(self.imap_server, self.imap_port)
|
||
|
||
# 登录
|
||
imap.login(email_address, password)
|
||
print(f"{Fore.GREEN}[IMAP] 登录成功")
|
||
|
||
return imap
|
||
|
||
except imaplib.IMAP4.error as e:
|
||
print(f"{Fore.RED}[IMAP] 登录失败: {str(e)}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"{Fore.RED}[IMAP] 连接失败: {str(e)}")
|
||
return None
|
||
|
||
def fetch_verification_code(self, email_address: str, password: str,
|
||
sender_filter: str = "@duoplus",
|
||
timeout: int = 60,
|
||
check_interval: int = 5) -> Optional[str]:
|
||
"""
|
||
从邮箱获取验证码
|
||
|
||
Args:
|
||
email_address: 邮箱地址
|
||
password: 密码
|
||
sender_filter: 发件人过滤条件
|
||
timeout: 超时时间(秒)
|
||
check_interval: 检查间隔(秒)
|
||
|
||
Returns:
|
||
验证码字符串
|
||
"""
|
||
print(f"{Fore.CYAN}[IMAP] 等待接收验证码邮件...")
|
||
|
||
start_time = time.time()
|
||
|
||
while time.time() - start_time < timeout:
|
||
imap = self.connect_imap(email_address, password)
|
||
if not imap:
|
||
time.sleep(check_interval)
|
||
continue
|
||
|
||
try:
|
||
# 选择收件箱
|
||
imap.select('INBOX')
|
||
|
||
# 搜索未读邮件
|
||
search_criteria = f'(UNSEEN FROM "{sender_filter}")'
|
||
result, data = imap.search(None, search_criteria)
|
||
|
||
if result == 'OK' and data[0]:
|
||
email_ids = data[0].split()
|
||
|
||
# 获取最新的邮件
|
||
for email_id in reversed(email_ids):
|
||
result, data = imap.fetch(email_id, '(RFC822)')
|
||
|
||
if result == 'OK':
|
||
raw_email = data[0][1]
|
||
msg = email.message_from_bytes(raw_email)
|
||
|
||
# 获取邮件内容
|
||
body = self._get_email_body(msg)
|
||
|
||
# 提取验证码(通常是6位数字)
|
||
code_patterns = [
|
||
r'验证码[::]\s*(\d{6})',
|
||
r'验证码是[::]\s*(\d{6})',
|
||
r'您的验证码是[::]\s*(\d{6})',
|
||
r'(\d{6})', # 直接匹配6位数字
|
||
]
|
||
|
||
for pattern in code_patterns:
|
||
match = re.search(pattern, body)
|
||
if match:
|
||
code = match.group(1)
|
||
print(f"{Fore.GREEN}[IMAP] ✅ 获取到验证码: {code}")
|
||
|
||
# 标记邮件为已读
|
||
imap.store(email_id, '+FLAGS', '\\Seen')
|
||
|
||
imap.close()
|
||
imap.logout()
|
||
return code
|
||
|
||
imap.close()
|
||
imap.logout()
|
||
|
||
except Exception as e:
|
||
print(f"{Fore.YELLOW}[IMAP] 检查邮件时出错: {str(e)}")
|
||
if imap:
|
||
try:
|
||
imap.logout()
|
||
except:
|
||
pass
|
||
|
||
# 等待下次检查
|
||
elapsed = int(time.time() - start_time)
|
||
remaining = timeout - elapsed
|
||
print(f"{Fore.YELLOW}[IMAP] 暂未收到验证码,{remaining}秒后超时...")
|
||
time.sleep(check_interval)
|
||
|
||
print(f"{Fore.RED}[IMAP] 获取验证码超时")
|
||
return None
|
||
|
||
def _get_email_body(self, msg) -> str:
|
||
"""
|
||
提取邮件正文
|
||
|
||
Args:
|
||
msg: email.message对象
|
||
|
||
Returns:
|
||
邮件正文文本
|
||
"""
|
||
body = ""
|
||
|
||
if msg.is_multipart():
|
||
for part in msg.walk():
|
||
content_type = part.get_content_type()
|
||
content_disposition = str(part.get("Content-Disposition"))
|
||
|
||
if content_type == "text/plain" and "attachment" not in content_disposition:
|
||
try:
|
||
body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
|
||
break
|
||
except:
|
||
pass
|
||
elif content_type == "text/html" and not body:
|
||
try:
|
||
html_body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
|
||
# 简单移除HTML标签
|
||
body = re.sub('<[^<]+?>', '', html_body)
|
||
except:
|
||
pass
|
||
else:
|
||
try:
|
||
body = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
|
||
except:
|
||
body = str(msg.get_payload())
|
||
|
||
return body
|
||
|
||
def get_recent_emails(self, email_address: str, password: str,
|
||
count: int = 5) -> List[Dict[str, str]]:
|
||
"""
|
||
获取最近的邮件列表
|
||
|
||
Args:
|
||
email_address: 邮箱地址
|
||
password: 密码
|
||
count: 获取邮件数量
|
||
|
||
Returns:
|
||
邮件列表
|
||
"""
|
||
emails = []
|
||
|
||
imap = self.connect_imap(email_address, password)
|
||
if not imap:
|
||
return emails
|
||
|
||
try:
|
||
# 选择收件箱
|
||
imap.select('INBOX')
|
||
|
||
# 搜索所有邮件
|
||
result, data = imap.search(None, 'ALL')
|
||
|
||
if result == 'OK' and data[0]:
|
||
email_ids = data[0].split()
|
||
|
||
# 获取最近的几封邮件
|
||
for email_id in reversed(email_ids[-count:]):
|
||
result, data = imap.fetch(email_id, '(RFC822)')
|
||
|
||
if result == 'OK':
|
||
raw_email = data[0][1]
|
||
msg = email.message_from_bytes(raw_email)
|
||
|
||
emails.append({
|
||
"id": email_id.decode(),
|
||
"from": msg.get('From', ''),
|
||
"to": msg.get('To', ''),
|
||
"subject": msg.get('Subject', ''),
|
||
"date": msg.get('Date', ''),
|
||
"body": self._get_email_body(msg)[:200] + "..."
|
||
})
|
||
|
||
imap.close()
|
||
imap.logout()
|
||
|
||
except Exception as e:
|
||
print(f"{Fore.RED}[IMAP] 获取邮件列表失败: {str(e)}")
|
||
if imap:
|
||
try:
|
||
imap.logout()
|
||
except:
|
||
pass
|
||
|
||
return emails
|
||
|
||
|
||
class AutoEmailVerification:
|
||
"""自动邮箱验证码获取"""
|
||
|
||
def __init__(self, email_manager: EmailManager):
|
||
self.email_manager = email_manager
|
||
|
||
def create_temp_email(self, domain: str = "cursor.edu.kg") -> Dict[str, str]:
|
||
"""
|
||
创建临时邮箱
|
||
|
||
Args:
|
||
domain: 邮箱域名
|
||
|
||
Returns:
|
||
邮箱信息
|
||
"""
|
||
return self.email_manager.create_email_account(domain=domain)
|
||
|
||
def wait_for_code(self, email_info: Dict[str, str],
|
||
sender_filter: str = "@duoplus",
|
||
timeout: int = 60) -> Optional[str]:
|
||
"""
|
||
等待并获取验证码
|
||
|
||
Args:
|
||
email_info: 邮箱信息字典
|
||
sender_filter: 发件人过滤
|
||
timeout: 超时时间
|
||
|
||
Returns:
|
||
验证码
|
||
"""
|
||
if not email_info.get("success"):
|
||
print(f"{Fore.RED}[Auto] 邮箱信息无效")
|
||
return None
|
||
|
||
return self.email_manager.fetch_verification_code(
|
||
email_info["email"],
|
||
email_info["password"],
|
||
sender_filter=sender_filter,
|
||
timeout=timeout
|
||
)
|
||
|
||
|
||
# 测试函数
|
||
def test_email_system():
|
||
"""测试邮箱系统"""
|
||
print(f"{Fore.CYAN}{'='*60}")
|
||
print(f"{Fore.CYAN}测试自建邮箱系统")
|
||
print(f"{Fore.CYAN}{'='*60}\n")
|
||
|
||
# 创建邮箱管理器
|
||
manager = EmailManager()
|
||
|
||
# 创建测试邮箱
|
||
email_info = manager.create_email_account(
|
||
username=None, # 自动生成
|
||
domain="cursor.edu.kg",
|
||
password=None, # 自动生成
|
||
quota_mb=10
|
||
)
|
||
|
||
if email_info.get("success"):
|
||
print(f"\n{Fore.GREEN}邮箱创建成功!")
|
||
print(f"{Fore.GREEN}邮箱地址: {email_info['email']}")
|
||
print(f"{Fore.GREEN}密码: {email_info['password']}")
|
||
|
||
# 测试IMAP连接
|
||
print(f"\n{Fore.CYAN}测试IMAP连接...")
|
||
imap = manager.connect_imap(email_info['email'], email_info['password'])
|
||
if imap:
|
||
print(f"{Fore.GREEN}IMAP连接成功!")
|
||
imap.logout()
|
||
else:
|
||
print(f"{Fore.RED}IMAP连接失败")
|
||
else:
|
||
print(f"{Fore.RED}邮箱创建失败: {email_info.get('message')}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
test_email_system() |