572 lines
21 KiB
Python
572 lines
21 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
网站流量模拟脚本 (真实用户行为版本)
|
||
使用真实用户数据库模拟更真实的访问轨迹
|
||
"""
|
||
|
||
import requests
|
||
import time
|
||
import random
|
||
import json
|
||
import os
|
||
import logging
|
||
from urllib.parse import urlparse, urljoin
|
||
import re
|
||
from real_user_database import RealUserDatabase
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.FileHandler('traffic_bot_realistic.log', encoding='utf-8'),
|
||
logging.StreamHandler()
|
||
]
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class WebTrafficBotRealistic:
|
||
def __init__(self, config_file='config.json'):
|
||
"""
|
||
初始化真实用户行为版流量机器人
|
||
"""
|
||
self.config = self.load_config(config_file)
|
||
self.session = None
|
||
self.user_db = RealUserDatabase()
|
||
self.current_profile = None
|
||
self.current_behavior = None
|
||
|
||
# 真实的GitHub来源页面(更丰富的列表)
|
||
self.github_referrers = [
|
||
"https://github.com/trending",
|
||
"https://github.com/trending/javascript",
|
||
"https://github.com/trending/typescript",
|
||
"https://github.com/topics/javascript",
|
||
"https://github.com/topics/game",
|
||
"https://github.com/topics/html5",
|
||
"https://github.com/topics/2048",
|
||
"https://github.com/search?q=2048+game",
|
||
"https://github.com/search?q=html5+games",
|
||
"https://github.com/collections/javascript-game-engines",
|
||
"https://github.com/explore",
|
||
"https://github.com/",
|
||
]
|
||
|
||
def load_config(self, config_file):
|
||
"""加载配置文件"""
|
||
try:
|
||
with open(config_file, 'r', encoding='utf-8') as f:
|
||
config = json.load(f)
|
||
logger.info(f"配置文件加载成功: {config_file}")
|
||
return config
|
||
except FileNotFoundError:
|
||
logger.error(f"配置文件未找到: {config_file}")
|
||
raise
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"配置文件格式错误: {e}")
|
||
raise
|
||
|
||
def setup_session(self):
|
||
"""设置请求会话"""
|
||
self.session = requests.Session()
|
||
|
||
# 生成真实用户配置
|
||
self.current_profile = self.user_db.get_random_user_profile()
|
||
self.current_behavior = self.user_db.get_visit_behavior()
|
||
|
||
# 设置代理
|
||
proxy_config = self.config.get('proxy')
|
||
if proxy_config:
|
||
proxy_url = f"http://{proxy_config['username']}:{proxy_config['password']}@{proxy_config['host']}:{proxy_config['port']}"
|
||
self.session.proxies = {
|
||
'http': proxy_url,
|
||
'https': proxy_url
|
||
}
|
||
logger.info(f"已配置代理: {proxy_config['host']}:{proxy_config['port']}")
|
||
|
||
# 使用真实用户数据库生成的头部
|
||
realistic_headers = self.user_db.get_realistic_headers(self.current_profile)
|
||
self.session.headers.update(realistic_headers)
|
||
|
||
# 记录用户特征
|
||
logger.info(f"用户配置:")
|
||
logger.info(f" 操作系统: {self.current_profile['operating_system']}")
|
||
logger.info(f" 屏幕分辨率: {self.current_profile['screen_resolution']}")
|
||
logger.info(f" 语言: {self.current_profile['language']}")
|
||
logger.info(f" 时区: {self.current_profile['timezone']}")
|
||
logger.info(f" 访问模式: {self.current_behavior['pattern_type']}")
|
||
|
||
# 获取并显示当前IP
|
||
current_ip = self.get_current_ip()
|
||
if current_ip:
|
||
print(f"🌍 当前IP地址: {current_ip}")
|
||
print(f"👤 用户配置: {self.current_profile['operating_system']} | {self.current_profile['screen_resolution']} | {self.current_behavior['pattern_type']}")
|
||
|
||
def get_current_ip(self):
|
||
"""获取当前IP地址"""
|
||
ip_services = [
|
||
'https://httpbin.org/ip',
|
||
'https://api.ipify.org?format=json',
|
||
'https://ipinfo.io/json',
|
||
]
|
||
|
||
for service in ip_services:
|
||
try:
|
||
logger.info(f"正在获取IP地址: {service}")
|
||
response = self.session.get(service, timeout=10)
|
||
|
||
if response.status_code == 200:
|
||
if 'json' in service or 'httpbin' in service or 'ipinfo' in service:
|
||
try:
|
||
data = response.json()
|
||
if 'origin' in data:
|
||
ip = data['origin']
|
||
elif 'ip' in data:
|
||
ip = data['ip']
|
||
else:
|
||
ip = str(data)
|
||
except:
|
||
ip = response.text.strip()
|
||
else:
|
||
ip = response.text.strip()
|
||
|
||
logger.info(f"✅ 当前IP地址: {ip}")
|
||
return ip
|
||
|
||
except Exception as e:
|
||
logger.warning(f"从 {service} 获取IP失败: {e}")
|
||
continue
|
||
|
||
logger.error("❌ 无法获取当前IP地址")
|
||
return None
|
||
|
||
def simulate_realistic_github_visit(self):
|
||
"""模拟真实的GitHub访问过程"""
|
||
github_page = random.choice(self.github_referrers)
|
||
|
||
try:
|
||
logger.info(f"🔍 模拟从GitHub访问: {github_page}")
|
||
|
||
# 更新headers以模拟GitHub访问
|
||
github_headers = self.user_db.get_realistic_headers(
|
||
self.current_profile,
|
||
referrer=None
|
||
)
|
||
|
||
# 模拟在GitHub的真实浏览行为
|
||
github_stay_time = self.user_db.get_realistic_timing(
|
||
random.uniform(5, 15),
|
||
self.current_behavior
|
||
)
|
||
|
||
logger.info(f"📚 在GitHub页面停留 {github_stay_time:.1f} 秒")
|
||
|
||
# 模拟GitHub上的行为
|
||
self._simulate_github_browsing(github_stay_time)
|
||
|
||
# 设置referrer用于后续访问
|
||
self.session.headers.update({
|
||
'Referer': github_page,
|
||
'Sec-Fetch-Site': 'cross-site'
|
||
})
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"GitHub访问模拟失败: {e}")
|
||
return False
|
||
|
||
def _simulate_github_browsing(self, total_time):
|
||
"""模拟在GitHub上的浏览行为"""
|
||
actions = [
|
||
"查看项目描述",
|
||
"阅读README",
|
||
"查看代码文件",
|
||
"检查更新时间",
|
||
"查看星标数量",
|
||
"浏览提交历史"
|
||
]
|
||
|
||
segments = random.randint(3, 6)
|
||
segment_time = total_time / segments
|
||
|
||
for i in range(segments):
|
||
if i > 0:
|
||
delay = self.user_db.simulate_human_delays("reading")
|
||
time.sleep(delay)
|
||
|
||
action = random.choice(actions)
|
||
logger.info(f" GitHub行为: {action}")
|
||
|
||
# 模拟不同行为的停留时间
|
||
if "阅读" in action:
|
||
time.sleep(segment_time * 0.4)
|
||
elif "查看" in action:
|
||
time.sleep(segment_time * 0.3)
|
||
else:
|
||
time.sleep(segment_time * 0.2)
|
||
|
||
def visit_main_site_realistic(self):
|
||
"""真实模拟访问主网站"""
|
||
main_site = self.config['targets']['main_site']
|
||
|
||
try:
|
||
logger.info(f"🏠 访问主网站: {main_site}")
|
||
|
||
# 发起请求
|
||
response = self.make_realistic_request(main_site)
|
||
if not response:
|
||
return False
|
||
|
||
# 模拟真实的页面浏览行为
|
||
self._simulate_realistic_browsing(response, is_main_page=True)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"主网站访问失败: {e}")
|
||
return False
|
||
|
||
def visit_game_page_realistic(self):
|
||
"""真实模拟访问游戏页面"""
|
||
game_page = self.config['targets']['game_page']
|
||
main_site = self.config['targets']['main_site']
|
||
|
||
try:
|
||
logger.info(f"🎮 访问游戏页面: {game_page}")
|
||
|
||
# 更新referrer为主站
|
||
self.session.headers.update({
|
||
'Referer': main_site,
|
||
'Sec-Fetch-Site': 'same-origin'
|
||
})
|
||
|
||
# 发起请求
|
||
response = self.make_realistic_request(game_page)
|
||
if not response:
|
||
return False
|
||
|
||
# 模拟真实的游戏页面行为
|
||
self._simulate_realistic_gaming(response)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"游戏页面访问失败: {e}")
|
||
return False
|
||
|
||
def make_realistic_request(self, url, timeout=10):
|
||
"""发起真实的HTTP请求"""
|
||
try:
|
||
# 添加随机的人为延迟
|
||
pre_request_delay = self.user_db.simulate_human_delays("thinking")
|
||
time.sleep(pre_request_delay)
|
||
|
||
response = self.session.get(url, timeout=timeout, allow_redirects=True)
|
||
|
||
# 记录详细信息
|
||
logger.info(f"📡 访问 {url}")
|
||
logger.info(f" 状态码: {response.status_code}")
|
||
logger.info(f" 响应大小: {len(response.content)} 字节")
|
||
logger.info(f" 响应时间: {response.elapsed.total_seconds():.2f}秒")
|
||
|
||
if response.headers.get('content-type'):
|
||
logger.info(f" 内容类型: {response.headers.get('content-type')}")
|
||
|
||
response.raise_for_status()
|
||
return response
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
logger.error(f"请求失败 {url}: {e}")
|
||
return None
|
||
|
||
def _simulate_realistic_browsing(self, response, is_main_page=False):
|
||
"""模拟真实的页面浏览行为"""
|
||
content = response.text
|
||
|
||
# 估算页面内容长度和阅读时间
|
||
text_length = len(re.sub(r'<[^>]+>', '', content))
|
||
reading_time = text_length / self.current_behavior['reading_speed'] * 60 # 转换为秒
|
||
|
||
logger.info(f"📖 页面内容长度: {text_length} 字符")
|
||
logger.info(f"📖 预估阅读时间: {reading_time:.1f} 秒")
|
||
|
||
# 获取真实的停留时间
|
||
if is_main_page:
|
||
base_time = random.uniform(*self.config['settings']['main_site_stay_time'])
|
||
else:
|
||
base_time = reading_time
|
||
|
||
stay_time = self.user_db.get_realistic_timing(base_time, self.current_behavior)
|
||
|
||
logger.info(f"⏱️ 实际停留时间: {stay_time:.1f} 秒")
|
||
|
||
# 模拟分段浏览
|
||
self._simulate_browsing_segments(stay_time, content)
|
||
|
||
def _simulate_browsing_segments(self, total_time, content):
|
||
"""模拟分段浏览行为"""
|
||
# 查找页面中的链接
|
||
links = re.findall(r'href=["\'](.*?)["\']', content)
|
||
internal_links = [link for link in links if not link.startswith('http') or self.config['targets']['main_site'] in link]
|
||
|
||
segments = random.randint(3, 8)
|
||
segment_time = total_time / segments
|
||
|
||
browsing_actions = [
|
||
"阅读页面内容",
|
||
"查看导航菜单",
|
||
"滚动浏览",
|
||
"查看页脚信息",
|
||
"检查页面链接",
|
||
"观察页面布局"
|
||
]
|
||
|
||
for i in range(segments):
|
||
action = random.choice(browsing_actions)
|
||
logger.info(f" 浏览行为: {action}")
|
||
|
||
# 根据行为类型调整时间
|
||
if action == "滚动浏览":
|
||
self._simulate_scrolling_behavior(segment_time)
|
||
elif action == "检查页面链接" and internal_links:
|
||
self._simulate_link_hovering(internal_links)
|
||
time.sleep(segment_time * 0.8)
|
||
else:
|
||
# 添加人为的不规律性
|
||
actual_segment_time = segment_time * random.uniform(0.7, 1.3)
|
||
time.sleep(actual_segment_time)
|
||
|
||
def _simulate_scrolling_behavior(self, duration):
|
||
"""模拟真实的滚动行为"""
|
||
scroll_sessions = random.randint(2, 5)
|
||
session_time = duration / scroll_sessions
|
||
|
||
for session in range(scroll_sessions):
|
||
logger.info(f" 滚动会话 {session + 1}")
|
||
|
||
# 模拟快速滚动
|
||
quick_scrolls = random.randint(2, 4)
|
||
for _ in range(quick_scrolls):
|
||
time.sleep(random.uniform(0.2, 0.8))
|
||
|
||
# 模拟停顿阅读
|
||
if random.random() < 0.7: # 70%概率停顿阅读
|
||
pause_time = random.uniform(1, 4)
|
||
logger.info(f" 停顿阅读 {pause_time:.1f}秒")
|
||
time.sleep(pause_time)
|
||
|
||
def _simulate_link_hovering(self, links):
|
||
"""模拟鼠标悬停在链接上"""
|
||
hover_count = min(random.randint(1, 3), len(links))
|
||
sample_links = random.sample(links, hover_count)
|
||
|
||
for link in sample_links:
|
||
logger.info(f" 查看链接: {link[:50]}...")
|
||
time.sleep(random.uniform(0.5, 2))
|
||
|
||
def _simulate_realistic_gaming(self, response):
|
||
"""模拟真实的游戏行为"""
|
||
logger.info("🎲 开始模拟2048游戏")
|
||
|
||
# 游戏前的准备时间
|
||
prep_time = self.user_db.simulate_human_delays("thinking")
|
||
logger.info(f"🤔 游戏准备时间: {prep_time:.1f}秒")
|
||
time.sleep(prep_time)
|
||
|
||
# 获取游戏停留时间
|
||
base_time = random.uniform(*self.config['settings']['game_page_stay_time'])
|
||
game_time = self.user_db.get_realistic_timing(base_time, self.current_behavior)
|
||
|
||
logger.info(f"🎮 游戏总时长: {game_time:.1f}秒")
|
||
|
||
# 模拟游戏过程
|
||
self._simulate_game_sessions(game_time)
|
||
|
||
def _simulate_game_sessions(self, total_time):
|
||
"""模拟游戏会话"""
|
||
# 将游戏时间分成多个会话
|
||
sessions = random.randint(2, 5)
|
||
|
||
for session in range(sessions):
|
||
session_time = total_time / sessions * random.uniform(0.8, 1.2)
|
||
logger.info(f"🎯 游戏会话 {session + 1}, 时长: {session_time:.1f}秒")
|
||
|
||
self._simulate_single_game_session(session_time)
|
||
|
||
# 会话间休息
|
||
if session < sessions - 1:
|
||
break_time = random.uniform(2, 8)
|
||
logger.info(f"⏸️ 会话间休息: {break_time:.1f}秒")
|
||
time.sleep(break_time)
|
||
|
||
def _simulate_single_game_session(self, session_time):
|
||
"""模拟单个游戏会话"""
|
||
game_moves = ["⬆️上移", "⬇️下移", "⬅️左移", "➡️右移"]
|
||
|
||
start_time = time.time()
|
||
move_count = 0
|
||
|
||
while time.time() - start_time < session_time:
|
||
# 随机选择移动方向
|
||
move = random.choice(game_moves)
|
||
move_count += 1
|
||
|
||
logger.info(f" 第{move_count}步: {move}")
|
||
|
||
# 模拟不同难度的思考时间
|
||
if move_count % 10 == 0: # 每10步深度思考
|
||
think_time = random.uniform(3, 8)
|
||
logger.info(f" 深度思考: {think_time:.1f}秒")
|
||
time.sleep(think_time)
|
||
elif random.random() < 0.3: # 30%概率短暂思考
|
||
think_time = random.uniform(0.5, 2)
|
||
time.sleep(think_time)
|
||
else: # 快速移动
|
||
time.sleep(random.uniform(0.3, 1))
|
||
|
||
# 模拟偶尔的错误操作
|
||
if random.random() < 0.05: # 5%概率
|
||
logger.info(" 误操作,快速纠正")
|
||
time.sleep(0.2)
|
||
logger.info(f" 纠正: {random.choice(game_moves)}")
|
||
time.sleep(random.uniform(0.3, 0.8))
|
||
|
||
def run_single_visit(self):
|
||
"""执行一次完整的真实访问流程"""
|
||
logger.info("🚀 开始执行真实用户访问流程")
|
||
|
||
# 设置会话
|
||
self.setup_session()
|
||
|
||
try:
|
||
# 1. 模拟真实的GitHub访问
|
||
if not self.simulate_realistic_github_visit():
|
||
logger.warning("GitHub访问模拟失败,继续执行")
|
||
|
||
# 2. 访问主网站
|
||
if not self.visit_main_site_realistic():
|
||
logger.error("主网站访问失败")
|
||
return False
|
||
|
||
# 3. 访问游戏页面
|
||
if not self.visit_game_page_realistic():
|
||
logger.error("游戏页面访问失败")
|
||
return False
|
||
|
||
logger.info("✅ 真实访问流程执行成功")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"访问流程执行出错: {e}")
|
||
return False
|
||
|
||
finally:
|
||
# 清理资源
|
||
if self.session:
|
||
self.session.close()
|
||
|
||
def run_continuous(self, total_visits=None, delay_range=None):
|
||
"""连续执行多次真实访问"""
|
||
if total_visits is None:
|
||
total_visits = self.config['settings']['default_visits']
|
||
|
||
if delay_range is None:
|
||
delay_range = (
|
||
self.config['settings']['min_delay'],
|
||
self.config['settings']['max_delay']
|
||
)
|
||
|
||
success_count = 0
|
||
|
||
for i in range(total_visits):
|
||
logger.info(f"🔄 执行第 {i+1}/{total_visits} 次真实访问")
|
||
|
||
if self.run_single_visit():
|
||
success_count += 1
|
||
|
||
# 智能延迟(根据时间模式调整)
|
||
if i < total_visits - 1:
|
||
base_delay = random.uniform(delay_range[0], delay_range[1])
|
||
behavior = self.user_db.get_visit_behavior()
|
||
|
||
# 根据访问模式调整延迟
|
||
if behavior['pattern_type'] == "工作时间":
|
||
delay = base_delay * 0.8 # 工作时间间隔较短
|
||
elif behavior['pattern_type'] == "深夜":
|
||
delay = base_delay * 1.5 # 深夜间隔较长
|
||
else:
|
||
delay = base_delay
|
||
|
||
logger.info(f"⏳ 智能延迟 {delay:.1f} 秒 (模式: {behavior['pattern_type']})")
|
||
time.sleep(delay)
|
||
|
||
logger.info(f"🎉 真实访问完成,成功: {success_count}/{total_visits}")
|
||
return success_count
|
||
|
||
def main():
|
||
"""主函数"""
|
||
config_file = 'config.json'
|
||
|
||
if not os.path.exists(config_file):
|
||
print(f"❌ 配置文件 {config_file} 不存在!")
|
||
return
|
||
|
||
try:
|
||
bot = WebTrafficBotRealistic(config_file)
|
||
|
||
print("=== 网站流量模拟脚本 (真实用户行为版) ===")
|
||
print("🎭 使用真实用户数据库,模拟最真实的访问轨迹")
|
||
print("⚠️ 请确保仅用于测试自己的网站!")
|
||
print("目标网站:", bot.config['targets']['main_site'])
|
||
print("游戏页面:", bot.config['targets']['game_page'])
|
||
print()
|
||
|
||
print("请选择运行模式:")
|
||
print("1. 单次真实访问测试")
|
||
print("2. 连续真实访问模式 (使用配置文件设置)")
|
||
print("3. 连续真实访问模式 (自定义参数)")
|
||
|
||
choice = input("请输入选择 (1/2/3): ").strip()
|
||
|
||
if choice == "1":
|
||
logger.info("开始单次真实访问测试")
|
||
success = bot.run_single_visit()
|
||
if success:
|
||
print("✅ 单次真实访问测试成功!")
|
||
else:
|
||
print("❌ 单次真实访问测试失败!")
|
||
|
||
elif choice == "2":
|
||
logger.info("开始连续真实访问(配置文件模式)")
|
||
success_count = bot.run_continuous()
|
||
print(f"✅ 连续真实访问完成!成功: {success_count}/{bot.config['settings']['default_visits']}")
|
||
|
||
elif choice == "3":
|
||
try:
|
||
visit_count = int(input("请输入访问次数: ").strip())
|
||
min_delay = int(input("请输入最小延迟秒数: ").strip())
|
||
max_delay = int(input("请输入最大延迟秒数: ").strip())
|
||
|
||
logger.info(f"开始连续真实访问,总次数: {visit_count}")
|
||
success_count = bot.run_continuous(
|
||
total_visits=visit_count,
|
||
delay_range=(min_delay, max_delay)
|
||
)
|
||
|
||
print(f"✅ 连续真实访问完成!成功: {success_count}/{visit_count}")
|
||
|
||
except ValueError:
|
||
print("❌ 输入参数错误!")
|
||
else:
|
||
print("❌ 无效选择!")
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n⚠️ 用户中断执行")
|
||
except Exception as e:
|
||
logger.error(f"程序执行出错: {e}")
|
||
print("❌ 程序执行出错,请检查日志文件 traffic_bot_realistic.log")
|
||
|
||
if __name__ == "__main__":
|
||
main() |