1120 lines
45 KiB
Python
1120 lines
45 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
网站流量模拟脚本 (最终版本)
|
||
使用真实用户数据库模拟最真实的访问轨迹
|
||
支持Google搜索来源和真实网站跳转
|
||
2024升级:增加多游戏快速选择行为 + 宝塔友好模式 + 移动端偏好
|
||
"""
|
||
|
||
import requests
|
||
import time
|
||
import random
|
||
import json
|
||
import os
|
||
import logging
|
||
from urllib.parse import urlparse, urljoin, parse_qs
|
||
import re
|
||
from real_user_database import RealUserDatabase
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.FileHandler('traffic_bot_final.log', encoding='utf-8'),
|
||
logging.StreamHandler()
|
||
]
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class WebTrafficBotFinal:
|
||
def __init__(self, config_file='config.json'):
|
||
"""
|
||
初始化最终版流量机器人
|
||
"""
|
||
self.config = self.load_config(config_file)
|
||
self.session = None
|
||
self.user_db = RealUserDatabase()
|
||
self.current_profile = None
|
||
self.current_behavior = None
|
||
|
||
# 真实的访问来源页面(主要从Google和真实网站)
|
||
self.traffic_sources = [
|
||
# Google搜索来源 (主要)
|
||
"https://www.google.com/search?q=2048+game+online",
|
||
"https://www.google.com/search?q=html5+games",
|
||
"https://www.google.com/search?q=browser+games+2048",
|
||
"https://www.google.com/search?q=free+online+games",
|
||
"https://www.google.com/search?q=puzzle+games+online",
|
||
"https://www.google.com/search?q=数字游戏+2048",
|
||
"https://www.google.com/search?q=在线小游戏",
|
||
|
||
# 用户指定的真实网站
|
||
"https://github.com/chengazhen/cursor-auto-free",
|
||
"https://linux.do/",
|
||
|
||
# 其他真实来源网站
|
||
"https://github.com/trending",
|
||
"https://github.com/topics/game",
|
||
"https://news.ycombinator.com/",
|
||
"https://www.reddit.com/r/WebGames/",
|
||
"https://www.reddit.com/r/incremental_games/",
|
||
"https://www.producthunt.com/",
|
||
"https://stackoverflow.com/",
|
||
"https://www.zhihu.com/",
|
||
"https://v2ex.com/",
|
||
"https://segmentfault.com/",
|
||
"https://juejin.cn/",
|
||
"https://www.csdn.net/",
|
||
"https://www.oschina.net/",
|
||
"https://gitee.com/",
|
||
]
|
||
|
||
# 🎮 多游戏页面列表 - 模拟真实用户的游戏选择行为
|
||
self.game_pages = [
|
||
"/games/2048/index.html",
|
||
"/games/snake/index.html",
|
||
"/games/iframe-games.html?game=diy-doll-factory",
|
||
"/games/iframe-games.html?game=super-sprunki-adventure",
|
||
"/games/iframe-games.html?game=flightbird",
|
||
"/games/iframe-games.html?game=tap-to-color-painting-book",
|
||
"/games/iframe-games.html?game=lazy-gto-6",
|
||
"/games/iframe-games.html?game=something-below-the-sea",
|
||
"/games/iframe-games.html?game=gem-clicker-pro",
|
||
"/games/iframe-games.html?game=guess-tiles",
|
||
"/games/iframe-games.html?game=tic-tac-toe-2025",
|
||
"/games/iframe-games.html?game=bubble-shooter",
|
||
"/games/iframe-games.html?game=tetris-classic",
|
||
"/games/iframe-games.html?game=candy-crush",
|
||
"/games/iframe-games.html?game=puzzle-adventure"
|
||
]
|
||
|
||
def load_config(self, config_file):
|
||
"""加载配置文件"""
|
||
try:
|
||
with open(config_file, 'r', encoding='utf-8') as f:
|
||
config = json.load(f)
|
||
logger.info(f"配置文件加载成功: {config_file}")
|
||
return config
|
||
except FileNotFoundError:
|
||
logger.error(f"配置文件未找到: {config_file}")
|
||
raise
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"配置文件格式错误: {e}")
|
||
raise
|
||
|
||
def setup_session(self):
|
||
"""设置请求会话 - 宝塔友好增强版"""
|
||
self.session = requests.Session()
|
||
|
||
# 🍪 启用Cookie支持 - 宝塔检测的重要指标
|
||
self.session.cookies.clear()
|
||
|
||
# 生成真实用户配置
|
||
self.current_profile = self.user_db.get_random_user_profile()
|
||
self.current_behavior = self.user_db.get_visit_behavior()
|
||
|
||
# 设置代理
|
||
proxy_config = self.config.get('proxy')
|
||
if proxy_config:
|
||
proxy_url = f"http://{proxy_config['username']}:{proxy_config['password']}@{proxy_config['host']}:{proxy_config['port']}"
|
||
self.session.proxies = {
|
||
'http': proxy_url,
|
||
'https': proxy_url
|
||
}
|
||
logger.info(f"已配置代理: {proxy_config['host']}:{proxy_config['port']}")
|
||
|
||
# 🌟 使用增强的真实浏览器头部 - 专门针对宝塔优化
|
||
enhanced_headers = self._get_baota_friendly_headers()
|
||
self.session.headers.update(enhanced_headers)
|
||
|
||
# 记录用户特征
|
||
logger.info(f"🎭 用户身份配置:")
|
||
logger.info(f" 操作系统: {self.current_profile['operating_system']}")
|
||
logger.info(f" 屏幕分辨率: {self.current_profile['screen_resolution']}")
|
||
logger.info(f" 浏览器语言: {self.current_profile['language']}")
|
||
logger.info(f" 时区: {self.current_profile['timezone']}")
|
||
logger.info(f" 访问时间模式: {self.current_behavior['pattern_type']}")
|
||
logger.info(f" 硬件并发数: {self.current_profile['hardware_concurrency']}")
|
||
logger.info(f" 设备内存: {self.current_profile['device_memory']}GB")
|
||
|
||
# 获取并显示当前IP
|
||
current_ip = self.get_current_ip()
|
||
if current_ip:
|
||
print(f"🌍 当前IP地址: {current_ip}")
|
||
print(f"👤 用户身份: {self.current_profile['operating_system']} | {self.current_profile['screen_resolution']} | {self.current_behavior['pattern_type']}")
|
||
|
||
def _get_baota_friendly_headers(self):
|
||
"""生成宝塔友好的完整HTTP头部 - 移动端偏好版"""
|
||
|
||
# 🎯 增加移动端User-Agent偏好 - 因为测试发现移动端更容易通过防护
|
||
if random.random() < 0.7: # 70%概率使用移动端
|
||
mobile_user_agents = [
|
||
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_1_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Mobile/15E148 Safari/604.1",
|
||
"Mozilla/5.0 (iPhone; CPU iPhone OS 16_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1",
|
||
"Mozilla/5.0 (iPhone; CPU iPhone OS 15_7_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.6 Mobile/15E148 Safari/604.1",
|
||
"Mozilla/5.0 (Linux; Android 14; SM-G991B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36",
|
||
"Mozilla/5.0 (Linux; Android 13; SM-A515F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Mobile Safari/537.36",
|
||
"Mozilla/5.0 (Linux; Android 12; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Mobile Safari/537.36",
|
||
"Mozilla/5.0 (iPad; CPU OS 17_1_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Mobile/15E148 Safari/604.1",
|
||
"Mozilla/5.0 (iPad; CPU OS 16_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1",
|
||
]
|
||
user_agent = random.choice(mobile_user_agents)
|
||
is_mobile = True
|
||
else:
|
||
user_agent = self.current_profile["user_agent"]
|
||
is_mobile = "Mobile" in user_agent or "iPhone" in user_agent or "Android" in user_agent
|
||
|
||
# 🎯 宝塔友好的完整头部配置
|
||
headers = {
|
||
"User-Agent": user_agent,
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
|
||
"Accept-Language": self.current_profile["language"],
|
||
"Accept-Encoding": "gzip, deflate, br",
|
||
"Connection": "keep-alive",
|
||
"Upgrade-Insecure-Requests": "1",
|
||
"Sec-Fetch-Dest": "document",
|
||
"Sec-Fetch-Mode": "navigate",
|
||
"Sec-Fetch-Site": "none",
|
||
"Sec-Fetch-User": "?1",
|
||
"Cache-Control": "max-age=0",
|
||
"DNT": str(random.randint(0, 1)),
|
||
"Pragma": "no-cache",
|
||
|
||
# 🎯 关键:宝塔可能检测的额外头部
|
||
"X-Requested-With": "XMLHttpRequest" if random.random() < 0.1 else None,
|
||
"Origin": None,
|
||
"Purpose": "prefetch" if random.random() < 0.05 else None,
|
||
|
||
# 🎯 移动端特有头部
|
||
"X-Forwarded-For": None, # 避免代理检测
|
||
"X-Real-IP": None, # 避免代理检测
|
||
}
|
||
|
||
# 移动端特殊头部
|
||
if is_mobile:
|
||
headers.update({
|
||
"Sec-CH-UA-Mobile": "?1",
|
||
"Sec-CH-UA-Platform": '"Android"' if "Android" in user_agent else '"iOS"',
|
||
"Viewport-Width": str(random.choice([375, 414, 390, 393, 412])),
|
||
"Device-Memory": str(random.choice([4, 6, 8])),
|
||
})
|
||
|
||
# 移除None值
|
||
headers = {k: v for k, v in headers.items() if v is not None}
|
||
|
||
# 根据浏览器类型添加特定头部
|
||
if "Chrome" in user_agent:
|
||
headers.update({
|
||
"sec-ch-ua": self._generate_chrome_sec_ch_ua(user_agent),
|
||
"sec-ch-ua-mobile": "?1" if is_mobile else "?0",
|
||
"sec-ch-ua-platform": f'"{self._get_platform_from_ua(user_agent)}"',
|
||
})
|
||
|
||
logger.info(f"📱 使用User-Agent: {user_agent}")
|
||
logger.info(f"📱 移动端模式: {is_mobile}")
|
||
|
||
return headers
|
||
|
||
def _generate_chrome_sec_ch_ua(self, user_agent):
|
||
"""生成Chrome的sec-ch-ua头部"""
|
||
if "Chrome/" in user_agent:
|
||
version = user_agent.split("Chrome/")[1].split(".")[0]
|
||
return f'"Not_A Brand";v="8", "Chromium";v="{version}", "Google Chrome";v="{version}"'
|
||
return '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"'
|
||
|
||
def _get_platform_version(self):
|
||
"""获取平台版本信息"""
|
||
platform = self.current_profile["platform"]
|
||
if platform == "Windows":
|
||
return random.choice(['"10.0.0"', '"11.0.0"'])
|
||
elif platform == "macOS":
|
||
return random.choice(['"13.6.0"', '"14.1.0"', '"12.7.0"'])
|
||
elif platform == "Linux":
|
||
return '""'
|
||
return '""'
|
||
|
||
def _get_platform_from_ua(self, user_agent):
|
||
"""从User-Agent中提取平台信息"""
|
||
if "iPhone" in user_agent or "iPad" in user_agent:
|
||
return "iOS"
|
||
elif "Android" in user_agent:
|
||
return "Android"
|
||
elif "Windows" in user_agent:
|
||
return "Windows"
|
||
elif "Macintosh" in user_agent:
|
||
return "macOS"
|
||
elif "Linux" in user_agent:
|
||
return "Linux"
|
||
else:
|
||
return "Unknown"
|
||
|
||
def get_current_ip(self):
|
||
"""获取当前IP地址"""
|
||
ip_services = [
|
||
'https://httpbin.org/ip',
|
||
'https://api.ipify.org?format=json',
|
||
'https://ipinfo.io/json',
|
||
]
|
||
|
||
for service in ip_services:
|
||
try:
|
||
logger.info(f"🔍 正在获取IP地址: {service}")
|
||
response = self.session.get(service, timeout=10)
|
||
|
||
if response.status_code == 200:
|
||
if 'json' in service or 'httpbin' in service or 'ipinfo' in service:
|
||
try:
|
||
data = response.json()
|
||
if 'origin' in data:
|
||
ip = data['origin']
|
||
elif 'ip' in data:
|
||
ip = data['ip']
|
||
else:
|
||
ip = str(data)
|
||
except:
|
||
ip = response.text.strip()
|
||
else:
|
||
ip = response.text.strip()
|
||
|
||
logger.info(f"✅ 当前IP地址: {ip}")
|
||
return ip
|
||
|
||
except Exception as e:
|
||
logger.warning(f"从 {service} 获取IP失败: {e}")
|
||
continue
|
||
|
||
logger.error("❌ 无法获取当前IP地址")
|
||
return None
|
||
|
||
def simulate_realistic_source_visit(self):
|
||
"""模拟真实的来源网站访问"""
|
||
source_page = random.choice(self.traffic_sources)
|
||
|
||
try:
|
||
logger.info(f"🔗 模拟从来源访问: {source_page}")
|
||
|
||
# 根据来源类型调整访问行为
|
||
if "google.com" in source_page:
|
||
stay_time = self.user_db.get_realistic_timing(
|
||
random.uniform(3, 12), # Google搜索停留时间较短
|
||
self.current_behavior
|
||
)
|
||
logger.info(f"🔍 Google搜索停留 {stay_time:.1f} 秒")
|
||
self._simulate_google_search_behavior(stay_time)
|
||
|
||
elif "github.com" in source_page:
|
||
stay_time = self.user_db.get_realistic_timing(
|
||
random.uniform(8, 25), # GitHub停留时间中等
|
||
self.current_behavior
|
||
)
|
||
logger.info(f"🐙 GitHub页面停留 {stay_time:.1f} 秒")
|
||
self._simulate_github_behavior(stay_time, source_page)
|
||
|
||
elif "linux.do" in source_page:
|
||
stay_time = self.user_db.get_realistic_timing(
|
||
random.uniform(10, 30), # 技术社区停留时间较长
|
||
self.current_behavior
|
||
)
|
||
logger.info(f"💻 Linux.do社区停留 {stay_time:.1f} 秒")
|
||
self._simulate_community_behavior(stay_time)
|
||
|
||
else:
|
||
stay_time = self.user_db.get_realistic_timing(
|
||
random.uniform(5, 20), # 其他网站
|
||
self.current_behavior
|
||
)
|
||
logger.info(f"🌐 其他网站停留 {stay_time:.1f} 秒")
|
||
self._simulate_general_browsing(stay_time)
|
||
|
||
# 设置referrer用于后续访问
|
||
self.session.headers.update({
|
||
'Referer': source_page,
|
||
'Sec-Fetch-Site': 'cross-site'
|
||
})
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"来源访问模拟失败: {e}")
|
||
return False
|
||
|
||
def _simulate_google_search_behavior(self, total_time):
|
||
"""模拟Google搜索行为"""
|
||
actions = [
|
||
"输入搜索关键词",
|
||
"查看搜索结果",
|
||
"滚动浏览结果页面",
|
||
"点击相关搜索建议",
|
||
"查看图片搜索结果"
|
||
]
|
||
|
||
segments = random.randint(2, 4) # Google搜索动作较少
|
||
segment_time = total_time / segments
|
||
|
||
for i, action in enumerate(random.sample(actions, segments)):
|
||
if i > 0:
|
||
delay = self.user_db.simulate_human_delays("thinking")
|
||
time.sleep(delay)
|
||
|
||
logger.info(f" 🔍 Google行为: {action}")
|
||
|
||
if "输入" in action:
|
||
# 模拟输入搜索词的时间
|
||
typing_time = random.uniform(2, 5)
|
||
time.sleep(typing_time)
|
||
elif "滚动" in action:
|
||
# 模拟快速滚动查看结果
|
||
time.sleep(segment_time * 0.6)
|
||
else:
|
||
time.sleep(segment_time * 0.8)
|
||
|
||
def _simulate_github_behavior(self, total_time, github_url):
|
||
"""模拟GitHub页面行为"""
|
||
if "cursor-auto-free" in github_url:
|
||
actions = [
|
||
"查看项目README",
|
||
"阅读项目描述",
|
||
"查看Stars和Forks数量",
|
||
"浏览代码文件",
|
||
"查看Issues讨论",
|
||
"阅读使用说明"
|
||
]
|
||
else:
|
||
actions = [
|
||
"查看trending项目",
|
||
"浏览热门仓库",
|
||
"查看项目描述",
|
||
"阅读README文件",
|
||
"查看代码示例"
|
||
]
|
||
|
||
segments = random.randint(3, 6)
|
||
segment_time = total_time / segments
|
||
|
||
for i in range(segments):
|
||
if i > 0:
|
||
delay = self.user_db.simulate_human_delays("reading")
|
||
time.sleep(delay)
|
||
|
||
action = random.choice(actions)
|
||
logger.info(f" 🐙 GitHub行为: {action}")
|
||
|
||
if "阅读" in action:
|
||
time.sleep(segment_time * 0.7)
|
||
elif "查看" in action:
|
||
time.sleep(segment_time * 0.5)
|
||
else:
|
||
time.sleep(segment_time * 0.4)
|
||
|
||
def _simulate_community_behavior(self, total_time):
|
||
"""模拟技术社区行为"""
|
||
actions = [
|
||
"浏览热门帖子",
|
||
"阅读技术讨论",
|
||
"查看最新话题",
|
||
"搜索相关内容",
|
||
"查看用户资料",
|
||
"阅读精华帖子"
|
||
]
|
||
|
||
segments = random.randint(4, 7) # 社区停留时间较长,动作较多
|
||
segment_time = total_time / segments
|
||
|
||
for i in range(segments):
|
||
if i > 0:
|
||
delay = self.user_db.simulate_human_delays("reading")
|
||
time.sleep(delay)
|
||
|
||
action = random.choice(actions)
|
||
logger.info(f" 💻 社区行为: {action}")
|
||
|
||
if "阅读" in action:
|
||
time.sleep(segment_time * 0.8) # 阅读时间较长
|
||
else:
|
||
time.sleep(segment_time * 0.6)
|
||
|
||
def _simulate_general_browsing(self, total_time):
|
||
"""模拟一般网站浏览行为"""
|
||
actions = [
|
||
"浏览页面内容",
|
||
"查看导航菜单",
|
||
"滚动阅读文章",
|
||
"点击相关链接",
|
||
"查看评论区"
|
||
]
|
||
|
||
segments = random.randint(3, 5)
|
||
segment_time = total_time / segments
|
||
|
||
for i in range(segments):
|
||
if i > 0:
|
||
delay = self.user_db.simulate_human_delays("normal")
|
||
time.sleep(delay)
|
||
|
||
action = random.choice(actions)
|
||
logger.info(f" 🌐 浏览行为: {action}")
|
||
time.sleep(segment_time * random.uniform(0.7, 1.2))
|
||
|
||
def visit_main_site_realistic(self):
|
||
"""真实模拟访问主网站"""
|
||
main_site = self.config['targets']['main_site']
|
||
|
||
try:
|
||
logger.info(f"🏠 访问目标网站: {main_site}")
|
||
|
||
# 发起请求
|
||
response = self.make_realistic_request(main_site)
|
||
if not response:
|
||
return False
|
||
|
||
# 模拟真实的页面浏览行为 - 缩短主页停留时间
|
||
self._simulate_realistic_browsing(response, is_main_page=True)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"主网站访问失败: {e}")
|
||
return False
|
||
|
||
def visit_multiple_games_realistic(self):
|
||
"""🎮 模拟真实用户快速浏览多个游戏的行为"""
|
||
main_site = self.config['targets']['main_site']
|
||
base_url = main_site.rstrip('/')
|
||
|
||
# 模拟真实用户行为:快速浏览3-8个游戏页面选择
|
||
game_count = random.randint(3, 8)
|
||
selected_games = random.sample(self.game_pages, min(game_count, len(self.game_pages)))
|
||
|
||
logger.info(f"🎯 开始快速游戏选择行为,将浏览 {game_count} 个游戏")
|
||
|
||
total_start_time = time.time()
|
||
success_count = 0
|
||
|
||
for i, game_path in enumerate(selected_games):
|
||
game_url = base_url + game_path
|
||
|
||
try:
|
||
logger.info(f"🎮 第{i+1}/{game_count}个游戏: {game_path}")
|
||
|
||
# 更新referrer
|
||
if i == 0:
|
||
# 第一个游戏从主页进入
|
||
self.session.headers.update({
|
||
'Referer': main_site,
|
||
'Sec-Fetch-Site': 'same-origin'
|
||
})
|
||
else:
|
||
# 后续游戏从前一个游戏页面进入
|
||
prev_game_url = base_url + selected_games[i-1]
|
||
self.session.headers.update({
|
||
'Referer': prev_game_url,
|
||
'Sec-Fetch-Site': 'same-origin'
|
||
})
|
||
|
||
# 发起请求
|
||
response = self.make_realistic_request(game_url)
|
||
if not response:
|
||
logger.warning(f"⚠️ 游戏页面访问失败: {game_path}")
|
||
continue
|
||
|
||
# 模拟快速浏览行为(真实用户模式:1-4秒快速查看)
|
||
self._simulate_quick_game_browsing(response, game_path, i+1, game_count)
|
||
|
||
success_count += 1
|
||
|
||
# 游戏间的快速切换延迟(模拟点击下一个游戏)
|
||
if i < len(selected_games) - 1:
|
||
switch_delay = random.uniform(0.5, 2.0)
|
||
logger.info(f" ⚡ 快速切换到下个游戏: {switch_delay:.1f}秒")
|
||
time.sleep(switch_delay)
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ 游戏访问出错 {game_path}: {e}")
|
||
continue
|
||
|
||
total_time = time.time() - total_start_time
|
||
logger.info(f"🏁 游戏选择完成!访问了 {success_count}/{game_count} 个游戏,总耗时: {total_time:.1f}秒")
|
||
|
||
# 模拟最终选择一个游戏进行较长时间游玩
|
||
if success_count > 0:
|
||
self._simulate_final_game_selection(selected_games, base_url)
|
||
|
||
return success_count > 0
|
||
|
||
def _simulate_quick_game_browsing(self, response, game_path, current_index, total_count):
|
||
"""模拟快速游戏浏览行为 - 防护绕过版"""
|
||
# 🎯 增加游戏页面停留时间:3-8秒
|
||
browse_time = random.uniform(3.0, 8.0)
|
||
|
||
game_name = self._extract_game_name(game_path)
|
||
logger.info(f" 👀 快速查看 {game_name}: {browse_time:.1f}秒")
|
||
|
||
# 🎯 减少Javascript交互频率
|
||
if random.random() < 0.2: # 降低到20%概率
|
||
self._simulate_javascript_behavior(response.url)
|
||
|
||
# 模拟快速扫视页面
|
||
quick_actions = [
|
||
"查看游戏截图",
|
||
"阅读游戏标题",
|
||
"扫视游戏介绍",
|
||
"检查加载状态"
|
||
]
|
||
|
||
action_count = random.randint(1, 3)
|
||
action_time = browse_time / action_count
|
||
|
||
for i in range(action_count):
|
||
if i > 0:
|
||
time.sleep(random.uniform(0.5, 1.2)) # 增加间隔
|
||
|
||
action = random.choice(quick_actions)
|
||
logger.info(f" ⚡ {action}")
|
||
time.sleep(action_time * random.uniform(0.8, 1.6))
|
||
|
||
# 检查是否需要登录
|
||
if self._check_login_required(response):
|
||
logger.info(f" ⚠️ 检测到登录页面,快速跳过")
|
||
|
||
def _simulate_final_game_selection(self, selected_games, base_url):
|
||
"""模拟最终选择一个游戏进行深度游玩"""
|
||
# 有30%概率选择一个游戏进行深度体验
|
||
if random.random() < 0.3:
|
||
final_game = random.choice(selected_games)
|
||
final_game_url = base_url + final_game
|
||
|
||
logger.info(f"🎯 最终选择深度体验: {final_game}")
|
||
|
||
try:
|
||
# 重新访问选定的游戏
|
||
self.session.headers.update({
|
||
'Referer': base_url + selected_games[-1], # 从最后浏览的游戏页面返回
|
||
'Sec-Fetch-Site': 'same-origin'
|
||
})
|
||
|
||
response = self.make_realistic_request(final_game_url)
|
||
if response:
|
||
# 模拟正常游戏行为(较长时间)
|
||
self._simulate_realistic_gaming(response, final_game)
|
||
|
||
except Exception as e:
|
||
logger.error(f"深度游戏体验失败: {e}")
|
||
|
||
def _extract_game_name(self, game_path):
|
||
"""从游戏路径提取游戏名称"""
|
||
if "2048" in game_path:
|
||
return "2048数字游戏"
|
||
elif "snake" in game_path:
|
||
return "贪吃蛇"
|
||
elif "diy-doll-factory" in game_path:
|
||
return "DIY娃娃工厂"
|
||
elif "super-sprunki-adventure" in game_path:
|
||
return "超级冒险"
|
||
elif "flightbird" in game_path:
|
||
return "飞行小鸟"
|
||
elif "tap-to-color" in game_path:
|
||
return "点击涂色"
|
||
elif "lazy-gto" in game_path:
|
||
return "懒惰GTO"
|
||
elif "something-below-the-sea" in game_path:
|
||
return "海底探险"
|
||
elif "gem-clicker" in game_path:
|
||
return "宝石点击"
|
||
elif "guess-tiles" in game_path:
|
||
return "猜瓷砖"
|
||
elif "tic-tac-toe" in game_path:
|
||
return "井字棋2025"
|
||
else:
|
||
return "未知游戏"
|
||
|
||
def _check_login_required(self, response):
|
||
"""检查是否需要登录"""
|
||
if not response:
|
||
return False
|
||
|
||
content = response.text.lower()
|
||
login_indicators = [
|
||
'登录', 'login', '用户名', 'username', 'password', '密码',
|
||
'signin', 'sign in', '账号', 'account', '验证码'
|
||
]
|
||
|
||
login_count = sum(1 for indicator in login_indicators if indicator in content)
|
||
|
||
if login_count >= 3:
|
||
logger.warning("⚠️ 检测到可能的登录页面")
|
||
logger.info("💡 建议:检查网站是否需要登录访问")
|
||
return True
|
||
|
||
return False
|
||
|
||
def make_realistic_request(self, url, timeout=20):
|
||
"""发起宝塔友好的真实HTTP请求 - 防护绕过增强版"""
|
||
try:
|
||
# 🎯 增加更长的延迟以避免被识别为爬虫
|
||
pre_request_delay = random.uniform(3, 8)
|
||
logger.info(f"🕐 人工延迟: {pre_request_delay:.1f}秒")
|
||
time.sleep(pre_request_delay)
|
||
|
||
# 🎯 分多次尝试请求,模拟网络重试
|
||
max_retries = 3
|
||
for attempt in range(max_retries):
|
||
try:
|
||
response = self.session.get(url, timeout=timeout, allow_redirects=True)
|
||
break
|
||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
||
if attempt < max_retries - 1:
|
||
retry_delay = random.uniform(5, 12)
|
||
logger.warning(f"⚠️ 连接失败,{retry_delay:.1f}秒后重试 (尝试 {attempt + 1}/{max_retries})")
|
||
time.sleep(retry_delay)
|
||
continue
|
||
else:
|
||
raise e
|
||
|
||
# 记录详细信息
|
||
logger.info(f"📡 HTTP请求详情:")
|
||
logger.info(f" 📍 访问URL: {url}")
|
||
logger.info(f" 📊 状态码: {response.status_code}")
|
||
logger.info(f" 📦 响应大小: {len(response.content)} 字节")
|
||
logger.info(f" ⏱️ 响应时间: {response.elapsed.total_seconds():.2f}秒")
|
||
|
||
# 🍪 记录Cookie信息
|
||
if response.cookies:
|
||
logger.info(f" 🍪 接收到Cookies: {len(response.cookies)} 个")
|
||
for cookie in response.cookies:
|
||
logger.info(f" 🍪 {cookie.name}={cookie.value[:20]}...")
|
||
|
||
if response.headers.get('content-type'):
|
||
logger.info(f" 📄 内容类型: {response.headers.get('content-type')}")
|
||
|
||
if response.headers.get('server'):
|
||
logger.info(f" 🖥️ 服务器: {response.headers.get('server')}")
|
||
|
||
# 🎯 检查是否被重定向或拦截
|
||
if response.url != url:
|
||
logger.warning(f"🔄 发生重定向: {url} -> {response.url}")
|
||
|
||
# 🎯 检查响应内容是否正常
|
||
if len(response.content) < 1000:
|
||
logger.warning(f"⚠️ 响应内容过小,可能被拦截")
|
||
|
||
response.raise_for_status()
|
||
|
||
# 🎯 模拟浏览器的自动行为,但减少频率避免被检测
|
||
if random.random() < 0.3: # 降低到30%概率
|
||
self._simulate_browser_auto_requests(url, response)
|
||
|
||
return response
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
logger.error(f"❌ 请求失败 {url}: {e}")
|
||
return None
|
||
|
||
def _simulate_browser_auto_requests(self, base_url, response):
|
||
"""模拟浏览器自动请求静态资源 - 防护绕过版"""
|
||
if random.random() < 0.5: # 50%概率模拟资源请求
|
||
parsed_url = urlparse(base_url)
|
||
base_domain = f"{parsed_url.scheme}://{parsed_url.netloc}"
|
||
|
||
# 🎯 减少资源请求,只请求关键资源
|
||
essential_resources = [
|
||
"/favicon.ico",
|
||
"/css/style.css",
|
||
"/js/main.js",
|
||
]
|
||
|
||
# 随机选择1个资源请求
|
||
resource = random.choice(essential_resources)
|
||
|
||
try:
|
||
# 🎯 更长的延迟
|
||
time.sleep(random.uniform(2, 5))
|
||
|
||
resource_headers = self.session.headers.copy()
|
||
resource_headers.update({
|
||
"Referer": base_url,
|
||
"Sec-Fetch-Dest": self._get_resource_dest(resource),
|
||
"Sec-Fetch-Mode": "no-cors",
|
||
"Sec-Fetch-Site": "same-origin",
|
||
})
|
||
|
||
resource_url = base_domain + resource
|
||
|
||
resource_response = self.session.get(
|
||
resource_url,
|
||
headers=resource_headers,
|
||
timeout=10,
|
||
allow_redirects=True
|
||
)
|
||
|
||
if resource_response.status_code == 200:
|
||
logger.info(f" 📄 成功请求资源: {resource}")
|
||
|
||
except Exception as e:
|
||
logger.debug(f" ⚠️ 资源请求失败 {resource}: {e}")
|
||
|
||
def _get_resource_dest(self, resource_path):
|
||
"""根据资源路径确定Sec-Fetch-Dest"""
|
||
if resource_path.endswith(('.css',)):
|
||
return "style"
|
||
elif resource_path.endswith(('.js',)):
|
||
return "script"
|
||
elif resource_path.endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp', '.ico')):
|
||
return "image"
|
||
elif resource_path.endswith('.json'):
|
||
return "manifest"
|
||
else:
|
||
return "empty"
|
||
|
||
def _simulate_javascript_behavior(self, url):
|
||
"""模拟Javascript行为 - 防护绕过版"""
|
||
if random.random() < 0.2: # 降低到20%概率
|
||
parsed_url = urlparse(url)
|
||
base_domain = f"{parsed_url.scheme}://{parsed_url.netloc}"
|
||
|
||
# 🎯 只模拟最基本的AJAX请求
|
||
basic_endpoints = [
|
||
"/api/ping",
|
||
"/heartbeat",
|
||
"/api/status"
|
||
]
|
||
|
||
endpoint = random.choice(basic_endpoints)
|
||
ajax_url = base_domain + endpoint
|
||
|
||
try:
|
||
ajax_headers = self.session.headers.copy()
|
||
ajax_headers.update({
|
||
"X-Requested-With": "XMLHttpRequest",
|
||
"Content-Type": "application/json",
|
||
"Referer": url,
|
||
"Sec-Fetch-Dest": "empty",
|
||
"Sec-Fetch-Mode": "cors",
|
||
"Sec-Fetch-Site": "same-origin",
|
||
})
|
||
|
||
# 🎯 更长的AJAX延迟
|
||
time.sleep(random.uniform(5, 15))
|
||
|
||
ajax_response = self.session.get(
|
||
ajax_url,
|
||
headers=ajax_headers,
|
||
timeout=8
|
||
)
|
||
|
||
if ajax_response.status_code == 200:
|
||
logger.info(f" 🔄 AJAX请求成功: {endpoint}")
|
||
|
||
except Exception as e:
|
||
logger.debug(f" ⚠️ AJAX请求失败 {endpoint}: {e}")
|
||
|
||
def _simulate_realistic_browsing(self, response, is_main_page=False):
|
||
"""模拟真实的页面浏览行为 - 防护绕过增强版"""
|
||
content = response.text
|
||
|
||
# 估算页面内容长度和阅读时间
|
||
text_length = len(re.sub(r'<[^>]+>', '', content))
|
||
reading_time = text_length / self.current_behavior['reading_speed'] * 60
|
||
|
||
logger.info(f"📖 页面分析:")
|
||
logger.info(f" 📝 内容长度: {text_length} 字符")
|
||
logger.info(f" ⏱️ 预估阅读时间: {reading_time:.1f} 秒")
|
||
|
||
# 🎯 增加停留时间以避免被识别为爬虫
|
||
if is_main_page:
|
||
# 主页停留时间:5-15秒(增加以通过防护检测)
|
||
base_time = random.uniform(5, 15)
|
||
else:
|
||
base_time = min(reading_time, 60)
|
||
|
||
stay_time = self.user_db.get_realistic_timing(base_time, self.current_behavior)
|
||
|
||
logger.info(f" 🕐 实际停留时间: {stay_time:.1f} 秒")
|
||
|
||
# 🎯 减少Javascript行为频率
|
||
if random.random() < 0.3: # 降低到30%概率
|
||
self._simulate_javascript_behavior(response.url)
|
||
|
||
# 模拟分段浏览
|
||
self._simulate_browsing_segments(stay_time, content)
|
||
|
||
def _simulate_browsing_segments(self, total_time, content):
|
||
"""模拟分段浏览行为"""
|
||
# 查找页面中的链接
|
||
links = re.findall(r'href=["\'](.*?)["\']', content)
|
||
internal_links = [link for link in links if not link.startswith('http') or self.config['targets']['main_site'] in link]
|
||
|
||
segments = random.randint(3, 8)
|
||
segment_time = total_time / segments
|
||
|
||
browsing_actions = [
|
||
"📖 阅读页面内容",
|
||
"🧭 查看导航菜单",
|
||
"📜 滚动浏览页面",
|
||
"🔗 检查页面链接",
|
||
"👁️ 观察页面布局",
|
||
"📱 查看页脚信息"
|
||
]
|
||
|
||
for i in range(segments):
|
||
action = random.choice(browsing_actions)
|
||
logger.info(f" {action}")
|
||
|
||
# 根据行为类型调整时间
|
||
if "滚动" in action:
|
||
self._simulate_scrolling_behavior(segment_time)
|
||
elif "检查链接" in action and internal_links:
|
||
self._simulate_link_hovering(internal_links)
|
||
time.sleep(segment_time * 0.8)
|
||
else:
|
||
actual_segment_time = segment_time * random.uniform(0.7, 1.3)
|
||
time.sleep(actual_segment_time)
|
||
|
||
def _simulate_scrolling_behavior(self, duration):
|
||
"""模拟真实的滚动行为"""
|
||
scroll_sessions = random.randint(2, 5)
|
||
session_time = duration / scroll_sessions
|
||
|
||
for session in range(scroll_sessions):
|
||
logger.info(f" 📜 滚动会话 {session + 1}")
|
||
|
||
# 模拟快速滚动
|
||
quick_scrolls = random.randint(2, 4)
|
||
for _ in range(quick_scrolls):
|
||
time.sleep(random.uniform(0.3, 1.0))
|
||
|
||
# 模拟停顿阅读
|
||
if random.random() < 0.7:
|
||
pause_time = random.uniform(1, 4)
|
||
logger.info(f" ⏸️ 停顿阅读 {pause_time:.1f}秒")
|
||
time.sleep(pause_time)
|
||
|
||
def _simulate_link_hovering(self, links):
|
||
"""模拟鼠标悬停在链接上"""
|
||
hover_count = min(random.randint(1, 3), len(links))
|
||
sample_links = random.sample(links, hover_count)
|
||
|
||
for link in sample_links:
|
||
logger.info(f" 🔗 查看链接: {link[:50]}...")
|
||
time.sleep(random.uniform(0.5, 2.5))
|
||
|
||
def _simulate_realistic_gaming(self, response, game_path=None):
|
||
"""模拟真实的游戏行为"""
|
||
if game_path:
|
||
game_name = self._extract_game_name(game_path)
|
||
logger.info(f"🎲 开始深度体验 {game_name}")
|
||
else:
|
||
logger.info("🎲 开始游戏模拟")
|
||
|
||
# 游戏前的准备时间
|
||
prep_time = self.user_db.simulate_human_delays("thinking")
|
||
logger.info(f"🤔 游戏加载和准备: {prep_time:.1f}秒")
|
||
time.sleep(prep_time)
|
||
|
||
# 获取游戏停留时间
|
||
base_time = random.uniform(*self.config['settings']['game_page_stay_time'])
|
||
game_time = self.user_db.get_realistic_timing(base_time, self.current_behavior)
|
||
|
||
logger.info(f"🎮 游戏总时长: {game_time:.1f}秒")
|
||
|
||
# 模拟游戏过程
|
||
self._simulate_game_sessions(game_time)
|
||
|
||
def _simulate_game_sessions(self, total_time):
|
||
"""模拟游戏会话"""
|
||
sessions = random.randint(2, 4)
|
||
|
||
for session in range(sessions):
|
||
session_time = total_time / sessions * random.uniform(0.8, 1.2)
|
||
logger.info(f"🎯 游戏会话 {session + 1}/{sessions}, 时长: {session_time:.1f}秒")
|
||
|
||
self._simulate_single_game_session(session_time)
|
||
|
||
# 会话间休息
|
||
if session < sessions - 1:
|
||
break_time = random.uniform(3, 10)
|
||
logger.info(f"⏸️ 休息思考: {break_time:.1f}秒")
|
||
time.sleep(break_time)
|
||
|
||
def _simulate_single_game_session(self, session_time):
|
||
"""模拟单个游戏会话"""
|
||
game_moves = ["⬆️上", "⬇️下", "⬅️左", "➡️右"]
|
||
|
||
start_time = time.time()
|
||
move_count = 0
|
||
|
||
while time.time() - start_time < session_time:
|
||
move = random.choice(game_moves)
|
||
move_count += 1
|
||
|
||
logger.info(f" 🎮 第{move_count}步: {move}")
|
||
|
||
# 模拟不同难度的思考时间
|
||
if move_count % 8 == 0: # 每8步深度思考
|
||
think_time = random.uniform(4, 10)
|
||
logger.info(f" 🧠 策略思考: {think_time:.1f}秒")
|
||
time.sleep(think_time)
|
||
elif random.random() < 0.4: # 40%概率短暂思考
|
||
think_time = random.uniform(0.8, 3)
|
||
time.sleep(think_time)
|
||
else: # 快速移动
|
||
time.sleep(random.uniform(0.4, 1.2))
|
||
|
||
# 模拟偶尔的错误操作和纠正
|
||
if random.random() < 0.06: # 6%概率误操作
|
||
logger.info(" ❌ 误操作,立即纠正")
|
||
time.sleep(0.3)
|
||
corrective_move = random.choice(game_moves)
|
||
logger.info(f" 🔄 纠正: {corrective_move}")
|
||
time.sleep(random.uniform(0.5, 1.0))
|
||
|
||
def run_single_visit(self):
|
||
"""执行一次完整的真实访问流程"""
|
||
logger.info("🚀 开始执行真实访问流程")
|
||
|
||
# 设置会话
|
||
self.setup_session()
|
||
|
||
try:
|
||
# 1. 模拟来源网站访问
|
||
if not self.simulate_realistic_source_visit():
|
||
logger.warning("⚠️ 来源访问模拟失败,继续执行")
|
||
|
||
# 2. 访问主网站
|
||
if not self.visit_main_site_realistic():
|
||
logger.error("❌ 主网站访问失败")
|
||
return False
|
||
|
||
# 3. 🎮 NEW: 模拟真实的多游戏快速选择行为
|
||
if not self.visit_multiple_games_realistic():
|
||
logger.error("❌ 多游戏访问失败")
|
||
return False
|
||
|
||
logger.info("✅ 访问流程完美执行成功!")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ 访问流程执行出错: {e}")
|
||
return False
|
||
|
||
finally:
|
||
if self.session:
|
||
self.session.close()
|
||
|
||
def run_continuous(self, total_visits=None, delay_range=None):
|
||
"""连续执行多次真实访问"""
|
||
if total_visits is None:
|
||
total_visits = self.config['settings']['default_visits']
|
||
|
||
if delay_range is None:
|
||
delay_range = (
|
||
self.config['settings']['min_delay'],
|
||
self.config['settings']['max_delay']
|
||
)
|
||
|
||
success_count = 0
|
||
|
||
logger.info(f"🎯 开始连续访问,目标: {total_visits} 次")
|
||
|
||
for i in range(total_visits):
|
||
logger.info(f"{'='*60}")
|
||
logger.info(f"🔄 执行第 {i+1}/{total_visits} 次访问")
|
||
logger.info(f"{'='*60}")
|
||
|
||
if self.run_single_visit():
|
||
success_count += 1
|
||
logger.info(f"✅ 第 {i+1} 次访问成功!累计成功: {success_count}")
|
||
else:
|
||
logger.error(f"❌ 第 {i+1} 次访问失败!")
|
||
|
||
# 智能延迟
|
||
if i < total_visits - 1:
|
||
base_delay = random.uniform(delay_range[0], delay_range[1])
|
||
behavior = self.user_db.get_visit_behavior()
|
||
|
||
# 根据访问模式调整延迟
|
||
if behavior['pattern_type'] == "工作时间":
|
||
delay = base_delay * 0.7
|
||
elif behavior['pattern_type'] == "深夜":
|
||
delay = base_delay * 1.8
|
||
else:
|
||
delay = base_delay
|
||
|
||
logger.info(f"⏳ 智能等待 {delay:.1f} 秒 (当前时段: {behavior['pattern_type']})")
|
||
time.sleep(delay)
|
||
|
||
success_rate = (success_count / total_visits) * 100
|
||
logger.info(f"🎉 访问任务完成!")
|
||
logger.info(f"📊 成功率: {success_count}/{total_visits} ({success_rate:.1f}%)")
|
||
return success_count
|
||
|
||
def main():
|
||
"""主函数"""
|
||
config_file = 'config.json'
|
||
|
||
if not os.path.exists(config_file):
|
||
print(f"❌ 配置文件 {config_file} 不存在!")
|
||
return
|
||
|
||
try:
|
||
bot = WebTrafficBotFinal(config_file)
|
||
|
||
print("=" * 60)
|
||
print("🎭 网站流量模拟脚本 (最终升级版)")
|
||
print("=" * 60)
|
||
print("🌟 特性:真实用户行为 + Google/GitHub来源 + 完整浏览器指纹")
|
||
print("⚠️ 请确保仅用于测试自己的网站!")
|
||
print()
|
||
print(f"🎯 目标网站: {bot.config['targets']['main_site']}")
|
||
print(f"🎮 游戏页面: {bot.config['targets']['game_page']}")
|
||
print()
|
||
|
||
print("请选择运行模式:")
|
||
print("1. 💎 单次完整访问测试")
|
||
print("2. 🚀 连续访问模式 (使用配置参数)")
|
||
print("3. ⚙️ 自定义连续访问")
|
||
|
||
choice = input("请输入选择 (1/2/3): ").strip()
|
||
|
||
if choice == "1":
|
||
logger.info("🎬 开始单次完整访问测试")
|
||
success = bot.run_single_visit()
|
||
if success:
|
||
print("🎉 单次访问测试完美成功!")
|
||
else:
|
||
print("😞 单次访问测试失败!")
|
||
|
||
elif choice == "2":
|
||
logger.info("🎬 开始连续访问模式")
|
||
success_count = bot.run_continuous()
|
||
total = bot.config['settings']['default_visits']
|
||
print(f"🎉 连续访问完成!成功率: {success_count}/{total} ({(success_count/total)*100:.1f}%)")
|
||
|
||
elif choice == "3":
|
||
try:
|
||
visit_count = int(input("请输入访问次数: ").strip())
|
||
min_delay = int(input("请输入最小延迟秒数: ").strip())
|
||
max_delay = int(input("请输入最大延迟秒数: ").strip())
|
||
|
||
logger.info(f"🎬 开始自定义连续访问,总次数: {visit_count}")
|
||
success_count = bot.run_continuous(
|
||
total_visits=visit_count,
|
||
delay_range=(min_delay, max_delay)
|
||
)
|
||
|
||
print(f"🎉 自定义访问完成!成功率: {success_count}/{visit_count} ({(success_count/visit_count)*100:.1f}%)")
|
||
|
||
except ValueError:
|
||
print("❌ 输入参数错误!")
|
||
else:
|
||
print("❌ 无效选择!")
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n⚠️ 用户中断执行")
|
||
except Exception as e:
|
||
logger.error(f"程序执行出错: {e}")
|
||
print("❌ 程序执行出错,请检查日志文件 traffic_bot_final.log")
|
||
|
||
if __name__ == "__main__":
|
||
main() |