128 lines
4.1 KiB
Python
128 lines
4.1 KiB
Python
import json
|
|
import os
|
|
import time
|
|
from datetime import datetime
|
|
import requests
|
|
from typing import Dict, Optional
|
|
from config import Config
|
|
from logger import logging
|
|
|
|
class AccountManager:
|
|
def __init__(self, api_base_url: str = None):
|
|
self.config = Config()
|
|
self.api_base_url = api_base_url or os.getenv("API_BASE_URL", "http://api.example.com")
|
|
self.accounts_file = "accounts.json"
|
|
self._ensure_accounts_file()
|
|
|
|
def _ensure_accounts_file(self):
|
|
"""确保accounts.json文件存在"""
|
|
if not os.path.exists(self.accounts_file):
|
|
with open(self.accounts_file, "w", encoding="utf-8") as f:
|
|
json.dump({"accounts": []}, f, ensure_ascii=False, indent=2)
|
|
|
|
def save_account(self, account_info: Dict) -> bool:
|
|
"""
|
|
保存账号信息到本地JSON文件
|
|
|
|
Args:
|
|
account_info: 包含账号信息的字典
|
|
Returns:
|
|
bool: 保存是否成功
|
|
"""
|
|
try:
|
|
# 添加时间戳
|
|
account_info["created_at"] = datetime.now().isoformat()
|
|
|
|
# 读取现有数据
|
|
with open(self.accounts_file, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
# 添加新账号
|
|
data["accounts"].append(account_info)
|
|
|
|
# 保存回文件
|
|
with open(self.accounts_file, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
|
logging.info(f"账号信息已保存到本地: {account_info['email']}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.error(f"保存账号信息到本地失败: {str(e)}")
|
|
return False
|
|
|
|
def sync_to_server(self, account_info: Dict) -> bool:
|
|
"""
|
|
同步账号信息到服务器
|
|
|
|
Args:
|
|
account_info: 包含账号信息的字典
|
|
Returns:
|
|
bool: 同步是否成功
|
|
"""
|
|
try:
|
|
# 构建API请求
|
|
endpoint = f"{self.api_base_url}/api/accounts"
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {os.getenv('API_TOKEN', '')}"
|
|
}
|
|
|
|
# 发送POST请求
|
|
response = requests.post(
|
|
endpoint,
|
|
json=account_info,
|
|
headers=headers,
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
logging.info(f"账号信息已同步到服务器: {account_info['email']}")
|
|
return True
|
|
else:
|
|
logging.error(f"同步到服务器失败: {response.status_code} - {response.text}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logging.error(f"同步账号信息到服务器失败: {str(e)}")
|
|
return False
|
|
|
|
def process_account(self, account_info: Dict) -> bool:
|
|
"""
|
|
处理账号信息:保存到本地并同步到服务器
|
|
|
|
Args:
|
|
account_info: 包含账号信息的字典
|
|
Returns:
|
|
bool: 处理是否成功
|
|
"""
|
|
# 保存到本地
|
|
local_save_success = self.save_account(account_info)
|
|
|
|
# 同步到服务器
|
|
server_sync_success = self.sync_to_server(account_info)
|
|
|
|
return local_save_success and server_sync_success
|
|
|
|
def get_account_info(self, email: str) -> Optional[Dict]:
|
|
"""
|
|
获取指定邮箱的账号信息
|
|
|
|
Args:
|
|
email: 邮箱地址
|
|
Returns:
|
|
Optional[Dict]: 账号信息或None
|
|
"""
|
|
try:
|
|
with open(self.accounts_file, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
for account in data["accounts"]:
|
|
if account["email"] == email:
|
|
return account
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logging.error(f"获取账号信息失败: {str(e)}")
|
|
return None |