200 lines
7.1 KiB
Python
200 lines
7.1 KiB
Python
import json
|
||
import os
|
||
import time
|
||
from datetime import datetime
|
||
import requests
|
||
from typing import Dict, Optional
|
||
from config import Config
|
||
from logger import logging
|
||
import patch_cursor_get_machine_id
|
||
|
||
class AccountManager:
|
||
def __init__(self, api_base_url: str = None):
|
||
self.config = Config()
|
||
self.api_base_url = api_base_url or "https://cursorapi.nosqli.com/admin"
|
||
self.api_token = os.getenv("API_TOKEN", "")
|
||
self.accounts_file = "accounts.json"
|
||
self._ensure_accounts_file()
|
||
|
||
def _ensure_accounts_file(self):
|
||
"""确保accounts.json文件存在"""
|
||
if not os.path.exists(self.accounts_file):
|
||
with open(self.accounts_file, "w", encoding="utf-8") as f:
|
||
json.dump({"accounts": []}, f, ensure_ascii=False, indent=2)
|
||
|
||
def save_account(self, account_info: Dict) -> bool:
|
||
"""
|
||
保存账号信息到本地JSON文件
|
||
|
||
Args:
|
||
account_info: 包含账号信息的字典
|
||
Returns:
|
||
bool: 保存是否成功
|
||
"""
|
||
try:
|
||
# 添加时间戳
|
||
account_info["created_at"] = datetime.now().isoformat()
|
||
|
||
# 读取现有数据
|
||
with open(self.accounts_file, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
|
||
# 添加新账号
|
||
data["accounts"].append(account_info)
|
||
|
||
# 保存回文件
|
||
with open(self.accounts_file, "w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
logging.info(f"账号信息已保存到本地: {account_info['email']}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logging.error(f"保存账号信息到本地失败: {str(e)}")
|
||
return False
|
||
|
||
def sync_to_server(self, account_info: Dict) -> bool:
|
||
"""
|
||
同步账号信息到服务器
|
||
|
||
Args:
|
||
account_info: 包含账号信息的字典
|
||
Returns:
|
||
bool: 同步是否成功
|
||
"""
|
||
try:
|
||
# 构建API请求
|
||
endpoint = f"{self.api_base_url}/api.account/add"
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Bearer {self.api_token}"
|
||
}
|
||
|
||
# 确保machine_id不为空
|
||
machine_id = account_info.get("machine_id")
|
||
if not machine_id:
|
||
logging.warning("machine_id为空,尝试重新获取")
|
||
try:
|
||
# 先执行patch操作
|
||
patch_cursor_get_machine_id.patch_cursor_get_machine_id()
|
||
|
||
# 然后获取machine_id
|
||
pkg_path, _ = patch_cursor_get_machine_id.get_cursor_paths()
|
||
with open(pkg_path, "r", encoding="utf-8") as f:
|
||
pkg_data = json.load(f)
|
||
machine_id = pkg_data.get("machineId", "")
|
||
if machine_id:
|
||
logging.info(f"成功获取machine_id: {machine_id}")
|
||
except Exception as e:
|
||
logging.error(f"获取machine_id失败: {str(e)}")
|
||
machine_id = ""
|
||
|
||
# 准备请求数据
|
||
data = {
|
||
"email": account_info["email"],
|
||
"password": account_info["password"],
|
||
"first_name": account_info["first_name"],
|
||
"last_name": account_info["last_name"],
|
||
"access_token": account_info["access_token"],
|
||
"refresh_token": account_info["refresh_token"],
|
||
"machine_id": machine_id,
|
||
"user_agent": account_info.get("user_agent", ""),
|
||
"registration_time": datetime.fromisoformat(account_info["registration_time"]).strftime("%Y-%m-%d %H:%M:%S")
|
||
}
|
||
|
||
# 发送POST请求
|
||
response = requests.post(
|
||
endpoint,
|
||
json=data,
|
||
headers=headers,
|
||
timeout=10
|
||
)
|
||
|
||
response_data = response.json()
|
||
if response_data["code"] == 0 and "添加成功" in response_data.get("msg", ""):
|
||
logging.info(f"账号信息已同步到服务器: {account_info['email']}")
|
||
return True
|
||
elif response_data["code"] == 400:
|
||
logging.warning(f"账号已存在: {account_info['email']}")
|
||
return False
|
||
else:
|
||
logging.error(f"同步到服务器失败: {response_data['msg']}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logging.error(f"同步账号信息到服务器失败: {str(e)}")
|
||
return False
|
||
|
||
def get_unused_account(self) -> Optional[Dict]:
|
||
"""
|
||
从服务器获取一个未使用的账号
|
||
|
||
Returns:
|
||
Optional[Dict]: 账号信息或None
|
||
"""
|
||
try:
|
||
endpoint = f"{self.api_base_url}/api.account/getUnused"
|
||
headers = {
|
||
"Authorization": f"Bearer {self.api_token}"
|
||
}
|
||
|
||
response = requests.get(
|
||
endpoint,
|
||
headers=headers,
|
||
timeout=10
|
||
)
|
||
|
||
response_data = response.json()
|
||
if response_data["code"] == 0:
|
||
logging.info("成功获取未使用账号")
|
||
return response_data["data"]
|
||
elif response_data["code"] == 404:
|
||
logging.warning("没有可用的未使用账号")
|
||
return None
|
||
else:
|
||
logging.error(f"获取未使用账号失败: {response_data['msg']}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取未使用账号失败: {str(e)}")
|
||
return None
|
||
|
||
def process_account(self, account_info: Dict) -> bool:
|
||
"""
|
||
处理账号信息:保存到本地并同步到服务器
|
||
|
||
Args:
|
||
account_info: 包含账号信息的字典
|
||
Returns:
|
||
bool: 处理是否成功
|
||
"""
|
||
# 保存到本地
|
||
local_save_success = self.save_account(account_info)
|
||
|
||
# 同步到服务器
|
||
server_sync_success = self.sync_to_server(account_info)
|
||
|
||
return local_save_success and server_sync_success
|
||
|
||
def get_account_info(self, email: str) -> Optional[Dict]:
|
||
"""
|
||
获取指定邮箱的账号信息
|
||
|
||
Args:
|
||
email: 邮箱地址
|
||
Returns:
|
||
Optional[Dict]: 账号信息或None
|
||
"""
|
||
try:
|
||
with open(self.accounts_file, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
|
||
for account in data["accounts"]:
|
||
if account["email"] == email:
|
||
return account
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取账号信息失败: {str(e)}")
|
||
return None |