feat: 初始化项目,添加重置功能

This commit is contained in:
Your Name
2025-02-09 22:11:39 +08:00
parent 12a73d4496
commit 348c233194

View File

@@ -1,220 +0,0 @@
import logging
import time
import re
from config import Config
import requests
import email
import imaplib
import json
class EmailVerificationHandler:
def __init__(self):
self.imap = Config().get_imap()
self.username = Config().get_temp_mail()
self.epin = Config().get_temp_mail_epin()
self.session = requests.Session()
self.emailExtension = Config().get_temp_mail_ext()
def get_verification_code(self):
"""获取验证码,带重试机制"""
max_retries = 5
retry_delay = 3
code = None
for attempt in range(max_retries):
try:
print(f"\n{attempt + 1} 次尝试获取验证码...")
if self.imap is False:
# 等待并获取最新邮件
code, first_id = self._get_latest_mail_code()
if code:
print(f"成功获取验证码: {code}")
return code
elif first_id: # 有邮件但没有验证码
self._cleanup_mail(first_id)
else:
code = self._get_mail_code_by_imap()
if code:
print(f"成功获取验证码: {code}")
return code
if attempt < max_retries - 1:
print(f"等待 {retry_delay} 秒后重试...")
time.sleep(retry_delay)
except Exception as e:
print(f"获取验证码时出错: {str(e)}")
if attempt < max_retries - 1:
print(f"等待 {retry_delay} 秒后重试...")
time.sleep(retry_delay)
print("获取验证码失败,已达到最大重试次数")
return None
# 使用imap获取邮件
def _get_mail_code_by_imap(self, retry = 0):
if retry > 0:
time.sleep(3)
if retry >= 20:
raise Exception("获取验证码超时")
try:
# 连接到IMAP服务器
mail = imaplib.IMAP4_SSL(self.imap['imap_server'], self.imap['imap_port'])
mail.login(self.imap['imap_user'], self.imap['imap_pass'])
mail.select(self.imap['imap_dir'])
status, messages = mail.search(None, 'FROM', '"no-reply@cursor.sh"')
if status != 'OK':
return None
mail_ids = messages[0].split()
if not mail_ids:
# 没有获取到,就在获取一次
return self._get_mail_code_by_imap(retry=retry + 1)
latest_mail_id = mail_ids[-1]
# 获取邮件内容
status, msg_data = mail.fetch(latest_mail_id, '(RFC822)')
if status != 'OK':
return None
raw_email = msg_data[0][1]
email_message = email.message_from_bytes(raw_email)
# 提取邮件正文
body = self._extract_imap_body(email_message)
if body:
# 使用正则表达式查找6位数字验证码
code_match = re.search(r"\b\d{6}\b", body)
if code_match:
code = code_match.group()
# 删除邮件
mail.store(latest_mail_id, '+FLAGS', '\\Deleted')
mail.expunge()
mail.logout()
# print(f"找到的验证码: {code}")
return code
# print("未找到验证码")
mail.logout()
return None
except Exception as e:
print(f"发生错误: {e}")
return None
def _extract_imap_body(self, email_message):
# 提取邮件正文
if email_message.is_multipart():
for part in email_message.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if content_type == "text/plain" and "attachment" not in content_disposition:
charset = part.get_content_charset() or 'utf-8'
try:
body = part.get_payload(decode=True).decode(charset, errors='ignore')
return body
except Exception as e:
logging.error(f"解码邮件正文失败: {e}")
else:
content_type = email_message.get_content_type()
if content_type == "text/plain":
charset = email_message.get_content_charset() or 'utf-8'
try:
body = email_message.get_payload(decode=True).decode(charset, errors='ignore')
return body
except Exception as e:
logging.error(f"解码邮件正文失败: {e}")
return ""
def _get_latest_mail_code(self):
"""获取最新邮件的验证码"""
try:
# 配置请求会话
self.session.verify = False
self.session.proxies = {"http": None, "https": None}
self.session.timeout = 30
# 获取邮件列表
mail_list_url = f"https://tempmail.plus/api/mails?email={self.username}{self.emailExtension}&limit=20"
if self.epin:
mail_list_url += f"&epin={self.epin}"
print(f"正在请求邮件列表: {mail_list_url}")
mail_list_response = self.session.get(mail_list_url)
mail_list_data = mail_list_response.json()
if not mail_list_data.get("result"):
print("获取邮件列表失败")
return None, None
# 获取最新邮件的ID
first_id = mail_list_data.get("first_id")
if not first_id:
print("未找到新邮件")
return None, None
# 获取具体邮件内容
mail_detail_url = f"https://tempmail.plus/api/mails/{first_id}?email={self.username}{self.emailExtension}"
if self.epin:
mail_detail_url += f"&epin={self.epin}"
print(f"正在获取邮件内容: {mail_detail_url}")
mail_detail_response = self.session.get(mail_detail_url)
mail_detail_data = mail_detail_response.json()
if not mail_detail_data.get("result"):
print("获取邮件内容失败")
return None, None
# 从邮件文本中提取6位数字验证码
mail_text = mail_detail_data.get("text", "")
code_match = re.search(r"(?<![a-zA-Z@.])\b\d{6}\b", mail_text)
if code_match:
return code_match.group(), first_id
print("邮件中未找到验证码")
return None, None
except requests.exceptions.RequestException as e:
print(f"网络请求错误: {str(e)}")
return None, None
except json.JSONDecodeError as e:
print(f"JSON解析错误: {str(e)}")
return None, None
except Exception as e:
print(f"处理邮件时出错: {str(e)}")
return None, None
def _cleanup_mail(self, first_id):
# 构造删除请求的URL和数据
delete_url = "https://tempmail.plus/api/mails/"
payload = {
"email": f"{self.username}{self.emailExtension}",
"first_id": first_id
}
if self.epin:
payload["epin"] = self.epin
# 最多尝试5次
for _ in range(5):
response = self.session.delete(delete_url, data=payload, verify=False, proxies={"http": None, "https": None})
try:
result = response.json().get("result")
if result is True:
return True
except:
pass
# 如果失败,等待0.5秒后重试
time.sleep(0.5)
return False
if __name__ == "__main__":
email_handler = EmailVerificationHandler()
code = email_handler.get_verification_code()
print(code)