214 lines
7.0 KiB
Python
214 lines
7.0 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
DuoPlus 数据追踪模块
|
||
处理 data4.net 的统计追踪请求
|
||
"""
|
||
|
||
import requests
|
||
import json
|
||
import time
|
||
import random
|
||
import string
|
||
from typing import Dict, Any, Optional
|
||
from colorama import init, Fore
|
||
|
||
# 初始化 colorama
|
||
init(autoreset=True)
|
||
|
||
class DataTracker:
|
||
"""Data4.net 数据追踪器"""
|
||
|
||
def __init__(self):
|
||
self.api_url = "https://api.data4.net/api"
|
||
self.website_id = "f487e5f1981a4c459c973fa4355b7fab"
|
||
self.session_id = self._generate_session_id()
|
||
self.uuid = self._generate_uuid()
|
||
self.visit_id = None
|
||
self.tracking_id = None
|
||
|
||
# 设置请求头
|
||
self.headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
|
||
'Accept': '*/*',
|
||
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
|
||
'Accept-Encoding': 'gzip, deflate, br, zstd',
|
||
'Content-Type': 'application/json',
|
||
'Origin': 'https://my.duoplus.cn',
|
||
'Referer': 'https://my.duoplus.cn/',
|
||
'sec-ch-ua': '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
|
||
'sec-ch-ua-mobile': '?0',
|
||
'sec-ch-ua-platform': '"Windows"',
|
||
'sec-fetch-site': 'cross-site',
|
||
'sec-fetch-mode': 'cors',
|
||
'sec-fetch-dest': 'empty',
|
||
'pragma': 'no-cache',
|
||
'cache-control': 'no-cache',
|
||
'priority': 'u=1, i'
|
||
}
|
||
|
||
def _generate_session_id(self) -> str:
|
||
"""生成会话ID"""
|
||
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=32))
|
||
|
||
def _generate_uuid(self) -> str:
|
||
"""生成UUID"""
|
||
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=32))
|
||
|
||
def send_initial_event(self, page_url: str = "/sign-up") -> bool:
|
||
"""
|
||
发送初始事件
|
||
|
||
Args:
|
||
page_url: 页面URL
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
url = f"{self.api_url}/report/send"
|
||
|
||
payload = {
|
||
"type": "event",
|
||
"payload": {
|
||
"website": self.website_id,
|
||
"hostname": "my.duoplus.cn",
|
||
"screen": "2048x1152",
|
||
"language": "zh-CN",
|
||
"title": "DuoPlus 云手机",
|
||
"url": page_url,
|
||
"referrer": "my.duoplus.cn",
|
||
"session_id": self.session_id,
|
||
"uuid": self.uuid
|
||
}
|
||
}
|
||
|
||
try:
|
||
response = requests.post(url, json=payload, headers=self.headers)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get("code") == 200:
|
||
self.session_id = data["data"]["session_id"]
|
||
self.visit_id = data["data"]["visit_id"]
|
||
self.tracking_id = data["data"]["id"]
|
||
print(f"{Fore.GREEN}[Tracker] 初始事件发送成功")
|
||
return True
|
||
|
||
print(f"{Fore.YELLOW}[Tracker] 初始事件发送失败")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"{Fore.YELLOW}[Tracker] 初始事件发送异常: {str(e)}")
|
||
return False
|
||
|
||
def send_pageview(self, page_url: str = "/sign-up") -> bool:
|
||
"""
|
||
发送页面浏览事件
|
||
|
||
Args:
|
||
page_url: 页面URL
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
url = f"{self.api_url}/report/event"
|
||
|
||
payload = {
|
||
"payload": {
|
||
"website": self.website_id,
|
||
"hostname": "my.duoplus.cn",
|
||
"screen": "2048x1152",
|
||
"language": "zh-CN",
|
||
"title": "DuoPlus 云手机",
|
||
"url": page_url,
|
||
"referrer": "my.duoplus.cn",
|
||
"client_time": int(time.time()),
|
||
"scroll_depth_ratio": 0,
|
||
"session_id": self.session_id,
|
||
"uuid": self.uuid
|
||
},
|
||
"type": "pageview"
|
||
}
|
||
|
||
try:
|
||
response = requests.post(url, json=payload, headers=self.headers)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get("code") == 200:
|
||
print(f"{Fore.GREEN}[Tracker] 页面浏览事件发送成功")
|
||
return True
|
||
|
||
print(f"{Fore.YELLOW}[Tracker] 页面浏览事件发送失败")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"{Fore.YELLOW}[Tracker] 页面浏览事件发送异常: {str(e)}")
|
||
return False
|
||
|
||
def send_view_time(self, view_time: int = 10) -> bool:
|
||
"""
|
||
发送页面停留时间
|
||
|
||
Args:
|
||
view_time: 停留时间(秒)
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
if not self.visit_id or not self.tracking_id:
|
||
print(f"{Fore.YELLOW}[Tracker] 缺少必要的ID,跳过发送停留时间")
|
||
return False
|
||
|
||
url = f"{self.api_url}/report/view_time"
|
||
|
||
payload = {
|
||
"website_id": self.website_id,
|
||
"view_time": view_time,
|
||
"id": self.tracking_id,
|
||
"session_id": self.session_id,
|
||
"visit_id": self.visit_id
|
||
}
|
||
|
||
try:
|
||
response = requests.post(url, json=payload, headers=self.headers)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get("code") == 200:
|
||
print(f"{Fore.GREEN}[Tracker] 停留时间发送成功: {view_time}秒")
|
||
return True
|
||
|
||
print(f"{Fore.YELLOW}[Tracker] 停留时间发送失败")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"{Fore.YELLOW}[Tracker] 停留时间发送异常: {str(e)}")
|
||
return False
|
||
|
||
def track_registration_page(self) -> bool:
|
||
"""
|
||
追踪注册页面访问
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
print(f"{Fore.CYAN}[Tracker] 开始追踪注册页面...")
|
||
|
||
# 发送初始事件
|
||
if not self.send_initial_event("/sign-up"):
|
||
print(f"{Fore.YELLOW}[Tracker] 初始事件发送失败,继续执行")
|
||
|
||
# 发送页面浏览事件
|
||
time.sleep(0.5)
|
||
if not self.send_pageview("/sign-up"):
|
||
print(f"{Fore.YELLOW}[Tracker] 页面浏览事件发送失败,继续执行")
|
||
|
||
# 模拟页面停留时间
|
||
time.sleep(1)
|
||
if not self.send_view_time(5):
|
||
print(f"{Fore.YELLOW}[Tracker] 停留时间发送失败,继续执行")
|
||
|
||
print(f"{Fore.GREEN}[Tracker] 页面追踪完成")
|
||
return True |