import os import sys import sqlite3 import requests import urllib3 from typing import Dict, Tuple, Optional from datetime import datetime from config import config from logger import logger class CursorTokenRefresher: """Cursor Token 刷新器 - 简化版""" def __init__(self): """初始化刷新器""" # 设置数据库路径 if sys.platform == "win32": appdata = os.getenv("APPDATA") if appdata is None: raise EnvironmentError("APPDATA 环境变量未设置") self.db_path = os.path.join(appdata, "Cursor", "User", "globalStorage", "state.vscdb") else: raise NotImplementedError(f"暂不支持的操作系统: {sys.platform}") # 获取日志记录器 self.logger = logger.get_logger("TokenRefresh") def update_auth(self, email: Optional[str] = None, access_token: Optional[str] = None, refresh_token: Optional[str] = None) -> bool: """ 更新SQLite数据库中的认证信息 :param email: 可选,新的邮箱地址 :param access_token: 可选,新的访问令牌 :param refresh_token: 可选,新的刷新令牌 :return: 是否成功更新 """ updates = [] # 登录状态 updates.append(("cursorAuth/cachedSignUpType", "Auth_0")) if email is not None: updates.append(("cursorAuth/cachedEmail", email)) if access_token is not None: updates.append(("cursorAuth/accessToken", access_token)) if refresh_token is not None: updates.append(("cursorAuth/refreshToken", refresh_token)) if not updates: self.logger.warning("没有提供任何要更新的值") return False conn = None try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() for key, value in updates: # 如果没有更新任何行,说明key不存在,执行插入 # 检查 key 是否存在 check_query = "SELECT COUNT(*) FROM itemTable WHERE key = ?" cursor.execute(check_query, (key,)) if cursor.fetchone()[0] == 0: insert_query = "INSERT INTO itemTable (key, value) VALUES (?, ?)" cursor.execute(insert_query, (key, value)) self.logger.info(f"已插入新记录: {key.split('/')[-1]}") else: update_query = "UPDATE itemTable SET value = ? WHERE key = ?" cursor.execute(update_query, (value, key)) if cursor.rowcount > 0: self.logger.info(f"成功更新 {key.split('/')[-1]}") else: self.logger.warning(f"未找到 {key.split('/')[-1]} 或值未变化") conn.commit() return True except sqlite3.Error as e: self.logger.error(f"数据库错误: {str(e)}") return False except Exception as e: self.logger.error(f"发生错误: {str(e)}") return False finally: if conn: conn.close() def refresh_token(self, machine_id: str) -> Tuple[bool, str, Optional[Dict]]: """ 刷新token(从API获取新账号并更新到数据库) :param machine_id: 机器ID :return: (是否成功, 消息, 可选的账号信息) """ try: # 禁用SSL警告 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # 发送请求 response = requests.post( config.get_unused_url, json={"machine_id": machine_id}, headers={"Content-Type": "application/json"}, timeout=30, verify=False ) # 解析响应 response_data = response.json() if response_data.get("code") == 200: account_data = response_data.get("data", {}) # 获取账号信息 email = account_data.get("email") access_token = account_data.get("access_token") refresh_token = account_data.get("refresh_token") expire_time = account_data.get("expire_time") days_left = account_data.get("days_left") if not all([email, access_token, refresh_token]): return False, "获取账号信息不完整", None # 更新SQLite数据库 if self.update_auth(email=email, access_token=access_token, refresh_token=refresh_token): account_info = { "email": email, "expire_time": expire_time, "days_left": days_left } return True, "认证信息更新成功", account_info else: return False, "更新认证信息失败", None elif response_data.get("code") == 404: return False, "没有可用的未使用账号", None else: error_msg = response_data.get("msg", "未知错误") return False, f"获取账号失败: {error_msg}", None except Exception as e: error_msg = str(e) self.logger.error(f"刷新token失败: {error_msg}") return False, f"刷新token失败: {error_msg}", None