Files
shualiangv1/website_traffic_bot_final.py
huangzhenpc fdac72a040 正式22
2025-07-18 11:24:38 +08:00

1120 lines
45 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
网站流量模拟脚本 (最终版本)
使用真实用户数据库模拟最真实的访问轨迹
支持Google搜索来源和真实网站跳转
2024升级增加多游戏快速选择行为 + 宝塔友好模式 + 移动端偏好
"""
import requests
import time
import random
import json
import os
import logging
from urllib.parse import urlparse, urljoin, parse_qs
import re
from real_user_database import RealUserDatabase
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('traffic_bot_final.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class WebTrafficBotFinal:
def __init__(self, config_file='config.json'):
"""
初始化最终版流量机器人
"""
self.config = self.load_config(config_file)
self.session = None
self.user_db = RealUserDatabase()
self.current_profile = None
self.current_behavior = None
# 真实的访问来源页面主要从Google和真实网站
self.traffic_sources = [
# Google搜索来源 (主要)
"https://www.google.com/search?q=2048+game+online",
"https://www.google.com/search?q=html5+games",
"https://www.google.com/search?q=browser+games+2048",
"https://www.google.com/search?q=free+online+games",
"https://www.google.com/search?q=puzzle+games+online",
"https://www.google.com/search?q=数字游戏+2048",
"https://www.google.com/search?q=在线小游戏",
# 用户指定的真实网站
"https://github.com/chengazhen/cursor-auto-free",
"https://linux.do/",
# 其他真实来源网站
"https://github.com/trending",
"https://github.com/topics/game",
"https://news.ycombinator.com/",
"https://www.reddit.com/r/WebGames/",
"https://www.reddit.com/r/incremental_games/",
"https://www.producthunt.com/",
"https://stackoverflow.com/",
"https://www.zhihu.com/",
"https://v2ex.com/",
"https://segmentfault.com/",
"https://juejin.cn/",
"https://www.csdn.net/",
"https://www.oschina.net/",
"https://gitee.com/",
]
# 🎮 多游戏页面列表 - 模拟真实用户的游戏选择行为
self.game_pages = [
"/games/2048/index.html",
"/games/snake/index.html",
"/games/iframe-games.html?game=diy-doll-factory",
"/games/iframe-games.html?game=super-sprunki-adventure",
"/games/iframe-games.html?game=flightbird",
"/games/iframe-games.html?game=tap-to-color-painting-book",
"/games/iframe-games.html?game=lazy-gto-6",
"/games/iframe-games.html?game=something-below-the-sea",
"/games/iframe-games.html?game=gem-clicker-pro",
"/games/iframe-games.html?game=guess-tiles",
"/games/iframe-games.html?game=tic-tac-toe-2025",
"/games/iframe-games.html?game=bubble-shooter",
"/games/iframe-games.html?game=tetris-classic",
"/games/iframe-games.html?game=candy-crush",
"/games/iframe-games.html?game=puzzle-adventure"
]
def load_config(self, config_file):
"""加载配置文件"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
logger.info(f"配置文件加载成功: {config_file}")
return config
except FileNotFoundError:
logger.error(f"配置文件未找到: {config_file}")
raise
except json.JSONDecodeError as e:
logger.error(f"配置文件格式错误: {e}")
raise
def setup_session(self):
"""设置请求会话 - 宝塔友好增强版"""
self.session = requests.Session()
# 🍪 启用Cookie支持 - 宝塔检测的重要指标
self.session.cookies.clear()
# 生成真实用户配置
self.current_profile = self.user_db.get_random_user_profile()
self.current_behavior = self.user_db.get_visit_behavior()
# 设置代理
proxy_config = self.config.get('proxy')
if proxy_config:
proxy_url = f"http://{proxy_config['username']}:{proxy_config['password']}@{proxy_config['host']}:{proxy_config['port']}"
self.session.proxies = {
'http': proxy_url,
'https': proxy_url
}
logger.info(f"已配置代理: {proxy_config['host']}:{proxy_config['port']}")
# 🌟 使用增强的真实浏览器头部 - 专门针对宝塔优化
enhanced_headers = self._get_baota_friendly_headers()
self.session.headers.update(enhanced_headers)
# 记录用户特征
logger.info(f"🎭 用户身份配置:")
logger.info(f" 操作系统: {self.current_profile['operating_system']}")
logger.info(f" 屏幕分辨率: {self.current_profile['screen_resolution']}")
logger.info(f" 浏览器语言: {self.current_profile['language']}")
logger.info(f" 时区: {self.current_profile['timezone']}")
logger.info(f" 访问时间模式: {self.current_behavior['pattern_type']}")
logger.info(f" 硬件并发数: {self.current_profile['hardware_concurrency']}")
logger.info(f" 设备内存: {self.current_profile['device_memory']}GB")
# 获取并显示当前IP
current_ip = self.get_current_ip()
if current_ip:
print(f"🌍 当前IP地址: {current_ip}")
print(f"👤 用户身份: {self.current_profile['operating_system']} | {self.current_profile['screen_resolution']} | {self.current_behavior['pattern_type']}")
def _get_baota_friendly_headers(self):
"""生成宝塔友好的完整HTTP头部 - 移动端偏好版"""
# 🎯 增加移动端User-Agent偏好 - 因为测试发现移动端更容易通过防护
if random.random() < 0.7: # 70%概率使用移动端
mobile_user_agents = [
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_1_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (iPhone; CPU iPhone OS 16_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (iPhone; CPU iPhone OS 15_7_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.6 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (Linux; Android 14; SM-G991B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (Linux; Android 13; SM-A515F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (Linux; Android 12; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (iPad; CPU OS 17_1_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (iPad; CPU OS 16_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1",
]
user_agent = random.choice(mobile_user_agents)
is_mobile = True
else:
user_agent = self.current_profile["user_agent"]
is_mobile = "Mobile" in user_agent or "iPhone" in user_agent or "Android" in user_agent
# 🎯 宝塔友好的完整头部配置
headers = {
"User-Agent": user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": self.current_profile["language"],
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Cache-Control": "max-age=0",
"DNT": str(random.randint(0, 1)),
"Pragma": "no-cache",
# 🎯 关键:宝塔可能检测的额外头部
"X-Requested-With": "XMLHttpRequest" if random.random() < 0.1 else None,
"Origin": None,
"Purpose": "prefetch" if random.random() < 0.05 else None,
# 🎯 移动端特有头部
"X-Forwarded-For": None, # 避免代理检测
"X-Real-IP": None, # 避免代理检测
}
# 移动端特殊头部
if is_mobile:
headers.update({
"Sec-CH-UA-Mobile": "?1",
"Sec-CH-UA-Platform": '"Android"' if "Android" in user_agent else '"iOS"',
"Viewport-Width": str(random.choice([375, 414, 390, 393, 412])),
"Device-Memory": str(random.choice([4, 6, 8])),
})
# 移除None值
headers = {k: v for k, v in headers.items() if v is not None}
# 根据浏览器类型添加特定头部
if "Chrome" in user_agent:
headers.update({
"sec-ch-ua": self._generate_chrome_sec_ch_ua(user_agent),
"sec-ch-ua-mobile": "?1" if is_mobile else "?0",
"sec-ch-ua-platform": f'"{self._get_platform_from_ua(user_agent)}"',
})
logger.info(f"📱 使用User-Agent: {user_agent}")
logger.info(f"📱 移动端模式: {is_mobile}")
return headers
def _generate_chrome_sec_ch_ua(self, user_agent):
"""生成Chrome的sec-ch-ua头部"""
if "Chrome/" in user_agent:
version = user_agent.split("Chrome/")[1].split(".")[0]
return f'"Not_A Brand";v="8", "Chromium";v="{version}", "Google Chrome";v="{version}"'
return '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"'
def _get_platform_version(self):
"""获取平台版本信息"""
platform = self.current_profile["platform"]
if platform == "Windows":
return random.choice(['"10.0.0"', '"11.0.0"'])
elif platform == "macOS":
return random.choice(['"13.6.0"', '"14.1.0"', '"12.7.0"'])
elif platform == "Linux":
return '""'
return '""'
def _get_platform_from_ua(self, user_agent):
"""从User-Agent中提取平台信息"""
if "iPhone" in user_agent or "iPad" in user_agent:
return "iOS"
elif "Android" in user_agent:
return "Android"
elif "Windows" in user_agent:
return "Windows"
elif "Macintosh" in user_agent:
return "macOS"
elif "Linux" in user_agent:
return "Linux"
else:
return "Unknown"
def get_current_ip(self):
"""获取当前IP地址"""
ip_services = [
'https://httpbin.org/ip',
'https://api.ipify.org?format=json',
'https://ipinfo.io/json',
]
for service in ip_services:
try:
logger.info(f"🔍 正在获取IP地址: {service}")
response = self.session.get(service, timeout=10)
if response.status_code == 200:
if 'json' in service or 'httpbin' in service or 'ipinfo' in service:
try:
data = response.json()
if 'origin' in data:
ip = data['origin']
elif 'ip' in data:
ip = data['ip']
else:
ip = str(data)
except:
ip = response.text.strip()
else:
ip = response.text.strip()
logger.info(f"✅ 当前IP地址: {ip}")
return ip
except Exception as e:
logger.warning(f"{service} 获取IP失败: {e}")
continue
logger.error("❌ 无法获取当前IP地址")
return None
def simulate_realistic_source_visit(self):
"""模拟真实的来源网站访问"""
source_page = random.choice(self.traffic_sources)
try:
logger.info(f"🔗 模拟从来源访问: {source_page}")
# 根据来源类型调整访问行为
if "google.com" in source_page:
stay_time = self.user_db.get_realistic_timing(
random.uniform(3, 12), # Google搜索停留时间较短
self.current_behavior
)
logger.info(f"🔍 Google搜索停留 {stay_time:.1f}")
self._simulate_google_search_behavior(stay_time)
elif "github.com" in source_page:
stay_time = self.user_db.get_realistic_timing(
random.uniform(8, 25), # GitHub停留时间中等
self.current_behavior
)
logger.info(f"🐙 GitHub页面停留 {stay_time:.1f}")
self._simulate_github_behavior(stay_time, source_page)
elif "linux.do" in source_page:
stay_time = self.user_db.get_realistic_timing(
random.uniform(10, 30), # 技术社区停留时间较长
self.current_behavior
)
logger.info(f"💻 Linux.do社区停留 {stay_time:.1f}")
self._simulate_community_behavior(stay_time)
else:
stay_time = self.user_db.get_realistic_timing(
random.uniform(5, 20), # 其他网站
self.current_behavior
)
logger.info(f"🌐 其他网站停留 {stay_time:.1f}")
self._simulate_general_browsing(stay_time)
# 设置referrer用于后续访问
self.session.headers.update({
'Referer': source_page,
'Sec-Fetch-Site': 'cross-site'
})
return True
except Exception as e:
logger.error(f"来源访问模拟失败: {e}")
return False
def _simulate_google_search_behavior(self, total_time):
"""模拟Google搜索行为"""
actions = [
"输入搜索关键词",
"查看搜索结果",
"滚动浏览结果页面",
"点击相关搜索建议",
"查看图片搜索结果"
]
segments = random.randint(2, 4) # Google搜索动作较少
segment_time = total_time / segments
for i, action in enumerate(random.sample(actions, segments)):
if i > 0:
delay = self.user_db.simulate_human_delays("thinking")
time.sleep(delay)
logger.info(f" 🔍 Google行为: {action}")
if "输入" in action:
# 模拟输入搜索词的时间
typing_time = random.uniform(2, 5)
time.sleep(typing_time)
elif "滚动" in action:
# 模拟快速滚动查看结果
time.sleep(segment_time * 0.6)
else:
time.sleep(segment_time * 0.8)
def _simulate_github_behavior(self, total_time, github_url):
"""模拟GitHub页面行为"""
if "cursor-auto-free" in github_url:
actions = [
"查看项目README",
"阅读项目描述",
"查看Stars和Forks数量",
"浏览代码文件",
"查看Issues讨论",
"阅读使用说明"
]
else:
actions = [
"查看trending项目",
"浏览热门仓库",
"查看项目描述",
"阅读README文件",
"查看代码示例"
]
segments = random.randint(3, 6)
segment_time = total_time / segments
for i in range(segments):
if i > 0:
delay = self.user_db.simulate_human_delays("reading")
time.sleep(delay)
action = random.choice(actions)
logger.info(f" 🐙 GitHub行为: {action}")
if "阅读" in action:
time.sleep(segment_time * 0.7)
elif "查看" in action:
time.sleep(segment_time * 0.5)
else:
time.sleep(segment_time * 0.4)
def _simulate_community_behavior(self, total_time):
"""模拟技术社区行为"""
actions = [
"浏览热门帖子",
"阅读技术讨论",
"查看最新话题",
"搜索相关内容",
"查看用户资料",
"阅读精华帖子"
]
segments = random.randint(4, 7) # 社区停留时间较长,动作较多
segment_time = total_time / segments
for i in range(segments):
if i > 0:
delay = self.user_db.simulate_human_delays("reading")
time.sleep(delay)
action = random.choice(actions)
logger.info(f" 💻 社区行为: {action}")
if "阅读" in action:
time.sleep(segment_time * 0.8) # 阅读时间较长
else:
time.sleep(segment_time * 0.6)
def _simulate_general_browsing(self, total_time):
"""模拟一般网站浏览行为"""
actions = [
"浏览页面内容",
"查看导航菜单",
"滚动阅读文章",
"点击相关链接",
"查看评论区"
]
segments = random.randint(3, 5)
segment_time = total_time / segments
for i in range(segments):
if i > 0:
delay = self.user_db.simulate_human_delays("normal")
time.sleep(delay)
action = random.choice(actions)
logger.info(f" 🌐 浏览行为: {action}")
time.sleep(segment_time * random.uniform(0.7, 1.2))
def visit_main_site_realistic(self):
"""真实模拟访问主网站"""
main_site = self.config['targets']['main_site']
try:
logger.info(f"🏠 访问目标网站: {main_site}")
# 发起请求
response = self.make_realistic_request(main_site)
if not response:
return False
# 模拟真实的页面浏览行为 - 缩短主页停留时间
self._simulate_realistic_browsing(response, is_main_page=True)
return True
except Exception as e:
logger.error(f"主网站访问失败: {e}")
return False
def visit_multiple_games_realistic(self):
"""🎮 模拟真实用户快速浏览多个游戏的行为"""
main_site = self.config['targets']['main_site']
base_url = main_site.rstrip('/')
# 模拟真实用户行为快速浏览3-8个游戏页面选择
game_count = random.randint(3, 8)
selected_games = random.sample(self.game_pages, min(game_count, len(self.game_pages)))
logger.info(f"🎯 开始快速游戏选择行为,将浏览 {game_count} 个游戏")
total_start_time = time.time()
success_count = 0
for i, game_path in enumerate(selected_games):
game_url = base_url + game_path
try:
logger.info(f"🎮 第{i+1}/{game_count}个游戏: {game_path}")
# 更新referrer
if i == 0:
# 第一个游戏从主页进入
self.session.headers.update({
'Referer': main_site,
'Sec-Fetch-Site': 'same-origin'
})
else:
# 后续游戏从前一个游戏页面进入
prev_game_url = base_url + selected_games[i-1]
self.session.headers.update({
'Referer': prev_game_url,
'Sec-Fetch-Site': 'same-origin'
})
# 发起请求
response = self.make_realistic_request(game_url)
if not response:
logger.warning(f"⚠️ 游戏页面访问失败: {game_path}")
continue
# 模拟快速浏览行为真实用户模式1-4秒快速查看
self._simulate_quick_game_browsing(response, game_path, i+1, game_count)
success_count += 1
# 游戏间的快速切换延迟(模拟点击下一个游戏)
if i < len(selected_games) - 1:
switch_delay = random.uniform(0.5, 2.0)
logger.info(f" ⚡ 快速切换到下个游戏: {switch_delay:.1f}")
time.sleep(switch_delay)
except Exception as e:
logger.error(f"❌ 游戏访问出错 {game_path}: {e}")
continue
total_time = time.time() - total_start_time
logger.info(f"🏁 游戏选择完成!访问了 {success_count}/{game_count} 个游戏,总耗时: {total_time:.1f}")
# 模拟最终选择一个游戏进行较长时间游玩
if success_count > 0:
self._simulate_final_game_selection(selected_games, base_url)
return success_count > 0
def _simulate_quick_game_browsing(self, response, game_path, current_index, total_count):
"""模拟快速游戏浏览行为 - 防护绕过版"""
# 🎯 增加游戏页面停留时间3-8秒
browse_time = random.uniform(3.0, 8.0)
game_name = self._extract_game_name(game_path)
logger.info(f" 👀 快速查看 {game_name}: {browse_time:.1f}")
# 🎯 减少Javascript交互频率
if random.random() < 0.2: # 降低到20%概率
self._simulate_javascript_behavior(response.url)
# 模拟快速扫视页面
quick_actions = [
"查看游戏截图",
"阅读游戏标题",
"扫视游戏介绍",
"检查加载状态"
]
action_count = random.randint(1, 3)
action_time = browse_time / action_count
for i in range(action_count):
if i > 0:
time.sleep(random.uniform(0.5, 1.2)) # 增加间隔
action = random.choice(quick_actions)
logger.info(f"{action}")
time.sleep(action_time * random.uniform(0.8, 1.6))
# 检查是否需要登录
if self._check_login_required(response):
logger.info(f" ⚠️ 检测到登录页面,快速跳过")
def _simulate_final_game_selection(self, selected_games, base_url):
"""模拟最终选择一个游戏进行深度游玩"""
# 有30%概率选择一个游戏进行深度体验
if random.random() < 0.3:
final_game = random.choice(selected_games)
final_game_url = base_url + final_game
logger.info(f"🎯 最终选择深度体验: {final_game}")
try:
# 重新访问选定的游戏
self.session.headers.update({
'Referer': base_url + selected_games[-1], # 从最后浏览的游戏页面返回
'Sec-Fetch-Site': 'same-origin'
})
response = self.make_realistic_request(final_game_url)
if response:
# 模拟正常游戏行为(较长时间)
self._simulate_realistic_gaming(response, final_game)
except Exception as e:
logger.error(f"深度游戏体验失败: {e}")
def _extract_game_name(self, game_path):
"""从游戏路径提取游戏名称"""
if "2048" in game_path:
return "2048数字游戏"
elif "snake" in game_path:
return "贪吃蛇"
elif "diy-doll-factory" in game_path:
return "DIY娃娃工厂"
elif "super-sprunki-adventure" in game_path:
return "超级冒险"
elif "flightbird" in game_path:
return "飞行小鸟"
elif "tap-to-color" in game_path:
return "点击涂色"
elif "lazy-gto" in game_path:
return "懒惰GTO"
elif "something-below-the-sea" in game_path:
return "海底探险"
elif "gem-clicker" in game_path:
return "宝石点击"
elif "guess-tiles" in game_path:
return "猜瓷砖"
elif "tic-tac-toe" in game_path:
return "井字棋2025"
else:
return "未知游戏"
def _check_login_required(self, response):
"""检查是否需要登录"""
if not response:
return False
content = response.text.lower()
login_indicators = [
'登录', 'login', '用户名', 'username', 'password', '密码',
'signin', 'sign in', '账号', 'account', '验证码'
]
login_count = sum(1 for indicator in login_indicators if indicator in content)
if login_count >= 3:
logger.warning("⚠️ 检测到可能的登录页面")
logger.info("💡 建议:检查网站是否需要登录访问")
return True
return False
def make_realistic_request(self, url, timeout=20):
"""发起宝塔友好的真实HTTP请求 - 防护绕过增强版"""
try:
# 🎯 增加更长的延迟以避免被识别为爬虫
pre_request_delay = random.uniform(3, 8)
logger.info(f"🕐 人工延迟: {pre_request_delay:.1f}")
time.sleep(pre_request_delay)
# 🎯 分多次尝试请求,模拟网络重试
max_retries = 3
for attempt in range(max_retries):
try:
response = self.session.get(url, timeout=timeout, allow_redirects=True)
break
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
if attempt < max_retries - 1:
retry_delay = random.uniform(5, 12)
logger.warning(f"⚠️ 连接失败,{retry_delay:.1f}秒后重试 (尝试 {attempt + 1}/{max_retries})")
time.sleep(retry_delay)
continue
else:
raise e
# 记录详细信息
logger.info(f"📡 HTTP请求详情:")
logger.info(f" 📍 访问URL: {url}")
logger.info(f" 📊 状态码: {response.status_code}")
logger.info(f" 📦 响应大小: {len(response.content)} 字节")
logger.info(f" ⏱️ 响应时间: {response.elapsed.total_seconds():.2f}")
# 🍪 记录Cookie信息
if response.cookies:
logger.info(f" 🍪 接收到Cookies: {len(response.cookies)}")
for cookie in response.cookies:
logger.info(f" 🍪 {cookie.name}={cookie.value[:20]}...")
if response.headers.get('content-type'):
logger.info(f" 📄 内容类型: {response.headers.get('content-type')}")
if response.headers.get('server'):
logger.info(f" 🖥️ 服务器: {response.headers.get('server')}")
# 🎯 检查是否被重定向或拦截
if response.url != url:
logger.warning(f"🔄 发生重定向: {url} -> {response.url}")
# 🎯 检查响应内容是否正常
if len(response.content) < 1000:
logger.warning(f"⚠️ 响应内容过小,可能被拦截")
response.raise_for_status()
# 🎯 模拟浏览器的自动行为,但减少频率避免被检测
if random.random() < 0.3: # 降低到30%概率
self._simulate_browser_auto_requests(url, response)
return response
except requests.exceptions.RequestException as e:
logger.error(f"❌ 请求失败 {url}: {e}")
return None
def _simulate_browser_auto_requests(self, base_url, response):
"""模拟浏览器自动请求静态资源 - 防护绕过版"""
if random.random() < 0.5: # 50%概率模拟资源请求
parsed_url = urlparse(base_url)
base_domain = f"{parsed_url.scheme}://{parsed_url.netloc}"
# 🎯 减少资源请求,只请求关键资源
essential_resources = [
"/favicon.ico",
"/css/style.css",
"/js/main.js",
]
# 随机选择1个资源请求
resource = random.choice(essential_resources)
try:
# 🎯 更长的延迟
time.sleep(random.uniform(2, 5))
resource_headers = self.session.headers.copy()
resource_headers.update({
"Referer": base_url,
"Sec-Fetch-Dest": self._get_resource_dest(resource),
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Site": "same-origin",
})
resource_url = base_domain + resource
resource_response = self.session.get(
resource_url,
headers=resource_headers,
timeout=10,
allow_redirects=True
)
if resource_response.status_code == 200:
logger.info(f" 📄 成功请求资源: {resource}")
except Exception as e:
logger.debug(f" ⚠️ 资源请求失败 {resource}: {e}")
def _get_resource_dest(self, resource_path):
"""根据资源路径确定Sec-Fetch-Dest"""
if resource_path.endswith(('.css',)):
return "style"
elif resource_path.endswith(('.js',)):
return "script"
elif resource_path.endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp', '.ico')):
return "image"
elif resource_path.endswith('.json'):
return "manifest"
else:
return "empty"
def _simulate_javascript_behavior(self, url):
"""模拟Javascript行为 - 防护绕过版"""
if random.random() < 0.2: # 降低到20%概率
parsed_url = urlparse(url)
base_domain = f"{parsed_url.scheme}://{parsed_url.netloc}"
# 🎯 只模拟最基本的AJAX请求
basic_endpoints = [
"/api/ping",
"/heartbeat",
"/api/status"
]
endpoint = random.choice(basic_endpoints)
ajax_url = base_domain + endpoint
try:
ajax_headers = self.session.headers.copy()
ajax_headers.update({
"X-Requested-With": "XMLHttpRequest",
"Content-Type": "application/json",
"Referer": url,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
})
# 🎯 更长的AJAX延迟
time.sleep(random.uniform(5, 15))
ajax_response = self.session.get(
ajax_url,
headers=ajax_headers,
timeout=8
)
if ajax_response.status_code == 200:
logger.info(f" 🔄 AJAX请求成功: {endpoint}")
except Exception as e:
logger.debug(f" ⚠️ AJAX请求失败 {endpoint}: {e}")
def _simulate_realistic_browsing(self, response, is_main_page=False):
"""模拟真实的页面浏览行为 - 防护绕过增强版"""
content = response.text
# 估算页面内容长度和阅读时间
text_length = len(re.sub(r'<[^>]+>', '', content))
reading_time = text_length / self.current_behavior['reading_speed'] * 60
logger.info(f"📖 页面分析:")
logger.info(f" 📝 内容长度: {text_length} 字符")
logger.info(f" ⏱️ 预估阅读时间: {reading_time:.1f}")
# 🎯 增加停留时间以避免被识别为爬虫
if is_main_page:
# 主页停留时间5-15秒增加以通过防护检测
base_time = random.uniform(5, 15)
else:
base_time = min(reading_time, 60)
stay_time = self.user_db.get_realistic_timing(base_time, self.current_behavior)
logger.info(f" 🕐 实际停留时间: {stay_time:.1f}")
# 🎯 减少Javascript行为频率
if random.random() < 0.3: # 降低到30%概率
self._simulate_javascript_behavior(response.url)
# 模拟分段浏览
self._simulate_browsing_segments(stay_time, content)
def _simulate_browsing_segments(self, total_time, content):
"""模拟分段浏览行为"""
# 查找页面中的链接
links = re.findall(r'href=["\'](.*?)["\']', content)
internal_links = [link for link in links if not link.startswith('http') or self.config['targets']['main_site'] in link]
segments = random.randint(3, 8)
segment_time = total_time / segments
browsing_actions = [
"📖 阅读页面内容",
"🧭 查看导航菜单",
"📜 滚动浏览页面",
"🔗 检查页面链接",
"👁️ 观察页面布局",
"📱 查看页脚信息"
]
for i in range(segments):
action = random.choice(browsing_actions)
logger.info(f" {action}")
# 根据行为类型调整时间
if "滚动" in action:
self._simulate_scrolling_behavior(segment_time)
elif "检查链接" in action and internal_links:
self._simulate_link_hovering(internal_links)
time.sleep(segment_time * 0.8)
else:
actual_segment_time = segment_time * random.uniform(0.7, 1.3)
time.sleep(actual_segment_time)
def _simulate_scrolling_behavior(self, duration):
"""模拟真实的滚动行为"""
scroll_sessions = random.randint(2, 5)
session_time = duration / scroll_sessions
for session in range(scroll_sessions):
logger.info(f" 📜 滚动会话 {session + 1}")
# 模拟快速滚动
quick_scrolls = random.randint(2, 4)
for _ in range(quick_scrolls):
time.sleep(random.uniform(0.3, 1.0))
# 模拟停顿阅读
if random.random() < 0.7:
pause_time = random.uniform(1, 4)
logger.info(f" ⏸️ 停顿阅读 {pause_time:.1f}")
time.sleep(pause_time)
def _simulate_link_hovering(self, links):
"""模拟鼠标悬停在链接上"""
hover_count = min(random.randint(1, 3), len(links))
sample_links = random.sample(links, hover_count)
for link in sample_links:
logger.info(f" 🔗 查看链接: {link[:50]}...")
time.sleep(random.uniform(0.5, 2.5))
def _simulate_realistic_gaming(self, response, game_path=None):
"""模拟真实的游戏行为"""
if game_path:
game_name = self._extract_game_name(game_path)
logger.info(f"🎲 开始深度体验 {game_name}")
else:
logger.info("🎲 开始游戏模拟")
# 游戏前的准备时间
prep_time = self.user_db.simulate_human_delays("thinking")
logger.info(f"🤔 游戏加载和准备: {prep_time:.1f}")
time.sleep(prep_time)
# 获取游戏停留时间
base_time = random.uniform(*self.config['settings']['game_page_stay_time'])
game_time = self.user_db.get_realistic_timing(base_time, self.current_behavior)
logger.info(f"🎮 游戏总时长: {game_time:.1f}")
# 模拟游戏过程
self._simulate_game_sessions(game_time)
def _simulate_game_sessions(self, total_time):
"""模拟游戏会话"""
sessions = random.randint(2, 4)
for session in range(sessions):
session_time = total_time / sessions * random.uniform(0.8, 1.2)
logger.info(f"🎯 游戏会话 {session + 1}/{sessions}, 时长: {session_time:.1f}")
self._simulate_single_game_session(session_time)
# 会话间休息
if session < sessions - 1:
break_time = random.uniform(3, 10)
logger.info(f"⏸️ 休息思考: {break_time:.1f}")
time.sleep(break_time)
def _simulate_single_game_session(self, session_time):
"""模拟单个游戏会话"""
game_moves = ["⬆️上", "⬇️下", "⬅️左", "➡️右"]
start_time = time.time()
move_count = 0
while time.time() - start_time < session_time:
move = random.choice(game_moves)
move_count += 1
logger.info(f" 🎮 第{move_count}步: {move}")
# 模拟不同难度的思考时间
if move_count % 8 == 0: # 每8步深度思考
think_time = random.uniform(4, 10)
logger.info(f" 🧠 策略思考: {think_time:.1f}")
time.sleep(think_time)
elif random.random() < 0.4: # 40%概率短暂思考
think_time = random.uniform(0.8, 3)
time.sleep(think_time)
else: # 快速移动
time.sleep(random.uniform(0.4, 1.2))
# 模拟偶尔的错误操作和纠正
if random.random() < 0.06: # 6%概率误操作
logger.info(" ❌ 误操作,立即纠正")
time.sleep(0.3)
corrective_move = random.choice(game_moves)
logger.info(f" 🔄 纠正: {corrective_move}")
time.sleep(random.uniform(0.5, 1.0))
def run_single_visit(self):
"""执行一次完整的真实访问流程"""
logger.info("🚀 开始执行真实访问流程")
# 设置会话
self.setup_session()
try:
# 1. 模拟来源网站访问
if not self.simulate_realistic_source_visit():
logger.warning("⚠️ 来源访问模拟失败,继续执行")
# 2. 访问主网站
if not self.visit_main_site_realistic():
logger.error("❌ 主网站访问失败")
return False
# 3. 🎮 NEW: 模拟真实的多游戏快速选择行为
if not self.visit_multiple_games_realistic():
logger.error("❌ 多游戏访问失败")
return False
logger.info("✅ 访问流程完美执行成功!")
return True
except Exception as e:
logger.error(f"❌ 访问流程执行出错: {e}")
return False
finally:
if self.session:
self.session.close()
def run_continuous(self, total_visits=None, delay_range=None):
"""连续执行多次真实访问"""
if total_visits is None:
total_visits = self.config['settings']['default_visits']
if delay_range is None:
delay_range = (
self.config['settings']['min_delay'],
self.config['settings']['max_delay']
)
success_count = 0
logger.info(f"🎯 开始连续访问,目标: {total_visits}")
for i in range(total_visits):
logger.info(f"{'='*60}")
logger.info(f"🔄 执行第 {i+1}/{total_visits} 次访问")
logger.info(f"{'='*60}")
if self.run_single_visit():
success_count += 1
logger.info(f"✅ 第 {i+1} 次访问成功!累计成功: {success_count}")
else:
logger.error(f"❌ 第 {i+1} 次访问失败!")
# 智能延迟
if i < total_visits - 1:
base_delay = random.uniform(delay_range[0], delay_range[1])
behavior = self.user_db.get_visit_behavior()
# 根据访问模式调整延迟
if behavior['pattern_type'] == "工作时间":
delay = base_delay * 0.7
elif behavior['pattern_type'] == "深夜":
delay = base_delay * 1.8
else:
delay = base_delay
logger.info(f"⏳ 智能等待 {delay:.1f} 秒 (当前时段: {behavior['pattern_type']})")
time.sleep(delay)
success_rate = (success_count / total_visits) * 100
logger.info(f"🎉 访问任务完成!")
logger.info(f"📊 成功率: {success_count}/{total_visits} ({success_rate:.1f}%)")
return success_count
def main():
"""主函数"""
config_file = 'config.json'
if not os.path.exists(config_file):
print(f"❌ 配置文件 {config_file} 不存在!")
return
try:
bot = WebTrafficBotFinal(config_file)
print("=" * 60)
print("🎭 网站流量模拟脚本 (最终升级版)")
print("=" * 60)
print("🌟 特性:真实用户行为 + Google/GitHub来源 + 完整浏览器指纹")
print("⚠️ 请确保仅用于测试自己的网站!")
print()
print(f"🎯 目标网站: {bot.config['targets']['main_site']}")
print(f"🎮 游戏页面: {bot.config['targets']['game_page']}")
print()
print("请选择运行模式:")
print("1. 💎 单次完整访问测试")
print("2. 🚀 连续访问模式 (使用配置参数)")
print("3. ⚙️ 自定义连续访问")
choice = input("请输入选择 (1/2/3): ").strip()
if choice == "1":
logger.info("🎬 开始单次完整访问测试")
success = bot.run_single_visit()
if success:
print("🎉 单次访问测试完美成功!")
else:
print("😞 单次访问测试失败!")
elif choice == "2":
logger.info("🎬 开始连续访问模式")
success_count = bot.run_continuous()
total = bot.config['settings']['default_visits']
print(f"🎉 连续访问完成!成功率: {success_count}/{total} ({(success_count/total)*100:.1f}%)")
elif choice == "3":
try:
visit_count = int(input("请输入访问次数: ").strip())
min_delay = int(input("请输入最小延迟秒数: ").strip())
max_delay = int(input("请输入最大延迟秒数: ").strip())
logger.info(f"🎬 开始自定义连续访问,总次数: {visit_count}")
success_count = bot.run_continuous(
total_visits=visit_count,
delay_range=(min_delay, max_delay)
)
print(f"🎉 自定义访问完成!成功率: {success_count}/{visit_count} ({(success_count/visit_count)*100:.1f}%)")
except ValueError:
print("❌ 输入参数错误!")
else:
print("❌ 无效选择!")
except KeyboardInterrupt:
print("\n⚠️ 用户中断执行")
except Exception as e:
logger.error(f"程序执行出错: {e}")
print("❌ 程序执行出错,请检查日志文件 traffic_bot_final.log")
if __name__ == "__main__":
main()