196 lines
6.9 KiB
Python
196 lines
6.9 KiB
Python
import os
|
||
import sys
|
||
import requests
|
||
import json
|
||
import time
|
||
from logger import logging
|
||
import urllib3
|
||
from requests.adapters import HTTPAdapter
|
||
from urllib3.util.retry import Retry
|
||
|
||
# 禁用 SSL 警告
|
||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||
|
||
class Config:
|
||
def __init__(self):
|
||
# 默认配置
|
||
self.default_config = {
|
||
"TEMP_MAIL": "demo",
|
||
"TEMP_MAIL_EXT": "@mailto.plus",
|
||
"TEMP_MAIL_EPIN": "",
|
||
"DOMAIN": "mailto.plus",
|
||
"BROWSER_USER_AGENT": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||
"MAIL_SERVER": "https://tempmail.plus"
|
||
}
|
||
|
||
# 获取应用程序的根目录路径
|
||
if getattr(sys, "frozen", False):
|
||
# 如果是打包后的可执行文件
|
||
application_path = os.path.dirname(sys.executable)
|
||
else:
|
||
# 如果是开发环境
|
||
application_path = os.path.dirname(os.path.abspath(__file__))
|
||
|
||
# 配置重试策略
|
||
retry_strategy = Retry(
|
||
total=2, # 减少重试次数
|
||
backoff_factor=0.5, # 减少等待时间
|
||
status_forcelist=[429, 500, 502, 503, 504],
|
||
)
|
||
|
||
# 创建共享session
|
||
self.session = requests.Session()
|
||
self.session.trust_env = False # 不使用系统代理设置
|
||
self.session.verify = False
|
||
self.session.proxies = {} # 完全禁用代理
|
||
|
||
# 配置adapter
|
||
adapter = HTTPAdapter(max_retries=retry_strategy)
|
||
self.session.mount("https://", adapter)
|
||
self.session.mount("http://", adapter)
|
||
|
||
# 从API获取配置
|
||
try:
|
||
# 使用session发送请求,减少超时时间
|
||
response = self.session.get(
|
||
"https://cursorapi.nosqli.com/admin/api.mail/getRandom",
|
||
timeout=10,
|
||
allow_redirects=True
|
||
)
|
||
data = response.json()
|
||
|
||
if data['code'] == 0:
|
||
config = data['data']['env']
|
||
else:
|
||
logging.warning(f"API返回错误: {data.get('msg', '未知错误')}")
|
||
logging.info("使用默认配置继续运行...")
|
||
config = self.default_config
|
||
|
||
except Exception as e:
|
||
logging.warning(f"从API获取配置失败: {str(e)}")
|
||
logging.info("使用默认配置继续运行...")
|
||
config = self.default_config
|
||
|
||
# 设置配置项
|
||
self.imap = False
|
||
self.temp_mail = config.get("TEMP_MAIL", self.default_config["TEMP_MAIL"]).strip()
|
||
self.temp_mail_ext = config.get("TEMP_MAIL_EXT", self.default_config["TEMP_MAIL_EXT"]).strip()
|
||
self.temp_mail_epin = config.get("TEMP_MAIL_EPIN", self.default_config["TEMP_MAIL_EPIN"]).strip()
|
||
self.domain = config.get("DOMAIN", self.default_config["DOMAIN"]).strip()
|
||
self.browser_user_agent = config.get("BROWSER_USER_AGENT", self.default_config["BROWSER_USER_AGENT"]).strip()
|
||
self.mail_server = config.get("MAIL_SERVER", self.default_config["MAIL_SERVER"]).strip()
|
||
|
||
# 如果临时邮箱为null则加载IMAP
|
||
if self.temp_mail == "null":
|
||
self.imap = True
|
||
self.imap_server = config.get("IMAP_SERVER", "").strip()
|
||
self.imap_port = config.get("IMAP_PORT", "").strip()
|
||
self.imap_user = config.get("IMAP_USER", "").strip()
|
||
self.imap_pass = config.get("IMAP_PASS", "").strip()
|
||
self.imap_dir = config.get("IMAP_DIR", "inbox").strip()
|
||
|
||
self.check_config()
|
||
|
||
def get_temp_mail(self):
|
||
return self.temp_mail.split("@")[0] if "@" in self.temp_mail else self.temp_mail
|
||
|
||
def get_temp_mail_ext(self):
|
||
return self.temp_mail_ext
|
||
|
||
def get_temp_mail_epin(self):
|
||
return self.temp_mail_epin
|
||
|
||
def get_browser_user_agent(self):
|
||
return self.browser_user_agent
|
||
|
||
def get_mail_server(self):
|
||
return self.mail_server
|
||
|
||
def get_imap(self):
|
||
if not self.imap:
|
||
return False
|
||
return {
|
||
"imap_server": self.imap_server,
|
||
"imap_port": self.imap_port,
|
||
"imap_user": self.imap_user,
|
||
"imap_pass": self.imap_pass,
|
||
"imap_dir": self.imap_dir,
|
||
}
|
||
|
||
def get_domain(self):
|
||
return self.domain
|
||
|
||
def check_config(self):
|
||
"""检查配置项是否有效
|
||
|
||
检查规则:
|
||
1. 如果使用 tempmail.plus,需要配置 TEMP_MAIL 和 DOMAIN
|
||
2. 如果使用 IMAP,需要配置 IMAP_SERVER、IMAP_PORT、IMAP_USER、IMAP_PASS
|
||
"""
|
||
# 定义必需的配置项
|
||
required_configs = {
|
||
"domain": "域名",
|
||
}
|
||
|
||
# 检查基础配置
|
||
for key, name in required_configs.items():
|
||
if not self.check_is_valid(getattr(self, key)):
|
||
raise ValueError(f"{name}未配置")
|
||
|
||
# 检查邮箱配置
|
||
if self.temp_mail != "null":
|
||
# tempmail.plus 模式
|
||
if not self.check_is_valid(self.temp_mail):
|
||
raise ValueError("临时邮箱未配置")
|
||
else:
|
||
# IMAP 模式
|
||
imap_configs = {
|
||
"imap_server": "IMAP服务器",
|
||
"imap_port": "IMAP端口",
|
||
"imap_user": "IMAP用户名",
|
||
"imap_pass": "IMAP密码",
|
||
}
|
||
|
||
for key, name in imap_configs.items():
|
||
value = getattr(self, key)
|
||
if value == "null" or not self.check_is_valid(value):
|
||
raise ValueError(f"{name}未配置")
|
||
|
||
# IMAP_DIR 是可选的,如果设置了就检查其有效性
|
||
if self.imap_dir != "null" and not self.check_is_valid(self.imap_dir):
|
||
raise ValueError("IMAP收件箱目录配置无效")
|
||
|
||
def check_is_valid(self, value):
|
||
"""检查配置项是否有效
|
||
|
||
Args:
|
||
value: 配置项的值
|
||
|
||
Returns:
|
||
bool: 配置项是否有效
|
||
"""
|
||
return isinstance(value, str) and len(str(value).strip()) > 0
|
||
|
||
def print_config(self):
|
||
if self.imap:
|
||
logging.info(f"\033[32mIMAP服务器: {self.imap_server}\033[0m")
|
||
logging.info(f"\033[32mIMAP端口: {self.imap_port}\033[0m")
|
||
logging.info(f"\033[32mIMAP用户名: {self.imap_user}\033[0m")
|
||
logging.info(f"\033[32mIMAP密码: {'*' * len(self.imap_pass)}\033[0m")
|
||
logging.info(f"\033[32mIMAP收件箱目录: {self.imap_dir}\033[0m")
|
||
if self.temp_mail != "null":
|
||
logging.info(
|
||
f"\033[32m临时邮箱: {self.temp_mail}{self.temp_mail_ext}\033[0m"
|
||
)
|
||
logging.info(f"\033[32m域名: {self.domain}\033[0m")
|
||
|
||
|
||
# 使用示例
|
||
if __name__ == "__main__":
|
||
try:
|
||
config = Config()
|
||
print("环境变量加载成功!")
|
||
config.print_config()
|
||
except ValueError as e:
|
||
print(f"错误: {e}")
|