Files
cursornew2026/backend/app/api/client.py

769 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
客户端 API - 兼容原 CursorPro 插件
"""
from fastapi import APIRouter, Depends, Request
from sqlalchemy.orm import Session
from typing import Optional, List
from pydantic import BaseModel
from app.database import get_db
from app.services import AccountService, KeyService, LogService, GlobalSettingsService
from app.schemas import VerifyKeyRequest, VerifyKeyResponse, SwitchAccountRequest, SwitchAccountResponse
from app.models import AccountStatus, MembershipType
router = APIRouter(prefix="/api", tags=["Client API"])
# ========== 账号数据响应模型 ==========
class AccountData(BaseModel):
"""账号数据 - 写入本地Cursor的数据"""
accessToken: str
refreshToken: Optional[str] = None
workosSessionToken: Optional[str] = None
email: str
membership_type: str
usage_type: Optional[str] = "default"
# 机器ID相关 (可选)
serviceMachineId: Optional[str] = None
machineId: Optional[str] = None
macMachineId: Optional[str] = None
devDeviceId: Optional[str] = None
sqmId: Optional[str] = None
machineIdFile: Optional[str] = None
# 使用统计
requestCount: Optional[int] = 0
usageAmount: Optional[float] = 0.0
# 额度信息
quota: Optional[int] = None
quotaUsed: Optional[int] = None
quotaRemaining: Optional[int] = None
expireDate: Optional[str] = None
class ApiResponse(BaseModel):
"""通用API响应"""
success: bool
message: Optional[str] = None
error: Optional[str] = None
data: Optional[AccountData] = None
# ========== 验证和切换 API ==========
def build_account_data(account, key) -> AccountData:
"""构建账号数据对象"""
expire_date = key.expire_at.strftime("%Y-%m-%d %H:%M:%S") if key.expire_at else None
return AccountData(
accessToken=account.access_token,
refreshToken=account.refresh_token,
workosSessionToken=account.workos_session_token,
email=account.email,
membership_type=account.membership_type.value,
quota=key.quota,
quotaUsed=key.quota_used,
quotaRemaining=key.quota - key.quota_used,
expireDate=expire_date
)
@router.post("/verify")
async def verify(request: VerifyKeyRequest, req: Request, db: Session = Depends(get_db)):
"""验证激活码 (前端调用的路径)"""
return await verify_key_impl(request, req, db)
@router.post("/verify-key")
async def verify_key(request: VerifyKeyRequest, req: Request, db: Session = Depends(get_db)):
"""验证激活码 (兼容路径)"""
return await verify_key_impl(request, req, db)
async def verify_key_impl(request: VerifyKeyRequest, req: Request, db: Session):
"""验证激活码实现"""
key = KeyService.get_by_key(db, request.key)
if not key:
return {"success": False, "valid": False, "error": "激活码不存在"}
# 首次激活:设置激活时间和过期时间
KeyService.activate(db, key)
# 检查设备限制
if request.device_id:
device_ok, device_msg = KeyService.check_device(db, key, request.device_id)
if not device_ok:
LogService.log(db, key.id, "verify", ip_address=req.client.host, success=False, message=device_msg)
return {"success": False, "valid": False, "error": device_msg}
# 检查激活码是否有效
is_valid, message = KeyService.is_valid(key, db)
if not is_valid:
LogService.log(db, key.id, "verify", ip_address=req.client.host, success=False, message=message)
return {"success": False, "valid": False, "error": message}
# 获取当前绑定的账号,或分配新账号
account = None
if key.current_account_id:
account = AccountService.get_by_id(db, key.current_account_id)
# 只有账号不存在或被禁用/过期才分配新的IN_USE 状态的账号继续使用)
if not account or account.status in (AccountStatus.DISABLED, AccountStatus.EXPIRED):
# 分配新账号
account = AccountService.get_available(db, key.membership_type)
if not account:
LogService.log(db, key.id, "verify", ip_address=req.client.host, success=False, message="无可用账号")
return {"success": False, "valid": False, "error": "暂无可用账号,请稍后重试"}
KeyService.bind_account(db, key, account)
AccountService.mark_used(db, account, key.id)
LogService.log(db, key.id, "verify", account.id, ip_address=req.client.host, success=True)
# 返回格式匹配原版插件期望
expire_date = key.expire_at.strftime("%Y-%m-%d %H:%M:%S") if key.expire_at else None
return {
"success": True,
"valid": True,
"expire_date": expire_date,
"switch_remaining": key.quota - key.quota_used,
"switch_limit": key.quota,
"data": build_account_data(account, key)
}
@router.post("/switch")
async def switch(request: SwitchAccountRequest, req: Request, db: Session = Depends(get_db)):
"""切换账号 (前端调用的路径)"""
return await switch_account_impl(request, req, db)
@router.post("/switch-account")
async def switch_account(request: SwitchAccountRequest, req: Request, db: Session = Depends(get_db)):
"""切换账号 (兼容路径)"""
return await switch_account_impl(request, req, db)
async def switch_account_impl(request: SwitchAccountRequest, req: Request, db: Session):
"""切换账号实现"""
key = KeyService.get_by_key(db, request.key)
if not key:
return ApiResponse(success=False, error="激活码不存在")
# 检查设备限制
if request.device_id:
device_ok, device_msg = KeyService.check_device(db, key, request.device_id)
if not device_ok:
LogService.log(db, key.id, "switch", ip_address=req.client.host, success=False, message=device_msg)
return ApiResponse(success=False, error=device_msg)
# 检查激活码是否有效
is_valid, message = KeyService.is_valid(key, db)
if not is_valid:
LogService.log(db, key.id, "switch", ip_address=req.client.host, success=False, message=message)
return ApiResponse(success=False, error=message)
# 检查换号频率限制
can_switch, switch_msg = KeyService.can_switch(db, key)
if not can_switch:
LogService.log(db, key.id, "switch", ip_address=req.client.host, success=False, message=switch_msg)
return ApiResponse(success=False, error=switch_msg)
# 释放当前账号
if key.current_account_id:
old_account = AccountService.get_by_id(db, key.current_account_id)
if old_account:
AccountService.release(db, old_account)
# 获取新账号
account = AccountService.get_available(db, key.membership_type)
if not account:
LogService.log(db, key.id, "switch", ip_address=req.client.host, success=False, message="无可用账号")
return ApiResponse(success=False, error="暂无可用账号,请稍后重试")
# 绑定新账号并扣除额度
KeyService.bind_account(db, key, account)
KeyService.use_switch(db, key)
AccountService.mark_used(db, account, key.id)
LogService.log(db, key.id, "switch", account.id, ip_address=req.client.host, success=True)
return ApiResponse(
success=True,
message="切换成功",
data=build_account_data(account, key)
)
# ========== 版本 API ==========
@router.get("/version")
async def get_version():
"""获取版本信息"""
return {
"success": True,
"version": "1.0.0",
"latestVersion": "1.0.0",
"updateUrl": None,
"message": None,
"forceUpdate": False
}
# ========== 代理配置 API ==========
# 内存存储代理配置 (生产环境应存数据库)
proxy_config = {
"is_enabled": False,
"proxy_url": ""
}
@router.get("/proxy-config")
async def get_proxy_config():
"""获取代理配置"""
return {
"success": True,
"data": proxy_config
}
@router.post("/proxy-config")
async def update_proxy_config(config: dict):
"""更新代理配置"""
global proxy_config
if "is_enabled" in config:
proxy_config["is_enabled"] = config["is_enabled"]
if "proxy_url" in config:
proxy_config["proxy_url"] = config["proxy_url"]
return {
"success": True,
"message": "代理配置已更新",
"data": proxy_config
}
# ========== 无感换号 API ==========
# 内存存储无感配置 (生产环境应存数据库)
seamless_config = {
"enabled": False,
"mode": "auto",
"switchThreshold": 10,
"accountPool": [],
"currentIndex": 0
}
@router.get("/seamless/status")
async def seamless_status(db: Session = Depends(get_db)):
"""获取无感换号状态"""
return {
"success": True,
"enabled": seamless_config["enabled"],
"message": "无感换号功能已就绪" if seamless_config["enabled"] else "无感换号功能未启用"
}
@router.get("/seamless/user-status")
async def seamless_user_status(key: str = None, userKey: str = None, db: Session = Depends(get_db)):
"""获取用户切换状态"""
# 兼容两种参数名
actual_key = key or userKey
if not actual_key:
return {"success": False, "valid": False, "error": "缺少激活码参数"}
activation_key = KeyService.get_by_key(db, actual_key)
if not activation_key:
return {"success": False, "valid": False, "error": "激活码不存在"}
# 检查是否有效
is_valid, message = KeyService.is_valid(activation_key, db)
if not is_valid:
return {"success": False, "valid": False, "error": message}
# 获取当前绑定的账号
locked_account = None
if activation_key.current_account_id:
account = AccountService.get_by_id(db, activation_key.current_account_id)
if account and account.status not in (AccountStatus.DISABLED, AccountStatus.EXPIRED):
locked_account = {
"email": account.email,
"membership_type": account.membership_type.value
}
# Pro密钥使用 quota (积分制)
# Auto密钥使用换号次数限制
is_pro = activation_key.membership_type == MembershipType.PRO
switch_remaining = activation_key.quota - activation_key.quota_used if is_pro else 999 # Auto不限积分
return {
"success": True,
"valid": True,
"switchRemaining": switch_remaining,
"canSwitch": switch_remaining > 0,
"lockedAccount": locked_account,
"membershipType": activation_key.membership_type.value,
"data": {
"canSwitch": True,
"quotaRemaining": activation_key.quota - activation_key.quota_used,
"switchCount": activation_key.switch_count
}
}
@router.get("/seamless/config")
async def get_seamless_config():
"""获取无感配置"""
return {
"success": True,
"data": seamless_config
}
@router.post("/seamless/config")
async def update_seamless_config(config: dict):
"""更新无感配置"""
global seamless_config
for key in ["enabled", "mode", "switchThreshold", "accountPool", "currentIndex"]:
if key in config:
seamless_config[key] = config[key]
return {
"success": True,
"message": "配置已更新",
"data": seamless_config
}
@router.post("/seamless/inject")
async def inject_seamless(data: dict, req: Request, db: Session = Depends(get_db)):
"""注入无感模式 - 返回账号数据"""
user_key = data.get("user_key")
if not user_key:
return {"success": False, "error": "缺少user_key参数"}
key = KeyService.get_by_key(db, user_key)
if not key:
return {"success": False, "error": "激活码不存在"}
# 检查有效性
is_valid, message = KeyService.is_valid(key, db)
if not is_valid:
return {"success": False, "error": message}
# 获取账号
account = None
if key.current_account_id:
account = AccountService.get_by_id(db, key.current_account_id)
# 只有账号不存在或被禁用/过期才分配新的
if not account or account.status in (AccountStatus.DISABLED, AccountStatus.EXPIRED):
account = AccountService.get_available(db, key.membership_type)
if not account:
return {"success": False, "error": "暂无可用账号"}
KeyService.bind_account(db, key, account)
AccountService.mark_used(db, account, key.id)
LogService.log(db, key.id, "seamless_inject", account.id, ip_address=req.client.host, success=True)
return {
"success": True,
"message": "无感模式已注入",
"data": build_account_data(account, key).model_dump()
}
@router.post("/seamless/restore")
async def restore_seamless():
"""恢复无感模式设置"""
global seamless_config
seamless_config["enabled"] = False
return {
"success": True,
"message": "已恢复默认设置"
}
@router.get("/seamless/accounts")
async def get_seamless_accounts(db: Session = Depends(get_db)):
"""获取账号池列表"""
# 返回可用账号列表 (脱敏)
accounts = AccountService.get_all(db, limit=100)
account_list = []
for acc in accounts:
if acc.status == AccountStatus.ACTIVE:
account_list.append({
"id": acc.id,
"email": acc.email[:3] + "***" + acc.email[acc.email.index("@"):],
"membership_type": acc.membership_type.value,
"status": acc.status.value
})
return {
"success": True,
"data": {
"accounts": account_list,
"total": len(account_list)
}
}
@router.post("/seamless/accounts")
async def sync_seamless_accounts(data: dict):
"""同步账号池"""
# 这个接口在我们的架构中不需要实际功能
# 账号由管理后台统一管理
return {
"success": True,
"message": "账号由管理后台统一管理"
}
@router.get("/seamless/token")
async def get_seamless_token(key: str, db: Session = Depends(get_db)):
"""获取无感Token"""
activation_key = KeyService.get_by_key(db, key)
if not activation_key:
return {"success": False, "error": "激活码不存在"}
account = None
if activation_key.current_account_id:
account = AccountService.get_by_id(db, activation_key.current_account_id)
if not account:
return {"success": False, "error": "未绑定账号"}
return {
"success": True,
"data": build_account_data(account, activation_key).model_dump()
}
@router.get("/seamless/get-token")
async def get_seamless_token_v2(userKey: str = None, key: str = None, req: Request = None, db: Session = Depends(get_db)):
"""获取无感Token (注入代码调用的路径,兼容 userKey 和 key 两种参数名)
返回格式需要直接包含 accessToken、email 等字段,供注入代码使用
"""
actual_key = userKey or key
if not actual_key:
return {"success": False, "error": "缺少激活码参数"}
activation_key = KeyService.get_by_key(db, actual_key)
if not activation_key:
return {"success": False, "error": "激活码不存在"}
# 检查有效性
is_valid, message = KeyService.is_valid(activation_key, db)
if not is_valid:
return {"success": False, "error": message}
# 获取或分配账号
account = None
is_new = False
if activation_key.current_account_id:
account = AccountService.get_by_id(db, activation_key.current_account_id)
# 只有账号不存在或被禁用/过期才分配新的
if not account or account.status in (AccountStatus.DISABLED, AccountStatus.EXPIRED):
# Auto密钥检查是否允许获取新账号频率限制
if activation_key.membership_type == MembershipType.FREE:
can_switch, switch_msg = KeyService.can_switch(db, activation_key)
if not can_switch:
return {"success": False, "error": switch_msg}
# 分配新账号
account = AccountService.get_available(db, activation_key.membership_type)
if not account:
return {"success": False, "error": "暂无可用账号"}
KeyService.bind_account(db, activation_key, account)
AccountService.mark_used(db, account, activation_key.id)
# 记录换号(用于频率限制)
KeyService.use_switch(db, activation_key)
is_new = True
# 记录日志
if req:
LogService.log(db, activation_key.id, "seamless_get_token", account.id, ip_address=req.client.host, success=True)
# 返回格式需要直接包含字段,供注入代码使用
# 注入代码检查: if(d && d.accessToken) { ... }
return {
"success": True,
"accessToken": account.access_token,
"refreshToken": account.refresh_token or "",
"email": account.email,
"membership_type": account.membership_type.value,
"switchVersion": activation_key.switch_count, # 用于检测是否需要更新
"switchRemaining": activation_key.quota - activation_key.quota_used,
"is_new": is_new,
"machineIds": None # 机器ID由客户端本地管理
}
@router.post("/seamless/switch")
async def switch_seamless_token(data: dict, req: Request, db: Session = Depends(get_db)):
"""切换无感Token"""
user_key = data.get("userKey")
if not user_key:
return {"switched": False, "message": "缺少userKey参数"}
# 复用切换逻辑
request = SwitchAccountRequest(key=user_key)
return await switch_token_impl(request, req, db)
@router.post("/seamless/switch-token")
async def switch_seamless_token_v2(data: dict, req: Request, db: Session = Depends(get_db)):
"""切换无感Token (client.js 调用的路径)"""
user_key = data.get("userKey")
if not user_key:
return {"switched": False, "message": "缺少userKey参数"}
request = SwitchAccountRequest(key=user_key)
return await switch_token_impl(request, req, db)
async def switch_token_impl(request: SwitchAccountRequest, req: Request, db: Session):
"""切换Token实现 - 返回插件期望的格式"""
key = KeyService.get_by_key(db, request.key)
if not key:
return {"switched": False, "message": "激活码不存在"}
# 检查设备限制
if request.device_id:
device_ok, device_msg = KeyService.check_device(db, key, request.device_id)
if not device_ok:
LogService.log(db, key.id, "switch", ip_address=req.client.host, success=False, message=device_msg)
return {"switched": False, "message": device_msg}
# 检查激活码是否有效
is_valid, message = KeyService.is_valid(key, db)
if not is_valid:
LogService.log(db, key.id, "switch", ip_address=req.client.host, success=False, message=message)
return {"switched": False, "message": message}
# 检查换号频率限制
can_switch, switch_msg = KeyService.can_switch(db, key)
if not can_switch:
LogService.log(db, key.id, "switch", ip_address=req.client.host, success=False, message=switch_msg)
return {"switched": False, "message": switch_msg}
# 释放当前账号
if key.current_account_id:
old_account = AccountService.get_by_id(db, key.current_account_id)
if old_account:
AccountService.release(db, old_account)
# 获取新账号
account = AccountService.get_available(db, key.membership_type)
if not account:
LogService.log(db, key.id, "switch", ip_address=req.client.host, success=False, message="无可用账号")
return {"switched": False, "message": "暂无可用账号,请稍后重试"}
# 绑定新账号并扣除额度
KeyService.bind_account(db, key, account)
KeyService.use_switch(db, key)
AccountService.mark_used(db, account, key.id)
LogService.log(db, key.id, "switch", account.id, ip_address=req.client.host, success=True)
return {
"switched": True,
"switchRemaining": key.quota - key.quota_used,
"email": account.email,
"data": build_account_data(account, key)
}
# ========== 代理配置 API ==========
@router.get("/proxy-config")
async def get_proxy_config(db: Session = Depends(get_db)):
"""获取代理配置"""
return {
"success": True,
"enabled": False,
"host": "",
"port": 0,
"username": "",
"password": ""
}
@router.put("/proxy-config")
async def update_proxy_config(config: dict, db: Session = Depends(get_db)):
"""更新代理配置 (客户端本地使用,后端仅返回成功)"""
return {"success": True, "message": "代理配置已保存"}
# ========== 无感换号配置 API ==========
@router.get("/seamless/status")
async def get_seamless_status(db: Session = Depends(get_db)):
"""获取无感换号全局状态"""
settings = GlobalSettingsService.get_all(db)
account_stats = AccountService.count(db)
return {
"success": True,
"enabled": True,
"available_accounts": account_stats.get("active", 0),
"total_accounts": account_stats.get("total", 0),
"switch_interval_minutes": settings.auto_switch_interval_minutes,
"max_switches_per_day": settings.auto_max_switches_per_day
}
@router.get("/seamless/config")
async def get_seamless_config(db: Session = Depends(get_db)):
"""获取无感换号配置"""
settings = GlobalSettingsService.get_all(db)
return {
"success": True,
"auto_switch": True,
"switch_interval_minutes": settings.auto_switch_interval_minutes,
"max_switches_per_day": settings.auto_max_switches_per_day,
"pro_quota_cost": settings.pro_quota_cost
}
@router.post("/seamless/config")
async def update_seamless_config(config: dict, db: Session = Depends(get_db)):
"""更新无感换号配置"""
# 从请求中提取配置
updates = {}
if "switch_interval_minutes" in config:
updates["auto_switch_interval_minutes"] = config["switch_interval_minutes"]
if "max_switches_per_day" in config:
updates["auto_max_switches_per_day"] = config["max_switches_per_day"]
if "pro_quota_cost" in config:
updates["pro_quota_cost"] = config["pro_quota_cost"]
if updates:
GlobalSettingsService.update_all(db, **updates)
return {"success": True, "message": "配置已更新"}
@router.post("/seamless/inject")
async def seamless_inject(data: dict, db: Session = Depends(get_db)):
"""注入无感换号代码 (客户端本地操作,后端仅记录)"""
user_key = data.get("userKey")
if user_key:
key = KeyService.get_by_key(db, user_key)
if key:
LogService.log(db, key.id, "seamless_inject", success=True, message="启用无感换号")
return {"success": True, "message": "无感换号已启用"}
@router.post("/seamless/restore")
async def seamless_restore(data: dict = None, db: Session = Depends(get_db)):
"""恢复原始代码 (客户端本地操作,后端仅记录)"""
if data and data.get("userKey"):
key = KeyService.get_by_key(db, data["userKey"])
if key:
LogService.log(db, key.id, "seamless_restore", success=True, message="禁用无感换号")
return {"success": True, "message": "已恢复原始状态"}
@router.get("/seamless/accounts")
async def get_seamless_accounts(userKey: str = None, db: Session = Depends(get_db)):
"""获取可用账号列表 (供管理使用)"""
if not userKey:
return {"success": False, "error": "缺少 userKey 参数"}
key = KeyService.get_by_key(db, userKey)
if not key:
return {"success": False, "error": "激活码不存在"}
# 获取该激活码类型的可用账号数量
accounts = AccountService.get_all(db, limit=100)
available_count = sum(1 for a in accounts if a.status == AccountStatus.ACTIVE and a.membership_type == key.membership_type)
return {
"success": True,
"available_count": available_count,
"membership_type": key.membership_type.value
}
@router.post("/seamless/sync-accounts")
async def sync_seamless_accounts(data: dict, db: Session = Depends(get_db)):
"""同步账号 (客户端上报账号信息)"""
# 客户端可能上报当前使用的账号状态
return {"success": True, "message": "同步成功"}
# ========== 版本检查 API ==========
@router.get("/version")
async def get_version():
"""获取最新版本信息"""
return {
"success": True,
"current_version": "0.4.5",
"latest_version": "0.4.5",
"has_update": False,
"download_url": "",
"changelog": ""
}
# ========== 公告 API ==========
@router.get("/announcement")
async def get_announcement(db: Session = Depends(get_db)):
"""获取公告信息"""
return {
"success": True,
"has_announcement": True,
"data": {
"is_active": True,
"title": "欢迎使用蜂鸟Pro",
"content": "感谢使用蜂鸟Pro\n\n如有问题请联系客服。",
"type": "info",
"created_at": "2024-12-17 00:00:00"
}
}
@router.get("/announcements/latest")
async def get_announcements_latest(db: Session = Depends(get_db)):
"""获取最新公告 (前端调用的路径)"""
return {
"success": True,
"data": {
"is_active": True,
"title": "欢迎使用蜂鸟Pro",
"content": "感谢使用蜂鸟Pro\n\n如有问题请联系客服。",
"type": "info",
"created_at": "2024-12-17 00:00:00"
}
}
# ========== 用量查询 API ==========
@router.get("/usage")
async def get_usage(userKey: str = None, db: Session = Depends(get_db)):
"""获取用量信息"""
if not userKey:
return {"success": False, "error": "缺少 userKey 参数"}
key = KeyService.get_by_key(db, userKey)
if not key:
return {"success": False, "error": "激活码不存在"}
account = None
if key.current_account_id:
account = AccountService.get_by_id(db, key.current_account_id)
return {
"success": True,
"membership_type": key.membership_type.value,
"quota": key.quota,
"quota_used": key.quota_used,
"quota_remaining": key.quota - key.quota_used,
"switch_count": key.switch_count,
"expire_at": key.expire_at.strftime("%Y-%m-%d %H:%M:%S") if key.expire_at else None,
"current_email": account.email if account else None
}