Files
duoplus/email_manager.py

463 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
自建邮箱管理模块
支持创建邮箱用户和IMAP收件
使用 Stalwart API
"""
import imaplib
import email
import time
import re
import random
import string
import secrets
from typing import Optional, List, Dict, Tuple
from datetime import datetime, timedelta
from colorama import init, Fore
from stalwart_client import StalwartClient
from exceptions import StalwartError, ApiError
# 初始化 colorama
init(autoreset=True)
class EmailManager:
"""邮箱管理器"""
def __init__(self, api_base_url: str = "https://mail.evnmail.com/api",
api_key: str = "admin:Hunter1520.",
imap_server: str = "mail.evnmail.com",
imap_port: int = 993):
"""
初始化邮箱管理器
Args:
api_base_url: Stalwart API基础URL
api_key: API认证密钥 (格式: username:password)
imap_server: IMAP服务器地址
imap_port: IMAP端口
"""
self.api_base_url = api_base_url
self.api_key = api_key
self.imap_server = imap_server
self.imap_port = imap_port
# 初始化 Stalwart 客户端
self.client = StalwartClient(api_base_url, api_key=api_key)
def generate_random_password(self, length: int = 12) -> str:
"""
生成随机安全密码(只包含大小写字母和数字)
Args:
length: 密码长度
Returns:
随机密码
"""
characters = string.ascii_letters + string.digits
password = [
secrets.choice(string.ascii_uppercase),
secrets.choice(string.ascii_lowercase),
secrets.choice(string.digits),
]
password.extend(secrets.choice(characters) for _ in range(length - 3))
random.shuffle(password)
return ''.join(password)
def generate_random_username(self, prefix: str = "user", length: int = 8) -> str:
"""
生成随机用户名
Args:
prefix: 用户名前缀
length: 随机部分长度
Returns:
随机用户名
"""
random_part = ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
return f"{prefix}{random_part}"
def create_email_account(self, username: Optional[str] = None,
domain: str = "cursor.edu.kg",
password: Optional[str] = None,
quota_mb: int = 10) -> Dict[str, str]:
"""
创建邮箱账户
Args:
username: 用户名如果为None则随机生成
domain: 邮箱域名
password: 密码如果为None则随机生成
quota_mb: 邮箱配额MB
Returns:
包含邮箱信息的字典
"""
# 生成用户名和密码
if not username:
username = self.generate_random_username()
if not password:
password = self.generate_random_password()
email_address = f"{username}@{domain}"
quota_bytes = quota_mb * 1024 * 1024
print(f"{Fore.CYAN}[Email] 创建邮箱账户: {email_address}")
# 准备用户数据
user_data = {
"type": "individual",
"name": email_address, # 使用完整邮箱地址作为name
"description": f"{email_address} account",
"quota": quota_bytes,
"emails": [email_address],
"roles": ["user"],
"memberOf": [],
"secrets": [password] # 直接在创建时设置密码
}
try:
# 使用 Stalwart 客户端创建用户
print(f"{Fore.YELLOW}[Email] 提交用户创建请求...")
response = self.client.create_principal(user_data)
# 获取用户ID
user_id = response.get('data') if isinstance(response, dict) else response
if user_id:
print(f"{Fore.GREEN}[Email] ✅ 邮箱创建成功ID: {user_id}")
# 验证用户信息
try:
user_info = self.client.fetch_principal(user_id)
print(f"{Fore.GREEN}[Email] 验证完成")
except Exception as e:
print(f"{Fore.YELLOW}[Email] 验证失败,但用户已创建: {str(e)}")
return {
"success": True,
"user_id": user_id,
"email": email_address,
"username": username,
"password": password,
"domain": domain,
"imap_server": self.imap_server,
"imap_port": self.imap_port
}
else:
print(f"{Fore.RED}[Email] 创建失败: 未返回用户ID")
return {"success": False, "message": "No user ID returned"}
except ApiError as e:
print(f"{Fore.RED}[Email] API错误: {e}")
return {"success": False, "message": f"API Error: {str(e)}"}
except StalwartError as e:
print(f"{Fore.RED}[Email] Stalwart错误: {e}")
return {"success": False, "message": f"Stalwart Error: {str(e)}"}
except Exception as e:
print(f"{Fore.RED}[Email] 创建异常: {str(e)}")
return {"success": False, "message": str(e)}
def connect_imap(self, email_address: str, password: str) -> Optional[imaplib.IMAP4_SSL]:
"""
连接到IMAP服务器
Args:
email_address: 邮箱地址
password: 密码
Returns:
IMAP连接对象
"""
try:
print(f"{Fore.YELLOW}[IMAP] 连接到 {self.imap_server}:{self.imap_port}")
# 连接到IMAP服务器
imap = imaplib.IMAP4_SSL(self.imap_server, self.imap_port)
# 登录
imap.login(email_address, password)
print(f"{Fore.GREEN}[IMAP] 登录成功")
return imap
except imaplib.IMAP4.error as e:
print(f"{Fore.RED}[IMAP] 登录失败: {str(e)}")
return None
except Exception as e:
print(f"{Fore.RED}[IMAP] 连接失败: {str(e)}")
return None
def fetch_verification_code(self, email_address: str, password: str,
sender_filter: str = "@duoplus",
timeout: int = 60,
check_interval: int = 5) -> Optional[str]:
"""
从邮箱获取验证码
Args:
email_address: 邮箱地址
password: 密码
sender_filter: 发件人过滤条件
timeout: 超时时间(秒)
check_interval: 检查间隔(秒)
Returns:
验证码字符串
"""
print(f"{Fore.CYAN}[IMAP] 等待接收验证码邮件...")
start_time = time.time()
while time.time() - start_time < timeout:
imap = self.connect_imap(email_address, password)
if not imap:
time.sleep(check_interval)
continue
try:
# 选择收件箱
imap.select('INBOX')
# 搜索未读邮件
search_criteria = f'(UNSEEN FROM "{sender_filter}")'
result, data = imap.search(None, search_criteria)
if result == 'OK' and data[0]:
email_ids = data[0].split()
# 获取最新的邮件
for email_id in reversed(email_ids):
result, data = imap.fetch(email_id, '(RFC822)')
if result == 'OK':
raw_email = data[0][1]
msg = email.message_from_bytes(raw_email)
# 获取邮件内容
body = self._get_email_body(msg)
# 提取验证码通常是6位数字
code_patterns = [
r'验证码[:]\s*(\d{6})',
r'验证码是[:]\s*(\d{6})',
r'您的验证码是[:]\s*(\d{6})',
r'(\d{6})', # 直接匹配6位数字
]
for pattern in code_patterns:
match = re.search(pattern, body)
if match:
code = match.group(1)
print(f"{Fore.GREEN}[IMAP] ✅ 获取到验证码: {code}")
# 标记邮件为已读
imap.store(email_id, '+FLAGS', '\\Seen')
imap.close()
imap.logout()
return code
imap.close()
imap.logout()
except Exception as e:
print(f"{Fore.YELLOW}[IMAP] 检查邮件时出错: {str(e)}")
if imap:
try:
imap.logout()
except:
pass
# 等待下次检查
elapsed = int(time.time() - start_time)
remaining = timeout - elapsed
print(f"{Fore.YELLOW}[IMAP] 暂未收到验证码,{remaining}秒后超时...")
time.sleep(check_interval)
print(f"{Fore.RED}[IMAP] 获取验证码超时")
return None
def _get_email_body(self, msg) -> str:
"""
提取邮件正文
Args:
msg: email.message对象
Returns:
邮件正文文本
"""
body = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if content_type == "text/plain" and "attachment" not in content_disposition:
try:
body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
break
except:
pass
elif content_type == "text/html" and not body:
try:
html_body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
# 简单移除HTML标签
body = re.sub('<[^<]+?>', '', html_body)
except:
pass
else:
try:
body = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
except:
body = str(msg.get_payload())
return body
def get_recent_emails(self, email_address: str, password: str,
count: int = 5) -> List[Dict[str, str]]:
"""
获取最近的邮件列表
Args:
email_address: 邮箱地址
password: 密码
count: 获取邮件数量
Returns:
邮件列表
"""
emails = []
imap = self.connect_imap(email_address, password)
if not imap:
return emails
try:
# 选择收件箱
imap.select('INBOX')
# 搜索所有邮件
result, data = imap.search(None, 'ALL')
if result == 'OK' and data[0]:
email_ids = data[0].split()
# 获取最近的几封邮件
for email_id in reversed(email_ids[-count:]):
result, data = imap.fetch(email_id, '(RFC822)')
if result == 'OK':
raw_email = data[0][1]
msg = email.message_from_bytes(raw_email)
emails.append({
"id": email_id.decode(),
"from": msg.get('From', ''),
"to": msg.get('To', ''),
"subject": msg.get('Subject', ''),
"date": msg.get('Date', ''),
"body": self._get_email_body(msg)[:200] + "..."
})
imap.close()
imap.logout()
except Exception as e:
print(f"{Fore.RED}[IMAP] 获取邮件列表失败: {str(e)}")
if imap:
try:
imap.logout()
except:
pass
return emails
class AutoEmailVerification:
"""自动邮箱验证码获取"""
def __init__(self, email_manager: EmailManager):
self.email_manager = email_manager
def create_temp_email(self, domain: str = "cursor.edu.kg") -> Dict[str, str]:
"""
创建临时邮箱
Args:
domain: 邮箱域名
Returns:
邮箱信息
"""
return self.email_manager.create_email_account(domain=domain)
def wait_for_code(self, email_info: Dict[str, str],
sender_filter: str = "@duoplus",
timeout: int = 60) -> Optional[str]:
"""
等待并获取验证码
Args:
email_info: 邮箱信息字典
sender_filter: 发件人过滤
timeout: 超时时间
Returns:
验证码
"""
if not email_info.get("success"):
print(f"{Fore.RED}[Auto] 邮箱信息无效")
return None
return self.email_manager.fetch_verification_code(
email_info["email"],
email_info["password"],
sender_filter=sender_filter,
timeout=timeout
)
# 测试函数
def test_email_system():
"""测试邮箱系统"""
print(f"{Fore.CYAN}{'='*60}")
print(f"{Fore.CYAN}测试自建邮箱系统")
print(f"{Fore.CYAN}{'='*60}\n")
# 创建邮箱管理器
manager = EmailManager()
# 创建测试邮箱
email_info = manager.create_email_account(
username=None, # 自动生成
domain="cursor.edu.kg",
password=None, # 自动生成
quota_mb=10
)
if email_info.get("success"):
print(f"\n{Fore.GREEN}邮箱创建成功!")
print(f"{Fore.GREEN}邮箱地址: {email_info['email']}")
print(f"{Fore.GREEN}密码: {email_info['password']}")
# 测试IMAP连接
print(f"\n{Fore.CYAN}测试IMAP连接...")
imap = manager.connect_imap(email_info['email'], email_info['password'])
if imap:
print(f"{Fore.GREEN}IMAP连接成功")
imap.logout()
else:
print(f"{Fore.RED}IMAP连接失败")
else:
print(f"{Fore.RED}邮箱创建失败: {email_info.get('message')}")
if __name__ == "__main__":
test_email_system()