Files
shualiangv1/website_traffic_bot_protocol.py
huangzhenpc 1d4f6f8c33 正式2
2025-07-18 09:51:18 +08:00

503 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
网站流量模拟脚本 (纯协议版本)
用途:测试网站访问流程和性能
请确保仅用于测试自己的网站!
不会打开浏览器窗口完全基于HTTP协议
"""
import requests
import time
import random
import json
import os
import logging
from urllib.parse import urlparse, urljoin
import re
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('traffic_bot.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class WebTrafficBotProtocol:
def __init__(self, config_file='config.json'):
"""
初始化纯协议版流量机器人
Args:
config_file (str): 配置文件路径
"""
self.config = self.load_config(config_file)
self.session = None
# 用户代理列表
self.user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/121.0",
]
def load_config(self, config_file):
"""加载配置文件"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
logger.info(f"配置文件加载成功: {config_file}")
return config
except FileNotFoundError:
logger.error(f"配置文件未找到: {config_file}")
raise
except json.JSONDecodeError as e:
logger.error(f"配置文件格式错误: {e}")
raise
def get_current_ip(self):
"""获取当前IP地址"""
ip_services = [
'https://httpbin.org/ip',
'https://api.ipify.org?format=json',
'https://ipinfo.io/json',
'https://ifconfig.me/ip'
]
for service in ip_services:
try:
logger.info(f"正在获取IP地址: {service}")
response = self.session.get(service, timeout=10)
if response.status_code == 200:
if 'json' in service or 'httpbin' in service or 'ipinfo' in service:
try:
data = response.json()
if 'origin' in data: # httpbin.org
ip = data['origin']
elif 'ip' in data: # ipify.org 或 ipinfo.io
ip = data['ip']
else:
ip = str(data)
except:
ip = response.text.strip()
else:
ip = response.text.strip()
logger.info(f"✅ 当前IP地址: {ip}")
# 如果配置了代理,显示代理信息
proxy_config = self.config.get('proxy')
if proxy_config:
logger.info(f"🌐 代理服务器: {proxy_config['host']}:{proxy_config['port']}")
else:
logger.info("🌐 未使用代理使用本地IP")
return ip
except Exception as e:
logger.warning(f"{service} 获取IP失败: {e}")
continue
logger.error("❌ 无法获取当前IP地址")
return None
def setup_session(self):
"""设置请求会话"""
self.session = requests.Session()
# 设置代理
proxy_config = self.config.get('proxy')
if proxy_config:
proxy_url = f"http://{proxy_config['username']}:{proxy_config['password']}@{proxy_config['host']}:{proxy_config['port']}"
self.session.proxies = {
'http': proxy_url,
'https': proxy_url
}
logger.info(f"已配置代理: {proxy_config['host']}:{proxy_config['port']}")
# 设置随机用户代理和其他头部
user_agent = random.choice(self.user_agents)
self.session.headers.update({
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Cache-Control': 'max-age=0',
})
logger.info(f"使用用户代理: {user_agent}")
# 获取并显示当前IP
current_ip = self.get_current_ip()
if current_ip:
print(f"🌍 当前IP地址: {current_ip}")
def simulate_github_visit(self):
"""模拟从GitHub访问"""
github_referrers = self.config.get('github_referrers', [])
if not github_referrers:
return True
github_page = random.choice(github_referrers)
try:
logger.info(f"模拟访问GitHub页面: {github_page}")
# 更新referrer头
self.session.headers.update({
'Referer': github_page,
'Sec-Fetch-Site': 'cross-site'
})
# 模拟在GitHub停留
stay_time = random.uniform(2, 8)
logger.info(f"在GitHub页面停留 {stay_time:.1f}")
time.sleep(stay_time)
return True
except Exception as e:
logger.error(f"GitHub访问模拟失败: {e}")
return False
def check_login_required(self, response):
"""检查是否需要登录"""
if not response:
return False
content = response.text.lower()
# 检查常见的登录页面特征
login_indicators = [
'登录', 'login', '用户名', 'username', 'password', '密码',
'signin', 'sign in', '账号', 'account', '验证码'
]
login_count = sum(1 for indicator in login_indicators if indicator in content)
# 如果发现多个登录相关的关键词,可能是登录页面
if login_count >= 3:
logger.warning("⚠️ 检测到可能的登录页面")
logger.info("建议:")
logger.info("1. 检查网站是否需要登录访问")
logger.info("2. 如需登录,请在配置文件中添加登录凭据")
logger.info("3. 或者更换为公开可访问的页面")
return True
return False
def make_request(self, url, timeout=10):
"""发起HTTP请求"""
try:
response = self.session.get(url, timeout=timeout, allow_redirects=True)
# 记录响应信息
logger.info(f"访问 {url} - 状态码: {response.status_code}")
logger.info(f"响应大小: {len(response.content)} 字节")
if response.headers.get('content-type'):
logger.info(f"内容类型: {response.headers.get('content-type')}")
# 检查是否被重定向到登录页面
if response.url != url and 'login' in response.url.lower():
logger.warning(f"⚠️ 被重定向到登录页面: {response.url}")
# 检查页面内容是否为登录页面
self.check_login_required(response)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
logger.error(f"请求失败 {url}: {e}")
return None
def simulate_page_interaction(self, response):
"""模拟页面交互(解析页面内容,模拟点击等)"""
if not response:
return
try:
# 模拟解析页面内容
content = response.text
# 查找页面中的链接
links = re.findall(r'href=["\'](.*?)["\']', content)
if links:
# 随机选择一些链接进行"预加载"模拟
random_links = random.sample(links[:10], min(3, len(links)))
for link in random_links:
if link.startswith('http') or link.startswith('/'):
if not link.startswith('http'):
link = urljoin(response.url, link)
# 30%概率模拟点击链接
if random.random() < 0.3:
logger.info(f"模拟预加载链接: {link}")
# 这里不实际请求,只是记录行为
time.sleep(random.uniform(0.5, 1.5))
# 模拟页面停留时的一些行为
time.sleep(random.uniform(1, 3))
except Exception as e:
logger.warning(f"页面交互模拟出错: {e}")
def visit_main_site(self):
"""访问主网站"""
main_site = self.config['targets']['main_site']
stay_time_range = self.config['settings']['main_site_stay_time']
try:
logger.info(f"访问主网站: {main_site}")
# 发起请求
response = self.make_request(main_site)
if not response:
return False
# 模拟页面交互
self.simulate_page_interaction(response)
# 随机停留时间
stay_time = random.uniform(stay_time_range[0], stay_time_range[1])
logger.info(f"在主网站停留 {stay_time:.1f}")
time.sleep(stay_time)
return True
except Exception as e:
logger.error(f"主网站访问失败: {e}")
return False
def visit_game_page(self):
"""访问游戏页面"""
game_page = self.config['targets']['game_page']
stay_time_range = self.config['settings']['game_page_stay_time']
try:
logger.info(f"访问游戏页面: {game_page}")
# 更新referrer为主站
main_site = self.config['targets']['main_site']
self.session.headers.update({
'Referer': main_site,
'Sec-Fetch-Site': 'same-origin'
})
# 发起请求
response = self.make_request(game_page)
if not response:
return False
# 模拟游戏页面的特殊交互
self.simulate_game_interaction(response)
# 在游戏页面停留更长时间
stay_time = random.uniform(stay_time_range[0], stay_time_range[1])
logger.info(f"在游戏页面停留 {stay_time:.1f}")
# 模拟游戏过程中的多次请求
self.simulate_gaming_behavior(stay_time)
return True
except Exception as e:
logger.error(f"游戏页面访问失败: {e}")
return False
def simulate_game_interaction(self, response):
"""模拟游戏交互"""
try:
logger.info("模拟2048游戏交互")
# 模拟游戏开始
time.sleep(random.uniform(2, 5))
# 模拟一些游戏动作(通过延迟模拟)
game_actions = ["上移", "下移", "左移", "右移"]
action_count = random.randint(8, 20)
for i in range(action_count):
action = random.choice(game_actions)
logger.info(f"模拟游戏动作: {action}")
time.sleep(random.uniform(1, 3))
# 偶尔模拟长思考时间
if random.random() < 0.2:
think_time = random.uniform(5, 15)
logger.info(f"模拟思考停顿 {think_time:.1f}")
time.sleep(think_time)
except Exception as e:
logger.warning(f"游戏交互模拟出错: {e}")
def simulate_gaming_behavior(self, total_time):
"""模拟游戏过程中的行为"""
try:
segments = random.randint(3, 6) # 将总时间分成几段
segment_time = total_time / segments
for i in range(segments):
# 每段时间内的行为
if i > 0:
time.sleep(segment_time)
# 模拟一些可能的游戏相关请求
if random.random() < 0.3:
logger.info("模拟游戏状态检查")
if random.random() < 0.1:
logger.info("模拟分数同步")
except Exception as e:
logger.warning(f"游戏行为模拟出错: {e}")
def run_single_visit(self):
"""执行一次完整的访问流程"""
logger.info("开始执行访问流程(纯协议模式)")
# 设置会话
self.setup_session()
try:
# 1. 模拟从GitHub访问
if not self.simulate_github_visit():
logger.warning("GitHub访问模拟失败继续执行")
# 2. 访问主网站
if not self.visit_main_site():
logger.error("主网站访问失败")
return False
# 3. 访问游戏页面
if not self.visit_game_page():
logger.error("游戏页面访问失败")
return False
logger.info("访问流程执行成功")
return True
except Exception as e:
logger.error(f"访问流程执行出错: {e}")
return False
finally:
# 清理资源
if self.session:
self.session.close()
def run_continuous(self, total_visits=None, delay_range=None):
"""连续执行多次访问"""
if total_visits is None:
total_visits = self.config['settings']['default_visits']
if delay_range is None:
delay_range = (
self.config['settings']['min_delay'],
self.config['settings']['max_delay']
)
success_count = 0
for i in range(total_visits):
logger.info(f"执行第 {i+1}/{total_visits} 次访问")
if self.run_single_visit():
success_count += 1
# 随机延迟
if i < total_visits - 1: # 最后一次不需要延迟
delay = random.uniform(delay_range[0], delay_range[1])
logger.info(f"等待 {delay:.1f} 秒后进行下次访问")
time.sleep(delay)
logger.info(f"访问完成,成功: {success_count}/{total_visits}")
return success_count
def main():
"""主函数"""
config_file = 'config.json'
# 检查配置文件是否存在
if not os.path.exists(config_file):
print(f"❌ 配置文件 {config_file} 不存在!")
print("请先创建配置文件")
return
try:
# 创建机器人实例
bot = WebTrafficBotProtocol(config_file)
print("=== 网站流量模拟脚本 (纯协议版) ===")
print("🚀 不会打开浏览器窗口完全基于HTTP协议")
print("⚠️ 请确保仅用于测试自己的网站!")
print("目标网站:", bot.config['targets']['main_site'])
print("游戏页面:", bot.config['targets']['game_page'])
print()
# 询问用户选择
print("请选择运行模式:")
print("1. 单次访问测试")
print("2. 连续访问模式 (使用配置文件设置)")
print("3. 连续访问模式 (自定义参数)")
choice = input("请输入选择 (1/2/3): ").strip()
if choice == "1":
# 单次访问
logger.info("开始单次访问测试")
success = bot.run_single_visit()
if success:
print("✅ 单次访问测试成功!")
else:
print("❌ 单次访问测试失败!")
elif choice == "2":
# 使用配置文件的连续访问
logger.info("开始连续访问(配置文件模式)")
success_count = bot.run_continuous()
print(f"✅ 连续访问完成!成功: {success_count}/{bot.config['settings']['default_visits']}")
elif choice == "3":
# 自定义参数的连续访问
try:
visit_count = int(input("请输入访问次数: ").strip())
min_delay = int(input("请输入最小延迟秒数: ").strip())
max_delay = int(input("请输入最大延迟秒数: ").strip())
logger.info(f"开始连续访问,总次数: {visit_count}")
success_count = bot.run_continuous(
total_visits=visit_count,
delay_range=(min_delay, max_delay)
)
print(f"✅ 连续访问完成!成功: {success_count}/{visit_count}")
except ValueError:
print("❌ 输入参数错误!")
else:
print("❌ 无效选择!")
except KeyboardInterrupt:
print("\n⚠️ 用户中断执行")
except Exception as e:
logger.error(f"程序执行出错: {e}")
print("❌ 程序执行出错,请检查日志文件 traffic_bot.log")
if __name__ == "__main__":
main()