#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 网站流量模拟脚本 用途:测试网站访问流程和性能 请确保仅用于测试自己的网站! """ import requests import time import random from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, WebDriverException import logging from urllib.parse import urlparse # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('traffic_bot.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class WebTrafficBot: def __init__(self, proxy_config): """ 初始化流量机器人 Args: proxy_config (dict): 代理配置 """ self.proxy_config = proxy_config self.session = None self.driver = None # 目标网站配置 self.main_site = "https://game.586vip.cn/" self.game_page = "https://game.586vip.cn/games/2048/index.html" # GitHub页面列表(模拟来源) self.github_referrers = [ "https://github.com/trending", "https://github.com/topics/javascript", "https://github.com/topics/game", "https://github.com/topics/html5", "https://github.com/search?q=2048+game", ] # 用户代理列表 self.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ] def get_current_ip(self): """获取当前IP地址""" ip_services = [ 'https://httpbin.org/ip', 'https://api.ipify.org?format=json', 'https://ipinfo.io/json', 'https://ifconfig.me/ip' ] for service in ip_services: try: logger.info(f"正在获取IP地址: {service}") response = self.session.get(service, timeout=10) if response.status_code == 200: if 'json' in service or 'httpbin' in service or 'ipinfo' in service: try: data = response.json() if 'origin' in data: # httpbin.org ip = data['origin'] elif 'ip' in data: # ipify.org 或 ipinfo.io ip = data['ip'] else: ip = str(data) except: ip = response.text.strip() else: ip = response.text.strip() logger.info(f"✅ 当前IP地址: {ip}") # 如果配置了代理,显示代理信息 if self.proxy_config: logger.info(f"🌐 代理服务器: {self.proxy_config['host']}:{self.proxy_config['port']}") else: logger.info("🌐 未使用代理,使用本地IP") return ip except Exception as e: logger.warning(f"从 {service} 获取IP失败: {e}") continue logger.error("❌ 无法获取当前IP地址") return None def setup_session(self): """设置请求会话""" self.session = requests.Session() # 设置代理 if self.proxy_config: proxy_url = f"http://{self.proxy_config['username']}:{self.proxy_config['password']}@{self.proxy_config['host']}:{self.proxy_config['port']}" self.session.proxies = { 'http': proxy_url, 'https': proxy_url } # 设置随机用户代理 self.session.headers.update({ 'User-Agent': random.choice(self.user_agents), 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', }) # 获取并显示当前IP current_ip = self.get_current_ip() if current_ip: print(f"🌍 当前IP地址: {current_ip}") def setup_selenium_driver(self): """设置Selenium WebDriver""" chrome_options = Options() # 设置代理 if self.proxy_config: proxy_url = f"{self.proxy_config['host']}:{self.proxy_config['port']}" chrome_options.add_argument(f'--proxy-server=http://{proxy_url}') # 其他Chrome选项 chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--window-size=1920,1080') chrome_options.add_argument(f'--user-agent={random.choice(self.user_agents)}') # 禁用图片加载以提高速度(可选) prefs = {"profile.managed_default_content_settings.images": 2} chrome_options.add_experimental_option("prefs", prefs) try: self.driver = webdriver.Chrome(options=chrome_options) return True except WebDriverException as e: logger.error(f"WebDriver初始化失败: {e}") return False def simulate_github_visit(self): """模拟从GitHub访问""" github_page = random.choice(self.github_referrers) try: logger.info(f"模拟访问GitHub页面: {github_page}") # 设置referrer self.session.headers.update({'Referer': github_page}) # 模拟在GitHub停留 stay_time = random.uniform(2, 8) logger.info(f"在GitHub页面停留 {stay_time:.1f} 秒") time.sleep(stay_time) return True except Exception as e: logger.error(f"GitHub访问模拟失败: {e}") return False def visit_main_site(self): """访问主网站""" try: logger.info(f"访问主网站: {self.main_site}") if self.driver: # 使用Selenium访问 self.driver.get(self.main_site) # 等待页面加载 WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) # 模拟用户行为 self.simulate_user_behavior() else: # 使用requests访问 response = self.session.get(self.main_site, timeout=10) response.raise_for_status() logger.info(f"主网站访问成功,状态码: {response.status_code}") # 随机停留时间 stay_time = random.uniform(5, 20) logger.info(f"在主网站停留 {stay_time:.1f} 秒") time.sleep(stay_time) return True except Exception as e: logger.error(f"主网站访问失败: {e}") return False def visit_game_page(self): """访问2048游戏页面""" try: logger.info(f"访问游戏页面: {self.game_page}") if self.driver: # 使用Selenium访问 self.driver.get(self.game_page) # 等待页面加载 WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) # 模拟游戏交互 self.simulate_game_interaction() else: # 使用requests访问 response = self.session.get(self.game_page, timeout=10) response.raise_for_status() logger.info(f"游戏页面访问成功,状态码: {response.status_code}") # 在游戏页面停留更长时间 stay_time = random.uniform(30, 120) logger.info(f"在游戏页面停留 {stay_time:.1f} 秒") time.sleep(stay_time) return True except Exception as e: logger.error(f"游戏页面访问失败: {e}") return False def simulate_user_behavior(self): """模拟真实用户行为""" if not self.driver: return try: # 模拟滚动 scroll_times = random.randint(1, 3) for _ in range(scroll_times): scroll_position = random.randint(200, 800) self.driver.execute_script(f"window.scrollTo(0, {scroll_position});") time.sleep(random.uniform(1, 3)) # 模拟点击(如果有链接) try: links = self.driver.find_elements(By.TAG_NAME, "a") if links and random.random() < 0.3: # 30%概率点击链接 link = random.choice(links[:5]) # 只考虑前5个链接 if link.is_displayed() and link.is_enabled(): link.click() time.sleep(random.uniform(1, 3)) self.driver.back() # 返回 except: pass except Exception as e: logger.warning(f"用户行为模拟出错: {e}") def simulate_game_interaction(self): """模拟游戏交互""" if not self.driver: return try: # 模拟键盘操作(2048游戏使用方向键) from selenium.webdriver.common.keys import Keys from selenium.webdriver.common.action_chains import ActionChains actions = ActionChains(self.driver) # 模拟几次游戏操作 game_moves = [Keys.ARROW_UP, Keys.ARROW_DOWN, Keys.ARROW_LEFT, Keys.ARROW_RIGHT] move_count = random.randint(5, 15) for _ in range(move_count): move = random.choice(game_moves) actions.send_keys(move).perform() time.sleep(random.uniform(0.5, 2)) except Exception as e: logger.warning(f"游戏交互模拟出错: {e}") def run_single_visit(self): """执行一次完整的访问流程""" logger.info("开始执行访问流程") # 设置会话 self.setup_session() # 随机选择是否使用Selenium use_selenium = random.random() < 0.7 # 70%概率使用Selenium if use_selenium: if not self.setup_selenium_driver(): logger.warning("Selenium设置失败,使用requests模式") use_selenium = False try: # 1. 模拟从GitHub访问 if not self.simulate_github_visit(): logger.warning("GitHub访问模拟失败,继续执行") # 2. 访问主网站 if not self.visit_main_site(): logger.error("主网站访问失败") return False # 3. 访问游戏页面 if not self.visit_game_page(): logger.error("游戏页面访问失败") return False logger.info("访问流程执行成功") return True except Exception as e: logger.error(f"访问流程执行出错: {e}") return False finally: # 清理资源 if self.driver: self.driver.quit() if self.session: self.session.close() def run_continuous(self, total_visits=10, delay_range=(60, 300)): """连续执行多次访问""" success_count = 0 for i in range(total_visits): logger.info(f"执行第 {i+1}/{total_visits} 次访问") if self.run_single_visit(): success_count += 1 # 随机延迟 if i < total_visits - 1: # 最后一次不需要延迟 delay = random.uniform(delay_range[0], delay_range[1]) logger.info(f"等待 {delay:.1f} 秒后进行下次访问") time.sleep(delay) logger.info(f"访问完成,成功: {success_count}/{total_visits}") return success_count def main(): """主函数""" # 代理配置 proxy_config = { 'host': 'gw.dataimpulse.com', 'port': '823', 'username': '3b9936d2ce39b35c4bdf__cr.us', 'password': '2263006e0ff05530' } # 创建机器人实例 bot = WebTrafficBot(proxy_config) print("=== 网站流量模拟脚本 ===") print("⚠️ 请确保仅用于测试自己的网站!") print("目标网站:", bot.main_site) print("游戏页面:", bot.game_page) print() try: # 询问用户选择 print("请选择运行模式:") print("1. 单次访问测试") print("2. 连续访问模式") choice = input("请输入选择 (1/2): ").strip() if choice == "1": # 单次访问 logger.info("开始单次访问测试") success = bot.run_single_visit() if success: print("✅ 单次访问测试成功!") else: print("❌ 单次访问测试失败!") elif choice == "2": # 连续访问 try: visit_count = int(input("请输入访问次数 (默认10): ").strip() or "10") min_delay = int(input("请输入最小延迟秒数 (默认60): ").strip() or "60") max_delay = int(input("请输入最大延迟秒数 (默认300): ").strip() or "300") logger.info(f"开始连续访问,总次数: {visit_count}") success_count = bot.run_continuous( total_visits=visit_count, delay_range=(min_delay, max_delay) ) print(f"✅ 连续访问完成!成功: {success_count}/{visit_count}") except ValueError: print("❌ 输入参数错误!") else: print("❌ 无效选择!") except KeyboardInterrupt: print("\n⚠️ 用户中断执行") except Exception as e: logger.error(f"程序执行出错: {e}") print("❌ 程序执行出错,请检查日志文件 traffic_bot.log") if __name__ == "__main__": main()