From 4d8ea56a45b4838dfbf8b7eb614d10821f6e7d49 Mon Sep 17 00:00:00 2001 From: huangzhenpc Date: Wed, 26 Mar 2025 11:39:59 +0800 Subject: [PATCH] xx --- Dockerfile | 21 ++ docker-compose.yml | 25 ++ example.py | 109 +++++++++ mail_api.py | 495 ++++++++++++++++++++++++++++++++++++++ outook copy.py | 121 ++++++++++ outook.py | 121 ++++++++++ requirements.txt | 4 + test.py | 39 +++ test_verification_code.py | 47 ++++ 9 files changed, 982 insertions(+) create mode 100644 Dockerfile create mode 100644 docker-compose.yml create mode 100644 example.py create mode 100644 mail_api.py create mode 100644 outook copy.py create mode 100644 outook.py create mode 100644 requirements.txt create mode 100644 test.py create mode 100644 test_verification_code.py diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..fb33cf5 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,21 @@ +FROM python:3.9-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +# 设置环境变量 +ENV PORT=5000 +ENV HOST=0.0.0.0 +ENV DEBUG=False +# Redis连接URL可以在运行容器时通过环境变量传入 +# ENV REDIS_URL=redis://redis:6379/0 +# ENV API_KEY=your_api_key + +EXPOSE 5000 + +# 使用Gunicorn作为生产级WSGI服务器 +CMD gunicorn --workers=4 --bind ${HOST}:${PORT} mail_api:app \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..644b54a --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,25 @@ +version: '3' + +services: + api: + build: . + ports: + - "5000:5000" + environment: + - REDIS_URL=redis://redis:6379/0 + - API_KEY=dev_api_key # 开发环境API密钥,生产环境请更改 + - DEBUG=False + depends_on: + - redis + restart: unless-stopped + + redis: + image: redis:alpine + ports: + - "6379:6379" + volumes: + - redis_data:/data + restart: unless-stopped + +volumes: + redis_data: \ No newline at end of file diff --git a/example.py b/example.py new file mode 100644 index 0000000..8b5ce47 --- /dev/null +++ b/example.py @@ -0,0 +1,109 @@ +import requests +import json + +# API接口地址(假设API服务运行在本地5000端口) +API_URL = "http://localhost:5000" + +client_id = '9e5f94bc-e8a4-4e73-b8be-63364c29d753' +email = 'eedbbfdd186@outlook.com' +refresh_token ='M.C544_BL2.0.U.-CmrNXHVufVZdj1*CaDuSw4WSQYVfF7ILMi4XYHeVQ!YJm56uJO5HPG9I2bOJIRrS3c5FgP9sDKB*HjA3O6wVY4Cr7hzNGWjujT*xZ5k4gOjRDVXx9ocaY1bf5J2HZgAoBJYjFq76*3h2xddMEGqp7iFjYDo3B9rcfRGh!G6rJ38vkWBSGw!7hcj21IWdZD!eIZqCx1o2tDrzeH*fRnuf*DoTQEFCDnFpCoulmQHDUBEFiBT8H*TzupejEgWTmXewN9tpcQwFituIbGScsDWdRuB5pcF63p7jazZdeZ8Bpa7pQb5Fc4mYUSwQS4Qx9CNNMYnwYuhiAVEXPcoppWCA7WXF!bgOxa7IuZASnWMiC!jqUu77KnwrHWZD14SDrFfwBQ$$' + + + +def get_emails(): + """获取最新的邮件""" + url = f"{API_URL}/api/emails" + + payload = { + "email": email, + "client_id": client_id, + "refresh_token": refresh_token, + "folder": "INBOX", # 可以改为其他文件夹 + "limit": 10 # 获取最新的10封邮件 + } + + headers = { + "Content-Type": "application/json" + } + + try: + response = requests.post(url, json=payload, headers=headers) + response.raise_for_status() # 如果请求失败,抛出异常 + + result = response.json() + + # 保存新的refresh_token(如果有) + if "refresh_token" in result and result["refresh_token"] != refresh_token: + print(f"新的refresh_token: {result['refresh_token']}") + # 在实际应用中,你应该将新的refresh_token保存到安全的地方 + + # 显示邮件 + print(f"找到 {len(result['emails'])} 封邮件:") + for i, email_data in enumerate(result["emails"], 1): + print(f"\n--- 邮件 {i} ---") + print(f"主题: {email_data.get('subject', 'N/A')}") + print(f"发件人: {email_data.get('from', 'N/A')}") + print(f"日期: {email_data.get('date', 'N/A')}") + + # 如果邮件有附件 + if "attachments" in email_data: + print(f"附件: {', '.join(email_data['attachments'])}") + + # 显示邮件正文(截取前100个字符) + if "body" in email_data: + body_preview = email_data["body"][:100] + "..." if len(email_data["body"]) > 100 else email_data["body"] + print(f"内容预览: {body_preview}") + + return result + + except requests.exceptions.RequestException as e: + print(f"请求失败: {e}") + return None + +def get_folders(): + """获取邮箱文件夹列表""" + url = f"{API_URL}/api/folders" + + payload = { + "email": email, + "client_id": client_id, + "refresh_token": refresh_token + } + + headers = { + "Content-Type": "application/json" + } + + try: + response = requests.post(url, json=payload, headers=headers) + response.raise_for_status() + + result = response.json() + + # 保存新的refresh_token(如果有) + if "refresh_token" in result and result["refresh_token"] != refresh_token: + print(f"新的refresh_token: {result['refresh_token']}") + + # 显示文件夹 + print("邮箱文件夹列表:") + for folder in result["folders"]: + print(f"- {folder}") + + return result + + except requests.exceptions.RequestException as e: + print(f"请求失败: {e}") + return None + +if __name__ == "__main__": + print("1. 获取邮箱文件夹列表") + print("2. 获取最新邮件") + + choice = input("请选择操作 (1/2): ") + + if choice == "1": + get_folders() + elif choice == "2": + get_emails() + else: + print("无效的选择") \ No newline at end of file diff --git a/mail_api.py b/mail_api.py new file mode 100644 index 0000000..558b7bb --- /dev/null +++ b/mail_api.py @@ -0,0 +1,495 @@ +from flask import Flask, request, jsonify +import base64 +import email +import email.header +import requests +import imaplib +import poplib +import json +import os +import re +import redis +import hashlib +from datetime import datetime, timedelta +from functools import wraps + +app = Flask(__name__) + +# 配置参数,生产环境中应通过环境变量配置 +API_KEY = os.environ.get('API_KEY', 'your_default_api_key') +REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0') +TOKEN_CACHE_EXPIRES = int(os.environ.get('TOKEN_CACHE_EXPIRES', 300)) # 5分钟缓存 + +# 初始化Redis连接 +try: + redis_client = redis.from_url(REDIS_URL) + # 测试连接 + redis_client.ping() + print("Redis连接成功") +except Exception as e: + print(f"Redis连接错误: {str(e)}") + redis_client = None + +# API密钥认证装饰器 +def require_api_key(f): + @wraps(f) + def decorated_function(*args, **kwargs): + # 如果环境变量中未设置API_KEY或设置为None,则跳过验证 + if not API_KEY or API_KEY == 'your_default_api_key': + return f(*args, **kwargs) + + provided_key = request.headers.get('X-API-Key') + if provided_key and provided_key == API_KEY: + return f(*args, **kwargs) + return jsonify({"error": "Invalid or missing API key"}), 401 + return decorated_function + +# 生成缓存键 +def generate_cache_key(email, client_id): + # 创建唯一值但不直接存储邮箱信息的键 + key_string = f"{email}:{client_id}" + return f"outlook:token:{hashlib.sha256(key_string.encode()).hexdigest()}" + +# 从缓存获取access_token +def get_token_from_cache(email, client_id): + if not redis_client: + return None + + cache_key = generate_cache_key(email, client_id) + token_data = redis_client.get(cache_key) + + if token_data: + try: + return json.loads(token_data.decode('utf-8')) + except Exception: + pass + + return None + +# 将access_token存入缓存 +def save_token_to_cache(email, client_id, access_token, expires_in=TOKEN_CACHE_EXPIRES): + if not redis_client: + return False + + try: + cache_key = generate_cache_key(email, client_id) + token_data = json.dumps({"access_token": access_token}) + + # 设置缓存,有效期5分钟(默认)或传入的expires_in值 + redis_client.setex(cache_key, expires_in, token_data) + return True + except Exception as e: + print(f"缓存错误: {str(e)}") + return False + +# 从refresh_token获取access_token +def get_accesstoken(refresh_token, client_id, email=None): + # 先尝试从缓存获取 + if email: + cached_token = get_token_from_cache(email, client_id) + if cached_token and "access_token" in cached_token: + return cached_token["access_token"], None, None + + # 缓存中没有,则请求新token + try: + data = { + 'client_id': client_id, + 'grant_type': 'refresh_token', + 'refresh_token': refresh_token + } + ret = requests.post('https://login.microsoftonline.com/consumers/oauth2/v2.0/token', data=data) + ret.raise_for_status() + + response_data = ret.json() + if 'access_token' not in response_data: + return None, response_data, "Access token not found in response" + + # 获取到新token后,存入缓存 + if email and "access_token" in response_data: + # 如果有expires_in字段,使用该值作为缓存时间(单位为秒) + expires_in = int(response_data.get('expires_in', TOKEN_CACHE_EXPIRES)) + # 为了安全起见,我们将缓存时间设置比实际过期时间短一些 + cache_expires = min(expires_in - 60, TOKEN_CACHE_EXPIRES) if expires_in > 60 else TOKEN_CACHE_EXPIRES + save_token_to_cache(email, client_id, response_data['access_token'], cache_expires) + + return response_data.get('access_token'), response_data.get('refresh_token'), None + except requests.RequestException as e: + return None, None, f"Error getting access token: {str(e)}" + +def generate_auth_string(user, token): + auth_string = f"user={user}\1auth=Bearer {token}\1\1" + return auth_string + +def fetch_email_body(mail, item): + try: + status, msg_data = mail.fetch(item, "(RFC822)") + if status != 'OK': + return {"error": f"Failed to fetch email: {status}"} + + email_data = {} + + for response_part in msg_data: + if isinstance(response_part, tuple): + # 解码邮件 + msg = email.message_from_bytes(response_part[1]) + + # 获取邮件主题 + subject = "No Subject" + try: + subject_header = msg.get("Subject", "") + if subject_header: + decoded_header = email.header.decode_header(subject_header) + if decoded_header and decoded_header[0]: + subject, encoding = decoded_header[0] + if isinstance(subject, bytes): + subject = subject.decode(encoding if encoding else "utf-8", errors='replace') + except Exception as e: + subject = f"[解析主题错误: {str(e)}]" + + email_data["subject"] = subject + + # 获取发件人 + try: + from_header = msg.get("From", "") + email_data["from"] = from_header + + # 尝试提取发件人的邮箱地址,用于后续筛选 + email_pattern = r'[\w\.-]+@[\w\.-]+' + found_emails = re.findall(email_pattern, from_header) + if found_emails: + email_data["sender_email"] = found_emails[0].lower() + except Exception as e: + email_data["from"] = f"[解析发件人错误: {str(e)}]" + + # 获取日期 + try: + date_str = msg.get("Date", "") + if date_str: + email_data["date"] = date_str + except Exception as e: + email_data["date_error"] = str(e) + + # 获取邮件内容 + email_data["body"] = "" + try: + if msg.is_multipart(): + for part in msg.walk(): + content_type = part.get_content_type() + content_disposition = str(part.get("Content-Disposition", "")) + + if content_type == "text/plain" and "attachment" not in content_disposition: + try: + body = part.get_payload(decode=True) + if body: + email_data["body"] = body.decode(errors='replace') + break + except Exception as e: + email_data["body_error"] = str(e) + else: + try: + body = msg.get_payload(decode=True) + if body: + email_data["body"] = body.decode(errors='replace') + except Exception as e: + email_data["body_error"] = str(e) + except Exception as e: + email_data["body_error"] = str(e) + + # 处理附件信息 + attachments = [] + try: + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_maintype() == 'multipart': + continue + if part.get('Content-Disposition') is None: + continue + + filename = part.get_filename() + if filename: + attachments.append(filename) + + if attachments: + email_data["attachments"] = attachments + except Exception as e: + email_data["attachments_error"] = str(e) + + return email_data + except Exception as e: + return {"error": str(e)} + +def get_outlook_emails(emailadr, access_token, folder="INBOX", limit=5, sender=None): + try: + mail = imaplib.IMAP4_SSL('outlook.live.com') + mail.authenticate('XOAUTH2', lambda x: generate_auth_string(emailadr, access_token)) + + mail.select(folder) + search_criteria = 'ALL' + + # 如果指定了发件人,可以尝试使用IMAP搜索,但这不总是可靠的 + # 实际实现中我们会在获取邮件后再筛选 + + status, messages = mail.search(None, search_criteria) + if status != 'OK': + return [{"error": f"Failed to search emails: {status}"}] + + email_ids = messages[0].split() + if not email_ids: + return [] + + # 获取最新的N封邮件 + latest_emails = [] + + # 从最新到最旧排序处理所有邮件 + for item in reversed(email_ids): + # 如果已经找到了指定发件人的邮件或者达到了限制数量,则停止 + if sender and len(latest_emails) >= 1: + break + if not sender and len(latest_emails) >= limit: + break + + email_data = fetch_email_body(mail, item) + if not email_data: + continue + + if "error" in email_data: + continue + + email_data["id"] = item.decode() + + # 如果指定了发件人,检查是否匹配 + if sender: + from_field = email_data.get("from", "").lower() + sender_email = email_data.get("sender_email", "").lower() + + # 检查发件人是否匹配(检查显示名称或邮箱地址) + if sender.lower() in from_field or sender.lower() == sender_email: + latest_emails.append(email_data) + else: + latest_emails.append(email_data) + + mail.logout() + return latest_emails + except Exception as e: + return [{"error": str(e)}] + +# 从邮件内容中提取验证码的函数 +def extract_verification_code(email_body): + if not email_body: + return None + + # 方法1: 查找特定格式的验证码 (数字序列通常在单独一行) + # 例如查找 digit_line\r\n\r\nThis code expires + code_pattern = r'(\d{6})\s*(?:\r\n|\n)+\s*(?:This code expires|验证码有效期为)' + code_match = re.search(code_pattern, email_body) + if code_match: + return code_match.group(1) + + # 方法2: 查找6位数字验证码 (最常见格式) + six_digit_pattern = r'(? 0 and "error" in emails[0]: + return jsonify({"error": emails[0]["error"]}), 500 + + # 提取最新邮件中的验证码 + verification_code = None + latest_email = emails[0] if emails else None + + if latest_email and "body" in latest_email: + verification_code = extract_verification_code(latest_email["body"]) + + response = { + "success": True, + "verification_code": verification_code, + "email": latest_email if verification_code is None else None, # 如果找不到验证码,返回完整邮件便于调试 + "refresh_token": new_refresh_token or refresh_token + } + + return jsonify(response) + except Exception as e: + return jsonify({"error": str(e)}), 500 + +@app.route('/api/emails', methods=['POST']) +@require_api_key +def get_emails(): + try: + data = request.get_json() + if not data: + return jsonify({"error": "Invalid JSON data"}), 400 + + # 检查必要参数 + required_fields = ['email', 'client_id', 'refresh_token'] + for field in required_fields: + if field not in data: + return jsonify({"error": f"Missing required field: {field}"}), 400 + + email = data['email'] + client_id = data['client_id'] + refresh_token = data['refresh_token'] + + # 可选参数 + folder = data.get('folder', 'INBOX') # 默认为收件箱 + limit = data.get('limit', 5) # 默认获取最新的5封邮件 + sender = data.get('sender') # 可选的发件人筛选 + latest_only = data.get('latest_only', False) # 是否只返回最新的邮件 + + # 如果只需要最新的一封邮件,调整limit + if latest_only: + limit = 1 + + # 获取访问令牌,传入email使启用缓存 + access_token, new_refresh_token, error = get_accesstoken(refresh_token, client_id, email) + if not access_token: + return jsonify({ + "error": "Failed to get access token", + "details": error or "Unknown error" + }), 401 + + # 获取邮件 + emails = get_outlook_emails(email, access_token, folder, limit, sender) + + # 检查是否有错误 + if emails and isinstance(emails, list) and len(emails) > 0 and "error" in emails[0]: + return jsonify({"error": emails[0]["error"]}), 500 + + response = { + "success": True, + "emails": emails, + "refresh_token": new_refresh_token or refresh_token # 返回新的refresh_token(如果有) + } + + return jsonify(response) + except Exception as e: + return jsonify({"error": str(e)}), 500 + +@app.route('/api/folders', methods=['POST']) +@require_api_key +def get_folders(): + try: + data = request.get_json() + if not data: + return jsonify({"error": "Invalid JSON data"}), 400 + + # 检查必要参数 + required_fields = ['email', 'client_id', 'refresh_token'] + for field in required_fields: + if field not in data: + return jsonify({"error": f"Missing required field: {field}"}), 400 + + email = data['email'] + client_id = data['client_id'] + refresh_token = data['refresh_token'] + + # 获取访问令牌,传入email使启用缓存 + access_token, new_refresh_token, error = get_accesstoken(refresh_token, client_id, email) + if not access_token: + return jsonify({ + "error": "Failed to get access token", + "details": error or "Unknown error" + }), 401 + + try: + # 连接并获取文件夹列表 + mail = imaplib.IMAP4_SSL('outlook.live.com') + mail.authenticate('XOAUTH2', lambda x: generate_auth_string(email, access_token)) + + status, folder_list = mail.list() + if status != 'OK': + return jsonify({"error": f"Failed to list folders: {status}"}), 500 + + folders = [] + + for folder_info in folder_list: + if isinstance(folder_info, bytes): + folder_info = folder_info.decode('utf-8', errors='replace') + # 解析文件夹名称 + folder_name = folder_info.split('"')[-2] if '"' in folder_info else folder_info.split()[-1] + folders.append(folder_name) + + mail.logout() + + response = { + "success": True, + "folders": folders, + "refresh_token": new_refresh_token or refresh_token + } + + return jsonify(response) + except Exception as e: + return jsonify({"error": str(e)}), 500 + except Exception as e: + return jsonify({"error": str(e)}), 500 + +# 健康检查端点 +@app.route('/health', methods=['GET']) +def health_check(): + # 检查Redis连接 + redis_status = "ok" if redis_client and redis_client.ping() else "error" + + return jsonify({ + "status": "ok", + "service": "outlook-email-api", + "redis": redis_status, + "time": datetime.now().isoformat() + }), 200 + +if __name__ == '__main__': + # 生产环境下应移除debug=True,并使用Gunicorn等WSGI服务器 + port = int(os.environ.get('PORT', 5000)) + debug = os.environ.get('DEBUG', 'False').lower() == 'true' + host = os.environ.get('HOST', '0.0.0.0') + + app.run(debug=debug, host=host, port=port) \ No newline at end of file diff --git a/outook copy.py b/outook copy.py new file mode 100644 index 0000000..978e4f9 --- /dev/null +++ b/outook copy.py @@ -0,0 +1,121 @@ +import base64 +import email +import email.header +import requests +import imaplib + +import poplib + + +# 1. 使用refresh_token获取access_token +def get_accesstoken(refresh_token,client_id): + data = { + 'client_id': client_id, + 'grant_type': 'refresh_token', + 'refresh_token': refresh_token + } + ret = requests.post('https://login.microsoftonline.com/consumers/oauth2/v2.0/token', data=data) + print(ret.text) + print(ret.json()['refresh_token']) + return ret.json()['access_token'] + + +def generate_auth_string(user, token): + auth_string = f"user={user}\1auth=Bearer {token}\1\1" + return auth_string + + +def tuple_to_str(tuple_): + """ + 元组转为字符串输出 + :param tuple_: 转换前的元组,QQ邮箱格式为(b'\xcd\xf5\xd4\xc6', 'gbk')或者(b' ', None),163邮箱格式为('', None) + :return: 转换后的字符串 + """ + if tuple_[1]: + out_str = tuple_[0].decode(tuple_[1]) + else: + if isinstance(tuple_[0], bytes): + out_str = tuple_[0].decode('gbk') + else: + out_str = tuple_[0] + return out_str + + +def fetch_email_body(mail, item): + status, msg_data = mail.fetch(item, "(RFC822)") + for response_part in msg_data: + if isinstance(response_part, tuple): + # 解码邮件 + msg = email.message_from_bytes(response_part[1]) + + # 获取邮件主题 + subject, encoding = email.header.decode_header(msg["Subject"])[0] + if isinstance(subject, bytes): + # 如果是字节,则解码 + subject = subject.decode(encoding if encoding else "utf-8") + print("Subject:", subject) + # 获取发件人 + from_ = msg.get("From") + print("From:", from_) + try: + if msg.is_multipart(): + for part in msg.walk(): + content_type = part.get_content_type() + content_disposition = str(part.get("Content-Disposition")) + + if content_type == "text/plain" and "attachment" not in content_disposition: + body = part.get_payload(decode=True).decode() + return body + return '' + else: + body = msg.get_payload(decode=True).decode() + return body + except Exception as e: + return '' + + +def getmail(sel, mail): + mail.select(sel) + status, messages = mail.search(None, 'ALL') # UNSEEN 为未读邮件 + all_emails = messages[0].split() + print(all_emails) + for item in all_emails: + print(fetch_email_body(mail, item)) + +# 2. 使用访问令牌连接 IMAP +def connect_imap(emailadr, access_token): + mail = imaplib.IMAP4_SSL('outlook.live.com') + mail.authenticate('XOAUTH2', lambda x: generate_auth_string(emailadr, access_token)) + print('读取收件箱') + getmail('INBOX', mail) # 读取收件箱 + print('读取垃圾箱') + getmail('Junk', mail) # 读取垃圾箱 + # 关闭连接 + mail.logout() + + +def connect_pop3(emailadr, access_token): + server = poplib.POP3_SSL('outlook.live.com') + token=generate_auth_string(emailadr, access_token) + encoded_auth_string = base64.b64encode(token.encode("utf-8")).decode("utf-8") + server._shortcmd(f'AUTH XOAUTH2') + server._shortcmd(f'{encoded_auth_string}') + print(server.list()[1]) + server.quit() + + +# 主流程 +if __name__ == '__main__': + client_id = '9e5f94bc-e8a4-4e73-b8be-63364c29d753' + emailadr = 'eedbbfdd186@outlook.com' + refresh_token ='M.C544_BL2.0.U.-CmrNXHVufVZdj1*CaDuSw4WSQYVfF7ILMi4XYHeVQ!YJm56uJO5HPG9I2bOJIRrS3c5FgP9sDKB*HjA3O6wVY4Cr7hzNGWjujT*xZ5k4gOjRDVXx9ocaY1bf5J2HZgAoBJYjFq76*3h2xddMEGqp7iFjYDo3B9rcfRGh!G6rJ38vkWBSGw!7hcj21IWdZD!eIZqCx1o2tDrzeH*fRnuf*DoTQEFCDnFpCoulmQHDUBEFiBT8H*TzupejEgWTmXewN9tpcQwFituIbGScsDWdRuB5pcF63p7jazZdeZ8Bpa7pQb5Fc4mYUSwQS4Qx9CNNMYnwYuhiAVEXPcoppWCA7WXF!bgOxa7IuZASnWMiC!jqUu77KnwrHWZD14SDrFfwBQ$$' + print(f'clientID:{client_id}') + print(f'邮箱地址:{emailadr}') + print(f'refresh_token:{refresh_token}') + access_token = get_accesstoken(refresh_token,client_id) + print("获取的访问令牌:", access_token) + # 连接 IMAP + print('IMAP测试') + connect_imap(emailadr, access_token) + print('pop3测试') + connect_pop3(emailadr,access_token) diff --git a/outook.py b/outook.py new file mode 100644 index 0000000..978e4f9 --- /dev/null +++ b/outook.py @@ -0,0 +1,121 @@ +import base64 +import email +import email.header +import requests +import imaplib + +import poplib + + +# 1. 使用refresh_token获取access_token +def get_accesstoken(refresh_token,client_id): + data = { + 'client_id': client_id, + 'grant_type': 'refresh_token', + 'refresh_token': refresh_token + } + ret = requests.post('https://login.microsoftonline.com/consumers/oauth2/v2.0/token', data=data) + print(ret.text) + print(ret.json()['refresh_token']) + return ret.json()['access_token'] + + +def generate_auth_string(user, token): + auth_string = f"user={user}\1auth=Bearer {token}\1\1" + return auth_string + + +def tuple_to_str(tuple_): + """ + 元组转为字符串输出 + :param tuple_: 转换前的元组,QQ邮箱格式为(b'\xcd\xf5\xd4\xc6', 'gbk')或者(b' ', None),163邮箱格式为('', None) + :return: 转换后的字符串 + """ + if tuple_[1]: + out_str = tuple_[0].decode(tuple_[1]) + else: + if isinstance(tuple_[0], bytes): + out_str = tuple_[0].decode('gbk') + else: + out_str = tuple_[0] + return out_str + + +def fetch_email_body(mail, item): + status, msg_data = mail.fetch(item, "(RFC822)") + for response_part in msg_data: + if isinstance(response_part, tuple): + # 解码邮件 + msg = email.message_from_bytes(response_part[1]) + + # 获取邮件主题 + subject, encoding = email.header.decode_header(msg["Subject"])[0] + if isinstance(subject, bytes): + # 如果是字节,则解码 + subject = subject.decode(encoding if encoding else "utf-8") + print("Subject:", subject) + # 获取发件人 + from_ = msg.get("From") + print("From:", from_) + try: + if msg.is_multipart(): + for part in msg.walk(): + content_type = part.get_content_type() + content_disposition = str(part.get("Content-Disposition")) + + if content_type == "text/plain" and "attachment" not in content_disposition: + body = part.get_payload(decode=True).decode() + return body + return '' + else: + body = msg.get_payload(decode=True).decode() + return body + except Exception as e: + return '' + + +def getmail(sel, mail): + mail.select(sel) + status, messages = mail.search(None, 'ALL') # UNSEEN 为未读邮件 + all_emails = messages[0].split() + print(all_emails) + for item in all_emails: + print(fetch_email_body(mail, item)) + +# 2. 使用访问令牌连接 IMAP +def connect_imap(emailadr, access_token): + mail = imaplib.IMAP4_SSL('outlook.live.com') + mail.authenticate('XOAUTH2', lambda x: generate_auth_string(emailadr, access_token)) + print('读取收件箱') + getmail('INBOX', mail) # 读取收件箱 + print('读取垃圾箱') + getmail('Junk', mail) # 读取垃圾箱 + # 关闭连接 + mail.logout() + + +def connect_pop3(emailadr, access_token): + server = poplib.POP3_SSL('outlook.live.com') + token=generate_auth_string(emailadr, access_token) + encoded_auth_string = base64.b64encode(token.encode("utf-8")).decode("utf-8") + server._shortcmd(f'AUTH XOAUTH2') + server._shortcmd(f'{encoded_auth_string}') + print(server.list()[1]) + server.quit() + + +# 主流程 +if __name__ == '__main__': + client_id = '9e5f94bc-e8a4-4e73-b8be-63364c29d753' + emailadr = 'eedbbfdd186@outlook.com' + refresh_token ='M.C544_BL2.0.U.-CmrNXHVufVZdj1*CaDuSw4WSQYVfF7ILMi4XYHeVQ!YJm56uJO5HPG9I2bOJIRrS3c5FgP9sDKB*HjA3O6wVY4Cr7hzNGWjujT*xZ5k4gOjRDVXx9ocaY1bf5J2HZgAoBJYjFq76*3h2xddMEGqp7iFjYDo3B9rcfRGh!G6rJ38vkWBSGw!7hcj21IWdZD!eIZqCx1o2tDrzeH*fRnuf*DoTQEFCDnFpCoulmQHDUBEFiBT8H*TzupejEgWTmXewN9tpcQwFituIbGScsDWdRuB5pcF63p7jazZdeZ8Bpa7pQb5Fc4mYUSwQS4Qx9CNNMYnwYuhiAVEXPcoppWCA7WXF!bgOxa7IuZASnWMiC!jqUu77KnwrHWZD14SDrFfwBQ$$' + print(f'clientID:{client_id}') + print(f'邮箱地址:{emailadr}') + print(f'refresh_token:{refresh_token}') + access_token = get_accesstoken(refresh_token,client_id) + print("获取的访问令牌:", access_token) + # 连接 IMAP + print('IMAP测试') + connect_imap(emailadr, access_token) + print('pop3测试') + connect_pop3(emailadr,access_token) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..9a6dbd8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +flask==2.3.3 +requests==2.31.0 +redis==4.5.5 +gunicorn==21.2.0 \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..686ace8 --- /dev/null +++ b/test.py @@ -0,0 +1,39 @@ +import requests +import json + +# API接口地址 +API_URL = "http://localhost:5000/api/emails" + +# 凭据信息 +credentials = { + "email": "eedbbfdd186@outlook.com", + "client_id": "9e5f94bc-e8a4-4e73-b8be-63364c29d753", + "refresh_token": "M.C544_BL2.0.U.-CmrNXHVufVZdj1*CaDuSw4WSQYVfF7ILMi4XYHeVQ!YJm56uJO5HPG9I2bOJIRrS3c5FgP9sDKB*HjA3O6wVY4Cr7hzNGWjujT*xZ5k4gOjRDVXx9ocaY1bf5J2HZgAoBJYjFq76*3h2xddMEGqp7iFjYDo3B9rcfRGh!G6rJ38vkWBSGw!7hcj21IWdZD!eIZqCx1o2tDrzeH*fRnuf*DoTQEFCDnFpCoulmQHDUBEFiBT8H*TzupejEgWTmXewN9tpcQwFituIbGScsDWdRuB5pcF63p7jazZdeZ8Bpa7pQb5Fc4mYUSwQS4Qx9CNNMYnwYuhiAVEXPcoppWCA7WXF!bgOxa7IuZASnWMiC!jqUu77KnwrHWZD14SDrFfwBQ$$", + "limit": 5 # 获取最新的5封邮件 +} + +headers = { + "Content-Type": "application/json" +} + +try: + # 发送POST请求 + response = requests.post( + API_URL, + headers=headers, + json=credentials + ) + + # 输出响应内容 + print(f"状态码: {response.status_code}") + print("响应内容:") + print(json.dumps(response.json(), indent=2, ensure_ascii=False)) + + # 如果成功,提取新的refresh_token + if response.status_code == 200: + result = response.json() + if "refresh_token" in result and result["refresh_token"] != credentials["refresh_token"]: + print(f"\n新的refresh_token: {result['refresh_token']}") + +except requests.exceptions.RequestException as e: + print(f"请求失败: {e}") \ No newline at end of file diff --git a/test_verification_code.py b/test_verification_code.py new file mode 100644 index 0000000..6fed3a5 --- /dev/null +++ b/test_verification_code.py @@ -0,0 +1,47 @@ +import requests +import json + +# API接口地址 +API_URL = "http://localhost:5000/api/verification-code" + +# 凭据信息 +credentials = { + "email": "eedbbfdd186@outlook.com", + "client_id": "9e5f94bc-e8a4-4e73-b8be-63364c29d753", + "refresh_token": "M.C544_BL2.0.U.-CmrNXHVufVZdj1*CaDuSw4WSQYVfF7ILMi4XYHeVQ!YJm56uJO5HPG9I2bOJIRrS3c5FgP9sDKB*HjA3O6wVY4Cr7hzNGWjujT*xZ5k4gOjRDVXx9ocaY1bf5J2HZgAoBJYjFq76*3h2xddMEGqp7iFjYDo3B9rcfRGh!G6rJ38vkWBSGw!7hcj21IWdZD!eIZqCx1o2tDrzeH*fRnuf*DoTQEFCDnFpCoulmQHDUBEFiBT8H*TzupejEgWTmXewN9tpcQwFituIbGScsDWdRuB5pcF63p7jazZdeZ8Bpa7pQb5Fc4mYUSwQS4Qx9CNNMYnwYuhiAVEXPcoppWCA7WXF!bgOxa7IuZASnWMiC!jqUu77KnwrHWZD14SDrFfwBQ$$", + "sender": "no-reply@cursor.sh" # 指定发件人为cursor的验证邮件 +} + +headers = { + "Content-Type": "application/json" +} + +try: + # 发送POST请求 + response = requests.post( + API_URL, + headers=headers, + json=credentials + ) + + # 输出响应内容 + print(f"状态码: {response.status_code}") + print("响应内容:") + print(json.dumps(response.json(), indent=2, ensure_ascii=False)) + + # 如果成功,提取验证码和新的refresh_token + if response.status_code == 200: + result = response.json() + + # 打印验证码 + if result.get("verification_code"): + print(f"\n提取的验证码: {result['verification_code']}") + else: + print("\n未能提取到验证码,请检查邮件内容。") + + # 打印refresh_token + if "refresh_token" in result and result["refresh_token"] != credentials["refresh_token"]: + print(f"\n新的refresh_token: {result['refresh_token']}") + +except requests.exceptions.RequestException as e: + print(f"请求失败: {e}") \ No newline at end of file