503 lines
19 KiB
Python
503 lines
19 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
网站流量模拟脚本 (纯协议版本)
|
||
用途:测试网站访问流程和性能
|
||
请确保仅用于测试自己的网站!
|
||
不会打开浏览器窗口,完全基于HTTP协议
|
||
"""
|
||
|
||
import requests
|
||
import time
|
||
import random
|
||
import json
|
||
import os
|
||
import logging
|
||
from urllib.parse import urlparse, urljoin
|
||
import re
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.FileHandler('traffic_bot.log', encoding='utf-8'),
|
||
logging.StreamHandler()
|
||
]
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class WebTrafficBotProtocol:
|
||
def __init__(self, config_file='config.json'):
|
||
"""
|
||
初始化纯协议版流量机器人
|
||
|
||
Args:
|
||
config_file (str): 配置文件路径
|
||
"""
|
||
self.config = self.load_config(config_file)
|
||
self.session = None
|
||
|
||
# 用户代理列表
|
||
self.user_agents = [
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
|
||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0",
|
||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/121.0",
|
||
]
|
||
|
||
def load_config(self, config_file):
|
||
"""加载配置文件"""
|
||
try:
|
||
with open(config_file, 'r', encoding='utf-8') as f:
|
||
config = json.load(f)
|
||
logger.info(f"配置文件加载成功: {config_file}")
|
||
return config
|
||
except FileNotFoundError:
|
||
logger.error(f"配置文件未找到: {config_file}")
|
||
raise
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"配置文件格式错误: {e}")
|
||
raise
|
||
|
||
def get_current_ip(self):
|
||
"""获取当前IP地址"""
|
||
ip_services = [
|
||
'https://httpbin.org/ip',
|
||
'https://api.ipify.org?format=json',
|
||
'https://ipinfo.io/json',
|
||
'https://ifconfig.me/ip'
|
||
]
|
||
|
||
for service in ip_services:
|
||
try:
|
||
logger.info(f"正在获取IP地址: {service}")
|
||
response = self.session.get(service, timeout=10)
|
||
|
||
if response.status_code == 200:
|
||
if 'json' in service or 'httpbin' in service or 'ipinfo' in service:
|
||
try:
|
||
data = response.json()
|
||
if 'origin' in data: # httpbin.org
|
||
ip = data['origin']
|
||
elif 'ip' in data: # ipify.org 或 ipinfo.io
|
||
ip = data['ip']
|
||
else:
|
||
ip = str(data)
|
||
except:
|
||
ip = response.text.strip()
|
||
else:
|
||
ip = response.text.strip()
|
||
|
||
logger.info(f"✅ 当前IP地址: {ip}")
|
||
|
||
# 如果配置了代理,显示代理信息
|
||
proxy_config = self.config.get('proxy')
|
||
if proxy_config:
|
||
logger.info(f"🌐 代理服务器: {proxy_config['host']}:{proxy_config['port']}")
|
||
else:
|
||
logger.info("🌐 未使用代理,使用本地IP")
|
||
|
||
return ip
|
||
|
||
except Exception as e:
|
||
logger.warning(f"从 {service} 获取IP失败: {e}")
|
||
continue
|
||
|
||
logger.error("❌ 无法获取当前IP地址")
|
||
return None
|
||
|
||
def setup_session(self):
|
||
"""设置请求会话"""
|
||
self.session = requests.Session()
|
||
|
||
# 设置代理
|
||
proxy_config = self.config.get('proxy')
|
||
if proxy_config:
|
||
proxy_url = f"http://{proxy_config['username']}:{proxy_config['password']}@{proxy_config['host']}:{proxy_config['port']}"
|
||
self.session.proxies = {
|
||
'http': proxy_url,
|
||
'https': proxy_url
|
||
}
|
||
logger.info(f"已配置代理: {proxy_config['host']}:{proxy_config['port']}")
|
||
|
||
# 设置随机用户代理和其他头部
|
||
user_agent = random.choice(self.user_agents)
|
||
self.session.headers.update({
|
||
'User-Agent': user_agent,
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
|
||
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
|
||
'Accept-Encoding': 'gzip, deflate, br',
|
||
'Connection': 'keep-alive',
|
||
'Upgrade-Insecure-Requests': '1',
|
||
'Sec-Fetch-Dest': 'document',
|
||
'Sec-Fetch-Mode': 'navigate',
|
||
'Sec-Fetch-Site': 'none',
|
||
'Sec-Fetch-User': '?1',
|
||
'Cache-Control': 'max-age=0',
|
||
})
|
||
|
||
logger.info(f"使用用户代理: {user_agent}")
|
||
|
||
# 获取并显示当前IP
|
||
current_ip = self.get_current_ip()
|
||
if current_ip:
|
||
print(f"🌍 当前IP地址: {current_ip}")
|
||
|
||
def simulate_github_visit(self):
|
||
"""模拟从GitHub访问"""
|
||
github_referrers = self.config.get('github_referrers', [])
|
||
if not github_referrers:
|
||
return True
|
||
|
||
github_page = random.choice(github_referrers)
|
||
|
||
try:
|
||
logger.info(f"模拟访问GitHub页面: {github_page}")
|
||
|
||
# 更新referrer头
|
||
self.session.headers.update({
|
||
'Referer': github_page,
|
||
'Sec-Fetch-Site': 'cross-site'
|
||
})
|
||
|
||
# 模拟在GitHub停留
|
||
stay_time = random.uniform(2, 8)
|
||
logger.info(f"在GitHub页面停留 {stay_time:.1f} 秒")
|
||
time.sleep(stay_time)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"GitHub访问模拟失败: {e}")
|
||
return False
|
||
|
||
def check_login_required(self, response):
|
||
"""检查是否需要登录"""
|
||
if not response:
|
||
return False
|
||
|
||
content = response.text.lower()
|
||
|
||
# 检查常见的登录页面特征
|
||
login_indicators = [
|
||
'登录', 'login', '用户名', 'username', 'password', '密码',
|
||
'signin', 'sign in', '账号', 'account', '验证码'
|
||
]
|
||
|
||
login_count = sum(1 for indicator in login_indicators if indicator in content)
|
||
|
||
# 如果发现多个登录相关的关键词,可能是登录页面
|
||
if login_count >= 3:
|
||
logger.warning("⚠️ 检测到可能的登录页面")
|
||
logger.info("建议:")
|
||
logger.info("1. 检查网站是否需要登录访问")
|
||
logger.info("2. 如需登录,请在配置文件中添加登录凭据")
|
||
logger.info("3. 或者更换为公开可访问的页面")
|
||
return True
|
||
|
||
return False
|
||
|
||
def make_request(self, url, timeout=10):
|
||
"""发起HTTP请求"""
|
||
try:
|
||
response = self.session.get(url, timeout=timeout, allow_redirects=True)
|
||
|
||
# 记录响应信息
|
||
logger.info(f"访问 {url} - 状态码: {response.status_code}")
|
||
logger.info(f"响应大小: {len(response.content)} 字节")
|
||
|
||
if response.headers.get('content-type'):
|
||
logger.info(f"内容类型: {response.headers.get('content-type')}")
|
||
|
||
# 检查是否被重定向到登录页面
|
||
if response.url != url and 'login' in response.url.lower():
|
||
logger.warning(f"⚠️ 被重定向到登录页面: {response.url}")
|
||
|
||
# 检查页面内容是否为登录页面
|
||
self.check_login_required(response)
|
||
|
||
response.raise_for_status()
|
||
return response
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
logger.error(f"请求失败 {url}: {e}")
|
||
return None
|
||
|
||
def simulate_page_interaction(self, response):
|
||
"""模拟页面交互(解析页面内容,模拟点击等)"""
|
||
if not response:
|
||
return
|
||
|
||
try:
|
||
# 模拟解析页面内容
|
||
content = response.text
|
||
|
||
# 查找页面中的链接
|
||
links = re.findall(r'href=["\'](.*?)["\']', content)
|
||
if links:
|
||
# 随机选择一些链接进行"预加载"模拟
|
||
random_links = random.sample(links[:10], min(3, len(links)))
|
||
for link in random_links:
|
||
if link.startswith('http') or link.startswith('/'):
|
||
if not link.startswith('http'):
|
||
link = urljoin(response.url, link)
|
||
|
||
# 30%概率模拟点击链接
|
||
if random.random() < 0.3:
|
||
logger.info(f"模拟预加载链接: {link}")
|
||
# 这里不实际请求,只是记录行为
|
||
time.sleep(random.uniform(0.5, 1.5))
|
||
|
||
# 模拟页面停留时的一些行为
|
||
time.sleep(random.uniform(1, 3))
|
||
|
||
except Exception as e:
|
||
logger.warning(f"页面交互模拟出错: {e}")
|
||
|
||
def visit_main_site(self):
|
||
"""访问主网站"""
|
||
main_site = self.config['targets']['main_site']
|
||
stay_time_range = self.config['settings']['main_site_stay_time']
|
||
|
||
try:
|
||
logger.info(f"访问主网站: {main_site}")
|
||
|
||
# 发起请求
|
||
response = self.make_request(main_site)
|
||
if not response:
|
||
return False
|
||
|
||
# 模拟页面交互
|
||
self.simulate_page_interaction(response)
|
||
|
||
# 随机停留时间
|
||
stay_time = random.uniform(stay_time_range[0], stay_time_range[1])
|
||
logger.info(f"在主网站停留 {stay_time:.1f} 秒")
|
||
time.sleep(stay_time)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"主网站访问失败: {e}")
|
||
return False
|
||
|
||
def visit_game_page(self):
|
||
"""访问游戏页面"""
|
||
game_page = self.config['targets']['game_page']
|
||
stay_time_range = self.config['settings']['game_page_stay_time']
|
||
|
||
try:
|
||
logger.info(f"访问游戏页面: {game_page}")
|
||
|
||
# 更新referrer为主站
|
||
main_site = self.config['targets']['main_site']
|
||
self.session.headers.update({
|
||
'Referer': main_site,
|
||
'Sec-Fetch-Site': 'same-origin'
|
||
})
|
||
|
||
# 发起请求
|
||
response = self.make_request(game_page)
|
||
if not response:
|
||
return False
|
||
|
||
# 模拟游戏页面的特殊交互
|
||
self.simulate_game_interaction(response)
|
||
|
||
# 在游戏页面停留更长时间
|
||
stay_time = random.uniform(stay_time_range[0], stay_time_range[1])
|
||
logger.info(f"在游戏页面停留 {stay_time:.1f} 秒")
|
||
|
||
# 模拟游戏过程中的多次请求
|
||
self.simulate_gaming_behavior(stay_time)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"游戏页面访问失败: {e}")
|
||
return False
|
||
|
||
def simulate_game_interaction(self, response):
|
||
"""模拟游戏交互"""
|
||
try:
|
||
logger.info("模拟2048游戏交互")
|
||
|
||
# 模拟游戏开始
|
||
time.sleep(random.uniform(2, 5))
|
||
|
||
# 模拟一些游戏动作(通过延迟模拟)
|
||
game_actions = ["上移", "下移", "左移", "右移"]
|
||
action_count = random.randint(8, 20)
|
||
|
||
for i in range(action_count):
|
||
action = random.choice(game_actions)
|
||
logger.info(f"模拟游戏动作: {action}")
|
||
time.sleep(random.uniform(1, 3))
|
||
|
||
# 偶尔模拟长思考时间
|
||
if random.random() < 0.2:
|
||
think_time = random.uniform(5, 15)
|
||
logger.info(f"模拟思考停顿 {think_time:.1f} 秒")
|
||
time.sleep(think_time)
|
||
|
||
except Exception as e:
|
||
logger.warning(f"游戏交互模拟出错: {e}")
|
||
|
||
def simulate_gaming_behavior(self, total_time):
|
||
"""模拟游戏过程中的行为"""
|
||
try:
|
||
segments = random.randint(3, 6) # 将总时间分成几段
|
||
segment_time = total_time / segments
|
||
|
||
for i in range(segments):
|
||
# 每段时间内的行为
|
||
if i > 0:
|
||
time.sleep(segment_time)
|
||
|
||
# 模拟一些可能的游戏相关请求
|
||
if random.random() < 0.3:
|
||
logger.info("模拟游戏状态检查")
|
||
|
||
if random.random() < 0.1:
|
||
logger.info("模拟分数同步")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"游戏行为模拟出错: {e}")
|
||
|
||
def run_single_visit(self):
|
||
"""执行一次完整的访问流程"""
|
||
logger.info("开始执行访问流程(纯协议模式)")
|
||
|
||
# 设置会话
|
||
self.setup_session()
|
||
|
||
try:
|
||
# 1. 模拟从GitHub访问
|
||
if not self.simulate_github_visit():
|
||
logger.warning("GitHub访问模拟失败,继续执行")
|
||
|
||
# 2. 访问主网站
|
||
if not self.visit_main_site():
|
||
logger.error("主网站访问失败")
|
||
return False
|
||
|
||
# 3. 访问游戏页面
|
||
if not self.visit_game_page():
|
||
logger.error("游戏页面访问失败")
|
||
return False
|
||
|
||
logger.info("访问流程执行成功")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"访问流程执行出错: {e}")
|
||
return False
|
||
|
||
finally:
|
||
# 清理资源
|
||
if self.session:
|
||
self.session.close()
|
||
|
||
def run_continuous(self, total_visits=None, delay_range=None):
|
||
"""连续执行多次访问"""
|
||
if total_visits is None:
|
||
total_visits = self.config['settings']['default_visits']
|
||
|
||
if delay_range is None:
|
||
delay_range = (
|
||
self.config['settings']['min_delay'],
|
||
self.config['settings']['max_delay']
|
||
)
|
||
|
||
success_count = 0
|
||
|
||
for i in range(total_visits):
|
||
logger.info(f"执行第 {i+1}/{total_visits} 次访问")
|
||
|
||
if self.run_single_visit():
|
||
success_count += 1
|
||
|
||
# 随机延迟
|
||
if i < total_visits - 1: # 最后一次不需要延迟
|
||
delay = random.uniform(delay_range[0], delay_range[1])
|
||
logger.info(f"等待 {delay:.1f} 秒后进行下次访问")
|
||
time.sleep(delay)
|
||
|
||
logger.info(f"访问完成,成功: {success_count}/{total_visits}")
|
||
return success_count
|
||
|
||
def main():
|
||
"""主函数"""
|
||
config_file = 'config.json'
|
||
|
||
# 检查配置文件是否存在
|
||
if not os.path.exists(config_file):
|
||
print(f"❌ 配置文件 {config_file} 不存在!")
|
||
print("请先创建配置文件")
|
||
return
|
||
|
||
try:
|
||
# 创建机器人实例
|
||
bot = WebTrafficBotProtocol(config_file)
|
||
|
||
print("=== 网站流量模拟脚本 (纯协议版) ===")
|
||
print("🚀 不会打开浏览器窗口,完全基于HTTP协议")
|
||
print("⚠️ 请确保仅用于测试自己的网站!")
|
||
print("目标网站:", bot.config['targets']['main_site'])
|
||
print("游戏页面:", bot.config['targets']['game_page'])
|
||
print()
|
||
|
||
# 询问用户选择
|
||
print("请选择运行模式:")
|
||
print("1. 单次访问测试")
|
||
print("2. 连续访问模式 (使用配置文件设置)")
|
||
print("3. 连续访问模式 (自定义参数)")
|
||
|
||
choice = input("请输入选择 (1/2/3): ").strip()
|
||
|
||
if choice == "1":
|
||
# 单次访问
|
||
logger.info("开始单次访问测试")
|
||
success = bot.run_single_visit()
|
||
if success:
|
||
print("✅ 单次访问测试成功!")
|
||
else:
|
||
print("❌ 单次访问测试失败!")
|
||
|
||
elif choice == "2":
|
||
# 使用配置文件的连续访问
|
||
logger.info("开始连续访问(配置文件模式)")
|
||
success_count = bot.run_continuous()
|
||
print(f"✅ 连续访问完成!成功: {success_count}/{bot.config['settings']['default_visits']}")
|
||
|
||
elif choice == "3":
|
||
# 自定义参数的连续访问
|
||
try:
|
||
visit_count = int(input("请输入访问次数: ").strip())
|
||
min_delay = int(input("请输入最小延迟秒数: ").strip())
|
||
max_delay = int(input("请输入最大延迟秒数: ").strip())
|
||
|
||
logger.info(f"开始连续访问,总次数: {visit_count}")
|
||
success_count = bot.run_continuous(
|
||
total_visits=visit_count,
|
||
delay_range=(min_delay, max_delay)
|
||
)
|
||
|
||
print(f"✅ 连续访问完成!成功: {success_count}/{visit_count}")
|
||
|
||
except ValueError:
|
||
print("❌ 输入参数错误!")
|
||
else:
|
||
print("❌ 无效选择!")
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n⚠️ 用户中断执行")
|
||
except Exception as e:
|
||
logger.error(f"程序执行出错: {e}")
|
||
print("❌ 程序执行出错,请检查日志文件 traffic_bot.log")
|
||
|
||
if __name__ == "__main__":
|
||
main() |