添加网站流量模拟脚本和相关配置文件
This commit is contained in:
26
config.json
Normal file
26
config.json
Normal file
@@ -0,0 +1,26 @@
|
||||
{
|
||||
"proxy": {
|
||||
"host": "gw.dataimpulse.com",
|
||||
"port": "823",
|
||||
"username": "3b9936d2ce39b35c4bdf__cr.us",
|
||||
"password": "2263006e0ff05530"
|
||||
},
|
||||
"targets": {
|
||||
"main_site": "https://game.586vip.cn/",
|
||||
"game_page": "https://game.586vip.cn/games/2048/index.html"
|
||||
},
|
||||
"settings": {
|
||||
"default_visits": 10,
|
||||
"min_delay": 60,
|
||||
"max_delay": 300,
|
||||
"main_site_stay_time": [5, 20],
|
||||
"game_page_stay_time": [30, 120]
|
||||
},
|
||||
"github_referrers": [
|
||||
"https://github.com/trending",
|
||||
"https://github.com/topics/javascript",
|
||||
"https://github.com/topics/game",
|
||||
"https://github.com/topics/html5",
|
||||
"https://github.com/search?q=2048+game"
|
||||
]
|
||||
}
|
||||
3
requirements.txt
Normal file
3
requirements.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
requests>=2.31.0
|
||||
selenium>=4.15.0
|
||||
webdriver-manager>=4.0.0
|
||||
85
setup_and_run.bat
Normal file
85
setup_and_run.bat
Normal file
@@ -0,0 +1,85 @@
|
||||
@echo off
|
||||
chcp 65001 >nul
|
||||
title 网站流量模拟脚本
|
||||
|
||||
echo ======================================
|
||||
echo 网站流量模拟脚本 安装和运行工具
|
||||
echo ======================================
|
||||
echo.
|
||||
|
||||
:: 检查Python是否已安装
|
||||
python --version >nul 2>&1
|
||||
if %errorlevel% neq 0 (
|
||||
echo ❌ Python未安装!请先安装Python 3.7+
|
||||
echo 下载地址: https://www.python.org/downloads/
|
||||
pause
|
||||
exit /b 1
|
||||
)
|
||||
|
||||
echo ✅ Python已安装
|
||||
python --version
|
||||
|
||||
:: 检查pip是否可用
|
||||
pip --version >nul 2>&1
|
||||
if %errorlevel% neq 0 (
|
||||
echo ❌ pip不可用!
|
||||
pause
|
||||
exit /b 1
|
||||
)
|
||||
|
||||
echo ✅ pip可用
|
||||
|
||||
:: 安装依赖
|
||||
echo.
|
||||
echo 📦 安装Python依赖包...
|
||||
pip install -r requirements.txt
|
||||
|
||||
if %errorlevel% neq 0 (
|
||||
echo ❌ 依赖安装失败!
|
||||
pause
|
||||
exit /b 1
|
||||
)
|
||||
|
||||
echo ✅ 依赖安装成功
|
||||
|
||||
:: 检查Chrome Driver
|
||||
echo.
|
||||
echo 🔍 检查Chrome和ChromeDriver...
|
||||
echo 如果没有安装Chrome浏览器,请先安装:
|
||||
echo https://www.google.com/chrome/
|
||||
|
||||
:: 显示脚本选择菜单
|
||||
echo.
|
||||
echo 📋 请选择要运行的脚本:
|
||||
echo 1. 基础版本 (website_traffic_bot.py)
|
||||
echo 2. 配置文件版本 (website_traffic_bot_config.py)
|
||||
echo 3. 退出
|
||||
|
||||
set /p choice="请输入选择 (1/2/3): "
|
||||
|
||||
if "%choice%"=="1" (
|
||||
echo.
|
||||
echo 🚀 运行基础版本...
|
||||
python website_traffic_bot.py
|
||||
) else if "%choice%"=="2" (
|
||||
if not exist config.json (
|
||||
echo ❌ 配置文件 config.json 不存在!
|
||||
echo 请先检查配置文件是否存在。
|
||||
pause
|
||||
exit /b 1
|
||||
)
|
||||
echo.
|
||||
echo 🚀 运行配置文件版本...
|
||||
python website_traffic_bot_config.py
|
||||
) else if "%choice%"=="3" (
|
||||
echo 👋 再见!
|
||||
exit /b 0
|
||||
) else (
|
||||
echo ❌ 无效选择!
|
||||
pause
|
||||
exit /b 1
|
||||
)
|
||||
|
||||
echo.
|
||||
echo 🏁 脚本执行完成!
|
||||
pause
|
||||
381
website_traffic_bot.py
Normal file
381
website_traffic_bot.py
Normal file
@@ -0,0 +1,381 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
网站流量模拟脚本
|
||||
用途:测试网站访问流程和性能
|
||||
请确保仅用于测试自己的网站!
|
||||
"""
|
||||
|
||||
import requests
|
||||
import time
|
||||
import random
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.common.exceptions import TimeoutException, WebDriverException
|
||||
import logging
|
||||
from urllib.parse import urlparse
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler('traffic_bot.log', encoding='utf-8'),
|
||||
logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class WebTrafficBot:
|
||||
def __init__(self, proxy_config):
|
||||
"""
|
||||
初始化流量机器人
|
||||
|
||||
Args:
|
||||
proxy_config (dict): 代理配置
|
||||
"""
|
||||
self.proxy_config = proxy_config
|
||||
self.session = None
|
||||
self.driver = None
|
||||
|
||||
# 目标网站配置
|
||||
self.main_site = "https://game.586vip.cn/"
|
||||
self.game_page = "https://game.586vip.cn/games/2048/index.html"
|
||||
|
||||
# GitHub页面列表(模拟来源)
|
||||
self.github_referrers = [
|
||||
"https://github.com/trending",
|
||||
"https://github.com/topics/javascript",
|
||||
"https://github.com/topics/game",
|
||||
"https://github.com/topics/html5",
|
||||
"https://github.com/search?q=2048+game",
|
||||
]
|
||||
|
||||
# 用户代理列表
|
||||
self.user_agents = [
|
||||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
|
||||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
||||
]
|
||||
|
||||
def setup_session(self):
|
||||
"""设置请求会话"""
|
||||
self.session = requests.Session()
|
||||
|
||||
# 设置代理
|
||||
if self.proxy_config:
|
||||
proxy_url = f"http://{self.proxy_config['username']}:{self.proxy_config['password']}@{self.proxy_config['host']}:{self.proxy_config['port']}"
|
||||
self.session.proxies = {
|
||||
'http': proxy_url,
|
||||
'https': proxy_url
|
||||
}
|
||||
|
||||
# 设置随机用户代理
|
||||
self.session.headers.update({
|
||||
'User-Agent': random.choice(self.user_agents),
|
||||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
||||
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
|
||||
'Accept-Encoding': 'gzip, deflate, br',
|
||||
'Connection': 'keep-alive',
|
||||
'Upgrade-Insecure-Requests': '1',
|
||||
})
|
||||
|
||||
def setup_selenium_driver(self):
|
||||
"""设置Selenium WebDriver"""
|
||||
chrome_options = Options()
|
||||
|
||||
# 设置代理
|
||||
if self.proxy_config:
|
||||
proxy_url = f"{self.proxy_config['host']}:{self.proxy_config['port']}"
|
||||
chrome_options.add_argument(f'--proxy-server=http://{proxy_url}')
|
||||
|
||||
# 其他Chrome选项
|
||||
chrome_options.add_argument('--no-sandbox')
|
||||
chrome_options.add_argument('--disable-dev-shm-usage')
|
||||
chrome_options.add_argument('--disable-gpu')
|
||||
chrome_options.add_argument('--window-size=1920,1080')
|
||||
chrome_options.add_argument(f'--user-agent={random.choice(self.user_agents)}')
|
||||
|
||||
# 禁用图片加载以提高速度(可选)
|
||||
prefs = {"profile.managed_default_content_settings.images": 2}
|
||||
chrome_options.add_experimental_option("prefs", prefs)
|
||||
|
||||
try:
|
||||
self.driver = webdriver.Chrome(options=chrome_options)
|
||||
return True
|
||||
except WebDriverException as e:
|
||||
logger.error(f"WebDriver初始化失败: {e}")
|
||||
return False
|
||||
|
||||
def simulate_github_visit(self):
|
||||
"""模拟从GitHub访问"""
|
||||
github_page = random.choice(self.github_referrers)
|
||||
|
||||
try:
|
||||
logger.info(f"模拟访问GitHub页面: {github_page}")
|
||||
|
||||
# 设置referrer
|
||||
self.session.headers.update({'Referer': github_page})
|
||||
|
||||
# 模拟在GitHub停留
|
||||
stay_time = random.uniform(2, 8)
|
||||
logger.info(f"在GitHub页面停留 {stay_time:.1f} 秒")
|
||||
time.sleep(stay_time)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"GitHub访问模拟失败: {e}")
|
||||
return False
|
||||
|
||||
def visit_main_site(self):
|
||||
"""访问主网站"""
|
||||
try:
|
||||
logger.info(f"访问主网站: {self.main_site}")
|
||||
|
||||
if self.driver:
|
||||
# 使用Selenium访问
|
||||
self.driver.get(self.main_site)
|
||||
|
||||
# 等待页面加载
|
||||
WebDriverWait(self.driver, 10).until(
|
||||
EC.presence_of_element_located((By.TAG_NAME, "body"))
|
||||
)
|
||||
|
||||
# 模拟用户行为
|
||||
self.simulate_user_behavior()
|
||||
|
||||
else:
|
||||
# 使用requests访问
|
||||
response = self.session.get(self.main_site, timeout=10)
|
||||
response.raise_for_status()
|
||||
logger.info(f"主网站访问成功,状态码: {response.status_code}")
|
||||
|
||||
# 随机停留时间
|
||||
stay_time = random.uniform(5, 20)
|
||||
logger.info(f"在主网站停留 {stay_time:.1f} 秒")
|
||||
time.sleep(stay_time)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"主网站访问失败: {e}")
|
||||
return False
|
||||
|
||||
def visit_game_page(self):
|
||||
"""访问2048游戏页面"""
|
||||
try:
|
||||
logger.info(f"访问游戏页面: {self.game_page}")
|
||||
|
||||
if self.driver:
|
||||
# 使用Selenium访问
|
||||
self.driver.get(self.game_page)
|
||||
|
||||
# 等待页面加载
|
||||
WebDriverWait(self.driver, 10).until(
|
||||
EC.presence_of_element_located((By.TAG_NAME, "body"))
|
||||
)
|
||||
|
||||
# 模拟游戏交互
|
||||
self.simulate_game_interaction()
|
||||
|
||||
else:
|
||||
# 使用requests访问
|
||||
response = self.session.get(self.game_page, timeout=10)
|
||||
response.raise_for_status()
|
||||
logger.info(f"游戏页面访问成功,状态码: {response.status_code}")
|
||||
|
||||
# 在游戏页面停留更长时间
|
||||
stay_time = random.uniform(30, 120)
|
||||
logger.info(f"在游戏页面停留 {stay_time:.1f} 秒")
|
||||
time.sleep(stay_time)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"游戏页面访问失败: {e}")
|
||||
return False
|
||||
|
||||
def simulate_user_behavior(self):
|
||||
"""模拟真实用户行为"""
|
||||
if not self.driver:
|
||||
return
|
||||
|
||||
try:
|
||||
# 模拟滚动
|
||||
scroll_times = random.randint(1, 3)
|
||||
for _ in range(scroll_times):
|
||||
scroll_position = random.randint(200, 800)
|
||||
self.driver.execute_script(f"window.scrollTo(0, {scroll_position});")
|
||||
time.sleep(random.uniform(1, 3))
|
||||
|
||||
# 模拟点击(如果有链接)
|
||||
try:
|
||||
links = self.driver.find_elements(By.TAG_NAME, "a")
|
||||
if links and random.random() < 0.3: # 30%概率点击链接
|
||||
link = random.choice(links[:5]) # 只考虑前5个链接
|
||||
if link.is_displayed() and link.is_enabled():
|
||||
link.click()
|
||||
time.sleep(random.uniform(1, 3))
|
||||
self.driver.back() # 返回
|
||||
except:
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"用户行为模拟出错: {e}")
|
||||
|
||||
def simulate_game_interaction(self):
|
||||
"""模拟游戏交互"""
|
||||
if not self.driver:
|
||||
return
|
||||
|
||||
try:
|
||||
# 模拟键盘操作(2048游戏使用方向键)
|
||||
from selenium.webdriver.common.keys import Keys
|
||||
from selenium.webdriver.common.action_chains import ActionChains
|
||||
|
||||
actions = ActionChains(self.driver)
|
||||
|
||||
# 模拟几次游戏操作
|
||||
game_moves = [Keys.ARROW_UP, Keys.ARROW_DOWN, Keys.ARROW_LEFT, Keys.ARROW_RIGHT]
|
||||
move_count = random.randint(5, 15)
|
||||
|
||||
for _ in range(move_count):
|
||||
move = random.choice(game_moves)
|
||||
actions.send_keys(move).perform()
|
||||
time.sleep(random.uniform(0.5, 2))
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"游戏交互模拟出错: {e}")
|
||||
|
||||
def run_single_visit(self):
|
||||
"""执行一次完整的访问流程"""
|
||||
logger.info("开始执行访问流程")
|
||||
|
||||
# 设置会话
|
||||
self.setup_session()
|
||||
|
||||
# 随机选择是否使用Selenium
|
||||
use_selenium = random.random() < 0.7 # 70%概率使用Selenium
|
||||
|
||||
if use_selenium:
|
||||
if not self.setup_selenium_driver():
|
||||
logger.warning("Selenium设置失败,使用requests模式")
|
||||
use_selenium = False
|
||||
|
||||
try:
|
||||
# 1. 模拟从GitHub访问
|
||||
if not self.simulate_github_visit():
|
||||
logger.warning("GitHub访问模拟失败,继续执行")
|
||||
|
||||
# 2. 访问主网站
|
||||
if not self.visit_main_site():
|
||||
logger.error("主网站访问失败")
|
||||
return False
|
||||
|
||||
# 3. 访问游戏页面
|
||||
if not self.visit_game_page():
|
||||
logger.error("游戏页面访问失败")
|
||||
return False
|
||||
|
||||
logger.info("访问流程执行成功")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"访问流程执行出错: {e}")
|
||||
return False
|
||||
|
||||
finally:
|
||||
# 清理资源
|
||||
if self.driver:
|
||||
self.driver.quit()
|
||||
if self.session:
|
||||
self.session.close()
|
||||
|
||||
def run_continuous(self, total_visits=10, delay_range=(60, 300)):
|
||||
"""连续执行多次访问"""
|
||||
success_count = 0
|
||||
|
||||
for i in range(total_visits):
|
||||
logger.info(f"执行第 {i+1}/{total_visits} 次访问")
|
||||
|
||||
if self.run_single_visit():
|
||||
success_count += 1
|
||||
|
||||
# 随机延迟
|
||||
if i < total_visits - 1: # 最后一次不需要延迟
|
||||
delay = random.uniform(delay_range[0], delay_range[1])
|
||||
logger.info(f"等待 {delay:.1f} 秒后进行下次访问")
|
||||
time.sleep(delay)
|
||||
|
||||
logger.info(f"访问完成,成功: {success_count}/{total_visits}")
|
||||
return success_count
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
# 代理配置
|
||||
proxy_config = {
|
||||
'host': 'gw.dataimpulse.com',
|
||||
'port': '823',
|
||||
'username': '3b9936d2ce39b35c4bdf__cr.us',
|
||||
'password': '2263006e0ff05530'
|
||||
}
|
||||
|
||||
# 创建机器人实例
|
||||
bot = WebTrafficBot(proxy_config)
|
||||
|
||||
print("=== 网站流量模拟脚本 ===")
|
||||
print("⚠️ 请确保仅用于测试自己的网站!")
|
||||
print("目标网站:", bot.main_site)
|
||||
print("游戏页面:", bot.game_page)
|
||||
print()
|
||||
|
||||
try:
|
||||
# 询问用户选择
|
||||
print("请选择运行模式:")
|
||||
print("1. 单次访问测试")
|
||||
print("2. 连续访问模式")
|
||||
|
||||
choice = input("请输入选择 (1/2): ").strip()
|
||||
|
||||
if choice == "1":
|
||||
# 单次访问
|
||||
logger.info("开始单次访问测试")
|
||||
success = bot.run_single_visit()
|
||||
if success:
|
||||
print("✅ 单次访问测试成功!")
|
||||
else:
|
||||
print("❌ 单次访问测试失败!")
|
||||
|
||||
elif choice == "2":
|
||||
# 连续访问
|
||||
try:
|
||||
visit_count = int(input("请输入访问次数 (默认10): ").strip() or "10")
|
||||
min_delay = int(input("请输入最小延迟秒数 (默认60): ").strip() or "60")
|
||||
max_delay = int(input("请输入最大延迟秒数 (默认300): ").strip() or "300")
|
||||
|
||||
logger.info(f"开始连续访问,总次数: {visit_count}")
|
||||
success_count = bot.run_continuous(
|
||||
total_visits=visit_count,
|
||||
delay_range=(min_delay, max_delay)
|
||||
)
|
||||
|
||||
print(f"✅ 连续访问完成!成功: {success_count}/{visit_count}")
|
||||
|
||||
except ValueError:
|
||||
print("❌ 输入参数错误!")
|
||||
else:
|
||||
print("❌ 无效选择!")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n⚠️ 用户中断执行")
|
||||
except Exception as e:
|
||||
logger.error(f"程序执行出错: {e}")
|
||||
print("❌ 程序执行出错,请检查日志文件 traffic_bot.log")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
412
website_traffic_bot_config.py
Normal file
412
website_traffic_bot_config.py
Normal file
@@ -0,0 +1,412 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
网站流量模拟脚本 (配置文件版本)
|
||||
用途:测试网站访问流程和性能
|
||||
请确保仅用于测试自己的网站!
|
||||
"""
|
||||
|
||||
import requests
|
||||
import time
|
||||
import random
|
||||
import json
|
||||
import os
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.common.exceptions import TimeoutException, WebDriverException
|
||||
import logging
|
||||
from urllib.parse import urlparse
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler('traffic_bot.log', encoding='utf-8'),
|
||||
logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class WebTrafficBotConfig:
|
||||
def __init__(self, config_file='config.json'):
|
||||
"""
|
||||
初始化配置版流量机器人
|
||||
|
||||
Args:
|
||||
config_file (str): 配置文件路径
|
||||
"""
|
||||
self.config = self.load_config(config_file)
|
||||
self.session = None
|
||||
self.driver = None
|
||||
|
||||
# 用户代理列表
|
||||
self.user_agents = [
|
||||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
|
||||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
||||
]
|
||||
|
||||
def load_config(self, config_file):
|
||||
"""加载配置文件"""
|
||||
try:
|
||||
with open(config_file, 'r', encoding='utf-8') as f:
|
||||
config = json.load(f)
|
||||
logger.info(f"配置文件加载成功: {config_file}")
|
||||
return config
|
||||
except FileNotFoundError:
|
||||
logger.error(f"配置文件未找到: {config_file}")
|
||||
raise
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"配置文件格式错误: {e}")
|
||||
raise
|
||||
|
||||
def setup_session(self):
|
||||
"""设置请求会话"""
|
||||
self.session = requests.Session()
|
||||
|
||||
# 设置代理
|
||||
proxy_config = self.config.get('proxy')
|
||||
if proxy_config:
|
||||
proxy_url = f"http://{proxy_config['username']}:{proxy_config['password']}@{proxy_config['host']}:{proxy_config['port']}"
|
||||
self.session.proxies = {
|
||||
'http': proxy_url,
|
||||
'https': proxy_url
|
||||
}
|
||||
|
||||
# 设置随机用户代理
|
||||
self.session.headers.update({
|
||||
'User-Agent': random.choice(self.user_agents),
|
||||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
||||
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
|
||||
'Accept-Encoding': 'gzip, deflate, br',
|
||||
'Connection': 'keep-alive',
|
||||
'Upgrade-Insecure-Requests': '1',
|
||||
})
|
||||
|
||||
def setup_selenium_driver(self):
|
||||
"""设置Selenium WebDriver"""
|
||||
chrome_options = Options()
|
||||
|
||||
# 设置代理
|
||||
proxy_config = self.config.get('proxy')
|
||||
if proxy_config:
|
||||
proxy_url = f"{proxy_config['host']}:{proxy_config['port']}"
|
||||
chrome_options.add_argument(f'--proxy-server=http://{proxy_url}')
|
||||
|
||||
# 其他Chrome选项
|
||||
chrome_options.add_argument('--no-sandbox')
|
||||
chrome_options.add_argument('--disable-dev-shm-usage')
|
||||
chrome_options.add_argument('--disable-gpu')
|
||||
chrome_options.add_argument('--window-size=1920,1080')
|
||||
chrome_options.add_argument(f'--user-agent={random.choice(self.user_agents)}')
|
||||
|
||||
# 禁用图片加载以提高速度(可选)
|
||||
prefs = {"profile.managed_default_content_settings.images": 2}
|
||||
chrome_options.add_experimental_option("prefs", prefs)
|
||||
|
||||
try:
|
||||
self.driver = webdriver.Chrome(options=chrome_options)
|
||||
return True
|
||||
except WebDriverException as e:
|
||||
logger.error(f"WebDriver初始化失败: {e}")
|
||||
return False
|
||||
|
||||
def simulate_github_visit(self):
|
||||
"""模拟从GitHub访问"""
|
||||
github_referrers = self.config.get('github_referrers', [])
|
||||
if not github_referrers:
|
||||
return True
|
||||
|
||||
github_page = random.choice(github_referrers)
|
||||
|
||||
try:
|
||||
logger.info(f"模拟访问GitHub页面: {github_page}")
|
||||
|
||||
# 设置referrer
|
||||
self.session.headers.update({'Referer': github_page})
|
||||
|
||||
# 模拟在GitHub停留
|
||||
stay_time = random.uniform(2, 8)
|
||||
logger.info(f"在GitHub页面停留 {stay_time:.1f} 秒")
|
||||
time.sleep(stay_time)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"GitHub访问模拟失败: {e}")
|
||||
return False
|
||||
|
||||
def visit_main_site(self):
|
||||
"""访问主网站"""
|
||||
main_site = self.config['targets']['main_site']
|
||||
stay_time_range = self.config['settings']['main_site_stay_time']
|
||||
|
||||
try:
|
||||
logger.info(f"访问主网站: {main_site}")
|
||||
|
||||
if self.driver:
|
||||
# 使用Selenium访问
|
||||
self.driver.get(main_site)
|
||||
|
||||
# 等待页面加载
|
||||
WebDriverWait(self.driver, 10).until(
|
||||
EC.presence_of_element_located((By.TAG_NAME, "body"))
|
||||
)
|
||||
|
||||
# 模拟用户行为
|
||||
self.simulate_user_behavior()
|
||||
|
||||
else:
|
||||
# 使用requests访问
|
||||
response = self.session.get(main_site, timeout=10)
|
||||
response.raise_for_status()
|
||||
logger.info(f"主网站访问成功,状态码: {response.status_code}")
|
||||
|
||||
# 随机停留时间
|
||||
stay_time = random.uniform(stay_time_range[0], stay_time_range[1])
|
||||
logger.info(f"在主网站停留 {stay_time:.1f} 秒")
|
||||
time.sleep(stay_time)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"主网站访问失败: {e}")
|
||||
return False
|
||||
|
||||
def visit_game_page(self):
|
||||
"""访问游戏页面"""
|
||||
game_page = self.config['targets']['game_page']
|
||||
stay_time_range = self.config['settings']['game_page_stay_time']
|
||||
|
||||
try:
|
||||
logger.info(f"访问游戏页面: {game_page}")
|
||||
|
||||
if self.driver:
|
||||
# 使用Selenium访问
|
||||
self.driver.get(game_page)
|
||||
|
||||
# 等待页面加载
|
||||
WebDriverWait(self.driver, 10).until(
|
||||
EC.presence_of_element_located((By.TAG_NAME, "body"))
|
||||
)
|
||||
|
||||
# 模拟游戏交互
|
||||
self.simulate_game_interaction()
|
||||
|
||||
else:
|
||||
# 使用requests访问
|
||||
response = self.session.get(game_page, timeout=10)
|
||||
response.raise_for_status()
|
||||
logger.info(f"游戏页面访问成功,状态码: {response.status_code}")
|
||||
|
||||
# 在游戏页面停留时间
|
||||
stay_time = random.uniform(stay_time_range[0], stay_time_range[1])
|
||||
logger.info(f"在游戏页面停留 {stay_time:.1f} 秒")
|
||||
time.sleep(stay_time)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"游戏页面访问失败: {e}")
|
||||
return False
|
||||
|
||||
def simulate_user_behavior(self):
|
||||
"""模拟真实用户行为"""
|
||||
if not self.driver:
|
||||
return
|
||||
|
||||
try:
|
||||
# 模拟滚动
|
||||
scroll_times = random.randint(1, 3)
|
||||
for _ in range(scroll_times):
|
||||
scroll_position = random.randint(200, 800)
|
||||
self.driver.execute_script(f"window.scrollTo(0, {scroll_position});")
|
||||
time.sleep(random.uniform(1, 3))
|
||||
|
||||
# 模拟点击(如果有链接)
|
||||
try:
|
||||
links = self.driver.find_elements(By.TAG_NAME, "a")
|
||||
if links and random.random() < 0.3: # 30%概率点击链接
|
||||
link = random.choice(links[:5]) # 只考虑前5个链接
|
||||
if link.is_displayed() and link.is_enabled():
|
||||
link.click()
|
||||
time.sleep(random.uniform(1, 3))
|
||||
self.driver.back() # 返回
|
||||
except:
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"用户行为模拟出错: {e}")
|
||||
|
||||
def simulate_game_interaction(self):
|
||||
"""模拟游戏交互"""
|
||||
if not self.driver:
|
||||
return
|
||||
|
||||
try:
|
||||
# 模拟键盘操作(2048游戏使用方向键)
|
||||
from selenium.webdriver.common.keys import Keys
|
||||
from selenium.webdriver.common.action_chains import ActionChains
|
||||
|
||||
actions = ActionChains(self.driver)
|
||||
|
||||
# 模拟几次游戏操作
|
||||
game_moves = [Keys.ARROW_UP, Keys.ARROW_DOWN, Keys.ARROW_LEFT, Keys.ARROW_RIGHT]
|
||||
move_count = random.randint(5, 15)
|
||||
|
||||
for _ in range(move_count):
|
||||
move = random.choice(game_moves)
|
||||
actions.send_keys(move).perform()
|
||||
time.sleep(random.uniform(0.5, 2))
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"游戏交互模拟出错: {e}")
|
||||
|
||||
def run_single_visit(self):
|
||||
"""执行一次完整的访问流程"""
|
||||
logger.info("开始执行访问流程")
|
||||
|
||||
# 设置会话
|
||||
self.setup_session()
|
||||
|
||||
# 随机选择是否使用Selenium
|
||||
use_selenium = random.random() < 0.7 # 70%概率使用Selenium
|
||||
|
||||
if use_selenium:
|
||||
if not self.setup_selenium_driver():
|
||||
logger.warning("Selenium设置失败,使用requests模式")
|
||||
use_selenium = False
|
||||
|
||||
try:
|
||||
# 1. 模拟从GitHub访问
|
||||
if not self.simulate_github_visit():
|
||||
logger.warning("GitHub访问模拟失败,继续执行")
|
||||
|
||||
# 2. 访问主网站
|
||||
if not self.visit_main_site():
|
||||
logger.error("主网站访问失败")
|
||||
return False
|
||||
|
||||
# 3. 访问游戏页面
|
||||
if not self.visit_game_page():
|
||||
logger.error("游戏页面访问失败")
|
||||
return False
|
||||
|
||||
logger.info("访问流程执行成功")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"访问流程执行出错: {e}")
|
||||
return False
|
||||
|
||||
finally:
|
||||
# 清理资源
|
||||
if self.driver:
|
||||
self.driver.quit()
|
||||
if self.session:
|
||||
self.session.close()
|
||||
|
||||
def run_continuous(self, total_visits=None, delay_range=None):
|
||||
"""连续执行多次访问"""
|
||||
if total_visits is None:
|
||||
total_visits = self.config['settings']['default_visits']
|
||||
|
||||
if delay_range is None:
|
||||
delay_range = (
|
||||
self.config['settings']['min_delay'],
|
||||
self.config['settings']['max_delay']
|
||||
)
|
||||
|
||||
success_count = 0
|
||||
|
||||
for i in range(total_visits):
|
||||
logger.info(f"执行第 {i+1}/{total_visits} 次访问")
|
||||
|
||||
if self.run_single_visit():
|
||||
success_count += 1
|
||||
|
||||
# 随机延迟
|
||||
if i < total_visits - 1: # 最后一次不需要延迟
|
||||
delay = random.uniform(delay_range[0], delay_range[1])
|
||||
logger.info(f"等待 {delay:.1f} 秒后进行下次访问")
|
||||
time.sleep(delay)
|
||||
|
||||
logger.info(f"访问完成,成功: {success_count}/{total_visits}")
|
||||
return success_count
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
config_file = 'config.json'
|
||||
|
||||
# 检查配置文件是否存在
|
||||
if not os.path.exists(config_file):
|
||||
print(f"❌ 配置文件 {config_file} 不存在!")
|
||||
print("请先创建配置文件或使用 website_traffic_bot.py")
|
||||
return
|
||||
|
||||
try:
|
||||
# 创建机器人实例
|
||||
bot = WebTrafficBotConfig(config_file)
|
||||
|
||||
print("=== 网站流量模拟脚本 (配置版) ===")
|
||||
print("⚠️ 请确保仅用于测试自己的网站!")
|
||||
print("目标网站:", bot.config['targets']['main_site'])
|
||||
print("游戏页面:", bot.config['targets']['game_page'])
|
||||
print()
|
||||
|
||||
# 询问用户选择
|
||||
print("请选择运行模式:")
|
||||
print("1. 单次访问测试")
|
||||
print("2. 连续访问模式 (使用配置文件设置)")
|
||||
print("3. 连续访问模式 (自定义参数)")
|
||||
|
||||
choice = input("请输入选择 (1/2/3): ").strip()
|
||||
|
||||
if choice == "1":
|
||||
# 单次访问
|
||||
logger.info("开始单次访问测试")
|
||||
success = bot.run_single_visit()
|
||||
if success:
|
||||
print("✅ 单次访问测试成功!")
|
||||
else:
|
||||
print("❌ 单次访问测试失败!")
|
||||
|
||||
elif choice == "2":
|
||||
# 使用配置文件的连续访问
|
||||
logger.info("开始连续访问(配置文件模式)")
|
||||
success_count = bot.run_continuous()
|
||||
print(f"✅ 连续访问完成!成功: {success_count}/{bot.config['settings']['default_visits']}")
|
||||
|
||||
elif choice == "3":
|
||||
# 自定义参数的连续访问
|
||||
try:
|
||||
visit_count = int(input("请输入访问次数: ").strip())
|
||||
min_delay = int(input("请输入最小延迟秒数: ").strip())
|
||||
max_delay = int(input("请输入最大延迟秒数: ").strip())
|
||||
|
||||
logger.info(f"开始连续访问,总次数: {visit_count}")
|
||||
success_count = bot.run_continuous(
|
||||
total_visits=visit_count,
|
||||
delay_range=(min_delay, max_delay)
|
||||
)
|
||||
|
||||
print(f"✅ 连续访问完成!成功: {success_count}/{visit_count}")
|
||||
|
||||
except ValueError:
|
||||
print("❌ 输入参数错误!")
|
||||
else:
|
||||
print("❌ 无效选择!")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n⚠️ 用户中断执行")
|
||||
except Exception as e:
|
||||
logger.error(f"程序执行出错: {e}")
|
||||
print("❌ 程序执行出错,请检查日志文件 traffic_bot.log")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user