Files
shualiangv1/website_traffic_bot_final.py
huangzhenpc 9f9f44ecc7 正式2
2025-07-18 10:08:38 +08:00

733 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
网站流量模拟脚本 (最终版本)
使用真实用户数据库模拟最真实的访问轨迹
支持Google搜索来源和真实网站跳转
"""
import requests
import time
import random
import json
import os
import logging
from urllib.parse import urlparse, urljoin
import re
from real_user_database import RealUserDatabase
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('traffic_bot_final.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class WebTrafficBotFinal:
def __init__(self, config_file='config.json'):
"""
初始化最终版流量机器人
"""
self.config = self.load_config(config_file)
self.session = None
self.user_db = RealUserDatabase()
self.current_profile = None
self.current_behavior = None
# 真实的访问来源页面主要从Google和真实网站
self.traffic_sources = [
# Google搜索来源 (主要)
"https://www.google.com/search?q=2048+game+online",
"https://www.google.com/search?q=html5+games",
"https://www.google.com/search?q=browser+games+2048",
"https://www.google.com/search?q=free+online+games",
"https://www.google.com/search?q=puzzle+games+online",
"https://www.google.com/search?q=数字游戏+2048",
"https://www.google.com/search?q=在线小游戏",
# 用户指定的真实网站
"https://github.com/chengazhen/cursor-auto-free",
"https://linux.do/",
# 其他真实来源网站
"https://github.com/trending",
"https://github.com/topics/game",
"https://news.ycombinator.com/",
"https://www.reddit.com/r/WebGames/",
"https://www.reddit.com/r/incremental_games/",
"https://www.producthunt.com/",
"https://stackoverflow.com/",
"https://www.zhihu.com/",
"https://v2ex.com/",
"https://segmentfault.com/",
"https://juejin.cn/",
"https://www.csdn.net/",
"https://www.oschina.net/",
"https://gitee.com/",
]
def load_config(self, config_file):
"""加载配置文件"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
logger.info(f"配置文件加载成功: {config_file}")
return config
except FileNotFoundError:
logger.error(f"配置文件未找到: {config_file}")
raise
except json.JSONDecodeError as e:
logger.error(f"配置文件格式错误: {e}")
raise
def setup_session(self):
"""设置请求会话"""
self.session = requests.Session()
# 生成真实用户配置
self.current_profile = self.user_db.get_random_user_profile()
self.current_behavior = self.user_db.get_visit_behavior()
# 设置代理
proxy_config = self.config.get('proxy')
if proxy_config:
proxy_url = f"http://{proxy_config['username']}:{proxy_config['password']}@{proxy_config['host']}:{proxy_config['port']}"
self.session.proxies = {
'http': proxy_url,
'https': proxy_url
}
logger.info(f"已配置代理: {proxy_config['host']}:{proxy_config['port']}")
# 使用真实用户数据库生成的头部
realistic_headers = self.user_db.get_realistic_headers(self.current_profile)
self.session.headers.update(realistic_headers)
# 记录用户特征
logger.info(f"🎭 用户身份配置:")
logger.info(f" 操作系统: {self.current_profile['operating_system']}")
logger.info(f" 屏幕分辨率: {self.current_profile['screen_resolution']}")
logger.info(f" 浏览器语言: {self.current_profile['language']}")
logger.info(f" 时区: {self.current_profile['timezone']}")
logger.info(f" 访问时间模式: {self.current_behavior['pattern_type']}")
logger.info(f" 硬件并发数: {self.current_profile['hardware_concurrency']}")
logger.info(f" 设备内存: {self.current_profile['device_memory']}GB")
# 获取并显示当前IP
current_ip = self.get_current_ip()
if current_ip:
print(f"🌍 当前IP地址: {current_ip}")
print(f"👤 用户身份: {self.current_profile['operating_system']} | {self.current_profile['screen_resolution']} | {self.current_behavior['pattern_type']}")
def get_current_ip(self):
"""获取当前IP地址"""
ip_services = [
'https://httpbin.org/ip',
'https://api.ipify.org?format=json',
'https://ipinfo.io/json',
]
for service in ip_services:
try:
logger.info(f"🔍 正在获取IP地址: {service}")
response = self.session.get(service, timeout=10)
if response.status_code == 200:
if 'json' in service or 'httpbin' in service or 'ipinfo' in service:
try:
data = response.json()
if 'origin' in data:
ip = data['origin']
elif 'ip' in data:
ip = data['ip']
else:
ip = str(data)
except:
ip = response.text.strip()
else:
ip = response.text.strip()
logger.info(f"✅ 当前IP地址: {ip}")
return ip
except Exception as e:
logger.warning(f"{service} 获取IP失败: {e}")
continue
logger.error("❌ 无法获取当前IP地址")
return None
def simulate_realistic_source_visit(self):
"""模拟真实的来源网站访问"""
source_page = random.choice(self.traffic_sources)
try:
logger.info(f"🔗 模拟从来源访问: {source_page}")
# 根据来源类型调整访问行为
if "google.com" in source_page:
stay_time = self.user_db.get_realistic_timing(
random.uniform(3, 12), # Google搜索停留时间较短
self.current_behavior
)
logger.info(f"🔍 Google搜索停留 {stay_time:.1f}")
self._simulate_google_search_behavior(stay_time)
elif "github.com" in source_page:
stay_time = self.user_db.get_realistic_timing(
random.uniform(8, 25), # GitHub停留时间中等
self.current_behavior
)
logger.info(f"🐙 GitHub页面停留 {stay_time:.1f}")
self._simulate_github_behavior(stay_time, source_page)
elif "linux.do" in source_page:
stay_time = self.user_db.get_realistic_timing(
random.uniform(10, 30), # 技术社区停留时间较长
self.current_behavior
)
logger.info(f"💻 Linux.do社区停留 {stay_time:.1f}")
self._simulate_community_behavior(stay_time)
else:
stay_time = self.user_db.get_realistic_timing(
random.uniform(5, 20), # 其他网站
self.current_behavior
)
logger.info(f"🌐 其他网站停留 {stay_time:.1f}")
self._simulate_general_browsing(stay_time)
# 设置referrer用于后续访问
self.session.headers.update({
'Referer': source_page,
'Sec-Fetch-Site': 'cross-site'
})
return True
except Exception as e:
logger.error(f"来源访问模拟失败: {e}")
return False
def _simulate_google_search_behavior(self, total_time):
"""模拟Google搜索行为"""
actions = [
"输入搜索关键词",
"查看搜索结果",
"滚动浏览结果页面",
"点击相关搜索建议",
"查看图片搜索结果"
]
segments = random.randint(2, 4) # Google搜索动作较少
segment_time = total_time / segments
for i, action in enumerate(random.sample(actions, segments)):
if i > 0:
delay = self.user_db.simulate_human_delays("thinking")
time.sleep(delay)
logger.info(f" 🔍 Google行为: {action}")
if "输入" in action:
# 模拟输入搜索词的时间
typing_time = random.uniform(2, 5)
time.sleep(typing_time)
elif "滚动" in action:
# 模拟快速滚动查看结果
time.sleep(segment_time * 0.6)
else:
time.sleep(segment_time * 0.8)
def _simulate_github_behavior(self, total_time, github_url):
"""模拟GitHub页面行为"""
if "cursor-auto-free" in github_url:
actions = [
"查看项目README",
"阅读项目描述",
"查看Stars和Forks数量",
"浏览代码文件",
"查看Issues讨论",
"阅读使用说明"
]
else:
actions = [
"查看trending项目",
"浏览热门仓库",
"查看项目描述",
"阅读README文件",
"查看代码示例"
]
segments = random.randint(3, 6)
segment_time = total_time / segments
for i in range(segments):
if i > 0:
delay = self.user_db.simulate_human_delays("reading")
time.sleep(delay)
action = random.choice(actions)
logger.info(f" 🐙 GitHub行为: {action}")
if "阅读" in action:
time.sleep(segment_time * 0.7)
elif "查看" in action:
time.sleep(segment_time * 0.5)
else:
time.sleep(segment_time * 0.4)
def _simulate_community_behavior(self, total_time):
"""模拟技术社区行为"""
actions = [
"浏览热门帖子",
"阅读技术讨论",
"查看最新话题",
"搜索相关内容",
"查看用户资料",
"阅读精华帖子"
]
segments = random.randint(4, 7) # 社区停留时间较长,动作较多
segment_time = total_time / segments
for i in range(segments):
if i > 0:
delay = self.user_db.simulate_human_delays("reading")
time.sleep(delay)
action = random.choice(actions)
logger.info(f" 💻 社区行为: {action}")
if "阅读" in action:
time.sleep(segment_time * 0.8) # 阅读时间较长
else:
time.sleep(segment_time * 0.6)
def _simulate_general_browsing(self, total_time):
"""模拟一般网站浏览行为"""
actions = [
"浏览页面内容",
"查看导航菜单",
"滚动阅读文章",
"点击相关链接",
"查看评论区"
]
segments = random.randint(3, 5)
segment_time = total_time / segments
for i in range(segments):
if i > 0:
delay = self.user_db.simulate_human_delays("normal")
time.sleep(delay)
action = random.choice(actions)
logger.info(f" 🌐 浏览行为: {action}")
time.sleep(segment_time * random.uniform(0.7, 1.2))
def visit_main_site_realistic(self):
"""真实模拟访问主网站"""
main_site = self.config['targets']['main_site']
try:
logger.info(f"🏠 访问目标网站: {main_site}")
# 发起请求
response = self.make_realistic_request(main_site)
if not response:
return False
# 模拟真实的页面浏览行为
self._simulate_realistic_browsing(response, is_main_page=True)
return True
except Exception as e:
logger.error(f"主网站访问失败: {e}")
return False
def visit_game_page_realistic(self):
"""真实模拟访问游戏页面"""
game_page = self.config['targets']['game_page']
main_site = self.config['targets']['main_site']
try:
logger.info(f"🎮 访问游戏页面: {game_page}")
# 更新referrer为主站
self.session.headers.update({
'Referer': main_site,
'Sec-Fetch-Site': 'same-origin'
})
# 发起请求
response = self.make_realistic_request(game_page)
if not response:
return False
# 检查是否需要登录
if self._check_login_required(response):
logger.warning("⚠️ 检测到登录页面,继续模拟访问")
# 模拟真实的游戏页面行为
self._simulate_realistic_gaming(response)
return True
except Exception as e:
logger.error(f"游戏页面访问失败: {e}")
return False
def _check_login_required(self, response):
"""检查是否需要登录"""
if not response:
return False
content = response.text.lower()
login_indicators = [
'登录', 'login', '用户名', 'username', 'password', '密码',
'signin', 'sign in', '账号', 'account', '验证码'
]
login_count = sum(1 for indicator in login_indicators if indicator in content)
if login_count >= 3:
logger.warning("⚠️ 检测到可能的登录页面")
logger.info("💡 建议:检查网站是否需要登录访问")
return True
return False
def make_realistic_request(self, url, timeout=15):
"""发起真实的HTTP请求"""
try:
# 添加随机的人为延迟
pre_request_delay = self.user_db.simulate_human_delays("thinking")
time.sleep(pre_request_delay)
response = self.session.get(url, timeout=timeout, allow_redirects=True)
# 记录详细信息
logger.info(f"📡 HTTP请求详情:")
logger.info(f" 📍 访问URL: {url}")
logger.info(f" 📊 状态码: {response.status_code}")
logger.info(f" 📦 响应大小: {len(response.content)} 字节")
logger.info(f" ⏱️ 响应时间: {response.elapsed.total_seconds():.2f}")
if response.headers.get('content-type'):
logger.info(f" 📄 内容类型: {response.headers.get('content-type')}")
if response.headers.get('server'):
logger.info(f" 🖥️ 服务器: {response.headers.get('server')}")
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
logger.error(f"❌ 请求失败 {url}: {e}")
return None
def _simulate_realistic_browsing(self, response, is_main_page=False):
"""模拟真实的页面浏览行为"""
content = response.text
# 估算页面内容长度和阅读时间
text_length = len(re.sub(r'<[^>]+>', '', content))
reading_time = text_length / self.current_behavior['reading_speed'] * 60
logger.info(f"📖 页面分析:")
logger.info(f" 📝 内容长度: {text_length} 字符")
logger.info(f" ⏱️ 预估阅读时间: {reading_time:.1f}")
# 获取真实的停留时间
if is_main_page:
base_time = random.uniform(*self.config['settings']['main_site_stay_time'])
else:
base_time = min(reading_time, 60) # 最多60秒阅读时间
stay_time = self.user_db.get_realistic_timing(base_time, self.current_behavior)
logger.info(f" 🕐 实际停留时间: {stay_time:.1f}")
# 模拟分段浏览
self._simulate_browsing_segments(stay_time, content)
def _simulate_browsing_segments(self, total_time, content):
"""模拟分段浏览行为"""
# 查找页面中的链接
links = re.findall(r'href=["\'](.*?)["\']', content)
internal_links = [link for link in links if not link.startswith('http') or self.config['targets']['main_site'] in link]
segments = random.randint(3, 8)
segment_time = total_time / segments
browsing_actions = [
"📖 阅读页面内容",
"🧭 查看导航菜单",
"📜 滚动浏览页面",
"🔗 检查页面链接",
"👁️ 观察页面布局",
"📱 查看页脚信息"
]
for i in range(segments):
action = random.choice(browsing_actions)
logger.info(f" {action}")
# 根据行为类型调整时间
if "滚动" in action:
self._simulate_scrolling_behavior(segment_time)
elif "检查链接" in action and internal_links:
self._simulate_link_hovering(internal_links)
time.sleep(segment_time * 0.8)
else:
actual_segment_time = segment_time * random.uniform(0.7, 1.3)
time.sleep(actual_segment_time)
def _simulate_scrolling_behavior(self, duration):
"""模拟真实的滚动行为"""
scroll_sessions = random.randint(2, 5)
session_time = duration / scroll_sessions
for session in range(scroll_sessions):
logger.info(f" 📜 滚动会话 {session + 1}")
# 模拟快速滚动
quick_scrolls = random.randint(2, 4)
for _ in range(quick_scrolls):
time.sleep(random.uniform(0.3, 1.0))
# 模拟停顿阅读
if random.random() < 0.7:
pause_time = random.uniform(1, 4)
logger.info(f" ⏸️ 停顿阅读 {pause_time:.1f}")
time.sleep(pause_time)
def _simulate_link_hovering(self, links):
"""模拟鼠标悬停在链接上"""
hover_count = min(random.randint(1, 3), len(links))
sample_links = random.sample(links, hover_count)
for link in sample_links:
logger.info(f" 🔗 查看链接: {link[:50]}...")
time.sleep(random.uniform(0.5, 2.5))
def _simulate_realistic_gaming(self, response):
"""模拟真实的游戏行为"""
logger.info("🎲 开始2048游戏模拟")
# 游戏前的准备时间
prep_time = self.user_db.simulate_human_delays("thinking")
logger.info(f"🤔 游戏加载和准备: {prep_time:.1f}")
time.sleep(prep_time)
# 获取游戏停留时间
base_time = random.uniform(*self.config['settings']['game_page_stay_time'])
game_time = self.user_db.get_realistic_timing(base_time, self.current_behavior)
logger.info(f"🎮 游戏总时长: {game_time:.1f}")
# 模拟游戏过程
self._simulate_game_sessions(game_time)
def _simulate_game_sessions(self, total_time):
"""模拟游戏会话"""
sessions = random.randint(2, 4)
for session in range(sessions):
session_time = total_time / sessions * random.uniform(0.8, 1.2)
logger.info(f"🎯 游戏会话 {session + 1}/{sessions}, 时长: {session_time:.1f}")
self._simulate_single_game_session(session_time)
# 会话间休息
if session < sessions - 1:
break_time = random.uniform(3, 10)
logger.info(f"⏸️ 休息思考: {break_time:.1f}")
time.sleep(break_time)
def _simulate_single_game_session(self, session_time):
"""模拟单个游戏会话"""
game_moves = ["⬆️上", "⬇️下", "⬅️左", "➡️右"]
start_time = time.time()
move_count = 0
while time.time() - start_time < session_time:
move = random.choice(game_moves)
move_count += 1
logger.info(f" 🎮 第{move_count}步: {move}")
# 模拟不同难度的思考时间
if move_count % 8 == 0: # 每8步深度思考
think_time = random.uniform(4, 10)
logger.info(f" 🧠 策略思考: {think_time:.1f}")
time.sleep(think_time)
elif random.random() < 0.4: # 40%概率短暂思考
think_time = random.uniform(0.8, 3)
time.sleep(think_time)
else: # 快速移动
time.sleep(random.uniform(0.4, 1.2))
# 模拟偶尔的错误操作和纠正
if random.random() < 0.06: # 6%概率误操作
logger.info(" ❌ 误操作,立即纠正")
time.sleep(0.3)
corrective_move = random.choice(game_moves)
logger.info(f" 🔄 纠正: {corrective_move}")
time.sleep(random.uniform(0.5, 1.0))
def run_single_visit(self):
"""执行一次完整的真实访问流程"""
logger.info("🚀 开始执行真实访问流程")
# 设置会话
self.setup_session()
try:
# 1. 模拟来源网站访问
if not self.simulate_realistic_source_visit():
logger.warning("⚠️ 来源访问模拟失败,继续执行")
# 2. 访问主网站
if not self.visit_main_site_realistic():
logger.error("❌ 主网站访问失败")
return False
# 3. 访问游戏页面
if not self.visit_game_page_realistic():
logger.error("❌ 游戏页面访问失败")
return False
logger.info("✅ 访问流程完美执行成功!")
return True
except Exception as e:
logger.error(f"❌ 访问流程执行出错: {e}")
return False
finally:
if self.session:
self.session.close()
def run_continuous(self, total_visits=None, delay_range=None):
"""连续执行多次真实访问"""
if total_visits is None:
total_visits = self.config['settings']['default_visits']
if delay_range is None:
delay_range = (
self.config['settings']['min_delay'],
self.config['settings']['max_delay']
)
success_count = 0
logger.info(f"🎯 开始连续访问,目标: {total_visits}")
for i in range(total_visits):
logger.info(f"{'='*60}")
logger.info(f"🔄 执行第 {i+1}/{total_visits} 次访问")
logger.info(f"{'='*60}")
if self.run_single_visit():
success_count += 1
logger.info(f"✅ 第 {i+1} 次访问成功!累计成功: {success_count}")
else:
logger.error(f"❌ 第 {i+1} 次访问失败!")
# 智能延迟
if i < total_visits - 1:
base_delay = random.uniform(delay_range[0], delay_range[1])
behavior = self.user_db.get_visit_behavior()
# 根据访问模式调整延迟
if behavior['pattern_type'] == "工作时间":
delay = base_delay * 0.7
elif behavior['pattern_type'] == "深夜":
delay = base_delay * 1.8
else:
delay = base_delay
logger.info(f"⏳ 智能等待 {delay:.1f} 秒 (当前时段: {behavior['pattern_type']})")
time.sleep(delay)
success_rate = (success_count / total_visits) * 100
logger.info(f"🎉 访问任务完成!")
logger.info(f"📊 成功率: {success_count}/{total_visits} ({success_rate:.1f}%)")
return success_count
def main():
"""主函数"""
config_file = 'config.json'
if not os.path.exists(config_file):
print(f"❌ 配置文件 {config_file} 不存在!")
return
try:
bot = WebTrafficBotFinal(config_file)
print("=" * 60)
print("🎭 网站流量模拟脚本 (最终升级版)")
print("=" * 60)
print("🌟 特性:真实用户行为 + Google/GitHub来源 + 完整浏览器指纹")
print("⚠️ 请确保仅用于测试自己的网站!")
print()
print(f"🎯 目标网站: {bot.config['targets']['main_site']}")
print(f"🎮 游戏页面: {bot.config['targets']['game_page']}")
print()
print("请选择运行模式:")
print("1. 💎 单次完整访问测试")
print("2. 🚀 连续访问模式 (使用配置参数)")
print("3. ⚙️ 自定义连续访问")
choice = input("请输入选择 (1/2/3): ").strip()
if choice == "1":
logger.info("🎬 开始单次完整访问测试")
success = bot.run_single_visit()
if success:
print("🎉 单次访问测试完美成功!")
else:
print("😞 单次访问测试失败!")
elif choice == "2":
logger.info("🎬 开始连续访问模式")
success_count = bot.run_continuous()
total = bot.config['settings']['default_visits']
print(f"🎉 连续访问完成!成功率: {success_count}/{total} ({(success_count/total)*100:.1f}%)")
elif choice == "3":
try:
visit_count = int(input("请输入访问次数: ").strip())
min_delay = int(input("请输入最小延迟秒数: ").strip())
max_delay = int(input("请输入最大延迟秒数: ").strip())
logger.info(f"🎬 开始自定义连续访问,总次数: {visit_count}")
success_count = bot.run_continuous(
total_visits=visit_count,
delay_range=(min_delay, max_delay)
)
print(f"🎉 自定义访问完成!成功率: {success_count}/{visit_count} ({(success_count/visit_count)*100:.1f}%)")
except ValueError:
print("❌ 输入参数错误!")
else:
print("❌ 无效选择!")
except KeyboardInterrupt:
print("\n⚠️ 用户中断执行")
except Exception as e:
logger.error(f"程序执行出错: {e}")
print("❌ 程序执行出错,请检查日志文件 traffic_bot_final.log")
if __name__ == "__main__":
main()