495 lines
19 KiB
Python
495 lines
19 KiB
Python
from flask import Flask, request, jsonify
|
||
import base64
|
||
import email
|
||
import email.header
|
||
import requests
|
||
import imaplib
|
||
import poplib
|
||
import json
|
||
import os
|
||
import re
|
||
import redis
|
||
import hashlib
|
||
from datetime import datetime, timedelta
|
||
from functools import wraps
|
||
|
||
app = Flask(__name__)
|
||
|
||
# 配置参数,生产环境中应通过环境变量配置
|
||
API_KEY = os.environ.get('API_KEY', 'your_default_api_key')
|
||
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
|
||
TOKEN_CACHE_EXPIRES = int(os.environ.get('TOKEN_CACHE_EXPIRES', 300)) # 5分钟缓存
|
||
|
||
# 初始化Redis连接
|
||
try:
|
||
redis_client = redis.from_url(REDIS_URL)
|
||
# 测试连接
|
||
redis_client.ping()
|
||
print("Redis连接成功")
|
||
except Exception as e:
|
||
print(f"Redis连接错误: {str(e)}")
|
||
redis_client = None
|
||
|
||
# API密钥认证装饰器
|
||
def require_api_key(f):
|
||
@wraps(f)
|
||
def decorated_function(*args, **kwargs):
|
||
# 如果环境变量中未设置API_KEY或设置为None,则跳过验证
|
||
if not API_KEY or API_KEY == 'your_default_api_key':
|
||
return f(*args, **kwargs)
|
||
|
||
provided_key = request.headers.get('X-API-Key')
|
||
if provided_key and provided_key == API_KEY:
|
||
return f(*args, **kwargs)
|
||
return jsonify({"error": "Invalid or missing API key"}), 401
|
||
return decorated_function
|
||
|
||
# 生成缓存键
|
||
def generate_cache_key(email, client_id):
|
||
# 创建唯一值但不直接存储邮箱信息的键
|
||
key_string = f"{email}:{client_id}"
|
||
return f"outlook:token:{hashlib.sha256(key_string.encode()).hexdigest()}"
|
||
|
||
# 从缓存获取access_token
|
||
def get_token_from_cache(email, client_id):
|
||
if not redis_client:
|
||
return None
|
||
|
||
cache_key = generate_cache_key(email, client_id)
|
||
token_data = redis_client.get(cache_key)
|
||
|
||
if token_data:
|
||
try:
|
||
return json.loads(token_data.decode('utf-8'))
|
||
except Exception:
|
||
pass
|
||
|
||
return None
|
||
|
||
# 将access_token存入缓存
|
||
def save_token_to_cache(email, client_id, access_token, expires_in=TOKEN_CACHE_EXPIRES):
|
||
if not redis_client:
|
||
return False
|
||
|
||
try:
|
||
cache_key = generate_cache_key(email, client_id)
|
||
token_data = json.dumps({"access_token": access_token})
|
||
|
||
# 设置缓存,有效期5分钟(默认)或传入的expires_in值
|
||
redis_client.setex(cache_key, expires_in, token_data)
|
||
return True
|
||
except Exception as e:
|
||
print(f"缓存错误: {str(e)}")
|
||
return False
|
||
|
||
# 从refresh_token获取access_token
|
||
def get_accesstoken(refresh_token, client_id, email=None):
|
||
# 先尝试从缓存获取
|
||
if email:
|
||
cached_token = get_token_from_cache(email, client_id)
|
||
if cached_token and "access_token" in cached_token:
|
||
return cached_token["access_token"], None, None
|
||
|
||
# 缓存中没有,则请求新token
|
||
try:
|
||
data = {
|
||
'client_id': client_id,
|
||
'grant_type': 'refresh_token',
|
||
'refresh_token': refresh_token
|
||
}
|
||
ret = requests.post('https://login.microsoftonline.com/consumers/oauth2/v2.0/token', data=data)
|
||
ret.raise_for_status()
|
||
|
||
response_data = ret.json()
|
||
if 'access_token' not in response_data:
|
||
return None, response_data, "Access token not found in response"
|
||
|
||
# 获取到新token后,存入缓存
|
||
if email and "access_token" in response_data:
|
||
# 如果有expires_in字段,使用该值作为缓存时间(单位为秒)
|
||
expires_in = int(response_data.get('expires_in', TOKEN_CACHE_EXPIRES))
|
||
# 为了安全起见,我们将缓存时间设置比实际过期时间短一些
|
||
cache_expires = min(expires_in - 60, TOKEN_CACHE_EXPIRES) if expires_in > 60 else TOKEN_CACHE_EXPIRES
|
||
save_token_to_cache(email, client_id, response_data['access_token'], cache_expires)
|
||
|
||
return response_data.get('access_token'), response_data.get('refresh_token'), None
|
||
except requests.RequestException as e:
|
||
return None, None, f"Error getting access token: {str(e)}"
|
||
|
||
def generate_auth_string(user, token):
|
||
auth_string = f"user={user}\1auth=Bearer {token}\1\1"
|
||
return auth_string
|
||
|
||
def fetch_email_body(mail, item):
|
||
try:
|
||
status, msg_data = mail.fetch(item, "(RFC822)")
|
||
if status != 'OK':
|
||
return {"error": f"Failed to fetch email: {status}"}
|
||
|
||
email_data = {}
|
||
|
||
for response_part in msg_data:
|
||
if isinstance(response_part, tuple):
|
||
# 解码邮件
|
||
msg = email.message_from_bytes(response_part[1])
|
||
|
||
# 获取邮件主题
|
||
subject = "No Subject"
|
||
try:
|
||
subject_header = msg.get("Subject", "")
|
||
if subject_header:
|
||
decoded_header = email.header.decode_header(subject_header)
|
||
if decoded_header and decoded_header[0]:
|
||
subject, encoding = decoded_header[0]
|
||
if isinstance(subject, bytes):
|
||
subject = subject.decode(encoding if encoding else "utf-8", errors='replace')
|
||
except Exception as e:
|
||
subject = f"[解析主题错误: {str(e)}]"
|
||
|
||
email_data["subject"] = subject
|
||
|
||
# 获取发件人
|
||
try:
|
||
from_header = msg.get("From", "")
|
||
email_data["from"] = from_header
|
||
|
||
# 尝试提取发件人的邮箱地址,用于后续筛选
|
||
email_pattern = r'[\w\.-]+@[\w\.-]+'
|
||
found_emails = re.findall(email_pattern, from_header)
|
||
if found_emails:
|
||
email_data["sender_email"] = found_emails[0].lower()
|
||
except Exception as e:
|
||
email_data["from"] = f"[解析发件人错误: {str(e)}]"
|
||
|
||
# 获取日期
|
||
try:
|
||
date_str = msg.get("Date", "")
|
||
if date_str:
|
||
email_data["date"] = date_str
|
||
except Exception as e:
|
||
email_data["date_error"] = str(e)
|
||
|
||
# 获取邮件内容
|
||
email_data["body"] = ""
|
||
try:
|
||
if msg.is_multipart():
|
||
for part in msg.walk():
|
||
content_type = part.get_content_type()
|
||
content_disposition = str(part.get("Content-Disposition", ""))
|
||
|
||
if content_type == "text/plain" and "attachment" not in content_disposition:
|
||
try:
|
||
body = part.get_payload(decode=True)
|
||
if body:
|
||
email_data["body"] = body.decode(errors='replace')
|
||
break
|
||
except Exception as e:
|
||
email_data["body_error"] = str(e)
|
||
else:
|
||
try:
|
||
body = msg.get_payload(decode=True)
|
||
if body:
|
||
email_data["body"] = body.decode(errors='replace')
|
||
except Exception as e:
|
||
email_data["body_error"] = str(e)
|
||
except Exception as e:
|
||
email_data["body_error"] = str(e)
|
||
|
||
# 处理附件信息
|
||
attachments = []
|
||
try:
|
||
if msg.is_multipart():
|
||
for part in msg.walk():
|
||
if part.get_content_maintype() == 'multipart':
|
||
continue
|
||
if part.get('Content-Disposition') is None:
|
||
continue
|
||
|
||
filename = part.get_filename()
|
||
if filename:
|
||
attachments.append(filename)
|
||
|
||
if attachments:
|
||
email_data["attachments"] = attachments
|
||
except Exception as e:
|
||
email_data["attachments_error"] = str(e)
|
||
|
||
return email_data
|
||
except Exception as e:
|
||
return {"error": str(e)}
|
||
|
||
def get_outlook_emails(emailadr, access_token, folder="INBOX", limit=5, sender=None):
|
||
try:
|
||
mail = imaplib.IMAP4_SSL('outlook.live.com')
|
||
mail.authenticate('XOAUTH2', lambda x: generate_auth_string(emailadr, access_token))
|
||
|
||
mail.select(folder)
|
||
search_criteria = 'ALL'
|
||
|
||
# 如果指定了发件人,可以尝试使用IMAP搜索,但这不总是可靠的
|
||
# 实际实现中我们会在获取邮件后再筛选
|
||
|
||
status, messages = mail.search(None, search_criteria)
|
||
if status != 'OK':
|
||
return [{"error": f"Failed to search emails: {status}"}]
|
||
|
||
email_ids = messages[0].split()
|
||
if not email_ids:
|
||
return []
|
||
|
||
# 获取最新的N封邮件
|
||
latest_emails = []
|
||
|
||
# 从最新到最旧排序处理所有邮件
|
||
for item in reversed(email_ids):
|
||
# 如果已经找到了指定发件人的邮件或者达到了限制数量,则停止
|
||
if sender and len(latest_emails) >= 1:
|
||
break
|
||
if not sender and len(latest_emails) >= limit:
|
||
break
|
||
|
||
email_data = fetch_email_body(mail, item)
|
||
if not email_data:
|
||
continue
|
||
|
||
if "error" in email_data:
|
||
continue
|
||
|
||
email_data["id"] = item.decode()
|
||
|
||
# 如果指定了发件人,检查是否匹配
|
||
if sender:
|
||
from_field = email_data.get("from", "").lower()
|
||
sender_email = email_data.get("sender_email", "").lower()
|
||
|
||
# 检查发件人是否匹配(检查显示名称或邮箱地址)
|
||
if sender.lower() in from_field or sender.lower() == sender_email:
|
||
latest_emails.append(email_data)
|
||
else:
|
||
latest_emails.append(email_data)
|
||
|
||
mail.logout()
|
||
return latest_emails
|
||
except Exception as e:
|
||
return [{"error": str(e)}]
|
||
|
||
# 从邮件内容中提取验证码的函数
|
||
def extract_verification_code(email_body):
|
||
if not email_body:
|
||
return None
|
||
|
||
# 方法1: 查找特定格式的验证码 (数字序列通常在单独一行)
|
||
# 例如查找 digit_line\r\n\r\nThis code expires
|
||
code_pattern = r'(\d{6})\s*(?:\r\n|\n)+\s*(?:This code expires|验证码有效期为)'
|
||
code_match = re.search(code_pattern, email_body)
|
||
if code_match:
|
||
return code_match.group(1)
|
||
|
||
# 方法2: 查找6位数字验证码 (最常见格式)
|
||
six_digit_pattern = r'(?<![0-9])([0-9]{6})(?![0-9])'
|
||
six_digit_match = re.search(six_digit_pattern, email_body)
|
||
if six_digit_match:
|
||
return six_digit_match.group(1)
|
||
|
||
# 方法3: 查找4-8位的数字验证码
|
||
digit_pattern = r'(?<![0-9])([0-9]{4,8})(?![0-9])'
|
||
digit_match = re.search(digit_pattern, email_body)
|
||
if digit_match:
|
||
return digit_match.group(1)
|
||
|
||
# 找不到验证码
|
||
return None
|
||
|
||
@app.route('/api/verification-code', methods=['POST'])
|
||
@require_api_key
|
||
def get_verification_code():
|
||
try:
|
||
data = request.get_json()
|
||
if not data:
|
||
return jsonify({"error": "Invalid JSON data"}), 400
|
||
|
||
# 检查必要参数
|
||
required_fields = ['email', 'client_id', 'refresh_token']
|
||
for field in required_fields:
|
||
if field not in data:
|
||
return jsonify({"error": f"Missing required field: {field}"}), 400
|
||
|
||
email = data['email']
|
||
client_id = data['client_id']
|
||
refresh_token = data['refresh_token']
|
||
|
||
# 可选参数
|
||
folder = data.get('folder', 'INBOX') # 默认为收件箱
|
||
sender = data.get('sender', 'no-reply@cursor.sh') # 默认为cursor的验证邮件发件人
|
||
|
||
# 获取访问令牌,传入email使启用缓存
|
||
access_token, new_refresh_token, error = get_accesstoken(refresh_token, client_id, email)
|
||
if not access_token:
|
||
return jsonify({
|
||
"error": "Failed to get access token",
|
||
"details": error or "Unknown error"
|
||
}), 401
|
||
|
||
# 只获取最新的1封匹配发件人的邮件
|
||
emails = get_outlook_emails(email, access_token, folder, 1, sender)
|
||
|
||
# 检查是否有错误
|
||
if not emails:
|
||
return jsonify({
|
||
"success": False,
|
||
"error": "No emails found from the specified sender",
|
||
"refresh_token": new_refresh_token or refresh_token
|
||
}), 404
|
||
|
||
if emails and isinstance(emails, list) and len(emails) > 0 and "error" in emails[0]:
|
||
return jsonify({"error": emails[0]["error"]}), 500
|
||
|
||
# 提取最新邮件中的验证码
|
||
verification_code = None
|
||
latest_email = emails[0] if emails else None
|
||
|
||
if latest_email and "body" in latest_email:
|
||
verification_code = extract_verification_code(latest_email["body"])
|
||
|
||
response = {
|
||
"success": True,
|
||
"verification_code": verification_code,
|
||
"email": latest_email if verification_code is None else None, # 如果找不到验证码,返回完整邮件便于调试
|
||
"refresh_token": new_refresh_token or refresh_token
|
||
}
|
||
|
||
return jsonify(response)
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
@app.route('/api/emails', methods=['POST'])
|
||
@require_api_key
|
||
def get_emails():
|
||
try:
|
||
data = request.get_json()
|
||
if not data:
|
||
return jsonify({"error": "Invalid JSON data"}), 400
|
||
|
||
# 检查必要参数
|
||
required_fields = ['email', 'client_id', 'refresh_token']
|
||
for field in required_fields:
|
||
if field not in data:
|
||
return jsonify({"error": f"Missing required field: {field}"}), 400
|
||
|
||
email = data['email']
|
||
client_id = data['client_id']
|
||
refresh_token = data['refresh_token']
|
||
|
||
# 可选参数
|
||
folder = data.get('folder', 'INBOX') # 默认为收件箱
|
||
limit = data.get('limit', 5) # 默认获取最新的5封邮件
|
||
sender = data.get('sender') # 可选的发件人筛选
|
||
latest_only = data.get('latest_only', False) # 是否只返回最新的邮件
|
||
|
||
# 如果只需要最新的一封邮件,调整limit
|
||
if latest_only:
|
||
limit = 1
|
||
|
||
# 获取访问令牌,传入email使启用缓存
|
||
access_token, new_refresh_token, error = get_accesstoken(refresh_token, client_id, email)
|
||
if not access_token:
|
||
return jsonify({
|
||
"error": "Failed to get access token",
|
||
"details": error or "Unknown error"
|
||
}), 401
|
||
|
||
# 获取邮件
|
||
emails = get_outlook_emails(email, access_token, folder, limit, sender)
|
||
|
||
# 检查是否有错误
|
||
if emails and isinstance(emails, list) and len(emails) > 0 and "error" in emails[0]:
|
||
return jsonify({"error": emails[0]["error"]}), 500
|
||
|
||
response = {
|
||
"success": True,
|
||
"emails": emails,
|
||
"refresh_token": new_refresh_token or refresh_token # 返回新的refresh_token(如果有)
|
||
}
|
||
|
||
return jsonify(response)
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
@app.route('/api/folders', methods=['POST'])
|
||
@require_api_key
|
||
def get_folders():
|
||
try:
|
||
data = request.get_json()
|
||
if not data:
|
||
return jsonify({"error": "Invalid JSON data"}), 400
|
||
|
||
# 检查必要参数
|
||
required_fields = ['email', 'client_id', 'refresh_token']
|
||
for field in required_fields:
|
||
if field not in data:
|
||
return jsonify({"error": f"Missing required field: {field}"}), 400
|
||
|
||
email = data['email']
|
||
client_id = data['client_id']
|
||
refresh_token = data['refresh_token']
|
||
|
||
# 获取访问令牌,传入email使启用缓存
|
||
access_token, new_refresh_token, error = get_accesstoken(refresh_token, client_id, email)
|
||
if not access_token:
|
||
return jsonify({
|
||
"error": "Failed to get access token",
|
||
"details": error or "Unknown error"
|
||
}), 401
|
||
|
||
try:
|
||
# 连接并获取文件夹列表
|
||
mail = imaplib.IMAP4_SSL('outlook.live.com')
|
||
mail.authenticate('XOAUTH2', lambda x: generate_auth_string(email, access_token))
|
||
|
||
status, folder_list = mail.list()
|
||
if status != 'OK':
|
||
return jsonify({"error": f"Failed to list folders: {status}"}), 500
|
||
|
||
folders = []
|
||
|
||
for folder_info in folder_list:
|
||
if isinstance(folder_info, bytes):
|
||
folder_info = folder_info.decode('utf-8', errors='replace')
|
||
# 解析文件夹名称
|
||
folder_name = folder_info.split('"')[-2] if '"' in folder_info else folder_info.split()[-1]
|
||
folders.append(folder_name)
|
||
|
||
mail.logout()
|
||
|
||
response = {
|
||
"success": True,
|
||
"folders": folders,
|
||
"refresh_token": new_refresh_token or refresh_token
|
||
}
|
||
|
||
return jsonify(response)
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
# 健康检查端点
|
||
@app.route('/health', methods=['GET'])
|
||
def health_check():
|
||
# 检查Redis连接
|
||
redis_status = "ok" if redis_client and redis_client.ping() else "error"
|
||
|
||
return jsonify({
|
||
"status": "ok",
|
||
"service": "outlook-email-api",
|
||
"redis": redis_status,
|
||
"time": datetime.now().isoformat()
|
||
}), 200
|
||
|
||
if __name__ == '__main__':
|
||
# 生产环境下应移除debug=True,并使用Gunicorn等WSGI服务器
|
||
port = int(os.environ.get('PORT', 5000))
|
||
debug = os.environ.get('DEBUG', 'False').lower() == 'true'
|
||
host = os.environ.get('HOST', '0.0.0.0')
|
||
|
||
app.run(debug=debug, host=host, port=port) |