Files
tingquanzhushou/cursor_token_refresher.py
2025-02-20 20:20:19 +08:00

145 lines
5.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import sqlite3
import requests
import urllib3
from typing import Dict, Tuple, Optional
from datetime import datetime
from config import config
from logger import logger
class CursorTokenRefresher:
"""Cursor Token 刷新器 - 简化版"""
def __init__(self):
"""初始化刷新器"""
# 设置数据库路径
if sys.platform == "win32":
appdata = os.getenv("APPDATA")
if appdata is None:
raise EnvironmentError("APPDATA 环境变量未设置")
self.db_path = os.path.join(appdata, "Cursor", "User", "globalStorage", "state.vscdb")
else:
raise NotImplementedError(f"暂不支持的操作系统: {sys.platform}")
# 获取日志记录器
self.logger = logger.get_logger("TokenRefresh")
def update_auth(self, email: Optional[str] = None,
access_token: Optional[str] = None,
refresh_token: Optional[str] = None) -> bool:
"""
更新SQLite数据库中的认证信息
:param email: 可选,新的邮箱地址
:param access_token: 可选,新的访问令牌
:param refresh_token: 可选,新的刷新令牌
:return: 是否成功更新
"""
updates = []
# 登录状态
updates.append(("cursorAuth/cachedSignUpType", "Auth_0"))
if email is not None:
updates.append(("cursorAuth/cachedEmail", email))
if access_token is not None:
updates.append(("cursorAuth/accessToken", access_token))
if refresh_token is not None:
updates.append(("cursorAuth/refreshToken", refresh_token))
if not updates:
self.logger.warning("没有提供任何要更新的值")
return False
conn = None
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
for key, value in updates:
# 如果没有更新任何行,说明key不存在,执行插入
# 检查 key 是否存在
check_query = "SELECT COUNT(*) FROM itemTable WHERE key = ?"
cursor.execute(check_query, (key,))
if cursor.fetchone()[0] == 0:
insert_query = "INSERT INTO itemTable (key, value) VALUES (?, ?)"
cursor.execute(insert_query, (key, value))
self.logger.info(f"已插入新记录: {key.split('/')[-1]}")
else:
update_query = "UPDATE itemTable SET value = ? WHERE key = ?"
cursor.execute(update_query, (value, key))
if cursor.rowcount > 0:
self.logger.info(f"成功更新 {key.split('/')[-1]}")
else:
self.logger.warning(f"未找到 {key.split('/')[-1]} 或值未变化")
conn.commit()
return True
except sqlite3.Error as e:
self.logger.error(f"数据库错误: {str(e)}")
return False
except Exception as e:
self.logger.error(f"发生错误: {str(e)}")
return False
finally:
if conn:
conn.close()
def refresh_token(self, machine_id: str) -> Tuple[bool, str, Optional[Dict]]:
"""
刷新token从API获取新账号并更新到数据库
:param machine_id: 机器ID
:return: (是否成功, 消息, 可选的账号信息)
"""
try:
# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 发送请求
response = requests.post(
config.get_unused_url,
json={"machine_id": machine_id},
headers={"Content-Type": "application/json"},
timeout=30,
verify=False
)
# 解析响应
response_data = response.json()
if response_data.get("code") == 200:
account_data = response_data.get("data", {})
# 获取账号信息
email = account_data.get("email")
access_token = account_data.get("access_token")
refresh_token = account_data.get("refresh_token")
expire_time = account_data.get("expire_time")
days_left = account_data.get("days_left")
if not all([email, access_token, refresh_token]):
return False, "获取账号信息不完整", None
# 更新SQLite数据库
if self.update_auth(email=email,
access_token=access_token,
refresh_token=refresh_token):
account_info = {
"email": email,
"expire_time": expire_time,
"days_left": days_left
}
return True, "认证信息更新成功", account_info
else:
return False, "更新认证信息失败", None
elif response_data.get("code") == 404:
return False, "没有可用的未使用账号", None
else:
error_msg = response_data.get("msg", "未知错误")
return False, f"获取账号失败: {error_msg}", None
except Exception as e:
error_msg = str(e)
self.logger.error(f"刷新token失败: {error_msg}")
return False, f"刷新token失败: {error_msg}", None