""" Cursor 官方用量 API 服务 用于验证账号有效性和查询用量信息 """ import httpx import asyncio from typing import Optional, Dict, Any, Tuple, List from dataclasses import dataclass from datetime import datetime @dataclass class CursorUsageInfo: """Cursor 用量信息""" is_valid: bool = False # 账号是否有效 error_message: Optional[str] = None # 错误信息 # 用户信息 user_id: Optional[int] = None email: Optional[str] = None team_id: Optional[int] = None is_enterprise: bool = False # 会员信息 membership_type: str = "free" # free, free_trial, pro, business billing_cycle_start: Optional[str] = None billing_cycle_end: Optional[str] = None days_remaining_on_trial: Optional[int] = None # 试用剩余天数 (free_trial) # 套餐用量 plan_used: int = 0 plan_limit: int = 0 plan_remaining: int = 0 # Token 用量 total_input_tokens: int = 0 total_output_tokens: int = 0 total_cache_read_tokens: int = 0 total_cost_cents: float = 0.0 # 请求次数 total_requests: int = 0 # totalUsageEventsCount @property def pool_type(self) -> str: """ 判断账号应归入哪个号池 - 'pro': Pro池 (free_trial, pro, business) - 'auto': Auto池 (free) """ if self.membership_type in ('free_trial', 'pro', 'business'): return 'pro' return 'auto' @property def is_pro_trial(self) -> bool: """是否为 Pro 试用账号""" return self.membership_type == 'free_trial' @property def is_usable(self) -> bool: """账号是否可用 (有效且有剩余额度)""" if not self.is_valid: return False # Pro试用和Pro需要检查剩余额度 if self.pool_type == 'pro': return self.plan_remaining > 0 # Auto池 free账号始终可用 return True class CursorUsageService: """Cursor 用量查询服务""" BASE_URL = "https://cursor.com" TIMEOUT = 15.0 def __init__(self): self.headers = { "accept": "*/*", "content-type": "application/json", "origin": "https://cursor.com", "referer": "https://cursor.com/dashboard", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } def _get_cookie_header(self, token: str) -> str: """构造 Cookie Header""" # 支持直接传 token 或完整 cookie if token.startswith("WorkosCursorSessionToken="): return token return f"WorkosCursorSessionToken={token}" async def _request( self, method: str, path: str, token: str, json_data: Optional[Dict] = None ) -> Dict[str, Any]: """发送请求""" headers = {**self.headers, "Cookie": self._get_cookie_header(token)} async with httpx.AsyncClient(timeout=self.TIMEOUT) as client: if method == "GET": resp = await client.get(f"{self.BASE_URL}{path}", headers=headers) else: resp = await client.post( f"{self.BASE_URL}{path}", headers=headers, json=json_data or {} ) if resp.status_code == 200: return {"success": True, "data": resp.json()} elif resp.status_code in [401, 403]: return {"success": False, "error": "认证失败,Token 无效或已过期"} else: return {"success": False, "error": f"请求失败: {resp.status_code}"} async def get_usage_summary(self, token: str) -> Dict[str, Any]: """获取用量摘要""" return await self._request("GET", "/api/usage-summary", token) async def get_billing_cycle(self, token: str) -> Dict[str, Any]: """获取当前计费周期""" return await self._request("POST", "/api/dashboard/get-current-billing-cycle", token, {}) async def get_filtered_usage( self, token: str, start_date_ms: str, end_date_ms: str, page: int = 1, page_size: int = 100 ) -> Dict[str, Any]: """获取过滤后的使用事件""" return await self._request( "POST", "/api/dashboard/get-filtered-usage-events", token, { "startDate": start_date_ms, "endDate": end_date_ms, "page": page, "pageSize": page_size } ) async def get_aggregated_usage( self, token: str, start_date_ms: int, team_id: Optional[int] = None ) -> Dict[str, Any]: """获取聚合使用事件""" data = {"startDate": start_date_ms} if team_id: data["teamId"] = team_id return await self._request( "POST", "/api/dashboard/get-aggregated-usage-events", token, data ) async def validate_and_get_usage(self, token: str) -> CursorUsageInfo: """ 验证账号并获取完整用量信息 这是主要的对外接口 """ result = CursorUsageInfo() try: # 1. 获取用量摘要 (验证 token 有效性) summary_resp = await self.get_usage_summary(token) if not summary_resp["success"]: result.error_message = summary_resp["error"] return result summary = summary_resp["data"] result.is_valid = True result.membership_type = summary.get("membershipType", "free") result.billing_cycle_start = summary.get("billingCycleStart") result.billing_cycle_end = summary.get("billingCycleEnd") result.days_remaining_on_trial = summary.get("daysRemainingOnTrial") # 试用剩余天数 # 套餐用量 individual = summary.get("individualUsage", {}) plan = individual.get("plan", {}) result.plan_used = plan.get("used", 0) result.plan_limit = plan.get("limit", 0) result.plan_remaining = plan.get("remaining", 0) # 2. 获取计费周期 billing_resp = await self.get_billing_cycle(token) if billing_resp["success"]: billing = billing_resp["data"] start_ms = billing.get("startDateEpochMillis", "0") end_ms = billing.get("endDateEpochMillis", "0") # 3. 获取请求次数 (totalUsageEventsCount) filtered_resp = await self.get_filtered_usage(token, start_ms, end_ms, 1, 1) if filtered_resp["success"]: filtered = filtered_resp["data"] result.total_requests = filtered.get("totalUsageEventsCount", 0) # 4. 获取 Token 用量 aggregated_resp = await self.get_aggregated_usage(token, int(start_ms)) if aggregated_resp["success"]: agg = aggregated_resp["data"] result.total_input_tokens = int(agg.get("totalInputTokens", "0")) result.total_output_tokens = int(agg.get("totalOutputTokens", "0")) result.total_cache_read_tokens = int(agg.get("totalCacheReadTokens", "0")) result.total_cost_cents = agg.get("totalCostCents", 0.0) return result except httpx.TimeoutException: result.error_message = "请求超时" return result except Exception as e: result.error_message = f"请求异常: {str(e)}" return result def validate_and_get_usage_sync(self, token: str) -> CursorUsageInfo: """同步版本的验证和获取用量""" return asyncio.run(self.validate_and_get_usage(token)) # 单例 cursor_usage_service = CursorUsageService() # ============ 便捷函数 ============ async def check_account_valid(token: str) -> Tuple[bool, Optional[str]]: """ 检查账号是否有效 返回: (是否有效, 错误信息) """ try: resp = await cursor_usage_service.get_usage_summary(token) if resp["success"]: return True, None return False, resp["error"] except Exception as e: return False, str(e) async def get_account_usage(token: str) -> Dict[str, Any]: """ 获取账号用量信息 返回格式化的用量数据 """ info = await cursor_usage_service.validate_and_get_usage(token) if not info.is_valid: return { "success": False, "error": info.error_message } return { "success": True, "data": { "membership_type": info.membership_type, "pool_type": info.pool_type, # 号池类型: pro/auto "is_pro_trial": info.is_pro_trial, # 是否Pro试用 "is_usable": info.is_usable, # 是否可用 "days_remaining_on_trial": info.days_remaining_on_trial, "billing_cycle": { "start": info.billing_cycle_start, "end": info.billing_cycle_end }, "plan_usage": { "used": info.plan_used, "limit": info.plan_limit, "remaining": info.plan_remaining }, "token_usage": { "input_tokens": info.total_input_tokens, "output_tokens": info.total_output_tokens, "cache_read_tokens": info.total_cache_read_tokens, "total_cost_usd": round(info.total_cost_cents / 100, 4) }, "total_requests": info.total_requests } } async def batch_check_accounts(tokens: List[str]) -> List[Dict[str, Any]]: """ 批量检查多个账号 """ results = [] for token in tokens: info = await cursor_usage_service.validate_and_get_usage(token) results.append({ "token": token[:20] + "...", # 脱敏 "is_valid": info.is_valid, "is_usable": info.is_usable, "pool_type": info.pool_type, # pro/auto "membership_type": info.membership_type if info.is_valid else None, "days_remaining_on_trial": info.days_remaining_on_trial, "plan_used": info.plan_used if info.is_valid else 0, "plan_limit": info.plan_limit if info.is_valid else 0, "plan_remaining": info.plan_remaining if info.is_valid else 0, "total_requests": info.total_requests if info.is_valid else 0, "error": info.error_message }) return results async def check_and_classify_account(token: str) -> Dict[str, Any]: """ 检查账号并分类到对应号池 返回账号信息和推荐的号池 """ info = await cursor_usage_service.validate_and_get_usage(token) if not info.is_valid: return { "success": False, "error": info.error_message } return { "success": True, "pool_type": info.pool_type, # 'pro' 或 'auto' "is_usable": info.is_usable, # 是否可用 "membership_type": info.membership_type, "is_pro_trial": info.is_pro_trial, "plan_remaining": info.plan_remaining, "total_requests": info.total_requests, "recommendation": f"建议放入 {'Pro' if info.pool_type == 'pro' else 'Auto'} 号池" }