import logging import time import re from config import Config import requests import email import imaplib import json class EmailVerificationHandler: def __init__(self): config = Config() self.imap = config.get_imap() self.username = config.get_temp_mail() self.epin = config.get_temp_mail_epin() # 创建新的session并设置基本配置 self.session = requests.Session() self.session.verify = False self.session.trust_env = False # 设置超时和重试 self.session.timeout = 30 adapter = requests.adapters.HTTPAdapter(max_retries=3) self.session.mount('http://', adapter) self.session.mount('https://', adapter) self.emailExtension = config.get_temp_mail_ext() def get_verification_code(self): code = None try: if self.imap is False: code, first_id = self._get_latest_mail_code() if first_id: self._cleanup_mail(first_id) else: code = self._get_mail_code_by_imap() except Exception as e: logging.error(f"获取验证码失败: {str(e)}") return code # 使用imap获取邮件 def _get_mail_code_by_imap(self, retry = 0): if retry > 0: time.sleep(3) if retry >= 20: raise Exception("获取验证码超时") try: # 连接到IMAP服务器 mail = imaplib.IMAP4_SSL(self.imap['imap_server'], self.imap['imap_port']) mail.login(self.imap['imap_user'], self.imap['imap_pass']) mail.select(self.imap['imap_dir']) status, messages = mail.search(None, 'FROM', '"no-reply@cursor.sh"') if status != 'OK': return None mail_ids = messages[0].split() if not mail_ids: # 没有获取到,就在获取一次 return self._get_mail_code_by_imap(retry=retry + 1) latest_mail_id = mail_ids[-1] # 获取邮件内容 status, msg_data = mail.fetch(latest_mail_id, '(RFC822)') if status != 'OK': return None raw_email = msg_data[0][1] email_message = email.message_from_bytes(raw_email) # 提取邮件正文 body = self._extract_imap_body(email_message) if body: # 使用正则表达式查找6位数字验证码 code_match = re.search(r"\b\d{6}\b", body) if code_match: code = code_match.group() # 删除邮件 mail.store(latest_mail_id, '+FLAGS', '\\Deleted') mail.expunge() mail.logout() # print(f"找到的验证码: {code}") return code # print("未找到验证码") mail.logout() return None except Exception as e: print(f"发生错误: {e}") return None def _extract_imap_body(self, email_message): # 提取邮件正文 if email_message.is_multipart(): for part in email_message.walk(): content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition")) if content_type == "text/plain" and "attachment" not in content_disposition: charset = part.get_content_charset() or 'utf-8' try: body = part.get_payload(decode=True).decode(charset, errors='ignore') return body except Exception as e: logging.error(f"解码邮件正文失败: {e}") else: content_type = email_message.get_content_type() if content_type == "text/plain": charset = email_message.get_content_charset() or 'utf-8' try: body = email_message.get_payload(decode=True).decode(charset, errors='ignore') return body except Exception as e: logging.error(f"解码邮件正文失败: {e}") return "" def _get_latest_mail_code(self): # 获取邮件列表 mail_list_url = f"https://tempmail.plus/api/mails?email={self.username}{self.emailExtension}&limit=20&epin={self.epin}" mail_list_response = self.session.get(mail_list_url) mail_list_data = mail_list_response.json() time.sleep(0.5) if not mail_list_data.get("result"): return None, None # 获取最新邮件的ID first_id = mail_list_data.get("first_id") if not first_id: return None, None # 获取具体邮件内容 mail_detail_url = f"https://tempmail.plus/api/mails/{first_id}?email={self.username}{self.emailExtension}&epin={self.epin}" mail_detail_response = self.session.get(mail_detail_url) mail_detail_data = mail_detail_response.json() time.sleep(0.5) if not mail_detail_data.get("result"): return None, None # 从邮件文本中提取6位数字验证码 mail_text = mail_detail_data.get("text", "") code_match = re.search(r"(?