Merge pull request #91 from MoLeft/main

新增使用imap来代替临时邮箱获取验证码
This commit is contained in:
Journey
2025-01-21 07:38:25 +08:00
committed by GitHub
3 changed files with 144 additions and 6 deletions

View File

@@ -50,6 +50,24 @@ TEMP_MAIL='ccxxxxcxx'
```
那么程序将随机生成 `@wozhangsan.me` 后缀作为注册邮箱。
### 使用 IMAP 来代替 tempmail.plus 邮箱
如果无法使用 tempmail.plus 邮箱,或者想使用一种更安全的方法来获取验证码,你可以像这样设置
```base
DOMAIN='wozhangsan.me'
TEMP_MAIL=null
# IMAP服务器
IMAP_SERVER=imap.xxxxx.com
# IMAP的SSL端口
IMAP_PORT=993
# 使用cf转发到的邮箱
IMAP_USER=xxxxxx@xxxxx.com
# 邮箱授权码
IMAP_PASS=xxxxxxxxxxxxx
# [可选] 默认是收件箱(inbox)
# 你也可以设置成其他的文件夹,只要你可以收到
IMAP_DIR=
```
## 运行方法

View File

@@ -23,15 +23,36 @@ class Config:
# 加载 .env 文件
load_dotenv(dotenv_path)
self.imap = False
self.temp_mail = os.getenv("TEMP_MAIL", "").strip().split("@")[0]
self.domain = os.getenv("DOMAIN", "").strip()
# 如果临时邮箱为null则加载IMAP
if self.temp_mail == 'null':
self.imap = True
self.imap_server = os.getenv("IMAP_SERVER", "").strip()
self.imap_port = os.getenv("IMAP_PORT", "").strip()
self.imap_user = os.getenv("IMAP_USER", "").strip()
self.imap_pass = os.getenv("IMAP_PASS", "").strip()
self.imap_dir = os.getenv("IMAP_DIR", "inbox").strip()
self.check_config()
def get_temp_mail(self):
return self.temp_mail
def get_imap(self):
if not self.imap:
return False
return {
"imap_server": self.imap_server,
"imap_port": self.imap_port,
"imap_user": self.imap_user,
"imap_pass": self.imap_pass,
"imap_dir": self.imap_dir
}
def get_domain(self):
return self.domain
@@ -40,12 +61,30 @@ class Config:
raise ValueError("临时邮箱未配置,请在 .env 文件中设置 TEMP_MAIL")
if not self.check_is_valid(self.domain):
raise ValueError("域名未配置,请在 .env 文件中设置 DOMAIN")
if not self.imap_server == 'null' and not self.check_is_valid(self.imap_server):
raise ValueError("IMAP服务器未配置请在 .env 文件中设置 IMAP_SERVER")
if not self.imap_port == 'null' and not self.check_is_valid(self.imap_port):
raise ValueError("IMAP端口未配置请在 .env 文件中设置 IMAP_PORT")
if not self.imap_user == 'null' and not self.check_is_valid(self.imap_user):
raise ValueError("IMAP用户名未配置请在 .env 文件中设置 IMAP_USER")
if not self.imap_pass == 'null' and not self.check_is_valid(self.imap_pass):
raise ValueError("IMAP密码未配置请在 .env 文件中设置 IMAP_PASS")
if not self.imap_dir == 'null' and not self.check_is_valid(self.imap_dir):
raise ValueError("IMAP收件箱目录未配置请在 .env 文件中设置 IMAP_DIRECTORY")
def check_is_valid(self, str):
return len(str.strip()) > 0
def print_config(self):
logging.info(f"\033[32m临时邮箱: {self.temp_mail}\033[0m")
# logging.info(f"\033[32m临时邮箱: {self.temp_mail}\033[0m")
if self.imap:
logging.info(f"\033[32mIMAP服务器: {self.imap_server}\033[0m")
logging.info(f"\033[32mIMAP端口: {self.imap_port}\033[0m")
logging.info(f"\033[32mIMAP用户名: {self.imap_user}\033[0m")
logging.info(f"\033[32mIMAP密码: {'*' * len(self.imap_pass)}\033[0m")
logging.info(f"\033[32mIMAP收件箱目录: {self.imap_dir}\033[0m")
if self.temp_mail != 'null':
logging.info(f"\033[32m临时邮箱: {self.temp_mail}@{self.domain}\033[0m")
logging.info(f"\033[32m域名: {self.domain}\033[0m")

View File

@@ -1,11 +1,15 @@
import logging
import time
import re
from config import Config
import requests
import email
import imaplib
class EmailVerificationHandler:
def __init__(self):
self.imap = Config().get_imap()
self.username = Config().get_temp_mail()
self.session = requests.Session()
self.emailExtension = "@mailto.plus"
@@ -16,17 +20,94 @@ class EmailVerificationHandler:
try:
print("正在处理...")
# 等待并获取最新邮件
code, first_id = self._get_latest_mail_code()
# 清理邮件
self._cleanup_mail(first_id)
if self.imap is False:
# 等待并获取最新邮件
code, first_id = self._get_latest_mail_code()
# 清理邮件
self._cleanup_mail(first_id)
else:
code = self._get_mail_code_by_imap()
except Exception as e:
print(f"获取验证码失败: {str(e)}")
return code
# 使用imap获取邮件
def _get_mail_code_by_imap(self, retry = 0):
if retry > 0:
time.sleep(3)
if retry >= 20:
raise Exception("获取验证码超时")
try:
# 连接到IMAP服务器
mail = imaplib.IMAP4_SSL(self.imap['imap_server'], self.imap['imap_port'])
mail.login(self.imap['imap_user'], self.imap['imap_pass'])
mail.select(self.imap['imap_dir'])
status, messages = mail.search(None, 'FROM', '"no-reply@cursor.sh"')
if status != 'OK':
return None
mail_ids = messages[0].split()
if not mail_ids:
# 没有获取到,就在获取一次
return self._get_mail_code_by_imap(retry=retry + 1)
latest_mail_id = mail_ids[-1]
# 获取邮件内容
status, msg_data = mail.fetch(latest_mail_id, '(RFC822)')
if status != 'OK':
return None
raw_email = msg_data[0][1]
email_message = email.message_from_bytes(raw_email)
# 提取邮件正文
body = self._extract_imap_body(email_message)
if body:
# 使用正则表达式查找6位数字验证码
code_match = re.search(r"\b\d{6}\b", body)
if code_match:
code = code_match.group()
# 删除邮件
mail.store(latest_mail_id, '+FLAGS', '\\Deleted')
mail.expunge()
mail.logout()
# print(f"找到的验证码: {code}")
return code
# print("未找到验证码")
mail.logout()
return None
except Exception as e:
print(f"发生错误: {e}")
return None
def _extract_imap_body(self, email_message):
# 提取邮件正文
if email_message.is_multipart():
for part in email_message.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if content_type == "text/plain" and "attachment" not in content_disposition:
charset = part.get_content_charset() or 'utf-8'
try:
body = part.get_payload(decode=True).decode(charset, errors='ignore')
return body
except Exception as e:
logging.error(f"解码邮件正文失败: {e}")
else:
content_type = email_message.get_content_type()
if content_type == "text/plain":
charset = email_message.get_content_charset() or 'utf-8'
try:
body = email_message.get_payload(decode=True).decode(charset, errors='ignore')
return body
except Exception as e:
logging.error(f"解码邮件正文失败: {e}")
return ""
# 手动输入验证码
def _get_latest_mail_code(self):
# 获取邮件列表