更新IMAP邮件获取功能和账户保存格式
This commit is contained in:
203
test_imap.py
Normal file
203
test_imap.py
Normal file
@@ -0,0 +1,203 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
测试IMAP邮件获取功能
|
||||
单独测试邮箱取件功能
|
||||
"""
|
||||
|
||||
import imaplib
|
||||
import email
|
||||
import email.header
|
||||
import sys
|
||||
from email.parser import BytesParser
|
||||
from email.policy import default
|
||||
import re
|
||||
import time
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
# 邮箱配置
|
||||
EMAIL_ADDRESS = "5ulxb79e@evnmail.com"
|
||||
EMAIL_PASSWORD = "dr!mueh15ahG"
|
||||
IMAP_SERVER = "mail.evnmail.com"
|
||||
USE_SSL = True
|
||||
|
||||
def decode_mime_words(text):
|
||||
"""解码MIME编码的文本"""
|
||||
if not text:
|
||||
return ""
|
||||
try:
|
||||
decoded = email.header.decode_header(text)
|
||||
return " ".join([
|
||||
(str(t[0], t[1] or 'utf-8') if isinstance(t[0], bytes) else str(t[0]))
|
||||
for t in decoded
|
||||
])
|
||||
except:
|
||||
return text
|
||||
|
||||
def get_body_text(msg):
|
||||
"""获取邮件正文文本内容"""
|
||||
if msg.is_multipart():
|
||||
# 如果邮件包含多个部分,递归处理
|
||||
text_parts = []
|
||||
for part in msg.get_payload():
|
||||
if part.get_content_type() == 'text/plain':
|
||||
charset = part.get_content_charset() or 'utf-8'
|
||||
try:
|
||||
text_parts.append(part.get_payload(decode=True).decode(charset))
|
||||
except:
|
||||
text_parts.append("[无法解码的内容]")
|
||||
elif part.is_multipart():
|
||||
# 递归处理多部分
|
||||
text_parts.append(get_body_text(part))
|
||||
return "\n".join(text_parts)
|
||||
else:
|
||||
# 单部分邮件
|
||||
content_type = msg.get_content_type()
|
||||
if content_type == 'text/plain':
|
||||
charset = msg.get_content_charset() or 'utf-8'
|
||||
try:
|
||||
return msg.get_payload(decode=True).decode(charset)
|
||||
except:
|
||||
return "[无法解码的内容]"
|
||||
else:
|
||||
return f"[内容类型: {content_type}]"
|
||||
|
||||
def extract_urls_from_body(body, pattern):
|
||||
"""从邮件正文中提取URL"""
|
||||
found_urls = re.findall(pattern, body)
|
||||
return [url.replace('&', '&').replace("=3D", "=").replace("=\r\n", "")
|
||||
.replace("\r\n", "").replace("=\n", "").replace("\n", "")
|
||||
for url in found_urls]
|
||||
|
||||
def test_imap_connection():
|
||||
"""测试IMAP连接"""
|
||||
print(f"正在测试连接到 {IMAP_SERVER}...")
|
||||
try:
|
||||
# 连接到IMAP服务器
|
||||
if USE_SSL:
|
||||
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
|
||||
else:
|
||||
mail = imaplib.IMAP4(IMAP_SERVER)
|
||||
|
||||
# 登录
|
||||
print(f"正在登录邮箱 {EMAIL_ADDRESS}...")
|
||||
mail.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
|
||||
print("登录成功!")
|
||||
|
||||
# 关闭连接
|
||||
mail.logout()
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"连接失败: {str(e)}")
|
||||
return False
|
||||
|
||||
def fetch_emails():
|
||||
"""获取邮件"""
|
||||
urls = []
|
||||
try:
|
||||
# 连接到IMAP服务器
|
||||
if USE_SSL:
|
||||
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
|
||||
else:
|
||||
mail = imaplib.IMAP4(IMAP_SERVER)
|
||||
|
||||
# 登录
|
||||
print(f"正在登录邮箱 {EMAIL_ADDRESS}...")
|
||||
mail.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
|
||||
print("登录成功!")
|
||||
|
||||
# 只检查INBOX文件夹
|
||||
steam_verification_pattern = r'https://store\.steampowered\.com/account/newaccountverification\?[^\s"\'<>]+'
|
||||
|
||||
try:
|
||||
print("\n检查文件夹: INBOX")
|
||||
mail.select("INBOX")
|
||||
status, messages = mail.search(None, "ALL")
|
||||
if status != "OK" or not messages[0]:
|
||||
print(" INBOX为空或无法访问")
|
||||
else:
|
||||
# 获取邮件ID列表,从最新的开始
|
||||
message_ids = messages[0].split()
|
||||
message_count = len(message_ids)
|
||||
print(f" 找到 {message_count} 封邮件")
|
||||
|
||||
# 最多检查10封最新邮件
|
||||
check_count = min(10, message_count)
|
||||
message_ids = message_ids[-check_count:]
|
||||
message_ids.reverse() # 最新的邮件放在前面
|
||||
|
||||
for i, msg_id in enumerate(message_ids):
|
||||
print(f"\n 正在检查邮件 {i+1}/{check_count}, ID: {msg_id.decode()}...")
|
||||
status, msg_data = mail.fetch(msg_id, "(RFC822)")
|
||||
if status != "OK":
|
||||
print(" 无法获取邮件内容")
|
||||
continue
|
||||
|
||||
# 解析邮件内容
|
||||
raw_email = msg_data[0][1]
|
||||
msg = email.message_from_bytes(raw_email, policy=default)
|
||||
|
||||
# 提取信息
|
||||
subject = decode_mime_words(msg["Subject"])
|
||||
from_addr = decode_mime_words(msg["From"])
|
||||
date = decode_mime_words(msg["Date"])
|
||||
|
||||
print(f" 主题: {subject}")
|
||||
print(f" 发件人: {from_addr}")
|
||||
print(f" 日期: {date}")
|
||||
|
||||
# 获取正文
|
||||
body = get_body_text(msg)
|
||||
body_preview = body[:100] + "..." if len(body) > 100 else body
|
||||
print(f" 正文预览: {body_preview}")
|
||||
|
||||
# 查找Steam验证链接
|
||||
found_urls = extract_urls_from_body(body, steam_verification_pattern)
|
||||
if found_urls:
|
||||
print(f" 发现 {len(found_urls)} 个可能的Steam验证链接:")
|
||||
for url in found_urls:
|
||||
print(f" {url[:80]}...")
|
||||
urls.append(url)
|
||||
else:
|
||||
print(" 未发现Steam验证链接")
|
||||
except Exception as e:
|
||||
print(f"处理INBOX出错: {str(e)}")
|
||||
|
||||
# 尝试关闭连接
|
||||
try:
|
||||
mail.close()
|
||||
except:
|
||||
pass
|
||||
mail.logout()
|
||||
|
||||
except Exception as e:
|
||||
print(f"IMAP连接错误: {str(e)}")
|
||||
|
||||
return urls
|
||||
|
||||
def main():
|
||||
print("=" * 60)
|
||||
print("IMAP邮件获取测试")
|
||||
print("=" * 60)
|
||||
|
||||
# 测试连接
|
||||
if not test_imap_connection():
|
||||
print("IMAP连接测试失败,程序退出")
|
||||
return 1
|
||||
|
||||
print("\n开始获取邮件...")
|
||||
urls = fetch_emails()
|
||||
|
||||
if urls:
|
||||
print("\n" + "=" * 60)
|
||||
print(f"共找到 {len(urls)} 个可能的Steam验证链接:")
|
||||
for i, url in enumerate(urls):
|
||||
print(f"{i+1}. {url}")
|
||||
else:
|
||||
print("\n没有找到任何Steam验证链接")
|
||||
|
||||
return 0
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user