From 4f42c1a1b5b6570c298d421177195c5097b5757f Mon Sep 17 00:00:00 2001 From: MoLeft <41848811+MoLeft@users.noreply.github.com> Date: Mon, 20 Jan 2025 05:46:18 +0000 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E4=BD=BF=E7=94=A8imap?= =?UTF-8?q?=E6=9D=A5=E4=BB=A3=E6=9B=BF=E4=B8=B4=E6=97=B6=E9=82=AE=E7=AE=B1?= =?UTF-8?q?=E8=8E=B7=E5=8F=96=E9=AA=8C=E8=AF=81=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 18 ++++++++++ config.py | 41 ++++++++++++++++++++- get_email_code.py | 91 ++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 144 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 8c47e7f..18ab322 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,24 @@ TEMP_MAIL='ccxxxxcxx' ``` 那么程序将随机生成 `@wozhangsan.me` 后缀作为注册邮箱。 +### 使用 IMAP 来代替 tempmail.plus 邮箱 +如果无法使用 tempmail.plus 邮箱,或者想使用一种更安全的方法来获取验证码,你可以像这样设置 +```base +DOMAIN='wozhangsan.me' +TEMP_MAIL=null + +# IMAP服务器 +IMAP_SERVER=imap.xxxxx.com +# IMAP的SSL端口 +IMAP_PORT=993 +# 使用cf转发到的邮箱 +IMAP_USER=xxxxxx@xxxxx.com +# 邮箱授权码 +IMAP_PASS=xxxxxxxxxxxxx +# [可选] 默认是收件箱(inbox) +# 你也可以设置成其他的文件夹,只要你可以收到 +IMAP_DIR= +``` ## 运行方法 diff --git a/config.py b/config.py index f66a777..982d7eb 100644 --- a/config.py +++ b/config.py @@ -23,15 +23,36 @@ class Config: # 加载 .env 文件 load_dotenv(dotenv_path) + self.imap = False self.temp_mail = os.getenv("TEMP_MAIL", "").strip().split("@")[0] self.domain = os.getenv("DOMAIN", "").strip() + # 如果临时邮箱为null则加载IMAP + if self.temp_mail == 'null': + self.imap = True + self.imap_server = os.getenv("IMAP_SERVER", "").strip() + self.imap_port = os.getenv("IMAP_PORT", "").strip() + self.imap_user = os.getenv("IMAP_USER", "").strip() + self.imap_pass = os.getenv("IMAP_PASS", "").strip() + self.imap_dir = os.getenv("IMAP_DIR", "inbox").strip() + self.check_config() def get_temp_mail(self): return self.temp_mail + def get_imap(self): + if not self.imap: + return False + return { + "imap_server": self.imap_server, + "imap_port": self.imap_port, + "imap_user": self.imap_user, + "imap_pass": self.imap_pass, + "imap_dir": self.imap_dir + } + def get_domain(self): return self.domain @@ -40,12 +61,30 @@ class Config: raise ValueError("临时邮箱未配置,请在 .env 文件中设置 TEMP_MAIL") if not self.check_is_valid(self.domain): raise ValueError("域名未配置,请在 .env 文件中设置 DOMAIN") + if not self.imap_server == 'null' and not self.check_is_valid(self.imap_server): + raise ValueError("IMAP服务器未配置,请在 .env 文件中设置 IMAP_SERVER") + if not self.imap_port == 'null' and not self.check_is_valid(self.imap_port): + raise ValueError("IMAP端口未配置,请在 .env 文件中设置 IMAP_PORT") + if not self.imap_user == 'null' and not self.check_is_valid(self.imap_user): + raise ValueError("IMAP用户名未配置,请在 .env 文件中设置 IMAP_USER") + if not self.imap_pass == 'null' and not self.check_is_valid(self.imap_pass): + raise ValueError("IMAP密码未配置,请在 .env 文件中设置 IMAP_PASS") + if not self.imap_dir == 'null' and not self.check_is_valid(self.imap_dir): + raise ValueError("IMAP收件箱目录未配置,请在 .env 文件中设置 IMAP_DIRECTORY") def check_is_valid(self, str): return len(str.strip()) > 0 def print_config(self): - logging.info(f"\033[32m临时邮箱: {self.temp_mail}\033[0m") + # logging.info(f"\033[32m临时邮箱: {self.temp_mail}\033[0m") + if self.imap: + logging.info(f"\033[32mIMAP服务器: {self.imap_server}\033[0m") + logging.info(f"\033[32mIMAP端口: {self.imap_port}\033[0m") + logging.info(f"\033[32mIMAP用户名: {self.imap_user}\033[0m") + logging.info(f"\033[32mIMAP密码: {'*' * len(self.imap_pass)}\033[0m") + logging.info(f"\033[32mIMAP收件箱目录: {self.imap_dir}\033[0m") + if self.temp_mail != 'null': + logging.info(f"\033[32m临时邮箱: {self.temp_mail}@{self.domain}\033[0m") logging.info(f"\033[32m域名: {self.domain}\033[0m") diff --git a/get_email_code.py b/get_email_code.py index 12d711d..e1e6d05 100644 --- a/get_email_code.py +++ b/get_email_code.py @@ -1,11 +1,15 @@ +import logging import time import re from config import Config import requests +import email +import imaplib class EmailVerificationHandler: def __init__(self): + self.imap = Config().get_imap() self.username = Config().get_temp_mail() self.session = requests.Session() self.emailExtension = "@mailto.plus" @@ -16,17 +20,94 @@ class EmailVerificationHandler: try: print("正在处理...") - # 等待并获取最新邮件 - code, first_id = self._get_latest_mail_code() - - # 清理邮件 - self._cleanup_mail(first_id) + if self.imap is False: + # 等待并获取最新邮件 + code, first_id = self._get_latest_mail_code() + # 清理邮件 + self._cleanup_mail(first_id) + else: + code = self._get_mail_code_by_imap() except Exception as e: print(f"获取验证码失败: {str(e)}") return code + # 使用imap获取邮件 + def _get_mail_code_by_imap(self, retry = 0): + if retry > 0: + time.sleep(3) + if retry >= 20: + raise Exception("获取验证码超时") + try: + # 连接到IMAP服务器 + mail = imaplib.IMAP4_SSL(self.imap['imap_server'], self.imap['imap_port']) + mail.login(self.imap['imap_user'], self.imap['imap_pass']) + mail.select(self.imap['imap_dir']) + + status, messages = mail.search(None, 'FROM', '"no-reply@cursor.sh"') + if status != 'OK': + return None + + mail_ids = messages[0].split() + if not mail_ids: + # 没有获取到,就在获取一次 + return self._get_mail_code_by_imap(retry=retry + 1) + + latest_mail_id = mail_ids[-1] + + # 获取邮件内容 + status, msg_data = mail.fetch(latest_mail_id, '(RFC822)') + if status != 'OK': + return None + + raw_email = msg_data[0][1] + email_message = email.message_from_bytes(raw_email) + + # 提取邮件正文 + body = self._extract_imap_body(email_message) + if body: + # 使用正则表达式查找6位数字验证码 + code_match = re.search(r"\b\d{6}\b", body) + if code_match: + code = code_match.group() + # 删除邮件 + mail.store(latest_mail_id, '+FLAGS', '\\Deleted') + mail.expunge() + mail.logout() + # print(f"找到的验证码: {code}") + return code + # print("未找到验证码") + mail.logout() + return None + except Exception as e: + print(f"发生错误: {e}") + return None + + def _extract_imap_body(self, email_message): + # 提取邮件正文 + if email_message.is_multipart(): + for part in email_message.walk(): + content_type = part.get_content_type() + content_disposition = str(part.get("Content-Disposition")) + if content_type == "text/plain" and "attachment" not in content_disposition: + charset = part.get_content_charset() or 'utf-8' + try: + body = part.get_payload(decode=True).decode(charset, errors='ignore') + return body + except Exception as e: + logging.error(f"解码邮件正文失败: {e}") + else: + content_type = email_message.get_content_type() + if content_type == "text/plain": + charset = email_message.get_content_charset() or 'utf-8' + try: + body = email_message.get_payload(decode=True).decode(charset, errors='ignore') + return body + except Exception as e: + logging.error(f"解码邮件正文失败: {e}") + return "" + # 手动输入验证码 def _get_latest_mail_code(self): # 获取邮件列表