This commit is contained in:
huangzhenpc
2025-03-26 11:39:59 +08:00
parent a0e9bae5a0
commit 4d8ea56a45
9 changed files with 982 additions and 0 deletions

21
Dockerfile Normal file
View File

@@ -0,0 +1,21 @@
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# 设置环境变量
ENV PORT=5000
ENV HOST=0.0.0.0
ENV DEBUG=False
# Redis连接URL可以在运行容器时通过环境变量传入
# ENV REDIS_URL=redis://redis:6379/0
# ENV API_KEY=your_api_key
EXPOSE 5000
# 使用Gunicorn作为生产级WSGI服务器
CMD gunicorn --workers=4 --bind ${HOST}:${PORT} mail_api:app

25
docker-compose.yml Normal file
View File

@@ -0,0 +1,25 @@
version: '3'
services:
api:
build: .
ports:
- "5000:5000"
environment:
- REDIS_URL=redis://redis:6379/0
- API_KEY=dev_api_key # 开发环境API密钥生产环境请更改
- DEBUG=False
depends_on:
- redis
restart: unless-stopped
redis:
image: redis:alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped
volumes:
redis_data:

109
example.py Normal file
View File

@@ -0,0 +1,109 @@
import requests
import json
# API接口地址假设API服务运行在本地5000端口
API_URL = "http://localhost:5000"
client_id = '9e5f94bc-e8a4-4e73-b8be-63364c29d753'
email = 'eedbbfdd186@outlook.com'
refresh_token ='M.C544_BL2.0.U.-CmrNXHVufVZdj1*CaDuSw4WSQYVfF7ILMi4XYHeVQ!YJm56uJO5HPG9I2bOJIRrS3c5FgP9sDKB*HjA3O6wVY4Cr7hzNGWjujT*xZ5k4gOjRDVXx9ocaY1bf5J2HZgAoBJYjFq76*3h2xddMEGqp7iFjYDo3B9rcfRGh!G6rJ38vkWBSGw!7hcj21IWdZD!eIZqCx1o2tDrzeH*fRnuf*DoTQEFCDnFpCoulmQHDUBEFiBT8H*TzupejEgWTmXewN9tpcQwFituIbGScsDWdRuB5pcF63p7jazZdeZ8Bpa7pQb5Fc4mYUSwQS4Qx9CNNMYnwYuhiAVEXPcoppWCA7WXF!bgOxa7IuZASnWMiC!jqUu77KnwrHWZD14SDrFfwBQ$$'
def get_emails():
"""获取最新的邮件"""
url = f"{API_URL}/api/emails"
payload = {
"email": email,
"client_id": client_id,
"refresh_token": refresh_token,
"folder": "INBOX", # 可以改为其他文件夹
"limit": 10 # 获取最新的10封邮件
}
headers = {
"Content-Type": "application/json"
}
try:
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status() # 如果请求失败,抛出异常
result = response.json()
# 保存新的refresh_token(如果有)
if "refresh_token" in result and result["refresh_token"] != refresh_token:
print(f"新的refresh_token: {result['refresh_token']}")
# 在实际应用中你应该将新的refresh_token保存到安全的地方
# 显示邮件
print(f"找到 {len(result['emails'])} 封邮件:")
for i, email_data in enumerate(result["emails"], 1):
print(f"\n--- 邮件 {i} ---")
print(f"主题: {email_data.get('subject', 'N/A')}")
print(f"发件人: {email_data.get('from', 'N/A')}")
print(f"日期: {email_data.get('date', 'N/A')}")
# 如果邮件有附件
if "attachments" in email_data:
print(f"附件: {', '.join(email_data['attachments'])}")
# 显示邮件正文截取前100个字符
if "body" in email_data:
body_preview = email_data["body"][:100] + "..." if len(email_data["body"]) > 100 else email_data["body"]
print(f"内容预览: {body_preview}")
return result
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None
def get_folders():
"""获取邮箱文件夹列表"""
url = f"{API_URL}/api/folders"
payload = {
"email": email,
"client_id": client_id,
"refresh_token": refresh_token
}
headers = {
"Content-Type": "application/json"
}
try:
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
result = response.json()
# 保存新的refresh_token(如果有)
if "refresh_token" in result and result["refresh_token"] != refresh_token:
print(f"新的refresh_token: {result['refresh_token']}")
# 显示文件夹
print("邮箱文件夹列表:")
for folder in result["folders"]:
print(f"- {folder}")
return result
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None
if __name__ == "__main__":
print("1. 获取邮箱文件夹列表")
print("2. 获取最新邮件")
choice = input("请选择操作 (1/2): ")
if choice == "1":
get_folders()
elif choice == "2":
get_emails()
else:
print("无效的选择")

495
mail_api.py Normal file
View File

@@ -0,0 +1,495 @@
from flask import Flask, request, jsonify
import base64
import email
import email.header
import requests
import imaplib
import poplib
import json
import os
import re
import redis
import hashlib
from datetime import datetime, timedelta
from functools import wraps
app = Flask(__name__)
# 配置参数,生产环境中应通过环境变量配置
API_KEY = os.environ.get('API_KEY', 'your_default_api_key')
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
TOKEN_CACHE_EXPIRES = int(os.environ.get('TOKEN_CACHE_EXPIRES', 300)) # 5分钟缓存
# 初始化Redis连接
try:
redis_client = redis.from_url(REDIS_URL)
# 测试连接
redis_client.ping()
print("Redis连接成功")
except Exception as e:
print(f"Redis连接错误: {str(e)}")
redis_client = None
# API密钥认证装饰器
def require_api_key(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# 如果环境变量中未设置API_KEY或设置为None则跳过验证
if not API_KEY or API_KEY == 'your_default_api_key':
return f(*args, **kwargs)
provided_key = request.headers.get('X-API-Key')
if provided_key and provided_key == API_KEY:
return f(*args, **kwargs)
return jsonify({"error": "Invalid or missing API key"}), 401
return decorated_function
# 生成缓存键
def generate_cache_key(email, client_id):
# 创建唯一值但不直接存储邮箱信息的键
key_string = f"{email}:{client_id}"
return f"outlook:token:{hashlib.sha256(key_string.encode()).hexdigest()}"
# 从缓存获取access_token
def get_token_from_cache(email, client_id):
if not redis_client:
return None
cache_key = generate_cache_key(email, client_id)
token_data = redis_client.get(cache_key)
if token_data:
try:
return json.loads(token_data.decode('utf-8'))
except Exception:
pass
return None
# 将access_token存入缓存
def save_token_to_cache(email, client_id, access_token, expires_in=TOKEN_CACHE_EXPIRES):
if not redis_client:
return False
try:
cache_key = generate_cache_key(email, client_id)
token_data = json.dumps({"access_token": access_token})
# 设置缓存有效期5分钟默认或传入的expires_in值
redis_client.setex(cache_key, expires_in, token_data)
return True
except Exception as e:
print(f"缓存错误: {str(e)}")
return False
# 从refresh_token获取access_token
def get_accesstoken(refresh_token, client_id, email=None):
# 先尝试从缓存获取
if email:
cached_token = get_token_from_cache(email, client_id)
if cached_token and "access_token" in cached_token:
return cached_token["access_token"], None, None
# 缓存中没有则请求新token
try:
data = {
'client_id': client_id,
'grant_type': 'refresh_token',
'refresh_token': refresh_token
}
ret = requests.post('https://login.microsoftonline.com/consumers/oauth2/v2.0/token', data=data)
ret.raise_for_status()
response_data = ret.json()
if 'access_token' not in response_data:
return None, response_data, "Access token not found in response"
# 获取到新token后存入缓存
if email and "access_token" in response_data:
# 如果有expires_in字段使用该值作为缓存时间单位为秒
expires_in = int(response_data.get('expires_in', TOKEN_CACHE_EXPIRES))
# 为了安全起见,我们将缓存时间设置比实际过期时间短一些
cache_expires = min(expires_in - 60, TOKEN_CACHE_EXPIRES) if expires_in > 60 else TOKEN_CACHE_EXPIRES
save_token_to_cache(email, client_id, response_data['access_token'], cache_expires)
return response_data.get('access_token'), response_data.get('refresh_token'), None
except requests.RequestException as e:
return None, None, f"Error getting access token: {str(e)}"
def generate_auth_string(user, token):
auth_string = f"user={user}\1auth=Bearer {token}\1\1"
return auth_string
def fetch_email_body(mail, item):
try:
status, msg_data = mail.fetch(item, "(RFC822)")
if status != 'OK':
return {"error": f"Failed to fetch email: {status}"}
email_data = {}
for response_part in msg_data:
if isinstance(response_part, tuple):
# 解码邮件
msg = email.message_from_bytes(response_part[1])
# 获取邮件主题
subject = "No Subject"
try:
subject_header = msg.get("Subject", "")
if subject_header:
decoded_header = email.header.decode_header(subject_header)
if decoded_header and decoded_header[0]:
subject, encoding = decoded_header[0]
if isinstance(subject, bytes):
subject = subject.decode(encoding if encoding else "utf-8", errors='replace')
except Exception as e:
subject = f"[解析主题错误: {str(e)}]"
email_data["subject"] = subject
# 获取发件人
try:
from_header = msg.get("From", "")
email_data["from"] = from_header
# 尝试提取发件人的邮箱地址,用于后续筛选
email_pattern = r'[\w\.-]+@[\w\.-]+'
found_emails = re.findall(email_pattern, from_header)
if found_emails:
email_data["sender_email"] = found_emails[0].lower()
except Exception as e:
email_data["from"] = f"[解析发件人错误: {str(e)}]"
# 获取日期
try:
date_str = msg.get("Date", "")
if date_str:
email_data["date"] = date_str
except Exception as e:
email_data["date_error"] = str(e)
# 获取邮件内容
email_data["body"] = ""
try:
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition", ""))
if content_type == "text/plain" and "attachment" not in content_disposition:
try:
body = part.get_payload(decode=True)
if body:
email_data["body"] = body.decode(errors='replace')
break
except Exception as e:
email_data["body_error"] = str(e)
else:
try:
body = msg.get_payload(decode=True)
if body:
email_data["body"] = body.decode(errors='replace')
except Exception as e:
email_data["body_error"] = str(e)
except Exception as e:
email_data["body_error"] = str(e)
# 处理附件信息
attachments = []
try:
if msg.is_multipart():
for part in msg.walk():
if part.get_content_maintype() == 'multipart':
continue
if part.get('Content-Disposition') is None:
continue
filename = part.get_filename()
if filename:
attachments.append(filename)
if attachments:
email_data["attachments"] = attachments
except Exception as e:
email_data["attachments_error"] = str(e)
return email_data
except Exception as e:
return {"error": str(e)}
def get_outlook_emails(emailadr, access_token, folder="INBOX", limit=5, sender=None):
try:
mail = imaplib.IMAP4_SSL('outlook.live.com')
mail.authenticate('XOAUTH2', lambda x: generate_auth_string(emailadr, access_token))
mail.select(folder)
search_criteria = 'ALL'
# 如果指定了发件人可以尝试使用IMAP搜索但这不总是可靠的
# 实际实现中我们会在获取邮件后再筛选
status, messages = mail.search(None, search_criteria)
if status != 'OK':
return [{"error": f"Failed to search emails: {status}"}]
email_ids = messages[0].split()
if not email_ids:
return []
# 获取最新的N封邮件
latest_emails = []
# 从最新到最旧排序处理所有邮件
for item in reversed(email_ids):
# 如果已经找到了指定发件人的邮件或者达到了限制数量,则停止
if sender and len(latest_emails) >= 1:
break
if not sender and len(latest_emails) >= limit:
break
email_data = fetch_email_body(mail, item)
if not email_data:
continue
if "error" in email_data:
continue
email_data["id"] = item.decode()
# 如果指定了发件人,检查是否匹配
if sender:
from_field = email_data.get("from", "").lower()
sender_email = email_data.get("sender_email", "").lower()
# 检查发件人是否匹配(检查显示名称或邮箱地址)
if sender.lower() in from_field or sender.lower() == sender_email:
latest_emails.append(email_data)
else:
latest_emails.append(email_data)
mail.logout()
return latest_emails
except Exception as e:
return [{"error": str(e)}]
# 从邮件内容中提取验证码的函数
def extract_verification_code(email_body):
if not email_body:
return None
# 方法1: 查找特定格式的验证码 (数字序列通常在单独一行)
# 例如查找 digit_line\r\n\r\nThis code expires
code_pattern = r'(\d{6})\s*(?:\r\n|\n)+\s*(?:This code expires|验证码有效期为)'
code_match = re.search(code_pattern, email_body)
if code_match:
return code_match.group(1)
# 方法2: 查找6位数字验证码 (最常见格式)
six_digit_pattern = r'(?<![0-9])([0-9]{6})(?![0-9])'
six_digit_match = re.search(six_digit_pattern, email_body)
if six_digit_match:
return six_digit_match.group(1)
# 方法3: 查找4-8位的数字验证码
digit_pattern = r'(?<![0-9])([0-9]{4,8})(?![0-9])'
digit_match = re.search(digit_pattern, email_body)
if digit_match:
return digit_match.group(1)
# 找不到验证码
return None
@app.route('/api/verification-code', methods=['POST'])
@require_api_key
def get_verification_code():
try:
data = request.get_json()
if not data:
return jsonify({"error": "Invalid JSON data"}), 400
# 检查必要参数
required_fields = ['email', 'client_id', 'refresh_token']
for field in required_fields:
if field not in data:
return jsonify({"error": f"Missing required field: {field}"}), 400
email = data['email']
client_id = data['client_id']
refresh_token = data['refresh_token']
# 可选参数
folder = data.get('folder', 'INBOX') # 默认为收件箱
sender = data.get('sender', 'no-reply@cursor.sh') # 默认为cursor的验证邮件发件人
# 获取访问令牌传入email使启用缓存
access_token, new_refresh_token, error = get_accesstoken(refresh_token, client_id, email)
if not access_token:
return jsonify({
"error": "Failed to get access token",
"details": error or "Unknown error"
}), 401
# 只获取最新的1封匹配发件人的邮件
emails = get_outlook_emails(email, access_token, folder, 1, sender)
# 检查是否有错误
if not emails:
return jsonify({
"success": False,
"error": "No emails found from the specified sender",
"refresh_token": new_refresh_token or refresh_token
}), 404
if emails and isinstance(emails, list) and len(emails) > 0 and "error" in emails[0]:
return jsonify({"error": emails[0]["error"]}), 500
# 提取最新邮件中的验证码
verification_code = None
latest_email = emails[0] if emails else None
if latest_email and "body" in latest_email:
verification_code = extract_verification_code(latest_email["body"])
response = {
"success": True,
"verification_code": verification_code,
"email": latest_email if verification_code is None else None, # 如果找不到验证码,返回完整邮件便于调试
"refresh_token": new_refresh_token or refresh_token
}
return jsonify(response)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/emails', methods=['POST'])
@require_api_key
def get_emails():
try:
data = request.get_json()
if not data:
return jsonify({"error": "Invalid JSON data"}), 400
# 检查必要参数
required_fields = ['email', 'client_id', 'refresh_token']
for field in required_fields:
if field not in data:
return jsonify({"error": f"Missing required field: {field}"}), 400
email = data['email']
client_id = data['client_id']
refresh_token = data['refresh_token']
# 可选参数
folder = data.get('folder', 'INBOX') # 默认为收件箱
limit = data.get('limit', 5) # 默认获取最新的5封邮件
sender = data.get('sender') # 可选的发件人筛选
latest_only = data.get('latest_only', False) # 是否只返回最新的邮件
# 如果只需要最新的一封邮件调整limit
if latest_only:
limit = 1
# 获取访问令牌传入email使启用缓存
access_token, new_refresh_token, error = get_accesstoken(refresh_token, client_id, email)
if not access_token:
return jsonify({
"error": "Failed to get access token",
"details": error or "Unknown error"
}), 401
# 获取邮件
emails = get_outlook_emails(email, access_token, folder, limit, sender)
# 检查是否有错误
if emails and isinstance(emails, list) and len(emails) > 0 and "error" in emails[0]:
return jsonify({"error": emails[0]["error"]}), 500
response = {
"success": True,
"emails": emails,
"refresh_token": new_refresh_token or refresh_token # 返回新的refresh_token如果有
}
return jsonify(response)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/folders', methods=['POST'])
@require_api_key
def get_folders():
try:
data = request.get_json()
if not data:
return jsonify({"error": "Invalid JSON data"}), 400
# 检查必要参数
required_fields = ['email', 'client_id', 'refresh_token']
for field in required_fields:
if field not in data:
return jsonify({"error": f"Missing required field: {field}"}), 400
email = data['email']
client_id = data['client_id']
refresh_token = data['refresh_token']
# 获取访问令牌传入email使启用缓存
access_token, new_refresh_token, error = get_accesstoken(refresh_token, client_id, email)
if not access_token:
return jsonify({
"error": "Failed to get access token",
"details": error or "Unknown error"
}), 401
try:
# 连接并获取文件夹列表
mail = imaplib.IMAP4_SSL('outlook.live.com')
mail.authenticate('XOAUTH2', lambda x: generate_auth_string(email, access_token))
status, folder_list = mail.list()
if status != 'OK':
return jsonify({"error": f"Failed to list folders: {status}"}), 500
folders = []
for folder_info in folder_list:
if isinstance(folder_info, bytes):
folder_info = folder_info.decode('utf-8', errors='replace')
# 解析文件夹名称
folder_name = folder_info.split('"')[-2] if '"' in folder_info else folder_info.split()[-1]
folders.append(folder_name)
mail.logout()
response = {
"success": True,
"folders": folders,
"refresh_token": new_refresh_token or refresh_token
}
return jsonify(response)
except Exception as e:
return jsonify({"error": str(e)}), 500
except Exception as e:
return jsonify({"error": str(e)}), 500
# 健康检查端点
@app.route('/health', methods=['GET'])
def health_check():
# 检查Redis连接
redis_status = "ok" if redis_client and redis_client.ping() else "error"
return jsonify({
"status": "ok",
"service": "outlook-email-api",
"redis": redis_status,
"time": datetime.now().isoformat()
}), 200
if __name__ == '__main__':
# 生产环境下应移除debug=True并使用Gunicorn等WSGI服务器
port = int(os.environ.get('PORT', 5000))
debug = os.environ.get('DEBUG', 'False').lower() == 'true'
host = os.environ.get('HOST', '0.0.0.0')
app.run(debug=debug, host=host, port=port)

121
outook copy.py Normal file
View File

@@ -0,0 +1,121 @@
import base64
import email
import email.header
import requests
import imaplib
import poplib
# 1. 使用refresh_token获取access_token
def get_accesstoken(refresh_token,client_id):
data = {
'client_id': client_id,
'grant_type': 'refresh_token',
'refresh_token': refresh_token
}
ret = requests.post('https://login.microsoftonline.com/consumers/oauth2/v2.0/token', data=data)
print(ret.text)
print(ret.json()['refresh_token'])
return ret.json()['access_token']
def generate_auth_string(user, token):
auth_string = f"user={user}\1auth=Bearer {token}\1\1"
return auth_string
def tuple_to_str(tuple_):
"""
元组转为字符串输出
:param tuple_: 转换前的元组QQ邮箱格式为(b'\xcd\xf5\xd4\xc6', 'gbk')或者(b' <XXXX@163.com>', None)163邮箱格式为('<XXXX@163.com>', None)
:return: 转换后的字符串
"""
if tuple_[1]:
out_str = tuple_[0].decode(tuple_[1])
else:
if isinstance(tuple_[0], bytes):
out_str = tuple_[0].decode('gbk')
else:
out_str = tuple_[0]
return out_str
def fetch_email_body(mail, item):
status, msg_data = mail.fetch(item, "(RFC822)")
for response_part in msg_data:
if isinstance(response_part, tuple):
# 解码邮件
msg = email.message_from_bytes(response_part[1])
# 获取邮件主题
subject, encoding = email.header.decode_header(msg["Subject"])[0]
if isinstance(subject, bytes):
# 如果是字节,则解码
subject = subject.decode(encoding if encoding else "utf-8")
print("Subject:", subject)
# 获取发件人
from_ = msg.get("From")
print("From:", from_)
try:
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if content_type == "text/plain" and "attachment" not in content_disposition:
body = part.get_payload(decode=True).decode()
return body
return ''
else:
body = msg.get_payload(decode=True).decode()
return body
except Exception as e:
return ''
def getmail(sel, mail):
mail.select(sel)
status, messages = mail.search(None, 'ALL') # UNSEEN 为未读邮件
all_emails = messages[0].split()
print(all_emails)
for item in all_emails:
print(fetch_email_body(mail, item))
# 2. 使用访问令牌连接 IMAP
def connect_imap(emailadr, access_token):
mail = imaplib.IMAP4_SSL('outlook.live.com')
mail.authenticate('XOAUTH2', lambda x: generate_auth_string(emailadr, access_token))
print('读取收件箱')
getmail('INBOX', mail) # 读取收件箱
print('读取垃圾箱')
getmail('Junk', mail) # 读取垃圾箱
# 关闭连接
mail.logout()
def connect_pop3(emailadr, access_token):
server = poplib.POP3_SSL('outlook.live.com')
token=generate_auth_string(emailadr, access_token)
encoded_auth_string = base64.b64encode(token.encode("utf-8")).decode("utf-8")
server._shortcmd(f'AUTH XOAUTH2')
server._shortcmd(f'{encoded_auth_string}')
print(server.list()[1])
server.quit()
# 主流程
if __name__ == '__main__':
client_id = '9e5f94bc-e8a4-4e73-b8be-63364c29d753'
emailadr = 'eedbbfdd186@outlook.com'
refresh_token ='M.C544_BL2.0.U.-CmrNXHVufVZdj1*CaDuSw4WSQYVfF7ILMi4XYHeVQ!YJm56uJO5HPG9I2bOJIRrS3c5FgP9sDKB*HjA3O6wVY4Cr7hzNGWjujT*xZ5k4gOjRDVXx9ocaY1bf5J2HZgAoBJYjFq76*3h2xddMEGqp7iFjYDo3B9rcfRGh!G6rJ38vkWBSGw!7hcj21IWdZD!eIZqCx1o2tDrzeH*fRnuf*DoTQEFCDnFpCoulmQHDUBEFiBT8H*TzupejEgWTmXewN9tpcQwFituIbGScsDWdRuB5pcF63p7jazZdeZ8Bpa7pQb5Fc4mYUSwQS4Qx9CNNMYnwYuhiAVEXPcoppWCA7WXF!bgOxa7IuZASnWMiC!jqUu77KnwrHWZD14SDrFfwBQ$$'
print(f'clientID:{client_id}')
print(f'邮箱地址:{emailadr}')
print(f'refresh_token:{refresh_token}')
access_token = get_accesstoken(refresh_token,client_id)
print("获取的访问令牌:", access_token)
# 连接 IMAP
print('IMAP测试')
connect_imap(emailadr, access_token)
print('pop3测试')
connect_pop3(emailadr,access_token)

121
outook.py Normal file
View File

@@ -0,0 +1,121 @@
import base64
import email
import email.header
import requests
import imaplib
import poplib
# 1. 使用refresh_token获取access_token
def get_accesstoken(refresh_token,client_id):
data = {
'client_id': client_id,
'grant_type': 'refresh_token',
'refresh_token': refresh_token
}
ret = requests.post('https://login.microsoftonline.com/consumers/oauth2/v2.0/token', data=data)
print(ret.text)
print(ret.json()['refresh_token'])
return ret.json()['access_token']
def generate_auth_string(user, token):
auth_string = f"user={user}\1auth=Bearer {token}\1\1"
return auth_string
def tuple_to_str(tuple_):
"""
元组转为字符串输出
:param tuple_: 转换前的元组QQ邮箱格式为(b'\xcd\xf5\xd4\xc6', 'gbk')或者(b' <XXXX@163.com>', None)163邮箱格式为('<XXXX@163.com>', None)
:return: 转换后的字符串
"""
if tuple_[1]:
out_str = tuple_[0].decode(tuple_[1])
else:
if isinstance(tuple_[0], bytes):
out_str = tuple_[0].decode('gbk')
else:
out_str = tuple_[0]
return out_str
def fetch_email_body(mail, item):
status, msg_data = mail.fetch(item, "(RFC822)")
for response_part in msg_data:
if isinstance(response_part, tuple):
# 解码邮件
msg = email.message_from_bytes(response_part[1])
# 获取邮件主题
subject, encoding = email.header.decode_header(msg["Subject"])[0]
if isinstance(subject, bytes):
# 如果是字节,则解码
subject = subject.decode(encoding if encoding else "utf-8")
print("Subject:", subject)
# 获取发件人
from_ = msg.get("From")
print("From:", from_)
try:
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if content_type == "text/plain" and "attachment" not in content_disposition:
body = part.get_payload(decode=True).decode()
return body
return ''
else:
body = msg.get_payload(decode=True).decode()
return body
except Exception as e:
return ''
def getmail(sel, mail):
mail.select(sel)
status, messages = mail.search(None, 'ALL') # UNSEEN 为未读邮件
all_emails = messages[0].split()
print(all_emails)
for item in all_emails:
print(fetch_email_body(mail, item))
# 2. 使用访问令牌连接 IMAP
def connect_imap(emailadr, access_token):
mail = imaplib.IMAP4_SSL('outlook.live.com')
mail.authenticate('XOAUTH2', lambda x: generate_auth_string(emailadr, access_token))
print('读取收件箱')
getmail('INBOX', mail) # 读取收件箱
print('读取垃圾箱')
getmail('Junk', mail) # 读取垃圾箱
# 关闭连接
mail.logout()
def connect_pop3(emailadr, access_token):
server = poplib.POP3_SSL('outlook.live.com')
token=generate_auth_string(emailadr, access_token)
encoded_auth_string = base64.b64encode(token.encode("utf-8")).decode("utf-8")
server._shortcmd(f'AUTH XOAUTH2')
server._shortcmd(f'{encoded_auth_string}')
print(server.list()[1])
server.quit()
# 主流程
if __name__ == '__main__':
client_id = '9e5f94bc-e8a4-4e73-b8be-63364c29d753'
emailadr = 'eedbbfdd186@outlook.com'
refresh_token ='M.C544_BL2.0.U.-CmrNXHVufVZdj1*CaDuSw4WSQYVfF7ILMi4XYHeVQ!YJm56uJO5HPG9I2bOJIRrS3c5FgP9sDKB*HjA3O6wVY4Cr7hzNGWjujT*xZ5k4gOjRDVXx9ocaY1bf5J2HZgAoBJYjFq76*3h2xddMEGqp7iFjYDo3B9rcfRGh!G6rJ38vkWBSGw!7hcj21IWdZD!eIZqCx1o2tDrzeH*fRnuf*DoTQEFCDnFpCoulmQHDUBEFiBT8H*TzupejEgWTmXewN9tpcQwFituIbGScsDWdRuB5pcF63p7jazZdeZ8Bpa7pQb5Fc4mYUSwQS4Qx9CNNMYnwYuhiAVEXPcoppWCA7WXF!bgOxa7IuZASnWMiC!jqUu77KnwrHWZD14SDrFfwBQ$$'
print(f'clientID:{client_id}')
print(f'邮箱地址:{emailadr}')
print(f'refresh_token:{refresh_token}')
access_token = get_accesstoken(refresh_token,client_id)
print("获取的访问令牌:", access_token)
# 连接 IMAP
print('IMAP测试')
connect_imap(emailadr, access_token)
print('pop3测试')
connect_pop3(emailadr,access_token)

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
flask==2.3.3
requests==2.31.0
redis==4.5.5
gunicorn==21.2.0

39
test.py Normal file
View File

@@ -0,0 +1,39 @@
import requests
import json
# API接口地址
API_URL = "http://localhost:5000/api/emails"
# 凭据信息
credentials = {
"email": "eedbbfdd186@outlook.com",
"client_id": "9e5f94bc-e8a4-4e73-b8be-63364c29d753",
"refresh_token": "M.C544_BL2.0.U.-CmrNXHVufVZdj1*CaDuSw4WSQYVfF7ILMi4XYHeVQ!YJm56uJO5HPG9I2bOJIRrS3c5FgP9sDKB*HjA3O6wVY4Cr7hzNGWjujT*xZ5k4gOjRDVXx9ocaY1bf5J2HZgAoBJYjFq76*3h2xddMEGqp7iFjYDo3B9rcfRGh!G6rJ38vkWBSGw!7hcj21IWdZD!eIZqCx1o2tDrzeH*fRnuf*DoTQEFCDnFpCoulmQHDUBEFiBT8H*TzupejEgWTmXewN9tpcQwFituIbGScsDWdRuB5pcF63p7jazZdeZ8Bpa7pQb5Fc4mYUSwQS4Qx9CNNMYnwYuhiAVEXPcoppWCA7WXF!bgOxa7IuZASnWMiC!jqUu77KnwrHWZD14SDrFfwBQ$$",
"limit": 5 # 获取最新的5封邮件
}
headers = {
"Content-Type": "application/json"
}
try:
# 发送POST请求
response = requests.post(
API_URL,
headers=headers,
json=credentials
)
# 输出响应内容
print(f"状态码: {response.status_code}")
print("响应内容:")
print(json.dumps(response.json(), indent=2, ensure_ascii=False))
# 如果成功提取新的refresh_token
if response.status_code == 200:
result = response.json()
if "refresh_token" in result and result["refresh_token"] != credentials["refresh_token"]:
print(f"\n新的refresh_token: {result['refresh_token']}")
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")

47
test_verification_code.py Normal file
View File

@@ -0,0 +1,47 @@
import requests
import json
# API接口地址
API_URL = "http://localhost:5000/api/verification-code"
# 凭据信息
credentials = {
"email": "eedbbfdd186@outlook.com",
"client_id": "9e5f94bc-e8a4-4e73-b8be-63364c29d753",
"refresh_token": "M.C544_BL2.0.U.-CmrNXHVufVZdj1*CaDuSw4WSQYVfF7ILMi4XYHeVQ!YJm56uJO5HPG9I2bOJIRrS3c5FgP9sDKB*HjA3O6wVY4Cr7hzNGWjujT*xZ5k4gOjRDVXx9ocaY1bf5J2HZgAoBJYjFq76*3h2xddMEGqp7iFjYDo3B9rcfRGh!G6rJ38vkWBSGw!7hcj21IWdZD!eIZqCx1o2tDrzeH*fRnuf*DoTQEFCDnFpCoulmQHDUBEFiBT8H*TzupejEgWTmXewN9tpcQwFituIbGScsDWdRuB5pcF63p7jazZdeZ8Bpa7pQb5Fc4mYUSwQS4Qx9CNNMYnwYuhiAVEXPcoppWCA7WXF!bgOxa7IuZASnWMiC!jqUu77KnwrHWZD14SDrFfwBQ$$",
"sender": "no-reply@cursor.sh" # 指定发件人为cursor的验证邮件
}
headers = {
"Content-Type": "application/json"
}
try:
# 发送POST请求
response = requests.post(
API_URL,
headers=headers,
json=credentials
)
# 输出响应内容
print(f"状态码: {response.status_code}")
print("响应内容:")
print(json.dumps(response.json(), indent=2, ensure_ascii=False))
# 如果成功提取验证码和新的refresh_token
if response.status_code == 200:
result = response.json()
# 打印验证码
if result.get("verification_code"):
print(f"\n提取的验证码: {result['verification_code']}")
else:
print("\n未能提取到验证码,请检查邮件内容。")
# 打印refresh_token
if "refresh_token" in result and result["refresh_token"] != credentials["refresh_token"]:
print(f"\n新的refresh_token: {result['refresh_token']}")
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")