196 lines
7.3 KiB
Python
196 lines
7.3 KiB
Python
import logging
|
|
import time
|
|
import re
|
|
from config import Config
|
|
import requests
|
|
import email
|
|
import imaplib
|
|
|
|
|
|
class EmailVerificationHandler:
|
|
def __init__(self):
|
|
self.imap = Config().get_imap()
|
|
self.username = Config().get_temp_mail()
|
|
self.epin = Config().get_temp_mail_epin()
|
|
self.session = requests.Session()
|
|
self.emailExtension = Config().get_temp_mail_ext()
|
|
|
|
def get_verification_code(self, max_retries=5, retry_interval=60):
|
|
"""
|
|
获取验证码,带有重试机制。
|
|
|
|
Args:
|
|
max_retries: 最大重试次数。
|
|
retry_interval: 重试间隔时间(秒)。
|
|
|
|
Returns:
|
|
验证码 (字符串或 None)。
|
|
"""
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
logging.info(f"尝试获取验证码 (第 {attempt + 1}/{max_retries} 次)...")
|
|
|
|
if not self.imap:
|
|
verify_code, first_id = self._get_latest_mail_code()
|
|
if verify_code is not None and first_id is not None:
|
|
self._cleanup_mail(first_id)
|
|
return verify_code
|
|
else:
|
|
verify_code = self._get_mail_code_by_imap()
|
|
if verify_code is not None:
|
|
return verify_code
|
|
|
|
if attempt < max_retries - 1: # 除了最后一次尝试,都等待
|
|
logging.warning(f"未获取到验证码,{retry_interval} 秒后重试...")
|
|
time.sleep(retry_interval)
|
|
|
|
except Exception as e:
|
|
logging.error(f"获取验证码失败: {e}") # 记录更一般的异常
|
|
if attempt < max_retries - 1:
|
|
logging.error(f"发生错误,{retry_interval} 秒后重试...")
|
|
time.sleep(retry_interval)
|
|
else:
|
|
raise Exception(f"获取验证码失败且已达最大重试次数: {e}") from e
|
|
|
|
raise Exception(f"经过 {max_retries} 次尝试后仍未获取到验证码。")
|
|
|
|
# 使用imap获取邮件
|
|
def _get_mail_code_by_imap(self, retry = 0):
|
|
if retry > 0:
|
|
time.sleep(3)
|
|
if retry >= 20:
|
|
raise Exception("获取验证码超时")
|
|
try:
|
|
# 连接到IMAP服务器
|
|
mail = imaplib.IMAP4_SSL(self.imap['imap_server'], self.imap['imap_port'])
|
|
mail.login(self.imap['imap_user'], self.imap['imap_pass'])
|
|
mail.select(self.imap['imap_dir'])
|
|
|
|
status, messages = mail.search(None, 'FROM', '"no-reply@cursor.sh"')
|
|
if status != 'OK':
|
|
return None
|
|
|
|
mail_ids = messages[0].split()
|
|
if not mail_ids:
|
|
# 没有获取到,就在获取一次
|
|
return self._get_mail_code_by_imap(retry=retry + 1)
|
|
|
|
latest_mail_id = mail_ids[-1]
|
|
|
|
# 获取邮件内容
|
|
status, msg_data = mail.fetch(latest_mail_id, '(RFC822)')
|
|
if status != 'OK':
|
|
return None
|
|
|
|
raw_email = msg_data[0][1]
|
|
email_message = email.message_from_bytes(raw_email)
|
|
|
|
# 提取邮件正文
|
|
body = self._extract_imap_body(email_message)
|
|
if body:
|
|
# 使用正则表达式查找6位数字验证码
|
|
code_match = re.search(r"\b\d{6}\b", body)
|
|
if code_match:
|
|
code = code_match.group()
|
|
# 删除邮件
|
|
mail.store(latest_mail_id, '+FLAGS', '\\Deleted')
|
|
mail.expunge()
|
|
mail.logout()
|
|
# print(f"找到的验证码: {code}")
|
|
return code
|
|
# print("未找到验证码")
|
|
mail.logout()
|
|
return None
|
|
except Exception as e:
|
|
print(f"发生错误: {e}")
|
|
return None
|
|
|
|
def _extract_imap_body(self, email_message):
|
|
# 提取邮件正文
|
|
if email_message.is_multipart():
|
|
for part in email_message.walk():
|
|
content_type = part.get_content_type()
|
|
content_disposition = str(part.get("Content-Disposition"))
|
|
if content_type == "text/plain" and "attachment" not in content_disposition:
|
|
charset = part.get_content_charset() or 'utf-8'
|
|
try:
|
|
body = part.get_payload(decode=True).decode(charset, errors='ignore')
|
|
return body
|
|
except Exception as e:
|
|
logging.error(f"解码邮件正文失败: {e}")
|
|
else:
|
|
content_type = email_message.get_content_type()
|
|
if content_type == "text/plain":
|
|
charset = email_message.get_content_charset() or 'utf-8'
|
|
try:
|
|
body = email_message.get_payload(decode=True).decode(charset, errors='ignore')
|
|
return body
|
|
except Exception as e:
|
|
logging.error(f"解码邮件正文失败: {e}")
|
|
return ""
|
|
|
|
# 手动输入验证码
|
|
def _get_latest_mail_code(self):
|
|
# 获取邮件列表
|
|
mail_list_url = f"https://tempmail.plus/api/mails?email={self.username}{self.emailExtension}&limit=20&epin={self.epin}"
|
|
mail_list_response = self.session.get(mail_list_url)
|
|
mail_list_data = mail_list_response.json()
|
|
time.sleep(0.5)
|
|
if not mail_list_data.get("result"):
|
|
return None, None
|
|
|
|
# 获取最新邮件的ID
|
|
first_id = mail_list_data.get("first_id")
|
|
if not first_id:
|
|
return None, None
|
|
|
|
# 获取具体邮件内容
|
|
mail_detail_url = f"https://tempmail.plus/api/mails/{first_id}?email={self.username}{self.emailExtension}&epin={self.epin}"
|
|
mail_detail_response = self.session.get(mail_detail_url)
|
|
mail_detail_data = mail_detail_response.json()
|
|
time.sleep(0.5)
|
|
if not mail_detail_data.get("result"):
|
|
return None, None
|
|
|
|
# 从邮件文本中提取6位数字验证码
|
|
mail_text = mail_detail_data.get("text", "")
|
|
mail_subject = mail_detail_data.get("subject", "")
|
|
logging.info(f"找到邮件主题: {mail_subject}")
|
|
# 修改正则表达式,确保 6 位数字不紧跟在字母或域名相关符号后面
|
|
code_match = re.search(r"(?<![a-zA-Z@.])\b\d{6}\b", mail_text)
|
|
|
|
if code_match:
|
|
return code_match.group(), first_id
|
|
return None, None
|
|
|
|
def _cleanup_mail(self, first_id):
|
|
# 构造删除请求的URL和数据
|
|
delete_url = "https://tempmail.plus/api/mails/"
|
|
payload = {
|
|
"email": f"{self.username}{self.emailExtension}",
|
|
"first_id": first_id,
|
|
"epin": f"{self.epin}",
|
|
}
|
|
|
|
# 最多尝试5次
|
|
for _ in range(5):
|
|
response = self.session.delete(delete_url, data=payload)
|
|
try:
|
|
result = response.json().get("result")
|
|
if result is True:
|
|
return True
|
|
except:
|
|
pass
|
|
|
|
# 如果失败,等待0.5秒后重试
|
|
time.sleep(0.5)
|
|
|
|
return False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
email_handler = EmailVerificationHandler()
|
|
code = email_handler.get_verification_code()
|
|
print(code)
|