Files
auto_cursor/services/self_hosted_email.py
2025-04-07 14:55:33 +08:00

205 lines
8.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import re
import time
import asyncio
from typing import Optional, Dict, Any
from loguru import logger
from core.exceptions import EmailError
from services.fetch_manager import FetchManager
class SelfHostedEmail:
"""自建邮箱服务类负责从API获取邮箱和验证码"""
def __init__(self, fetch_manager: FetchManager, api_base_url: str, api_key: Optional[str] = None):
"""初始化自建邮箱服务
Args:
fetch_manager: HTTP请求管理器
api_base_url: API基础URL例如 "https://cursorapi3.nosqli.com/"
api_key: API密钥可选
"""
self.fetch_manager = fetch_manager
self.api_base_url = "https://cursorapi3.nosqli.com/"
self.api_key = "1234567890"
# API端点
self.get_email_endpoint = "/admin/api.email/getEmail"
self.get_code_endpoint = "/admin/api.email/getVerificationCode"
# 新的邮件获取接口
self.new_email_api = "https://rnemail.nosqli.com/latest_email"
async def _make_api_request(self, endpoint: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""向API发送请求
Args:
endpoint: API端点例如 "/admin/api.email/getEmail"
params: 请求参数
Returns:
解析后的JSON响应
Raises:
EmailError: 当API请求失败或响应无法解析时
"""
url = f"{self.api_base_url}{endpoint}"
headers = {}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
logger.debug(f"正在请求: {url}")
response = await self.fetch_manager.request(
"GET",
url,
headers=headers,
params=params
)
if 'error' in response:
raise EmailError(f"API请求失败: {response['error']}")
try:
result = json.loads(response['body'].decode())
return result
except Exception as e:
raise EmailError(f"解析API响应失败: {str(e)}")
async def get_email(self) -> Optional[str]:
"""从API获取一个可用邮箱
Returns:
邮箱地址失败时返回None
"""
try:
result = await self._make_api_request(self.get_email_endpoint)
if result.get('code') != 0:
logger.error(f"获取邮箱失败: {result.get('msg')}")
return None
email = result.get('data', {}).get('email')
if not email:
logger.error("API未返回有效邮箱")
return None
logger.info(f"获取到自建邮箱: {email}")
return email
except Exception as e:
logger.error(f"获取邮箱失败: {str(e)}")
return None
async def get_verification_code(self, email: str) -> Optional[str]:
"""从API获取验证码
Args:
email: 邮箱地址
Returns:
验证码失败时返回None
"""
# 首先尝试使用新接口获取
code = await self._get_code_from_email_api(email)
if code:
return code
# 如果新接口失败,尝试旧接口
logger.debug("新接口获取验证码失败,尝试使用旧接口")
try:
result = await self._make_api_request(
self.get_code_endpoint,
params={"email": email}
)
if result.get('code') != 0:
logger.error(f"获取验证码失败: {result.get('msg')}")
return None
code = result.get('data', {}).get('code')
if not code:
logger.error("API未返回有效验证码")
return None
logger.info(f"获取到验证码: {code} (邮箱: {email})")
return code
except Exception as e:
logger.error(f"获取验证码失败: {str(e)}")
return None
async def _get_code_from_email_api(self, email: str, max_retries: int = 5, retry_delay: int = 3) -> Optional[str]:
"""从新的邮件API获取验证码
Args:
email: 邮箱地址
max_retries: 最大重试次数
retry_delay: 重试间隔(秒)
Returns:
验证码失败时返回None
"""
url = f"{self.new_email_api}?recipient={email}"
logger.debug(f"使用新接口获取验证码: {url}")
for attempt in range(max_retries):
try:
response = await self.fetch_manager.request("GET", url)
if 'error' in response:
logger.warning(f"获取邮件失败 (尝试 {attempt + 1}/{max_retries}): {response['error']}")
else:
email_data = json.loads(response['body'].decode())
logger.debug(f"获取到邮件数据: {str(email_data)[:200]}...")
# 解析邮件内容,提取验证码
if email_data and isinstance(email_data, dict):
body = email_data.get('body', '')
if 'code' in email_data:
# 如果API直接返回code字段
code = email_data.get('code')
if code:
logger.info(f"从邮件API直接获取到验证码: {code}")
return code
# 从邮件主体中提取验证码
if body:
# 正则表达式匹配6位数字验证码
code_match = re.search(r'code["\s:]*["\s]*(\d{6})["\s]*', body, re.IGNORECASE)
if code_match:
code = code_match.group(1)
logger.info(f"从邮件内容中提取到验证码: {code}")
return code
# 匹配"验证码"后的6位数字
code_match = re.search(r'[\u4e00-\u9fa5]*验证码[\u4e00-\u9fa5]*[:]*\s*(\d{6})\b', body)
if code_match:
code = code_match.group(1)
logger.info(f"从邮件内容中提取到验证码: {code}")
return code
# 如果是Cursor邮件尝试直接提取验证码格式
if "Cursor" in body and "verify" in body:
code_match = re.search(r'\b(\d{6})\b', body)
if code_match:
code = code_match.group(1)
logger.info(f"从Cursor邮件中提取到验证码: {code}")
return code
logger.warning(f"未能从邮件中提取到验证码 (尝试 {attempt + 1}/{max_retries})")
# 如果没有找到验证码,等待一段时间后重试
if attempt < max_retries - 1:
logger.debug(f"等待 {retry_delay} 秒后重试获取验证码...")
await asyncio.sleep(retry_delay)
except Exception as e:
logger.error(f"获取验证码出错 (尝试 {attempt + 1}/{max_retries}): {str(e)}")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
logger.error(f"{max_retries} 次尝试后未能获取到验证码")
return None