import os import json import sys import time import logging import sqlite3 from pathlib import Path import subprocess from typing import Optional, Dict, Tuple from datetime import datetime class CursorAuthManager: """Cursor认证信息管理器""" def __init__(self): # 判断操作系统 if sys.platform == "win32": # Windows appdata = os.getenv("APPDATA") if appdata is None: raise EnvironmentError("APPDATA 环境变量未设置") self.db_path = os.path.join( appdata, "Cursor", "User", "globalStorage", "state.vscdb" ) elif sys.platform == "darwin": # macOS self.db_path = os.path.abspath(os.path.expanduser( "~/Library/Application Support/Cursor/User/globalStorage/state.vscdb" )) elif sys.platform == "linux" : # Linux 和其他类Unix系统 self.db_path = os.path.abspath(os.path.expanduser( "~/.config/Cursor/User/globalStorage/state.vscdb" )) else: raise NotImplementedError(f"不支持的操作系统: {sys.platform}") self.cursor_path = Path(os.path.expanduser("~")) / "AppData" / "Local" / "Programs" / "Cursor" self.backup_dir = Path(os.getenv('APPDATA')) / "Cursor" / "User" / "globalStorage" / "backups" self.max_retries = 5 self.wait_time = 1 def backup_database(self) -> Optional[Path]: """备份数据库文件 Returns: Optional[Path]: 备份文件路径,失败返回None """ try: if not Path(self.db_path).exists(): logging.warning(f"数据库文件不存在: {self.db_path}") return None self.backup_dir.mkdir(parents=True, exist_ok=True) backup_name = f"state.vscdb.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" backup_path = self.backup_dir / backup_name # 如果数据库正在使用,先关闭连接 try: conn = sqlite3.connect(self.db_path) conn.close() except: pass import shutil shutil.copy2(self.db_path, backup_path) logging.info(f"已备份数据库: {backup_path}") return backup_path except Exception as e: logging.error(f"备份数据库失败: {str(e)}") return None def get_auth_info(self) -> Optional[Dict]: """获取当前的认证信息 Returns: Optional[Dict]: 认证信息字典,失败返回None """ try: if not Path(self.db_path).exists(): return None conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 查询认证相关的键值 auth_keys = [ 'authentication.currentToken', 'authentication.refreshToken', 'authentication.accessToken', 'authentication.email' ] result = {} for key in auth_keys: cursor.execute('SELECT value FROM ItemTable WHERE key = ?', (key,)) row = cursor.fetchone() if row: try: value = json.loads(row[0]) result[key] = value except: result[key] = row[0] conn.close() return result if result else None except Exception as e: logging.error(f"获取认证信息失败: {str(e)}") return None def update_auth(self, email: str = None, access_token: str = None, refresh_token: str = None) -> bool: """更新Cursor的认证信息 Args: email: 新的邮箱地址 access_token: 新的访问令牌 refresh_token: 新的刷新令牌 Returns: bool: 是否成功更新 """ try: # 备份数据库 if not self.backup_database(): logging.warning("数据库备份失败") # 准备更新数据 updates = [] # 登录状态 updates.append(("cursorAuth/cachedSignUpType", "Auth_0")) if email is not None: updates.append(("cursorAuth/cachedEmail", email)) if access_token is not None: updates.append(("cursorAuth/accessToken", access_token)) if refresh_token is not None: updates.append(("cursorAuth/refreshToken", refresh_token)) if not updates: logging.warning("没有提供任何要更新的值") return False # 确保数据库目录存在 db_dir = Path(self.db_path).parent db_dir.mkdir(parents=True, exist_ok=True) # 确保数据库表存在 conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 创建表(如果不存在) cursor.execute(''' CREATE TABLE IF NOT EXISTS ItemTable ( key TEXT PRIMARY KEY, value TEXT ) ''') try: # 开始事务 cursor.execute('BEGIN TRANSACTION') # 执行更新 for key, value in updates: # 检查key是否存在 cursor.execute('SELECT COUNT(*) FROM ItemTable WHERE key = ?', (key,)) if cursor.fetchone()[0] == 0: # 插入新值 cursor.execute( 'INSERT INTO ItemTable (key, value) VALUES (?, ?)', (key, value) ) else: # 更新现有值 cursor.execute( 'UPDATE ItemTable SET value = ? WHERE key = ?', (value, key) ) if cursor.rowcount > 0: logging.info(f"成功更新 {key.split('/')[-1]}") else: logging.warning(f"未找到 {key.split('/')[-1]} 或值未变化") # 提交事务 cursor.execute('COMMIT') logging.info(f"认证信息更新成功: {email}") return True except Exception as e: # 如果出错,回滚事务 cursor.execute('ROLLBACK') raise e except Exception as e: logging.error(f"更新认证信息失败: {str(e)}") return False finally: if 'conn' in locals(): conn.close() def verify_auth(self, email: str, access_token: str, refresh_token: str) -> bool: """验证认证信息是否正确写入 Args: email: 邮箱 access_token: 访问令牌 refresh_token: 新的刷新令牌 Returns: bool: 是否正确写入 """ try: # 连接数据库 conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 验证每个字段 expected = { 'cursorAuth/cachedEmail': email, 'cursorAuth/accessToken': access_token, 'cursorAuth/refreshToken': refresh_token, 'cursorAuth/cachedSignUpType': 'Auth_0' } for key, expected_value in expected.items(): cursor.execute('SELECT value FROM ItemTable WHERE key = ?', (key,)) row = cursor.fetchone() if not row: logging.error(f"缺少认证信息: {key}") return False actual_value = row[0] if actual_value != expected_value: logging.error(f"认证信息不匹配: {key}") logging.error(f"预期: {expected_value}") logging.error(f"实际: {actual_value}") return False conn.close() return True except Exception as e: logging.error(f"验证认证信息失败: {str(e)}") return False finally: if 'conn' in locals(): conn.close() def clear_auth(self) -> bool: """清除认证信息 Returns: bool: 是否成功 """ try: # 备份数据库 if not self.backup_database(): logging.warning("数据库备份失败") # 清除数据库中的认证信息 conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 要清除的键 auth_keys = [ 'authentication.currentToken', 'authentication.refreshToken', 'authentication.accessToken', 'authentication.email' ] # 执行删除 for key in auth_keys: cursor.execute('DELETE FROM ItemTable WHERE key = ?', (key,)) conn.commit() conn.close() logging.info("已清除认证信息") return True except Exception as e: logging.error(f"清除认证信息失败: {str(e)}") return False def close_cursor_process(self) -> bool: """关闭所有Cursor进程 Returns: bool: 是否成功 """ try: if sys.platform == "win32": # 创建startupinfo对象来隐藏命令行窗口 startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW startupinfo.wShowWindow = subprocess.SW_HIDE # 关闭进程 subprocess.run( "taskkill /f /im Cursor.exe >nul 2>&1", startupinfo=startupinfo, shell=True ) # 等待进程关闭 retry_count = 0 while retry_count < self.max_retries: try: subprocess.check_output( "tasklist | findstr Cursor.exe", startupinfo=startupinfo, shell=True ) retry_count += 1 if retry_count >= self.max_retries: logging.error("无法关闭所有Cursor进程") return False time.sleep(self.wait_time) except subprocess.CalledProcessError: # 进程已关闭 break return True else: # 其他系统的处理 if sys.platform == "darwin": subprocess.run("killall Cursor 2>/dev/null", shell=True) else: subprocess.run("pkill -f cursor", shell=True) time.sleep(2) return True except Exception as e: logging.error(f"关闭进程失败: {str(e)}") return False def restart_cursor(self) -> bool: """重启Cursor编辑器 Returns: bool: 是否成功 """ try: logging.info("正在重启Cursor...") # 确保进程已关闭 if not self.close_cursor_process(): return False # 启动Cursor if sys.platform == "win32": cursor_exe = self.cursor_path / "Cursor.exe" if cursor_exe.exists(): os.startfile(str(cursor_exe)) logging.info("Cursor启动成功") return True else: logging.error(f"未找到Cursor程序: {cursor_exe}") return False elif sys.platform == "darwin": subprocess.run("open -a Cursor", shell=True) logging.info("Cursor启动成功") return True elif sys.platform == "linux": subprocess.run("cursor &", shell=True) logging.info("Cursor启动成功") return True return False except Exception as e: logging.error(f"重启Cursor失败: {str(e)}") return False