#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ DuoPlus 数据追踪模块 处理 data4.net 的统计追踪请求 """ import requests import json import time import random import string from typing import Dict, Any, Optional from colorama import init, Fore # 初始化 colorama init(autoreset=True) class DataTracker: """Data4.net 数据追踪器""" def __init__(self): self.api_url = "https://api.data4.net/api" self.website_id = "f487e5f1981a4c459c973fa4355b7fab" self.session_id = self._generate_session_id() self.uuid = self._generate_uuid() self.visit_id = None self.tracking_id = None # 设置请求头 self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36', 'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br, zstd', 'Content-Type': 'application/json', 'Origin': 'https://my.duoplus.cn', 'Referer': 'https://my.duoplus.cn/', 'sec-ch-ua': '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-site': 'cross-site', 'sec-fetch-mode': 'cors', 'sec-fetch-dest': 'empty', 'pragma': 'no-cache', 'cache-control': 'no-cache', 'priority': 'u=1, i' } def _generate_session_id(self) -> str: """生成会话ID""" return ''.join(random.choices(string.ascii_lowercase + string.digits, k=32)) def _generate_uuid(self) -> str: """生成UUID""" return ''.join(random.choices(string.ascii_lowercase + string.digits, k=32)) def send_initial_event(self, page_url: str = "/sign-up") -> bool: """ 发送初始事件 Args: page_url: 页面URL Returns: 是否成功 """ url = f"{self.api_url}/report/send" payload = { "type": "event", "payload": { "website": self.website_id, "hostname": "my.duoplus.cn", "screen": "2048x1152", "language": "zh-CN", "title": "DuoPlus 云手机", "url": page_url, "referrer": "my.duoplus.cn", "session_id": self.session_id, "uuid": self.uuid } } try: response = requests.post(url, json=payload, headers=self.headers) if response.status_code == 200: data = response.json() if data.get("code") == 200: self.session_id = data["data"]["session_id"] self.visit_id = data["data"]["visit_id"] self.tracking_id = data["data"]["id"] print(f"{Fore.GREEN}[Tracker] 初始事件发送成功") return True print(f"{Fore.YELLOW}[Tracker] 初始事件发送失败") return False except Exception as e: print(f"{Fore.YELLOW}[Tracker] 初始事件发送异常: {str(e)}") return False def send_pageview(self, page_url: str = "/sign-up") -> bool: """ 发送页面浏览事件 Args: page_url: 页面URL Returns: 是否成功 """ url = f"{self.api_url}/report/event" payload = { "payload": { "website": self.website_id, "hostname": "my.duoplus.cn", "screen": "2048x1152", "language": "zh-CN", "title": "DuoPlus 云手机", "url": page_url, "referrer": "my.duoplus.cn", "client_time": int(time.time()), "scroll_depth_ratio": 0, "session_id": self.session_id, "uuid": self.uuid }, "type": "pageview" } try: response = requests.post(url, json=payload, headers=self.headers) if response.status_code == 200: data = response.json() if data.get("code") == 200: print(f"{Fore.GREEN}[Tracker] 页面浏览事件发送成功") return True print(f"{Fore.YELLOW}[Tracker] 页面浏览事件发送失败") return False except Exception as e: print(f"{Fore.YELLOW}[Tracker] 页面浏览事件发送异常: {str(e)}") return False def send_view_time(self, view_time: int = 10) -> bool: """ 发送页面停留时间 Args: view_time: 停留时间(秒) Returns: 是否成功 """ if not self.visit_id or not self.tracking_id: print(f"{Fore.YELLOW}[Tracker] 缺少必要的ID,跳过发送停留时间") return False url = f"{self.api_url}/report/view_time" payload = { "website_id": self.website_id, "view_time": view_time, "id": self.tracking_id, "session_id": self.session_id, "visit_id": self.visit_id } try: response = requests.post(url, json=payload, headers=self.headers) if response.status_code == 200: data = response.json() if data.get("code") == 200: print(f"{Fore.GREEN}[Tracker] 停留时间发送成功: {view_time}秒") return True print(f"{Fore.YELLOW}[Tracker] 停留时间发送失败") return False except Exception as e: print(f"{Fore.YELLOW}[Tracker] 停留时间发送异常: {str(e)}") return False def track_registration_page(self) -> bool: """ 追踪注册页面访问 Returns: 是否成功 """ print(f"{Fore.CYAN}[Tracker] 开始追踪注册页面...") # 发送初始事件 if not self.send_initial_event("/sign-up"): print(f"{Fore.YELLOW}[Tracker] 初始事件发送失败,继续执行") # 发送页面浏览事件 time.sleep(0.5) if not self.send_pageview("/sign-up"): print(f"{Fore.YELLOW}[Tracker] 页面浏览事件发送失败,继续执行") # 模拟页面停留时间 time.sleep(1) if not self.send_view_time(5): print(f"{Fore.YELLOW}[Tracker] 停留时间发送失败,继续执行") print(f"{Fore.GREEN}[Tracker] 页面追踪完成") return True