From 348c233194551f5f2adec2de6074eea6267056a3 Mon Sep 17 00:00:00 2001 From: Your Name Date: Sun, 9 Feb 2025 22:11:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=88=9D=E5=A7=8B=E5=8C=96=E9=A1=B9?= =?UTF-8?q?=E7=9B=AE=EF=BC=8C=E6=B7=BB=E5=8A=A0=E9=87=8D=E7=BD=AE=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- get_email_code copy.py | 220 ----------------------------------------- 1 file changed, 220 deletions(-) delete mode 100644 get_email_code copy.py diff --git a/get_email_code copy.py b/get_email_code copy.py deleted file mode 100644 index f83cc82..0000000 --- a/get_email_code copy.py +++ /dev/null @@ -1,220 +0,0 @@ -import logging -import time -import re -from config import Config -import requests -import email -import imaplib -import json - - -class EmailVerificationHandler: - def __init__(self): - self.imap = Config().get_imap() - self.username = Config().get_temp_mail() - self.epin = Config().get_temp_mail_epin() - self.session = requests.Session() - self.emailExtension = Config().get_temp_mail_ext() - - def get_verification_code(self): - """获取验证码,带重试机制""" - max_retries = 5 - retry_delay = 3 - code = None - - for attempt in range(max_retries): - try: - print(f"\n第 {attempt + 1} 次尝试获取验证码...") - - if self.imap is False: - # 等待并获取最新邮件 - code, first_id = self._get_latest_mail_code() - if code: - print(f"成功获取验证码: {code}") - return code - elif first_id: # 有邮件但没有验证码 - self._cleanup_mail(first_id) - else: - code = self._get_mail_code_by_imap() - if code: - print(f"成功获取验证码: {code}") - return code - - if attempt < max_retries - 1: - print(f"等待 {retry_delay} 秒后重试...") - time.sleep(retry_delay) - - except Exception as e: - print(f"获取验证码时出错: {str(e)}") - if attempt < max_retries - 1: - print(f"等待 {retry_delay} 秒后重试...") - time.sleep(retry_delay) - - print("获取验证码失败,已达到最大重试次数") - return None - - # 使用imap获取邮件 - def _get_mail_code_by_imap(self, retry = 0): - if retry > 0: - time.sleep(3) - if retry >= 20: - raise Exception("获取验证码超时") - try: - # 连接到IMAP服务器 - mail = imaplib.IMAP4_SSL(self.imap['imap_server'], self.imap['imap_port']) - mail.login(self.imap['imap_user'], self.imap['imap_pass']) - mail.select(self.imap['imap_dir']) - - status, messages = mail.search(None, 'FROM', '"no-reply@cursor.sh"') - if status != 'OK': - return None - - mail_ids = messages[0].split() - if not mail_ids: - # 没有获取到,就在获取一次 - return self._get_mail_code_by_imap(retry=retry + 1) - - latest_mail_id = mail_ids[-1] - - # 获取邮件内容 - status, msg_data = mail.fetch(latest_mail_id, '(RFC822)') - if status != 'OK': - return None - - raw_email = msg_data[0][1] - email_message = email.message_from_bytes(raw_email) - - # 提取邮件正文 - body = self._extract_imap_body(email_message) - if body: - # 使用正则表达式查找6位数字验证码 - code_match = re.search(r"\b\d{6}\b", body) - if code_match: - code = code_match.group() - # 删除邮件 - mail.store(latest_mail_id, '+FLAGS', '\\Deleted') - mail.expunge() - mail.logout() - # print(f"找到的验证码: {code}") - return code - # print("未找到验证码") - mail.logout() - return None - except Exception as e: - print(f"发生错误: {e}") - return None - - def _extract_imap_body(self, email_message): - # 提取邮件正文 - if email_message.is_multipart(): - for part in email_message.walk(): - content_type = part.get_content_type() - content_disposition = str(part.get("Content-Disposition")) - if content_type == "text/plain" and "attachment" not in content_disposition: - charset = part.get_content_charset() or 'utf-8' - try: - body = part.get_payload(decode=True).decode(charset, errors='ignore') - return body - except Exception as e: - logging.error(f"解码邮件正文失败: {e}") - else: - content_type = email_message.get_content_type() - if content_type == "text/plain": - charset = email_message.get_content_charset() or 'utf-8' - try: - body = email_message.get_payload(decode=True).decode(charset, errors='ignore') - return body - except Exception as e: - logging.error(f"解码邮件正文失败: {e}") - return "" - - def _get_latest_mail_code(self): - """获取最新邮件的验证码""" - try: - # 配置请求会话 - self.session.verify = False - self.session.proxies = {"http": None, "https": None} - self.session.timeout = 30 - - # 获取邮件列表 - mail_list_url = f"https://tempmail.plus/api/mails?email={self.username}{self.emailExtension}&limit=20" - if self.epin: - mail_list_url += f"&epin={self.epin}" - - print(f"正在请求邮件列表: {mail_list_url}") - mail_list_response = self.session.get(mail_list_url) - mail_list_data = mail_list_response.json() - - if not mail_list_data.get("result"): - print("获取邮件列表失败") - return None, None - - # 获取最新邮件的ID - first_id = mail_list_data.get("first_id") - if not first_id: - print("未找到新邮件") - return None, None - - # 获取具体邮件内容 - mail_detail_url = f"https://tempmail.plus/api/mails/{first_id}?email={self.username}{self.emailExtension}" - if self.epin: - mail_detail_url += f"&epin={self.epin}" - - print(f"正在获取邮件内容: {mail_detail_url}") - mail_detail_response = self.session.get(mail_detail_url) - mail_detail_data = mail_detail_response.json() - - if not mail_detail_data.get("result"): - print("获取邮件内容失败") - return None, None - - # 从邮件文本中提取6位数字验证码 - mail_text = mail_detail_data.get("text", "") - code_match = re.search(r"(?