import os import sys import requests import json import time from logger import logging import urllib3 from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry # 禁用 SSL 警告 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class Config: def __init__(self): # 默认配置 self.default_config = { "TEMP_MAIL": "", # 设置为空字符串,表示可选 "TEMP_MAIL_EXT": "@mailto.plus", "TEMP_MAIL_EPIN": "", "DOMAIN": "mailto.plus", "BROWSER_USER_AGENT": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "MAIL_SERVER": "https://tempmail.plus", # IMAP 相关配置 "USE_IMAP": False, # 是否使用 IMAP "IMAP_HOST": "", "IMAP_PORT": 993, "IMAP_USERNAME": "", "IMAP_PASSWORD": "", "IMAP_USE_SSL": True } # 获取应用程序的根目录路径 if getattr(sys, "frozen", False): # 如果是打包后的可执行文件 application_path = os.path.dirname(sys.executable) else: # 如果是开发环境 application_path = os.path.dirname(os.path.abspath(__file__)) # 配置重试策略 retry_strategy = Retry( total=2, # 减少重试次数 backoff_factor=0.5, # 减少等待时间 status_forcelist=[429, 500, 502, 503, 504], ) # 创建共享session self.session = requests.Session() self.session.trust_env = False # 不使用系统代理设置 self.session.verify = False self.session.proxies = {} # 完全禁用代理 # 配置adapter adapter = HTTPAdapter(max_retries=retry_strategy) self.session.mount("https://", adapter) self.session.mount("http://", adapter) # 从API获取配置 try: # 使用session发送请求,减少超时时间 response = self.session.get( "https://cursorapi.nosqli.com/admin/api.mail/gettestapi", timeout=10, allow_redirects=True ) data = response.json() if data['code'] == 0: config = data['data']['env'] else: logging.warning(f"API返回错误: {data.get('msg', '未知错误')}") logging.info("使用默认配置继续运行...") config = self.default_config except Exception as e: logging.warning(f"从API获取配置失败: {str(e)}") logging.info("使用默认配置继续运行...") config = self.default_config # 设置配置项 self.imap = False # 处理 TEMP_MAIL,如果为 null 则启用 IMAP temp_mail = config.get("TEMP_MAIL") if temp_mail is None or temp_mail == "null": self.imap = True self.temp_mail = "null" else: self.temp_mail = str(temp_mail).strip() # 设置其他基础配置 self.temp_mail_ext = str(config.get("TEMP_MAIL_EXT", self.default_config["TEMP_MAIL_EXT"])).strip() self.temp_mail_epin = str(config.get("TEMP_MAIL_EPIN", self.default_config["TEMP_MAIL_EPIN"])).strip() self.domain = str(config.get("DOMAIN", self.default_config["DOMAIN"])).strip() self.browser_user_agent = str(config.get("BROWSER_USER_AGENT", self.default_config["BROWSER_USER_AGENT"])).strip() self.mail_server = str(config.get("MAIL_SERVER", self.default_config["MAIL_SERVER"])).strip() # 如果启用了 IMAP,设置 IMAP 配置 if self.imap: self.imap_server = str(config.get("IMAP_SERVER", "")).strip() self.imap_port = str(config.get("IMAP_PORT", "")).strip() self.imap_user = str(config.get("IMAP_USER", "")).strip() self.imap_pass = str(config.get("IMAP_PASS", "")).strip() self.imap_dir = str(config.get("IMAP_DIR", "inbox")).strip() self.check_config() def get_temp_mail(self): return self.temp_mail.split("@")[0] if "@" in self.temp_mail else self.temp_mail def get_temp_mail_ext(self): return self.temp_mail_ext def get_temp_mail_epin(self): return self.temp_mail_epin def get_browser_user_agent(self): return self.browser_user_agent def get_mail_server(self): return self.mail_server def get_imap(self): if not self.imap: return False return { "imap_server": self.imap_server, "imap_port": self.imap_port, "imap_user": self.imap_user, "imap_pass": self.imap_pass, "imap_dir": self.imap_dir, } def get_domain(self): return self.domain def check_config(self): """检查配置项是否有效 检查规则: 1. 如果使用 tempmail.plus,需要配置 TEMP_MAIL 和 DOMAIN 2. 如果使用 IMAP,需要配置 IMAP_SERVER、IMAP_PORT、IMAP_USER、IMAP_PASS """ # 定义必需的配置项 required_configs = { "domain": "域名", } # 检查基础配置 for key, name in required_configs.items(): if not self.check_is_valid(getattr(self, key)): raise ValueError(f"{name}未配置") # 检查邮箱配置 if self.temp_mail != "null": # tempmail.plus 模式 if not self.check_is_valid(self.temp_mail): raise ValueError("临时邮箱未配置") else: # IMAP 模式 imap_configs = { "imap_server": "IMAP服务器", "imap_port": "IMAP端口", "imap_user": "IMAP用户名", "imap_pass": "IMAP密码", } for key, name in imap_configs.items(): value = getattr(self, key) if value == "null" or not self.check_is_valid(value): raise ValueError(f"{name}未配置") # IMAP_DIR 是可选的,如果设置了就检查其有效性 if self.imap_dir != "null" and not self.check_is_valid(self.imap_dir): raise ValueError("IMAP收件箱目录配置无效") def check_is_valid(self, value): """检查配置项是否有效 Args: value: 配置项的值 Returns: bool: 配置项是否有效 """ return isinstance(value, str) and len(str(value).strip()) > 0 def print_config(self): if self.imap: logging.info(f"\033[32mIMAP服务器: {self.imap_server}\033[0m") logging.info(f"\033[32mIMAP端口: {self.imap_port}\033[0m") logging.info(f"\033[32mIMAP用户名: {self.imap_user}\033[0m") logging.info(f"\033[32mIMAP密码: {'*' * len(self.imap_pass)}\033[0m") logging.info(f"\033[32mIMAP收件箱目录: {self.imap_dir}\033[0m") if self.temp_mail != "null": logging.info( f"\033[32m临时邮箱: {self.temp_mail}{self.temp_mail_ext}\033[0m" ) logging.info(f"\033[32m域名: {self.domain}\033[0m") # 使用示例 if __name__ == "__main__": try: config = Config() print("环境变量加载成功!") config.print_config() except ValueError as e: print(f"错误: {e}")