Files
shualiangv1/website_traffic_bot_config.py
huangzhenpc 1d4f6f8c33 正式2
2025-07-18 09:51:18 +08:00

464 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
网站流量模拟脚本 (配置文件版本)
用途:测试网站访问流程和性能
请确保仅用于测试自己的网站!
"""
import requests
import time
import random
import json
import os
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException
import logging
from urllib.parse import urlparse
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('traffic_bot.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class WebTrafficBotConfig:
def __init__(self, config_file='config.json'):
"""
初始化配置版流量机器人
Args:
config_file (str): 配置文件路径
"""
self.config = self.load_config(config_file)
self.session = None
self.driver = None
# 用户代理列表
self.user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
]
def load_config(self, config_file):
"""加载配置文件"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
logger.info(f"配置文件加载成功: {config_file}")
return config
except FileNotFoundError:
logger.error(f"配置文件未找到: {config_file}")
raise
except json.JSONDecodeError as e:
logger.error(f"配置文件格式错误: {e}")
raise
def get_current_ip(self):
"""获取当前IP地址"""
ip_services = [
'https://httpbin.org/ip',
'https://api.ipify.org?format=json',
'https://ipinfo.io/json',
'https://ifconfig.me/ip'
]
for service in ip_services:
try:
logger.info(f"正在获取IP地址: {service}")
response = self.session.get(service, timeout=10)
if response.status_code == 200:
if 'json' in service or 'httpbin' in service or 'ipinfo' in service:
try:
data = response.json()
if 'origin' in data: # httpbin.org
ip = data['origin']
elif 'ip' in data: # ipify.org 或 ipinfo.io
ip = data['ip']
else:
ip = str(data)
except:
ip = response.text.strip()
else:
ip = response.text.strip()
logger.info(f"✅ 当前IP地址: {ip}")
# 如果配置了代理,显示代理信息
proxy_config = self.config.get('proxy')
if proxy_config:
logger.info(f"🌐 代理服务器: {proxy_config['host']}:{proxy_config['port']}")
else:
logger.info("🌐 未使用代理使用本地IP")
return ip
except Exception as e:
logger.warning(f"{service} 获取IP失败: {e}")
continue
logger.error("❌ 无法获取当前IP地址")
return None
def setup_session(self):
"""设置请求会话"""
self.session = requests.Session()
# 设置代理
proxy_config = self.config.get('proxy')
if proxy_config:
proxy_url = f"http://{proxy_config['username']}:{proxy_config['password']}@{proxy_config['host']}:{proxy_config['port']}"
self.session.proxies = {
'http': proxy_url,
'https': proxy_url
}
# 设置随机用户代理
self.session.headers.update({
'User-Agent': random.choice(self.user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
# 获取并显示当前IP
current_ip = self.get_current_ip()
if current_ip:
print(f"🌍 当前IP地址: {current_ip}")
def setup_selenium_driver(self):
"""设置Selenium WebDriver"""
chrome_options = Options()
# 设置代理
proxy_config = self.config.get('proxy')
if proxy_config:
proxy_url = f"{proxy_config['host']}:{proxy_config['port']}"
chrome_options.add_argument(f'--proxy-server=http://{proxy_url}')
# 其他Chrome选项
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument(f'--user-agent={random.choice(self.user_agents)}')
# 禁用图片加载以提高速度(可选)
prefs = {"profile.managed_default_content_settings.images": 2}
chrome_options.add_experimental_option("prefs", prefs)
try:
self.driver = webdriver.Chrome(options=chrome_options)
return True
except WebDriverException as e:
logger.error(f"WebDriver初始化失败: {e}")
return False
def simulate_github_visit(self):
"""模拟从GitHub访问"""
github_referrers = self.config.get('github_referrers', [])
if not github_referrers:
return True
github_page = random.choice(github_referrers)
try:
logger.info(f"模拟访问GitHub页面: {github_page}")
# 设置referrer
self.session.headers.update({'Referer': github_page})
# 模拟在GitHub停留
stay_time = random.uniform(2, 8)
logger.info(f"在GitHub页面停留 {stay_time:.1f}")
time.sleep(stay_time)
return True
except Exception as e:
logger.error(f"GitHub访问模拟失败: {e}")
return False
def visit_main_site(self):
"""访问主网站"""
main_site = self.config['targets']['main_site']
stay_time_range = self.config['settings']['main_site_stay_time']
try:
logger.info(f"访问主网站: {main_site}")
if self.driver:
# 使用Selenium访问
self.driver.get(main_site)
# 等待页面加载
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# 模拟用户行为
self.simulate_user_behavior()
else:
# 使用requests访问
response = self.session.get(main_site, timeout=10)
response.raise_for_status()
logger.info(f"主网站访问成功,状态码: {response.status_code}")
# 随机停留时间
stay_time = random.uniform(stay_time_range[0], stay_time_range[1])
logger.info(f"在主网站停留 {stay_time:.1f}")
time.sleep(stay_time)
return True
except Exception as e:
logger.error(f"主网站访问失败: {e}")
return False
def visit_game_page(self):
"""访问游戏页面"""
game_page = self.config['targets']['game_page']
stay_time_range = self.config['settings']['game_page_stay_time']
try:
logger.info(f"访问游戏页面: {game_page}")
if self.driver:
# 使用Selenium访问
self.driver.get(game_page)
# 等待页面加载
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# 模拟游戏交互
self.simulate_game_interaction()
else:
# 使用requests访问
response = self.session.get(game_page, timeout=10)
response.raise_for_status()
logger.info(f"游戏页面访问成功,状态码: {response.status_code}")
# 在游戏页面停留时间
stay_time = random.uniform(stay_time_range[0], stay_time_range[1])
logger.info(f"在游戏页面停留 {stay_time:.1f}")
time.sleep(stay_time)
return True
except Exception as e:
logger.error(f"游戏页面访问失败: {e}")
return False
def simulate_user_behavior(self):
"""模拟真实用户行为"""
if not self.driver:
return
try:
# 模拟滚动
scroll_times = random.randint(1, 3)
for _ in range(scroll_times):
scroll_position = random.randint(200, 800)
self.driver.execute_script(f"window.scrollTo(0, {scroll_position});")
time.sleep(random.uniform(1, 3))
# 模拟点击(如果有链接)
try:
links = self.driver.find_elements(By.TAG_NAME, "a")
if links and random.random() < 0.3: # 30%概率点击链接
link = random.choice(links[:5]) # 只考虑前5个链接
if link.is_displayed() and link.is_enabled():
link.click()
time.sleep(random.uniform(1, 3))
self.driver.back() # 返回
except:
pass
except Exception as e:
logger.warning(f"用户行为模拟出错: {e}")
def simulate_game_interaction(self):
"""模拟游戏交互"""
if not self.driver:
return
try:
# 模拟键盘操作2048游戏使用方向键
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
actions = ActionChains(self.driver)
# 模拟几次游戏操作
game_moves = [Keys.ARROW_UP, Keys.ARROW_DOWN, Keys.ARROW_LEFT, Keys.ARROW_RIGHT]
move_count = random.randint(5, 15)
for _ in range(move_count):
move = random.choice(game_moves)
actions.send_keys(move).perform()
time.sleep(random.uniform(0.5, 2))
except Exception as e:
logger.warning(f"游戏交互模拟出错: {e}")
def run_single_visit(self):
"""执行一次完整的访问流程"""
logger.info("开始执行访问流程")
# 设置会话
self.setup_session()
# 随机选择是否使用Selenium
use_selenium = random.random() < 0.7 # 70%概率使用Selenium
if use_selenium:
if not self.setup_selenium_driver():
logger.warning("Selenium设置失败使用requests模式")
use_selenium = False
try:
# 1. 模拟从GitHub访问
if not self.simulate_github_visit():
logger.warning("GitHub访问模拟失败继续执行")
# 2. 访问主网站
if not self.visit_main_site():
logger.error("主网站访问失败")
return False
# 3. 访问游戏页面
if not self.visit_game_page():
logger.error("游戏页面访问失败")
return False
logger.info("访问流程执行成功")
return True
except Exception as e:
logger.error(f"访问流程执行出错: {e}")
return False
finally:
# 清理资源
if self.driver:
self.driver.quit()
if self.session:
self.session.close()
def run_continuous(self, total_visits=None, delay_range=None):
"""连续执行多次访问"""
if total_visits is None:
total_visits = self.config['settings']['default_visits']
if delay_range is None:
delay_range = (
self.config['settings']['min_delay'],
self.config['settings']['max_delay']
)
success_count = 0
for i in range(total_visits):
logger.info(f"执行第 {i+1}/{total_visits} 次访问")
if self.run_single_visit():
success_count += 1
# 随机延迟
if i < total_visits - 1: # 最后一次不需要延迟
delay = random.uniform(delay_range[0], delay_range[1])
logger.info(f"等待 {delay:.1f} 秒后进行下次访问")
time.sleep(delay)
logger.info(f"访问完成,成功: {success_count}/{total_visits}")
return success_count
def main():
"""主函数"""
config_file = 'config.json'
# 检查配置文件是否存在
if not os.path.exists(config_file):
print(f"❌ 配置文件 {config_file} 不存在!")
print("请先创建配置文件或使用 website_traffic_bot.py")
return
try:
# 创建机器人实例
bot = WebTrafficBotConfig(config_file)
print("=== 网站流量模拟脚本 (配置版) ===")
print("⚠️ 请确保仅用于测试自己的网站!")
print("目标网站:", bot.config['targets']['main_site'])
print("游戏页面:", bot.config['targets']['game_page'])
print()
# 询问用户选择
print("请选择运行模式:")
print("1. 单次访问测试")
print("2. 连续访问模式 (使用配置文件设置)")
print("3. 连续访问模式 (自定义参数)")
choice = input("请输入选择 (1/2/3): ").strip()
if choice == "1":
# 单次访问
logger.info("开始单次访问测试")
success = bot.run_single_visit()
if success:
print("✅ 单次访问测试成功!")
else:
print("❌ 单次访问测试失败!")
elif choice == "2":
# 使用配置文件的连续访问
logger.info("开始连续访问(配置文件模式)")
success_count = bot.run_continuous()
print(f"✅ 连续访问完成!成功: {success_count}/{bot.config['settings']['default_visits']}")
elif choice == "3":
# 自定义参数的连续访问
try:
visit_count = int(input("请输入访问次数: ").strip())
min_delay = int(input("请输入最小延迟秒数: ").strip())
max_delay = int(input("请输入最大延迟秒数: ").strip())
logger.info(f"开始连续访问,总次数: {visit_count}")
success_count = bot.run_continuous(
total_visits=visit_count,
delay_range=(min_delay, max_delay)
)
print(f"✅ 连续访问完成!成功: {success_count}/{visit_count}")
except ValueError:
print("❌ 输入参数错误!")
else:
print("❌ 无效选择!")
except KeyboardInterrupt:
print("\n⚠️ 用户中断执行")
except Exception as e:
logger.error(f"程序执行出错: {e}")
print("❌ 程序执行出错,请检查日志文件 traffic_bot.log")
if __name__ == "__main__":
main()