Files
nezhacursor/cursor_auth_manager.py

452 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import sys
import time
import logging
import sqlite3
from pathlib import Path
import subprocess
from typing import Optional, Dict, Tuple
from datetime import datetime
class CursorAuthManager:
"""Cursor认证信息管理器"""
def __init__(self):
# 判断操作系统
if sys.platform == "win32": # Windows
appdata = os.getenv("APPDATA")
if appdata is None:
raise EnvironmentError("APPDATA 环境变量未设置")
self.db_path = os.path.join(
appdata, "Cursor", "User", "globalStorage", "state.vscdb"
)
elif sys.platform == "darwin": # macOS
self.db_path = os.path.abspath(os.path.expanduser(
"~/Library/Application Support/Cursor/User/globalStorage/state.vscdb"
))
elif sys.platform == "linux" : # Linux 和其他类Unix系统
self.db_path = os.path.abspath(os.path.expanduser(
"~/.config/Cursor/User/globalStorage/state.vscdb"
))
else:
raise NotImplementedError(f"不支持的操作系统: {sys.platform}")
self.cursor_path = Path(os.path.expanduser("~")) / "AppData" / "Local" / "Programs" / "Cursor"
self.backup_dir = Path(os.getenv('APPDATA')) / "Cursor" / "User" / "globalStorage" / "backups"
self.max_retries = 5
self.wait_time = 1
def backup_database(self) -> Optional[Path]:
"""备份数据库文件
Returns:
Optional[Path]: 备份文件路径,失败返回None
"""
try:
if not Path(self.db_path).exists():
logging.warning(f"数据库文件不存在: {self.db_path}")
return None
self.backup_dir.mkdir(parents=True, exist_ok=True)
backup_name = f"state.vscdb.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_path = self.backup_dir / backup_name
# 如果数据库正在使用,先关闭连接
try:
conn = sqlite3.connect(self.db_path)
conn.close()
except:
pass
import shutil
shutil.copy2(self.db_path, backup_path)
logging.info(f"已备份数据库: {backup_path}")
return backup_path
except Exception as e:
logging.error(f"备份数据库失败: {str(e)}")
return None
def get_auth_info(self) -> Optional[Dict]:
"""获取当前的认证信息
Returns:
Optional[Dict]: 认证信息字典,失败返回None
"""
try:
if not Path(self.db_path).exists():
return None
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 查询认证相关的键值
auth_keys = [
'authentication.currentToken',
'authentication.refreshToken',
'authentication.accessToken',
'authentication.email'
]
result = {}
for key in auth_keys:
cursor.execute('SELECT value FROM ItemTable WHERE key = ?', (key,))
row = cursor.fetchone()
if row:
try:
value = json.loads(row[0])
result[key] = value
except:
result[key] = row[0]
conn.close()
return result if result else None
except Exception as e:
logging.error(f"获取认证信息失败: {str(e)}")
return None
def update_auth(self, email: str = None, access_token: str = None, refresh_token: str = None) -> bool:
"""更新Cursor的认证信息
Args:
email: 新的邮箱地址
access_token: 新的访问令牌
refresh_token: 新的刷新令牌
Returns:
bool: 是否成功更新
"""
try:
# 备份数据库
if not self.backup_database():
logging.warning("数据库备份失败")
# 准备更新数据
updates = []
# 登录状态
updates.append(("cursorAuth/cachedSignUpType", "Auth_0"))
if email is not None:
updates.append(("cursorAuth/cachedEmail", email))
if access_token is not None:
updates.append(("cursorAuth/accessToken", access_token))
if refresh_token is not None:
updates.append(("cursorAuth/refreshToken", refresh_token))
if not updates:
logging.warning("没有提供任何要更新的值")
return False
# 确保数据库目录存在
db_dir = Path(self.db_path).parent
db_dir.mkdir(parents=True, exist_ok=True)
# 确保数据库表存在
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建表(如果不存在)
cursor.execute('''
CREATE TABLE IF NOT EXISTS ItemTable (
key TEXT PRIMARY KEY,
value TEXT
)
''')
try:
# 开始事务
cursor.execute('BEGIN TRANSACTION')
# 执行更新
for key, value in updates:
# 检查key是否存在
cursor.execute('SELECT COUNT(*) FROM ItemTable WHERE key = ?', (key,))
if cursor.fetchone()[0] == 0:
# 插入新值
cursor.execute(
'INSERT INTO ItemTable (key, value) VALUES (?, ?)',
(key, value)
)
else:
# 更新现有值
cursor.execute(
'UPDATE ItemTable SET value = ? WHERE key = ?',
(value, key)
)
if cursor.rowcount > 0:
logging.info(f"成功更新 {key.split('/')[-1]}")
else:
logging.warning(f"未找到 {key.split('/')[-1]} 或值未变化")
# 提交事务
cursor.execute('COMMIT')
logging.info(f"认证信息更新成功: {email}")
return True
except Exception as e:
# 如果出错,回滚事务
cursor.execute('ROLLBACK')
raise e
except Exception as e:
logging.error(f"更新认证信息失败: {str(e)}")
return False
finally:
if 'conn' in locals():
conn.close()
def verify_auth(self, email: str, access_token: str, refresh_token: str) -> bool:
"""验证认证信息是否正确写入
Args:
email: 邮箱
access_token: 访问令牌
refresh_token: 新的刷新令牌
Returns:
bool: 是否正确写入
"""
try:
# 连接数据库
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 验证每个字段
expected = {
'cursorAuth/cachedEmail': email,
'cursorAuth/accessToken': access_token,
'cursorAuth/refreshToken': refresh_token,
'cursorAuth/cachedSignUpType': 'Auth_0'
}
for key, expected_value in expected.items():
cursor.execute('SELECT value FROM ItemTable WHERE key = ?', (key,))
row = cursor.fetchone()
if not row:
logging.error(f"缺少认证信息: {key}")
return False
actual_value = row[0]
if actual_value != expected_value:
logging.error(f"认证信息不匹配: {key}")
logging.error(f"预期: {expected_value}")
logging.error(f"实际: {actual_value}")
return False
conn.close()
return True
except Exception as e:
logging.error(f"验证认证信息失败: {str(e)}")
return False
finally:
if 'conn' in locals():
conn.close()
def clear_auth(self) -> bool:
"""清除认证信息
Returns:
bool: 是否成功
"""
try:
# 备份数据库
if not self.backup_database():
logging.warning("数据库备份失败")
# 清除数据库中的认证信息
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 要清除的键
auth_keys = [
'authentication.currentToken',
'authentication.refreshToken',
'authentication.accessToken',
'authentication.email'
]
# 执行删除
for key in auth_keys:
cursor.execute('DELETE FROM ItemTable WHERE key = ?', (key,))
conn.commit()
conn.close()
logging.info("已清除认证信息")
return True
except Exception as e:
logging.error(f"清除认证信息失败: {str(e)}")
return False
def close_cursor_process(self) -> bool:
"""关闭所有Cursor进程
Returns:
bool: 是否成功
"""
try:
if sys.platform == "win32":
# 创建startupinfo对象来隐藏命令行窗口
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
# 关闭进程
subprocess.run(
"taskkill /f /im Cursor.exe >nul 2>&1",
startupinfo=startupinfo,
shell=True
)
# 等待进程关闭
retry_count = 0
while retry_count < self.max_retries:
try:
subprocess.check_output(
"tasklist | findstr Cursor.exe",
startupinfo=startupinfo,
shell=True
)
retry_count += 1
if retry_count >= self.max_retries:
logging.error("无法关闭所有Cursor进程")
return False
time.sleep(self.wait_time)
except subprocess.CalledProcessError:
# 进程已关闭
break
return True
else:
# 其他系统的处理
if sys.platform == "darwin":
subprocess.run("killall Cursor 2>/dev/null", shell=True)
else:
subprocess.run("pkill -f cursor", shell=True)
time.sleep(2)
return True
except Exception as e:
logging.error(f"关闭进程失败: {str(e)}")
return False
def restart_cursor(self) -> bool:
"""重启Cursor编辑器
Returns:
bool: 是否成功
"""
try:
logging.info("正在重启Cursor...")
# 确保进程已关闭
if not self.close_cursor_process():
logging.error("无法关闭Cursor进程")
return False
# 等待系统资源释放
time.sleep(3)
# 启动Cursor
if sys.platform == "win32":
cursor_exe = self.cursor_path / "Cursor.exe"
if cursor_exe.exists():
max_retries = 3
for attempt in range(max_retries):
try:
# 检查是否还有Cursor进程在运行
try:
subprocess.check_output(
"tasklist | findstr Cursor.exe",
startupinfo=subprocess.STARTUPINFO(),
shell=True
)
# 如果找到进程,等待它关闭
if attempt < max_retries - 1:
logging.info("等待进程完全关闭...")
time.sleep(3)
continue
else:
logging.error("无法确保所有Cursor进程已关闭")
return False
except subprocess.CalledProcessError:
# 没有找到进程,可以继续启动
pass
# 使用subprocess启动
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
# 使用subprocess.Popen启动并等待一段时间
process = subprocess.Popen(
str(cursor_exe),
startupinfo=startupinfo,
creationflags=subprocess.CREATE_NEW_CONSOLE
)
# 等待进程启动
time.sleep(5)
# 验证进程是否成功启动
if process.poll() is None: # 进程仍在运行
logging.info("Cursor启动成功")
return True
else:
if attempt < max_retries - 1:
logging.warning(f"启动尝试 {attempt + 1} 失败,准备重试...")
time.sleep(3)
continue
logging.error("Cursor进程启动失败")
# 尝试使用 os.startfile 作为备选方案
try:
os.startfile(str(cursor_exe))
time.sleep(5)
logging.info("使用备选方案启动Cursor")
return True
except Exception as e:
logging.error(f"备选启动方案失败: {str(e)}")
return False
except Exception as e:
if attempt < max_retries - 1:
logging.warning(f"启动尝试 {attempt + 1} 失败: {str(e)},准备重试...")
time.sleep(3)
continue
logging.error(f"启动Cursor失败: {str(e)}")
return False
else:
logging.error(f"未找到Cursor程序: {cursor_exe}")
return False
elif sys.platform == "darwin":
try:
subprocess.run("open -a Cursor", shell=True, check=True)
time.sleep(5)
logging.info("Cursor启动成功")
return True
except subprocess.CalledProcessError as e:
logging.error(f"启动Cursor失败: {str(e)}")
return False
elif sys.platform == "linux":
try:
subprocess.run("cursor &", shell=True, check=True)
time.sleep(5)
logging.info("Cursor启动成功")
return True
except subprocess.CalledProcessError as e:
logging.error(f"启动Cursor失败: {str(e)}")
return False
return False
except Exception as e:
logging.error(f"重启Cursor失败: {str(e)}")
return False