From 9f9f44ecc7d6cff85f5edb22f03f89db06115d61 Mon Sep 17 00:00:00 2001 From: huangzhenpc Date: Fri, 18 Jul 2025 10:08:38 +0800 Subject: [PATCH] =?UTF-8?q?=E6=AD=A3=E5=BC=8F2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- real_user_database.py | 360 +++++++++++++++ setup_and_run.bat | 42 +- website_traffic_bot_final.py | 733 +++++++++++++++++++++++++++++++ website_traffic_bot_realistic.py | 572 ++++++++++++++++++++++++ 4 files changed, 1700 insertions(+), 7 deletions(-) create mode 100644 real_user_database.py create mode 100644 website_traffic_bot_final.py create mode 100644 website_traffic_bot_realistic.py diff --git a/real_user_database.py b/real_user_database.py new file mode 100644 index 0000000..893826c --- /dev/null +++ b/real_user_database.py @@ -0,0 +1,360 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +真实用户行为数据库 +包含真实的用户代理、浏览器指纹、访问模式等数据 +""" + +import random +import json +from datetime import datetime +import time + +class RealUserDatabase: + def __init__(self): + """初始化真实用户数据库""" + + # 真实的用户代理字符串(从真实浏览器收集) + self.user_agents = [ + # Chrome Windows 用户代理 + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 11.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + + # Chrome Mac 用户代理 + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_6_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + + # Firefox 用户代理 + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/120.0", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/121.0", + + # Safari 用户代理 + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Safari/605.1.15", + + # Edge 用户代理 + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0", + "Mozilla/5.0 (Windows NT 11.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0", + + # 移动端用户代理 + "Mozilla/5.0 (iPhone; CPU iPhone OS 17_1_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Mobile/15E148 Safari/604.1", + "Mozilla/5.0 (Linux; Android 14; SM-G991B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36", + ] + + # 真实的屏幕分辨率分布 + self.screen_resolutions = [ + (1920, 1080), # 最常见 + (1366, 768), # 笔记本常见 + (1440, 900), # MacBook + (1536, 864), # 高DPI + (2560, 1440), # 2K显示器 + (1600, 900), # 宽屏 + (1280, 720), # HD + (3840, 2160), # 4K显示器 + (2560, 1600), # 16:10 + (1920, 1200), # 16:10 + ] + + # 真实的操作系统分布 + self.operating_systems = [ + {"name": "Windows 10", "weight": 0.4}, + {"name": "Windows 11", "weight": 0.25}, + {"name": "macOS", "weight": 0.15}, + {"name": "Linux", "weight": 0.1}, + {"name": "Android", "weight": 0.07}, + {"name": "iOS", "weight": 0.03}, + ] + + # 真实的浏览器分布 + self.browser_distribution = [ + {"name": "Chrome", "weight": 0.65}, + {"name": "Firefox", "weight": 0.15}, + {"name": "Safari", "weight": 0.12}, + {"name": "Edge", "weight": 0.06}, + {"name": "Other", "weight": 0.02}, + ] + + # 真实的语言设置 + self.languages = [ + "zh-CN,zh;q=0.9,en;q=0.8", # 中文用户 + "en-US,en;q=0.9", # 英文用户 + "en-GB,en;q=0.9,en-US;q=0.8", + "zh-TW,zh;q=0.9,en;q=0.8", + "ja-JP,ja;q=0.9,en;q=0.8", + "ko-KR,ko;q=0.9,en;q=0.8", + ] + + # 真实的时区分布 + self.timezones = [ + "Asia/Shanghai", + "Asia/Tokyo", + "America/New_York", + "Europe/London", + "America/Los_Angeles", + "Europe/Berlin", + "Asia/Seoul", + "Australia/Sydney", + ] + + # 访问模式数据 + self.visit_patterns = { + "工作时间": { + "hours": list(range(9, 18)), + "stay_time_multiplier": 0.8, # 工作时间停留时间较短 + "scroll_frequency": 1.2, # 滚动更频繁 + }, + "休闲时间": { + "hours": list(range(19, 23)) + list(range(6, 9)), + "stay_time_multiplier": 1.5, # 休闲时间停留更久 + "scroll_frequency": 0.8, # 滚动较慢 + }, + "深夜": { + "hours": list(range(0, 6)) + [23], + "stay_time_multiplier": 2.0, # 深夜停留很久 + "scroll_frequency": 0.6, # 滚动很慢 + } + } + + def get_random_user_profile(self): + """生成一个真实的用户配置文件""" + + # 选择操作系统 + os_choice = self._weighted_choice(self.operating_systems) + + # 根据操作系统选择合适的用户代理 + if "Windows" in os_choice: + ua_candidates = [ua for ua in self.user_agents if "Windows NT" in ua] + elif "macOS" in os_choice: + ua_candidates = [ua for ua in self.user_agents if "Macintosh" in ua] + elif "Android" in os_choice: + ua_candidates = [ua for ua in self.user_agents if "Android" in ua] + elif "iOS" in os_choice: + ua_candidates = [ua for ua in self.user_agents if "iPhone" in ua] + else: + ua_candidates = [ua for ua in self.user_agents if "X11; Linux" in ua] + + if not ua_candidates: + ua_candidates = self.user_agents + + user_agent = random.choice(ua_candidates) + + # 选择屏幕分辨率 + resolution = random.choice(self.screen_resolutions) + + # 选择语言 + language = random.choice(self.languages) + + # 选择时区 + timezone = random.choice(self.timezones) + + # 生成其他浏览器指纹信息 + profile = { + "user_agent": user_agent, + "operating_system": os_choice, + "screen_resolution": resolution, + "viewport_size": ( + resolution[0] - random.randint(0, 100), + resolution[1] - random.randint(100, 200) + ), + "language": language, + "timezone": timezone, + "color_depth": random.choice([24, 32]), + "platform": self._extract_platform(user_agent), + "cookie_enabled": True, + "java_enabled": random.choice([True, False]), + "hardware_concurrency": random.choice([2, 4, 8, 12, 16]), + "device_memory": random.choice([2, 4, 8, 16, 32]), + "connection_type": random.choice(["wifi", "ethernet", "cellular"]), + } + + return profile + + def get_realistic_headers(self, profile=None, referrer=None): + """生成真实的HTTP头部""" + if not profile: + profile = self.get_random_user_profile() + + headers = { + "User-Agent": profile["user_agent"], + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", + "Accept-Language": profile["language"], + "Accept-Encoding": "gzip, deflate, br", + "Connection": "keep-alive", + "Upgrade-Insecure-Requests": "1", + "Sec-Fetch-Dest": "document", + "Sec-Fetch-Mode": "navigate", + "Sec-Fetch-Site": "none" if not referrer else "cross-site", + "Sec-Fetch-User": "?1", + "Cache-Control": "max-age=0", + "DNT": str(random.randint(0, 1)), # Do Not Track + } + + if referrer: + headers["Referer"] = referrer + headers["Sec-Fetch-Site"] = "cross-site" + + # 根据浏览器类型添加特定头部 + if "Chrome" in profile["user_agent"]: + headers["sec-ch-ua"] = self._generate_chrome_sec_ch_ua(profile["user_agent"]) + headers["sec-ch-ua-mobile"] = "?0" if "Mobile" not in profile["user_agent"] else "?1" + headers["sec-ch-ua-platform"] = f'"{profile["platform"]}"' + + return headers + + def get_visit_behavior(self): + """获取基于时间的访问行为模式""" + current_hour = datetime.now().hour + + # 确定当前时间段 + pattern_type = "工作时间" + for pattern_name, pattern_data in self.visit_patterns.items(): + if current_hour in pattern_data["hours"]: + pattern_type = pattern_name + break + + pattern = self.visit_patterns[pattern_type] + + return { + "pattern_type": pattern_type, + "stay_time_multiplier": pattern["stay_time_multiplier"], + "scroll_frequency": pattern["scroll_frequency"], + "reading_speed": random.uniform(200, 400), # 每分钟字数 + "interaction_probability": random.uniform(0.3, 0.8), + } + + def get_realistic_timing(self, base_time, behavior=None): + """生成真实的访问时间模式""" + if not behavior: + behavior = self.get_visit_behavior() + + # 应用时间段影响 + adjusted_time = base_time * behavior["stay_time_multiplier"] + + # 添加人类行为的随机性 + variation = random.uniform(0.7, 1.5) + final_time = adjusted_time * variation + + return max(final_time, 1.0) # 最少1秒 + + def simulate_human_delays(self, action_type="normal"): + """模拟真实的人类操作延迟""" + delays = { + "thinking": (2, 8), # 思考时间 + "reading": (3, 15), # 阅读时间 + "scrolling": (0.5, 2), # 滚动间隔 + "clicking": (0.8, 3), # 点击间隔 + "typing": (0.1, 0.5), # 打字间隔 + "normal": (1, 4), # 普通操作 + } + + min_delay, max_delay = delays.get(action_type, delays["normal"]) + return random.uniform(min_delay, max_delay) + + def _weighted_choice(self, choices): + """根据权重选择""" + total = sum(choice["weight"] for choice in choices) + r = random.uniform(0, total) + upto = 0 + for choice in choices: + if upto + choice["weight"] >= r: + return choice["name"] + upto += choice["weight"] + return choices[-1]["name"] + + def _extract_platform(self, user_agent): + """从用户代理中提取平台信息""" + if "Windows NT 10.0" in user_agent: + return "Windows" + elif "Windows NT 11.0" in user_agent: + return "Windows" + elif "Macintosh" in user_agent: + return "macOS" + elif "X11; Linux" in user_agent: + return "Linux" + elif "Android" in user_agent: + return "Android" + elif "iPhone" in user_agent: + return "iOS" + else: + return "Unknown" + + def _generate_chrome_sec_ch_ua(self, user_agent): + """生成Chrome的sec-ch-ua头部""" + # 从用户代理中提取Chrome版本 + if "Chrome/" in user_agent: + version = user_agent.split("Chrome/")[1].split(".")[0] + return f'"Not_A Brand";v="8", "Chromium";v="{version}", "Google Chrome";v="{version}"' + return '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"' + + def generate_session_data(self): + """生成完整的会话数据""" + profile = self.get_random_user_profile() + behavior = self.get_visit_behavior() + + session_data = { + "profile": profile, + "behavior": behavior, + "session_id": f"session_{int(time.time())}_{random.randint(1000, 9999)}", + "start_time": datetime.now().isoformat(), + "fingerprint": self._generate_browser_fingerprint(profile), + } + + return session_data + + def _generate_browser_fingerprint(self, profile): + """生成浏览器指纹""" + fingerprint = { + "screen": { + "width": profile["screen_resolution"][0], + "height": profile["screen_resolution"][1], + "colorDepth": profile["color_depth"], + "pixelDepth": profile["color_depth"], + }, + "navigator": { + "userAgent": profile["user_agent"], + "language": profile["language"].split(",")[0], + "languages": profile["language"].split(","), + "platform": profile["platform"], + "cookieEnabled": profile["cookie_enabled"], + "javaEnabled": profile["java_enabled"], + "hardwareConcurrency": profile["hardware_concurrency"], + "deviceMemory": profile["device_memory"], + }, + "timezone": profile["timezone"], + "webgl_vendor": random.choice([ + "Google Inc. (Intel)", + "Google Inc. (NVIDIA)", + "Google Inc. (AMD)", + "Apple Inc.", + ]), + } + + return fingerprint + +# 使用示例 +if __name__ == "__main__": + db = RealUserDatabase() + + # 生成用户配置 + profile = db.get_random_user_profile() + print("用户配置:") + print(json.dumps(profile, indent=2, ensure_ascii=False)) + + # 生成HTTP头部 + headers = db.get_realistic_headers(profile) + print("\nHTTP头部:") + for key, value in headers.items(): + print(f"{key}: {value}") + + # 生成访问行为 + behavior = db.get_visit_behavior() + print(f"\n访问行为: {behavior}") + + # 生成完整会话数据 + session = db.generate_session_data() + print("\n会话数据:") + print(json.dumps(session, indent=2, ensure_ascii=False)) \ No newline at end of file diff --git a/setup_and_run.bat b/setup_and_run.bat index e97c802..a7a1ae8 100644 --- a/setup_and_run.bat +++ b/setup_and_run.bat @@ -33,12 +33,13 @@ echo ✅ pip可用 echo. echo 📋 请选择要运行的脚本: echo 1. 基础版本 (website_traffic_bot.py) - 需要Chrome -echo 2. 配置文件版本 (website_traffic_bot_config.py) - 需要Chrome -echo 3. 纯协议版本 (website_traffic_bot_protocol.py) - 推荐,无需浏览器 -echo 4. IP地址检查工具 (check_ip.py) - 检查代理和当前IP -echo 5. 退出 +echo 2. 配置文件版本 (website_traffic_bot_config.py) - 需要Chrome +echo 3. 纯协议版本 (website_traffic_bot_protocol.py) - 轻量级 +echo 4. 真实用户行为版本 (website_traffic_bot_realistic.py) - 🌟推荐 +echo 5. IP地址检查工具 (check_ip.py) - 检查代理和当前IP +echo 6. 退出 -set /p choice="请输入选择 (1/2/3/4/5): " +set /p choice="请输入选择 (1/2/3/4/5/6): " if "%choice%"=="1" ( echo. @@ -89,10 +90,37 @@ if "%choice%"=="1" ( ) echo ✅ 依赖安装成功 echo. - echo 🚀 运行纯协议版本(推荐)... + echo 🚀 运行纯协议版本... echo 💡 此版本不会打开浏览器窗口,更加隐蔽高效 python website_traffic_bot_protocol.py ) else if "%choice%"=="4" ( + if not exist config.json ( + echo ❌ 配置文件 config.json 不存在! + echo 请先检查配置文件是否存在。 + pause + exit /b 1 + ) + if not exist real_user_database.py ( + echo ❌ 真实用户数据库文件 real_user_database.py 不存在! + echo 请确保所有文件都已下载完整。 + pause + exit /b 1 + ) + echo. + echo 📦 安装真实用户行为版依赖包... + pip install -r requirements_protocol.txt + if %errorlevel% neq 0 ( + echo ❌ 依赖安装失败! + pause + exit /b 1 + ) + echo ✅ 依赖安装成功 + echo. + echo 🎭 运行真实用户行为版本(最佳版本)... + echo 💡 此版本使用真实用户数据库,模拟最真实的访问轨迹 + echo 💡 包含完整的浏览器指纹、时间模式、行为特征等 + python website_traffic_bot_realistic.py +) else if "%choice%"=="5" ( echo. echo 📦 安装依赖包... pip install requests @@ -105,7 +133,7 @@ if "%choice%"=="1" ( echo 🔍 运行IP地址检查工具... echo 💡 此工具将检查您的代理设置和当前IP地址 python check_ip.py -) else if "%choice%"=="5" ( +) else if "%choice%"=="6" ( echo 👋 再见! exit /b 0 ) else ( diff --git a/website_traffic_bot_final.py b/website_traffic_bot_final.py new file mode 100644 index 0000000..519e2ea --- /dev/null +++ b/website_traffic_bot_final.py @@ -0,0 +1,733 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +网站流量模拟脚本 (最终版本) +使用真实用户数据库模拟最真实的访问轨迹 +支持Google搜索来源和真实网站跳转 +""" + +import requests +import time +import random +import json +import os +import logging +from urllib.parse import urlparse, urljoin +import re +from real_user_database import RealUserDatabase + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('traffic_bot_final.log', encoding='utf-8'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + +class WebTrafficBotFinal: + def __init__(self, config_file='config.json'): + """ + 初始化最终版流量机器人 + """ + self.config = self.load_config(config_file) + self.session = None + self.user_db = RealUserDatabase() + self.current_profile = None + self.current_behavior = None + + # 真实的访问来源页面(主要从Google和真实网站) + self.traffic_sources = [ + # Google搜索来源 (主要) + "https://www.google.com/search?q=2048+game+online", + "https://www.google.com/search?q=html5+games", + "https://www.google.com/search?q=browser+games+2048", + "https://www.google.com/search?q=free+online+games", + "https://www.google.com/search?q=puzzle+games+online", + "https://www.google.com/search?q=数字游戏+2048", + "https://www.google.com/search?q=在线小游戏", + + # 用户指定的真实网站 + "https://github.com/chengazhen/cursor-auto-free", + "https://linux.do/", + + # 其他真实来源网站 + "https://github.com/trending", + "https://github.com/topics/game", + "https://news.ycombinator.com/", + "https://www.reddit.com/r/WebGames/", + "https://www.reddit.com/r/incremental_games/", + "https://www.producthunt.com/", + "https://stackoverflow.com/", + "https://www.zhihu.com/", + "https://v2ex.com/", + "https://segmentfault.com/", + "https://juejin.cn/", + "https://www.csdn.net/", + "https://www.oschina.net/", + "https://gitee.com/", + ] + + def load_config(self, config_file): + """加载配置文件""" + try: + with open(config_file, 'r', encoding='utf-8') as f: + config = json.load(f) + logger.info(f"配置文件加载成功: {config_file}") + return config + except FileNotFoundError: + logger.error(f"配置文件未找到: {config_file}") + raise + except json.JSONDecodeError as e: + logger.error(f"配置文件格式错误: {e}") + raise + + def setup_session(self): + """设置请求会话""" + self.session = requests.Session() + + # 生成真实用户配置 + self.current_profile = self.user_db.get_random_user_profile() + self.current_behavior = self.user_db.get_visit_behavior() + + # 设置代理 + proxy_config = self.config.get('proxy') + if proxy_config: + proxy_url = f"http://{proxy_config['username']}:{proxy_config['password']}@{proxy_config['host']}:{proxy_config['port']}" + self.session.proxies = { + 'http': proxy_url, + 'https': proxy_url + } + logger.info(f"已配置代理: {proxy_config['host']}:{proxy_config['port']}") + + # 使用真实用户数据库生成的头部 + realistic_headers = self.user_db.get_realistic_headers(self.current_profile) + self.session.headers.update(realistic_headers) + + # 记录用户特征 + logger.info(f"🎭 用户身份配置:") + logger.info(f" 操作系统: {self.current_profile['operating_system']}") + logger.info(f" 屏幕分辨率: {self.current_profile['screen_resolution']}") + logger.info(f" 浏览器语言: {self.current_profile['language']}") + logger.info(f" 时区: {self.current_profile['timezone']}") + logger.info(f" 访问时间模式: {self.current_behavior['pattern_type']}") + logger.info(f" 硬件并发数: {self.current_profile['hardware_concurrency']}") + logger.info(f" 设备内存: {self.current_profile['device_memory']}GB") + + # 获取并显示当前IP + current_ip = self.get_current_ip() + if current_ip: + print(f"🌍 当前IP地址: {current_ip}") + print(f"👤 用户身份: {self.current_profile['operating_system']} | {self.current_profile['screen_resolution']} | {self.current_behavior['pattern_type']}") + + def get_current_ip(self): + """获取当前IP地址""" + ip_services = [ + 'https://httpbin.org/ip', + 'https://api.ipify.org?format=json', + 'https://ipinfo.io/json', + ] + + for service in ip_services: + try: + logger.info(f"🔍 正在获取IP地址: {service}") + response = self.session.get(service, timeout=10) + + if response.status_code == 200: + if 'json' in service or 'httpbin' in service or 'ipinfo' in service: + try: + data = response.json() + if 'origin' in data: + ip = data['origin'] + elif 'ip' in data: + ip = data['ip'] + else: + ip = str(data) + except: + ip = response.text.strip() + else: + ip = response.text.strip() + + logger.info(f"✅ 当前IP地址: {ip}") + return ip + + except Exception as e: + logger.warning(f"从 {service} 获取IP失败: {e}") + continue + + logger.error("❌ 无法获取当前IP地址") + return None + + def simulate_realistic_source_visit(self): + """模拟真实的来源网站访问""" + source_page = random.choice(self.traffic_sources) + + try: + logger.info(f"🔗 模拟从来源访问: {source_page}") + + # 根据来源类型调整访问行为 + if "google.com" in source_page: + stay_time = self.user_db.get_realistic_timing( + random.uniform(3, 12), # Google搜索停留时间较短 + self.current_behavior + ) + logger.info(f"🔍 Google搜索停留 {stay_time:.1f} 秒") + self._simulate_google_search_behavior(stay_time) + + elif "github.com" in source_page: + stay_time = self.user_db.get_realistic_timing( + random.uniform(8, 25), # GitHub停留时间中等 + self.current_behavior + ) + logger.info(f"🐙 GitHub页面停留 {stay_time:.1f} 秒") + self._simulate_github_behavior(stay_time, source_page) + + elif "linux.do" in source_page: + stay_time = self.user_db.get_realistic_timing( + random.uniform(10, 30), # 技术社区停留时间较长 + self.current_behavior + ) + logger.info(f"💻 Linux.do社区停留 {stay_time:.1f} 秒") + self._simulate_community_behavior(stay_time) + + else: + stay_time = self.user_db.get_realistic_timing( + random.uniform(5, 20), # 其他网站 + self.current_behavior + ) + logger.info(f"🌐 其他网站停留 {stay_time:.1f} 秒") + self._simulate_general_browsing(stay_time) + + # 设置referrer用于后续访问 + self.session.headers.update({ + 'Referer': source_page, + 'Sec-Fetch-Site': 'cross-site' + }) + + return True + + except Exception as e: + logger.error(f"来源访问模拟失败: {e}") + return False + + def _simulate_google_search_behavior(self, total_time): + """模拟Google搜索行为""" + actions = [ + "输入搜索关键词", + "查看搜索结果", + "滚动浏览结果页面", + "点击相关搜索建议", + "查看图片搜索结果" + ] + + segments = random.randint(2, 4) # Google搜索动作较少 + segment_time = total_time / segments + + for i, action in enumerate(random.sample(actions, segments)): + if i > 0: + delay = self.user_db.simulate_human_delays("thinking") + time.sleep(delay) + + logger.info(f" 🔍 Google行为: {action}") + + if "输入" in action: + # 模拟输入搜索词的时间 + typing_time = random.uniform(2, 5) + time.sleep(typing_time) + elif "滚动" in action: + # 模拟快速滚动查看结果 + time.sleep(segment_time * 0.6) + else: + time.sleep(segment_time * 0.8) + + def _simulate_github_behavior(self, total_time, github_url): + """模拟GitHub页面行为""" + if "cursor-auto-free" in github_url: + actions = [ + "查看项目README", + "阅读项目描述", + "查看Stars和Forks数量", + "浏览代码文件", + "查看Issues讨论", + "阅读使用说明" + ] + else: + actions = [ + "查看trending项目", + "浏览热门仓库", + "查看项目描述", + "阅读README文件", + "查看代码示例" + ] + + segments = random.randint(3, 6) + segment_time = total_time / segments + + for i in range(segments): + if i > 0: + delay = self.user_db.simulate_human_delays("reading") + time.sleep(delay) + + action = random.choice(actions) + logger.info(f" 🐙 GitHub行为: {action}") + + if "阅读" in action: + time.sleep(segment_time * 0.7) + elif "查看" in action: + time.sleep(segment_time * 0.5) + else: + time.sleep(segment_time * 0.4) + + def _simulate_community_behavior(self, total_time): + """模拟技术社区行为""" + actions = [ + "浏览热门帖子", + "阅读技术讨论", + "查看最新话题", + "搜索相关内容", + "查看用户资料", + "阅读精华帖子" + ] + + segments = random.randint(4, 7) # 社区停留时间较长,动作较多 + segment_time = total_time / segments + + for i in range(segments): + if i > 0: + delay = self.user_db.simulate_human_delays("reading") + time.sleep(delay) + + action = random.choice(actions) + logger.info(f" 💻 社区行为: {action}") + + if "阅读" in action: + time.sleep(segment_time * 0.8) # 阅读时间较长 + else: + time.sleep(segment_time * 0.6) + + def _simulate_general_browsing(self, total_time): + """模拟一般网站浏览行为""" + actions = [ + "浏览页面内容", + "查看导航菜单", + "滚动阅读文章", + "点击相关链接", + "查看评论区" + ] + + segments = random.randint(3, 5) + segment_time = total_time / segments + + for i in range(segments): + if i > 0: + delay = self.user_db.simulate_human_delays("normal") + time.sleep(delay) + + action = random.choice(actions) + logger.info(f" 🌐 浏览行为: {action}") + time.sleep(segment_time * random.uniform(0.7, 1.2)) + + def visit_main_site_realistic(self): + """真实模拟访问主网站""" + main_site = self.config['targets']['main_site'] + + try: + logger.info(f"🏠 访问目标网站: {main_site}") + + # 发起请求 + response = self.make_realistic_request(main_site) + if not response: + return False + + # 模拟真实的页面浏览行为 + self._simulate_realistic_browsing(response, is_main_page=True) + + return True + + except Exception as e: + logger.error(f"主网站访问失败: {e}") + return False + + def visit_game_page_realistic(self): + """真实模拟访问游戏页面""" + game_page = self.config['targets']['game_page'] + main_site = self.config['targets']['main_site'] + + try: + logger.info(f"🎮 访问游戏页面: {game_page}") + + # 更新referrer为主站 + self.session.headers.update({ + 'Referer': main_site, + 'Sec-Fetch-Site': 'same-origin' + }) + + # 发起请求 + response = self.make_realistic_request(game_page) + if not response: + return False + + # 检查是否需要登录 + if self._check_login_required(response): + logger.warning("⚠️ 检测到登录页面,继续模拟访问") + + # 模拟真实的游戏页面行为 + self._simulate_realistic_gaming(response) + + return True + + except Exception as e: + logger.error(f"游戏页面访问失败: {e}") + return False + + def _check_login_required(self, response): + """检查是否需要登录""" + if not response: + return False + + content = response.text.lower() + login_indicators = [ + '登录', 'login', '用户名', 'username', 'password', '密码', + 'signin', 'sign in', '账号', 'account', '验证码' + ] + + login_count = sum(1 for indicator in login_indicators if indicator in content) + + if login_count >= 3: + logger.warning("⚠️ 检测到可能的登录页面") + logger.info("💡 建议:检查网站是否需要登录访问") + return True + + return False + + def make_realistic_request(self, url, timeout=15): + """发起真实的HTTP请求""" + try: + # 添加随机的人为延迟 + pre_request_delay = self.user_db.simulate_human_delays("thinking") + time.sleep(pre_request_delay) + + response = self.session.get(url, timeout=timeout, allow_redirects=True) + + # 记录详细信息 + logger.info(f"📡 HTTP请求详情:") + logger.info(f" 📍 访问URL: {url}") + logger.info(f" 📊 状态码: {response.status_code}") + logger.info(f" 📦 响应大小: {len(response.content)} 字节") + logger.info(f" ⏱️ 响应时间: {response.elapsed.total_seconds():.2f}秒") + + if response.headers.get('content-type'): + logger.info(f" 📄 内容类型: {response.headers.get('content-type')}") + + if response.headers.get('server'): + logger.info(f" 🖥️ 服务器: {response.headers.get('server')}") + + response.raise_for_status() + return response + + except requests.exceptions.RequestException as e: + logger.error(f"❌ 请求失败 {url}: {e}") + return None + + def _simulate_realistic_browsing(self, response, is_main_page=False): + """模拟真实的页面浏览行为""" + content = response.text + + # 估算页面内容长度和阅读时间 + text_length = len(re.sub(r'<[^>]+>', '', content)) + reading_time = text_length / self.current_behavior['reading_speed'] * 60 + + logger.info(f"📖 页面分析:") + logger.info(f" 📝 内容长度: {text_length} 字符") + logger.info(f" ⏱️ 预估阅读时间: {reading_time:.1f} 秒") + + # 获取真实的停留时间 + if is_main_page: + base_time = random.uniform(*self.config['settings']['main_site_stay_time']) + else: + base_time = min(reading_time, 60) # 最多60秒阅读时间 + + stay_time = self.user_db.get_realistic_timing(base_time, self.current_behavior) + + logger.info(f" 🕐 实际停留时间: {stay_time:.1f} 秒") + + # 模拟分段浏览 + self._simulate_browsing_segments(stay_time, content) + + def _simulate_browsing_segments(self, total_time, content): + """模拟分段浏览行为""" + # 查找页面中的链接 + links = re.findall(r'href=["\'](.*?)["\']', content) + internal_links = [link for link in links if not link.startswith('http') or self.config['targets']['main_site'] in link] + + segments = random.randint(3, 8) + segment_time = total_time / segments + + browsing_actions = [ + "📖 阅读页面内容", + "🧭 查看导航菜单", + "📜 滚动浏览页面", + "🔗 检查页面链接", + "👁️ 观察页面布局", + "📱 查看页脚信息" + ] + + for i in range(segments): + action = random.choice(browsing_actions) + logger.info(f" {action}") + + # 根据行为类型调整时间 + if "滚动" in action: + self._simulate_scrolling_behavior(segment_time) + elif "检查链接" in action and internal_links: + self._simulate_link_hovering(internal_links) + time.sleep(segment_time * 0.8) + else: + actual_segment_time = segment_time * random.uniform(0.7, 1.3) + time.sleep(actual_segment_time) + + def _simulate_scrolling_behavior(self, duration): + """模拟真实的滚动行为""" + scroll_sessions = random.randint(2, 5) + session_time = duration / scroll_sessions + + for session in range(scroll_sessions): + logger.info(f" 📜 滚动会话 {session + 1}") + + # 模拟快速滚动 + quick_scrolls = random.randint(2, 4) + for _ in range(quick_scrolls): + time.sleep(random.uniform(0.3, 1.0)) + + # 模拟停顿阅读 + if random.random() < 0.7: + pause_time = random.uniform(1, 4) + logger.info(f" ⏸️ 停顿阅读 {pause_time:.1f}秒") + time.sleep(pause_time) + + def _simulate_link_hovering(self, links): + """模拟鼠标悬停在链接上""" + hover_count = min(random.randint(1, 3), len(links)) + sample_links = random.sample(links, hover_count) + + for link in sample_links: + logger.info(f" 🔗 查看链接: {link[:50]}...") + time.sleep(random.uniform(0.5, 2.5)) + + def _simulate_realistic_gaming(self, response): + """模拟真实的游戏行为""" + logger.info("🎲 开始2048游戏模拟") + + # 游戏前的准备时间 + prep_time = self.user_db.simulate_human_delays("thinking") + logger.info(f"🤔 游戏加载和准备: {prep_time:.1f}秒") + time.sleep(prep_time) + + # 获取游戏停留时间 + base_time = random.uniform(*self.config['settings']['game_page_stay_time']) + game_time = self.user_db.get_realistic_timing(base_time, self.current_behavior) + + logger.info(f"🎮 游戏总时长: {game_time:.1f}秒") + + # 模拟游戏过程 + self._simulate_game_sessions(game_time) + + def _simulate_game_sessions(self, total_time): + """模拟游戏会话""" + sessions = random.randint(2, 4) + + for session in range(sessions): + session_time = total_time / sessions * random.uniform(0.8, 1.2) + logger.info(f"🎯 游戏会话 {session + 1}/{sessions}, 时长: {session_time:.1f}秒") + + self._simulate_single_game_session(session_time) + + # 会话间休息 + if session < sessions - 1: + break_time = random.uniform(3, 10) + logger.info(f"⏸️ 休息思考: {break_time:.1f}秒") + time.sleep(break_time) + + def _simulate_single_game_session(self, session_time): + """模拟单个游戏会话""" + game_moves = ["⬆️上", "⬇️下", "⬅️左", "➡️右"] + + start_time = time.time() + move_count = 0 + + while time.time() - start_time < session_time: + move = random.choice(game_moves) + move_count += 1 + + logger.info(f" 🎮 第{move_count}步: {move}") + + # 模拟不同难度的思考时间 + if move_count % 8 == 0: # 每8步深度思考 + think_time = random.uniform(4, 10) + logger.info(f" 🧠 策略思考: {think_time:.1f}秒") + time.sleep(think_time) + elif random.random() < 0.4: # 40%概率短暂思考 + think_time = random.uniform(0.8, 3) + time.sleep(think_time) + else: # 快速移动 + time.sleep(random.uniform(0.4, 1.2)) + + # 模拟偶尔的错误操作和纠正 + if random.random() < 0.06: # 6%概率误操作 + logger.info(" ❌ 误操作,立即纠正") + time.sleep(0.3) + corrective_move = random.choice(game_moves) + logger.info(f" 🔄 纠正: {corrective_move}") + time.sleep(random.uniform(0.5, 1.0)) + + def run_single_visit(self): + """执行一次完整的真实访问流程""" + logger.info("🚀 开始执行真实访问流程") + + # 设置会话 + self.setup_session() + + try: + # 1. 模拟来源网站访问 + if not self.simulate_realistic_source_visit(): + logger.warning("⚠️ 来源访问模拟失败,继续执行") + + # 2. 访问主网站 + if not self.visit_main_site_realistic(): + logger.error("❌ 主网站访问失败") + return False + + # 3. 访问游戏页面 + if not self.visit_game_page_realistic(): + logger.error("❌ 游戏页面访问失败") + return False + + logger.info("✅ 访问流程完美执行成功!") + return True + + except Exception as e: + logger.error(f"❌ 访问流程执行出错: {e}") + return False + + finally: + if self.session: + self.session.close() + + def run_continuous(self, total_visits=None, delay_range=None): + """连续执行多次真实访问""" + if total_visits is None: + total_visits = self.config['settings']['default_visits'] + + if delay_range is None: + delay_range = ( + self.config['settings']['min_delay'], + self.config['settings']['max_delay'] + ) + + success_count = 0 + + logger.info(f"🎯 开始连续访问,目标: {total_visits} 次") + + for i in range(total_visits): + logger.info(f"{'='*60}") + logger.info(f"🔄 执行第 {i+1}/{total_visits} 次访问") + logger.info(f"{'='*60}") + + if self.run_single_visit(): + success_count += 1 + logger.info(f"✅ 第 {i+1} 次访问成功!累计成功: {success_count}") + else: + logger.error(f"❌ 第 {i+1} 次访问失败!") + + # 智能延迟 + if i < total_visits - 1: + base_delay = random.uniform(delay_range[0], delay_range[1]) + behavior = self.user_db.get_visit_behavior() + + # 根据访问模式调整延迟 + if behavior['pattern_type'] == "工作时间": + delay = base_delay * 0.7 + elif behavior['pattern_type'] == "深夜": + delay = base_delay * 1.8 + else: + delay = base_delay + + logger.info(f"⏳ 智能等待 {delay:.1f} 秒 (当前时段: {behavior['pattern_type']})") + time.sleep(delay) + + success_rate = (success_count / total_visits) * 100 + logger.info(f"🎉 访问任务完成!") + logger.info(f"📊 成功率: {success_count}/{total_visits} ({success_rate:.1f}%)") + return success_count + +def main(): + """主函数""" + config_file = 'config.json' + + if not os.path.exists(config_file): + print(f"❌ 配置文件 {config_file} 不存在!") + return + + try: + bot = WebTrafficBotFinal(config_file) + + print("=" * 60) + print("🎭 网站流量模拟脚本 (最终升级版)") + print("=" * 60) + print("🌟 特性:真实用户行为 + Google/GitHub来源 + 完整浏览器指纹") + print("⚠️ 请确保仅用于测试自己的网站!") + print() + print(f"🎯 目标网站: {bot.config['targets']['main_site']}") + print(f"🎮 游戏页面: {bot.config['targets']['game_page']}") + print() + + print("请选择运行模式:") + print("1. 💎 单次完整访问测试") + print("2. 🚀 连续访问模式 (使用配置参数)") + print("3. ⚙️ 自定义连续访问") + + choice = input("请输入选择 (1/2/3): ").strip() + + if choice == "1": + logger.info("🎬 开始单次完整访问测试") + success = bot.run_single_visit() + if success: + print("🎉 单次访问测试完美成功!") + else: + print("😞 单次访问测试失败!") + + elif choice == "2": + logger.info("🎬 开始连续访问模式") + success_count = bot.run_continuous() + total = bot.config['settings']['default_visits'] + print(f"🎉 连续访问完成!成功率: {success_count}/{total} ({(success_count/total)*100:.1f}%)") + + elif choice == "3": + try: + visit_count = int(input("请输入访问次数: ").strip()) + min_delay = int(input("请输入最小延迟秒数: ").strip()) + max_delay = int(input("请输入最大延迟秒数: ").strip()) + + logger.info(f"🎬 开始自定义连续访问,总次数: {visit_count}") + success_count = bot.run_continuous( + total_visits=visit_count, + delay_range=(min_delay, max_delay) + ) + + print(f"🎉 自定义访问完成!成功率: {success_count}/{visit_count} ({(success_count/visit_count)*100:.1f}%)") + + except ValueError: + print("❌ 输入参数错误!") + else: + print("❌ 无效选择!") + + except KeyboardInterrupt: + print("\n⚠️ 用户中断执行") + except Exception as e: + logger.error(f"程序执行出错: {e}") + print("❌ 程序执行出错,请检查日志文件 traffic_bot_final.log") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/website_traffic_bot_realistic.py b/website_traffic_bot_realistic.py new file mode 100644 index 0000000..8a67515 --- /dev/null +++ b/website_traffic_bot_realistic.py @@ -0,0 +1,572 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +网站流量模拟脚本 (真实用户行为版本) +使用真实用户数据库模拟更真实的访问轨迹 +""" + +import requests +import time +import random +import json +import os +import logging +from urllib.parse import urlparse, urljoin +import re +from real_user_database import RealUserDatabase + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('traffic_bot_realistic.log', encoding='utf-8'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + +class WebTrafficBotRealistic: + def __init__(self, config_file='config.json'): + """ + 初始化真实用户行为版流量机器人 + """ + self.config = self.load_config(config_file) + self.session = None + self.user_db = RealUserDatabase() + self.current_profile = None + self.current_behavior = None + + # 真实的GitHub来源页面(更丰富的列表) + self.github_referrers = [ + "https://github.com/trending", + "https://github.com/trending/javascript", + "https://github.com/trending/typescript", + "https://github.com/topics/javascript", + "https://github.com/topics/game", + "https://github.com/topics/html5", + "https://github.com/topics/2048", + "https://github.com/search?q=2048+game", + "https://github.com/search?q=html5+games", + "https://github.com/collections/javascript-game-engines", + "https://github.com/explore", + "https://github.com/", + ] + + def load_config(self, config_file): + """加载配置文件""" + try: + with open(config_file, 'r', encoding='utf-8') as f: + config = json.load(f) + logger.info(f"配置文件加载成功: {config_file}") + return config + except FileNotFoundError: + logger.error(f"配置文件未找到: {config_file}") + raise + except json.JSONDecodeError as e: + logger.error(f"配置文件格式错误: {e}") + raise + + def setup_session(self): + """设置请求会话""" + self.session = requests.Session() + + # 生成真实用户配置 + self.current_profile = self.user_db.get_random_user_profile() + self.current_behavior = self.user_db.get_visit_behavior() + + # 设置代理 + proxy_config = self.config.get('proxy') + if proxy_config: + proxy_url = f"http://{proxy_config['username']}:{proxy_config['password']}@{proxy_config['host']}:{proxy_config['port']}" + self.session.proxies = { + 'http': proxy_url, + 'https': proxy_url + } + logger.info(f"已配置代理: {proxy_config['host']}:{proxy_config['port']}") + + # 使用真实用户数据库生成的头部 + realistic_headers = self.user_db.get_realistic_headers(self.current_profile) + self.session.headers.update(realistic_headers) + + # 记录用户特征 + logger.info(f"用户配置:") + logger.info(f" 操作系统: {self.current_profile['operating_system']}") + logger.info(f" 屏幕分辨率: {self.current_profile['screen_resolution']}") + logger.info(f" 语言: {self.current_profile['language']}") + logger.info(f" 时区: {self.current_profile['timezone']}") + logger.info(f" 访问模式: {self.current_behavior['pattern_type']}") + + # 获取并显示当前IP + current_ip = self.get_current_ip() + if current_ip: + print(f"🌍 当前IP地址: {current_ip}") + print(f"👤 用户配置: {self.current_profile['operating_system']} | {self.current_profile['screen_resolution']} | {self.current_behavior['pattern_type']}") + + def get_current_ip(self): + """获取当前IP地址""" + ip_services = [ + 'https://httpbin.org/ip', + 'https://api.ipify.org?format=json', + 'https://ipinfo.io/json', + ] + + for service in ip_services: + try: + logger.info(f"正在获取IP地址: {service}") + response = self.session.get(service, timeout=10) + + if response.status_code == 200: + if 'json' in service or 'httpbin' in service or 'ipinfo' in service: + try: + data = response.json() + if 'origin' in data: + ip = data['origin'] + elif 'ip' in data: + ip = data['ip'] + else: + ip = str(data) + except: + ip = response.text.strip() + else: + ip = response.text.strip() + + logger.info(f"✅ 当前IP地址: {ip}") + return ip + + except Exception as e: + logger.warning(f"从 {service} 获取IP失败: {e}") + continue + + logger.error("❌ 无法获取当前IP地址") + return None + + def simulate_realistic_github_visit(self): + """模拟真实的GitHub访问过程""" + github_page = random.choice(self.github_referrers) + + try: + logger.info(f"🔍 模拟从GitHub访问: {github_page}") + + # 更新headers以模拟GitHub访问 + github_headers = self.user_db.get_realistic_headers( + self.current_profile, + referrer=None + ) + + # 模拟在GitHub的真实浏览行为 + github_stay_time = self.user_db.get_realistic_timing( + random.uniform(5, 15), + self.current_behavior + ) + + logger.info(f"📚 在GitHub页面停留 {github_stay_time:.1f} 秒") + + # 模拟GitHub上的行为 + self._simulate_github_browsing(github_stay_time) + + # 设置referrer用于后续访问 + self.session.headers.update({ + 'Referer': github_page, + 'Sec-Fetch-Site': 'cross-site' + }) + + return True + + except Exception as e: + logger.error(f"GitHub访问模拟失败: {e}") + return False + + def _simulate_github_browsing(self, total_time): + """模拟在GitHub上的浏览行为""" + actions = [ + "查看项目描述", + "阅读README", + "查看代码文件", + "检查更新时间", + "查看星标数量", + "浏览提交历史" + ] + + segments = random.randint(3, 6) + segment_time = total_time / segments + + for i in range(segments): + if i > 0: + delay = self.user_db.simulate_human_delays("reading") + time.sleep(delay) + + action = random.choice(actions) + logger.info(f" GitHub行为: {action}") + + # 模拟不同行为的停留时间 + if "阅读" in action: + time.sleep(segment_time * 0.4) + elif "查看" in action: + time.sleep(segment_time * 0.3) + else: + time.sleep(segment_time * 0.2) + + def visit_main_site_realistic(self): + """真实模拟访问主网站""" + main_site = self.config['targets']['main_site'] + + try: + logger.info(f"🏠 访问主网站: {main_site}") + + # 发起请求 + response = self.make_realistic_request(main_site) + if not response: + return False + + # 模拟真实的页面浏览行为 + self._simulate_realistic_browsing(response, is_main_page=True) + + return True + + except Exception as e: + logger.error(f"主网站访问失败: {e}") + return False + + def visit_game_page_realistic(self): + """真实模拟访问游戏页面""" + game_page = self.config['targets']['game_page'] + main_site = self.config['targets']['main_site'] + + try: + logger.info(f"🎮 访问游戏页面: {game_page}") + + # 更新referrer为主站 + self.session.headers.update({ + 'Referer': main_site, + 'Sec-Fetch-Site': 'same-origin' + }) + + # 发起请求 + response = self.make_realistic_request(game_page) + if not response: + return False + + # 模拟真实的游戏页面行为 + self._simulate_realistic_gaming(response) + + return True + + except Exception as e: + logger.error(f"游戏页面访问失败: {e}") + return False + + def make_realistic_request(self, url, timeout=10): + """发起真实的HTTP请求""" + try: + # 添加随机的人为延迟 + pre_request_delay = self.user_db.simulate_human_delays("thinking") + time.sleep(pre_request_delay) + + response = self.session.get(url, timeout=timeout, allow_redirects=True) + + # 记录详细信息 + logger.info(f"📡 访问 {url}") + logger.info(f" 状态码: {response.status_code}") + logger.info(f" 响应大小: {len(response.content)} 字节") + logger.info(f" 响应时间: {response.elapsed.total_seconds():.2f}秒") + + if response.headers.get('content-type'): + logger.info(f" 内容类型: {response.headers.get('content-type')}") + + response.raise_for_status() + return response + + except requests.exceptions.RequestException as e: + logger.error(f"请求失败 {url}: {e}") + return None + + def _simulate_realistic_browsing(self, response, is_main_page=False): + """模拟真实的页面浏览行为""" + content = response.text + + # 估算页面内容长度和阅读时间 + text_length = len(re.sub(r'<[^>]+>', '', content)) + reading_time = text_length / self.current_behavior['reading_speed'] * 60 # 转换为秒 + + logger.info(f"📖 页面内容长度: {text_length} 字符") + logger.info(f"📖 预估阅读时间: {reading_time:.1f} 秒") + + # 获取真实的停留时间 + if is_main_page: + base_time = random.uniform(*self.config['settings']['main_site_stay_time']) + else: + base_time = reading_time + + stay_time = self.user_db.get_realistic_timing(base_time, self.current_behavior) + + logger.info(f"⏱️ 实际停留时间: {stay_time:.1f} 秒") + + # 模拟分段浏览 + self._simulate_browsing_segments(stay_time, content) + + def _simulate_browsing_segments(self, total_time, content): + """模拟分段浏览行为""" + # 查找页面中的链接 + links = re.findall(r'href=["\'](.*?)["\']', content) + internal_links = [link for link in links if not link.startswith('http') or self.config['targets']['main_site'] in link] + + segments = random.randint(3, 8) + segment_time = total_time / segments + + browsing_actions = [ + "阅读页面内容", + "查看导航菜单", + "滚动浏览", + "查看页脚信息", + "检查页面链接", + "观察页面布局" + ] + + for i in range(segments): + action = random.choice(browsing_actions) + logger.info(f" 浏览行为: {action}") + + # 根据行为类型调整时间 + if action == "滚动浏览": + self._simulate_scrolling_behavior(segment_time) + elif action == "检查页面链接" and internal_links: + self._simulate_link_hovering(internal_links) + time.sleep(segment_time * 0.8) + else: + # 添加人为的不规律性 + actual_segment_time = segment_time * random.uniform(0.7, 1.3) + time.sleep(actual_segment_time) + + def _simulate_scrolling_behavior(self, duration): + """模拟真实的滚动行为""" + scroll_sessions = random.randint(2, 5) + session_time = duration / scroll_sessions + + for session in range(scroll_sessions): + logger.info(f" 滚动会话 {session + 1}") + + # 模拟快速滚动 + quick_scrolls = random.randint(2, 4) + for _ in range(quick_scrolls): + time.sleep(random.uniform(0.2, 0.8)) + + # 模拟停顿阅读 + if random.random() < 0.7: # 70%概率停顿阅读 + pause_time = random.uniform(1, 4) + logger.info(f" 停顿阅读 {pause_time:.1f}秒") + time.sleep(pause_time) + + def _simulate_link_hovering(self, links): + """模拟鼠标悬停在链接上""" + hover_count = min(random.randint(1, 3), len(links)) + sample_links = random.sample(links, hover_count) + + for link in sample_links: + logger.info(f" 查看链接: {link[:50]}...") + time.sleep(random.uniform(0.5, 2)) + + def _simulate_realistic_gaming(self, response): + """模拟真实的游戏行为""" + logger.info("🎲 开始模拟2048游戏") + + # 游戏前的准备时间 + prep_time = self.user_db.simulate_human_delays("thinking") + logger.info(f"🤔 游戏准备时间: {prep_time:.1f}秒") + time.sleep(prep_time) + + # 获取游戏停留时间 + base_time = random.uniform(*self.config['settings']['game_page_stay_time']) + game_time = self.user_db.get_realistic_timing(base_time, self.current_behavior) + + logger.info(f"🎮 游戏总时长: {game_time:.1f}秒") + + # 模拟游戏过程 + self._simulate_game_sessions(game_time) + + def _simulate_game_sessions(self, total_time): + """模拟游戏会话""" + # 将游戏时间分成多个会话 + sessions = random.randint(2, 5) + + for session in range(sessions): + session_time = total_time / sessions * random.uniform(0.8, 1.2) + logger.info(f"🎯 游戏会话 {session + 1}, 时长: {session_time:.1f}秒") + + self._simulate_single_game_session(session_time) + + # 会话间休息 + if session < sessions - 1: + break_time = random.uniform(2, 8) + logger.info(f"⏸️ 会话间休息: {break_time:.1f}秒") + time.sleep(break_time) + + def _simulate_single_game_session(self, session_time): + """模拟单个游戏会话""" + game_moves = ["⬆️上移", "⬇️下移", "⬅️左移", "➡️右移"] + + start_time = time.time() + move_count = 0 + + while time.time() - start_time < session_time: + # 随机选择移动方向 + move = random.choice(game_moves) + move_count += 1 + + logger.info(f" 第{move_count}步: {move}") + + # 模拟不同难度的思考时间 + if move_count % 10 == 0: # 每10步深度思考 + think_time = random.uniform(3, 8) + logger.info(f" 深度思考: {think_time:.1f}秒") + time.sleep(think_time) + elif random.random() < 0.3: # 30%概率短暂思考 + think_time = random.uniform(0.5, 2) + time.sleep(think_time) + else: # 快速移动 + time.sleep(random.uniform(0.3, 1)) + + # 模拟偶尔的错误操作 + if random.random() < 0.05: # 5%概率 + logger.info(" 误操作,快速纠正") + time.sleep(0.2) + logger.info(f" 纠正: {random.choice(game_moves)}") + time.sleep(random.uniform(0.3, 0.8)) + + def run_single_visit(self): + """执行一次完整的真实访问流程""" + logger.info("🚀 开始执行真实用户访问流程") + + # 设置会话 + self.setup_session() + + try: + # 1. 模拟真实的GitHub访问 + if not self.simulate_realistic_github_visit(): + logger.warning("GitHub访问模拟失败,继续执行") + + # 2. 访问主网站 + if not self.visit_main_site_realistic(): + logger.error("主网站访问失败") + return False + + # 3. 访问游戏页面 + if not self.visit_game_page_realistic(): + logger.error("游戏页面访问失败") + return False + + logger.info("✅ 真实访问流程执行成功") + return True + + except Exception as e: + logger.error(f"访问流程执行出错: {e}") + return False + + finally: + # 清理资源 + if self.session: + self.session.close() + + def run_continuous(self, total_visits=None, delay_range=None): + """连续执行多次真实访问""" + if total_visits is None: + total_visits = self.config['settings']['default_visits'] + + if delay_range is None: + delay_range = ( + self.config['settings']['min_delay'], + self.config['settings']['max_delay'] + ) + + success_count = 0 + + for i in range(total_visits): + logger.info(f"🔄 执行第 {i+1}/{total_visits} 次真实访问") + + if self.run_single_visit(): + success_count += 1 + + # 智能延迟(根据时间模式调整) + if i < total_visits - 1: + base_delay = random.uniform(delay_range[0], delay_range[1]) + behavior = self.user_db.get_visit_behavior() + + # 根据访问模式调整延迟 + if behavior['pattern_type'] == "工作时间": + delay = base_delay * 0.8 # 工作时间间隔较短 + elif behavior['pattern_type'] == "深夜": + delay = base_delay * 1.5 # 深夜间隔较长 + else: + delay = base_delay + + logger.info(f"⏳ 智能延迟 {delay:.1f} 秒 (模式: {behavior['pattern_type']})") + time.sleep(delay) + + logger.info(f"🎉 真实访问完成,成功: {success_count}/{total_visits}") + return success_count + +def main(): + """主函数""" + config_file = 'config.json' + + if not os.path.exists(config_file): + print(f"❌ 配置文件 {config_file} 不存在!") + return + + try: + bot = WebTrafficBotRealistic(config_file) + + print("=== 网站流量模拟脚本 (真实用户行为版) ===") + print("🎭 使用真实用户数据库,模拟最真实的访问轨迹") + print("⚠️ 请确保仅用于测试自己的网站!") + print("目标网站:", bot.config['targets']['main_site']) + print("游戏页面:", bot.config['targets']['game_page']) + print() + + print("请选择运行模式:") + print("1. 单次真实访问测试") + print("2. 连续真实访问模式 (使用配置文件设置)") + print("3. 连续真实访问模式 (自定义参数)") + + choice = input("请输入选择 (1/2/3): ").strip() + + if choice == "1": + logger.info("开始单次真实访问测试") + success = bot.run_single_visit() + if success: + print("✅ 单次真实访问测试成功!") + else: + print("❌ 单次真实访问测试失败!") + + elif choice == "2": + logger.info("开始连续真实访问(配置文件模式)") + success_count = bot.run_continuous() + print(f"✅ 连续真实访问完成!成功: {success_count}/{bot.config['settings']['default_visits']}") + + elif choice == "3": + try: + visit_count = int(input("请输入访问次数: ").strip()) + min_delay = int(input("请输入最小延迟秒数: ").strip()) + max_delay = int(input("请输入最大延迟秒数: ").strip()) + + logger.info(f"开始连续真实访问,总次数: {visit_count}") + success_count = bot.run_continuous( + total_visits=visit_count, + delay_range=(min_delay, max_delay) + ) + + print(f"✅ 连续真实访问完成!成功: {success_count}/{visit_count}") + + except ValueError: + print("❌ 输入参数错误!") + else: + print("❌ 无效选择!") + + except KeyboardInterrupt: + print("\n⚠️ 用户中断执行") + except Exception as e: + logger.error(f"程序执行出错: {e}") + print("❌ 程序执行出错,请检查日志文件 traffic_bot_realistic.log") + +if __name__ == "__main__": + main() \ No newline at end of file