Files
auto_cursor/auto_cursor_service.py
huangzhenpc 6d618d36ac xx
2025-04-01 17:25:45 +08:00

476 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Cursor自动化服务
- 监控API控制注册进程
- 自动获取邮箱账号
- 自动上传注册成功的账号
"""
import asyncio
import sys
import json
import signal
import subprocess
import time
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any
import aiohttp
from loguru import logger
# Windows平台特殊处理强制使用SelectorEventLoop
if sys.platform.startswith("win"):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
from core.config import Config
from core.database import DatabaseManager
from import_emails import import_emails
class AutoCursorService:
def __init__(self):
self.config = Config.from_yaml()
self.db_manager = DatabaseManager(self.config)
# API相关
self.api_base_url = "https://cursorapi.nosqli.com/admin/api.AutoCursor"
self.proxy = None
# 获取hostname用于API请求参数
self.hostname = getattr(self.config, "hostname", None)
if not self.hostname:
# 尝试从config.yaml中的server_config获取
server_config = getattr(self.config, "server_config", None)
if server_config:
self.hostname = getattr(server_config, "hostname", "unknown")
else:
self.hostname = "unknown"
logger.warning(f"未在配置中找到hostname使用默认值: {self.hostname}")
# 子进程控制
self.registration_process = None
self.reg_enabled = False
# 从配置中获取自动服务参数
auto_service_config = getattr(self.config, "auto_service_config", None)
if auto_service_config:
self.check_interval = getattr(auto_service_config, "check_interval", 60)
self.upload_interval = getattr(auto_service_config, "upload_interval", 300)
self.email_check_threshold = getattr(auto_service_config, "email_check_threshold", 30)
self.email_fetch_count = getattr(auto_service_config, "email_fetch_count", 2)
else:
self.check_interval = 60 # 检查API状态的间隔
self.upload_interval = 300 # 上传账号间隔(秒)
self.email_check_threshold = 30 # 当可用邮箱少于这个数时获取新邮箱
self.email_fetch_count = 2 # 获取邮箱的次数每次15个
# 处理邮箱的最小数量阈值,只要有这么多邮箱就会立即处理
self.min_email_to_process = 1
# 运行状态控制
self.running = True
# 设置信号处理
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGTERM, self._handle_signal)
def _handle_signal(self, signum, frame):
"""处理信号,优雅关闭服务"""
logger.info(f"收到信号 {signum},准备关闭服务")
self.running = False
# 确保这不是在异步上下文中调用的
if self.registration_process:
logger.info("正在终止注册进程...")
try:
if sys.platform.startswith("win"):
subprocess.run(["taskkill", "/F", "/T", "/PID", str(self.registration_process.pid)])
else:
self.registration_process.terminate()
except Exception as e:
logger.error(f"终止注册进程时出错: {e}")
async def initialize(self):
"""初始化服务"""
logger.info("初始化自动化服务")
await self.db_manager.initialize()
# 检查并设置代理
if hasattr(self.config, "proxy_config") and self.config.proxy_config:
if hasattr(self.config.proxy_config, "api_proxy") and self.config.proxy_config.api_proxy:
self.proxy = self.config.proxy_config.api_proxy
logger.info(f"使用API代理: {self.proxy}")
async def cleanup(self):
"""清理资源"""
logger.info("清理服务资源")
if self.registration_process and self.registration_process.poll() is None:
logger.info("终止注册进程")
try:
if sys.platform.startswith("win"):
subprocess.run(["taskkill", "/F", "/T", "/PID", str(self.registration_process.pid)])
else:
self.registration_process.terminate()
self.registration_process.wait(timeout=5)
except Exception as e:
logger.error(f"终止注册进程时出错: {e}")
await self.db_manager.cleanup()
async def check_registration_status(self) -> bool:
"""检查注册API是否开启
Returns:
bool: True表示开启注册False表示关闭注册
"""
url = f"{self.api_base_url}/regstates"
params = {"hostname": self.hostname}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, proxy=self.proxy, ssl=False) as response:
if response.status != 200:
logger.error(f"API请求失败状态码: {response.status}")
return self.reg_enabled # 保持当前状态
data = await response.json()
if data.get("code") != 0:
logger.error(f"API返回错误: {data.get('msg', 'Unknown error')}")
return self.reg_enabled # 保持当前状态
reg_state = data.get("data", {}).get("reg_state", False)
logger.info(f"注册状态: {'开启' if reg_state else '关闭'}")
return reg_state
except Exception as e:
logger.error(f"检查注册状态时出错: {e}")
return self.reg_enabled # 出错时保持当前状态
async def fetch_email_accounts(self) -> List[Dict[str, str]]:
"""从API获取邮箱账号
Returns:
List[Dict[str, str]]: 邮箱账号列表
"""
url = f"{self.api_base_url}/getemailaccount"
params = {"hostname": self.hostname}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, proxy=self.proxy, ssl=False) as response:
if response.status != 200:
logger.error(f"获取邮箱API请求失败状态码: {response.status}")
return []
data = await response.json()
if data.get("code") != 0:
logger.error(f"获取邮箱API返回错误: {data.get('msg', 'Unknown error')}")
return []
accounts = data.get("data", {}).get("accounts", [])
logger.info(f"成功获取 {len(accounts)} 个邮箱账号")
return accounts
except Exception as e:
logger.error(f"获取邮箱账号时出错: {e}")
return []
async def import_email_accounts(self, accounts: List[Dict[str, str]]) -> int:
"""导入邮箱账号到数据库
Args:
accounts: 邮箱账号列表
Returns:
int: 成功导入的账号数量
"""
if not accounts:
logger.warning("没有邮箱账号可导入")
return 0
count = 0
for account in accounts:
try:
email = account.get("email", "")
password = account.get("password", "")
client_id = account.get("client_id", "")
refresh_token = account.get("refresh_token", "")
if not (email and password and client_id and refresh_token):
logger.warning(f"账号数据不完整: {account}")
continue
# 插入数据库
insert_query = '''
INSERT INTO email_accounts
(email, password, client_id, refresh_token, status)
VALUES (%s, %s, %s, %s, 'pending')
ON DUPLICATE KEY UPDATE
password = VALUES(password),
client_id = VALUES(client_id),
refresh_token = VALUES(refresh_token),
status = 'pending',
updated_at = CURRENT_TIMESTAMP
'''
await self.db_manager.execute(
insert_query,
(email, password, client_id, refresh_token)
)
count += 1
except Exception as e:
logger.error(f"导入邮箱账号时出错: {e}")
logger.success(f"成功导入 {count} 个邮箱账号")
return count
async def count_pending_accounts(self) -> int:
"""统计可用的pending状态账号数量"""
query = """
SELECT COUNT(*)
FROM email_accounts
WHERE status = 'pending' AND in_use = 0 AND sold = 0
"""
result = await self.db_manager.fetch_one(query)
if result:
return result.get("COUNT(*)", 0)
return 0
async def check_and_fetch_emails(self) -> int:
"""检查并获取邮箱账号(如果需要)
Returns:
int: 获取的邮箱账号数量
"""
# 检查当前可用邮箱数量
pending_count = await self.count_pending_accounts()
logger.info(f"当前可用邮箱数量: {pending_count}")
if pending_count >= self.email_check_threshold:
logger.info(f"可用邮箱数量充足 ({pending_count} >= {self.email_check_threshold})")
# 确保注册进程已启动
if pending_count >= self.min_email_to_process and self.reg_enabled:
if not self.registration_process or self.registration_process.poll() is not None:
logger.info("有足够邮箱但注册进程未运行,启动注册进程")
await self.start_registration_process()
return 0
# 需要获取新邮箱
logger.info(f"可用邮箱不足 ({pending_count} < {self.email_check_threshold}),准备获取新邮箱")
total_imported = 0
for i in range(self.email_fetch_count):
accounts = await self.fetch_email_accounts()
if not accounts:
logger.warning(f"{i+1} 次获取邮箱失败或无可用邮箱")
break
imported = await self.import_email_accounts(accounts)
total_imported += imported
# 每次获取到新邮箱并成功导入后,立即确保注册进程在运行
if imported > 0 and self.reg_enabled:
new_count = pending_count + imported
if new_count >= self.min_email_to_process:
logger.info(f"已获取到 {imported} 个新邮箱,总可用邮箱数 {new_count},确保注册进程运行")
if not self.registration_process or self.registration_process.poll() is not None:
await self.start_registration_process()
pending_count = new_count # 更新计数器
if imported < 15: # 每次API应返回15个账号
logger.warning(f"获取到的邮箱少于预期 ({imported} < 15),可能没有更多邮箱可用")
break
if i < self.email_fetch_count - 1:
# 在多次请求之间添加延迟
await asyncio.sleep(2)
return total_imported
async def start_registration_process(self):
"""启动注册进程"""
if self.registration_process and self.registration_process.poll() is None:
logger.info("注册进程已在运行中")
return
# 直接检查可用邮箱数量
pending_count = await self.count_pending_accounts()
if pending_count < self.min_email_to_process:
logger.warning(f"可用邮箱数量不足 ({pending_count} < {self.min_email_to_process}),暂不启动注册进程")
return
logger.info(f"{pending_count} 个可用邮箱,启动注册进程")
try:
# 使用subprocess启动main.py
if sys.platform.startswith("win"):
self.registration_process = subprocess.Popen(
["python", "main.py"],
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP
)
else:
self.registration_process = subprocess.Popen(
["python3", "main.py"]
)
logger.info(f"注册进程已启动PID: {self.registration_process.pid}")
except Exception as e:
logger.error(f"启动注册进程时出错: {e}")
self.registration_process = None
def stop_registration_process(self):
"""停止注册进程"""
if not self.registration_process or self.registration_process.poll() is not None:
logger.info("注册进程未在运行")
self.registration_process = None
return
logger.info("停止注册进程")
try:
if sys.platform.startswith("win"):
# Windows下使用taskkill
subprocess.run(["taskkill", "/F", "/T", "/PID", str(self.registration_process.pid)])
else:
# Linux/Mac下使用terminate
self.registration_process.terminate()
self.registration_process.wait(timeout=5)
logger.info("注册进程已停止")
except Exception as e:
logger.error(f"停止注册进程时出错: {e}")
self.registration_process = None
async def upload_accounts(self):
"""上传已注册成功的账号"""
logger.info("开始上传注册成功的账号")
try:
# 使用subprocess运行upload_account.py
if sys.platform.startswith("win"):
process = subprocess.Popen(
["python", "upload_account.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
creationflags=subprocess.CREATE_NO_WINDOW
)
else:
process = subprocess.Popen(
["python3", "upload_account.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# 等待进程完成
stdout, stderr = process.communicate()
if process.returncode != 0:
logger.error(f"上传账号进程异常终止,退出码: {process.returncode}")
if stderr:
logger.error(f"错误输出: {stderr.decode('utf-8', errors='ignore')}")
else:
logger.info("账号上传完成")
if stdout:
# 只记录最后几行输出
output_lines = stdout.decode('utf-8', errors='ignore').strip().split('\n')
for line in output_lines[-5:]:
logger.info(f"Upload: {line}")
except Exception as e:
logger.error(f"执行账号上传脚本时出错: {e}")
async def run(self):
"""运行服务主循环"""
logger.info("启动Cursor自动化服务")
last_upload_time = 0
while self.running:
try:
# 1. 检查注册API状态
current_status = await self.check_registration_status()
# 状态变化处理
if current_status != self.reg_enabled:
self.reg_enabled = current_status
logger.info(f"注册状态变化为: {'开启' if self.reg_enabled else '关闭'}")
if self.reg_enabled:
# 开启注册时,先检查并获取邮箱
await self.check_and_fetch_emails()
# 检查是否有可用邮箱,有则启动注册进程
pending_count = await self.count_pending_accounts()
if pending_count >= self.min_email_to_process:
logger.info(f"{pending_count} 个可用邮箱,启动注册进程")
await self.start_registration_process()
else:
# 关闭注册时,停止注册进程
self.stop_registration_process()
# 2. 如果注册已开启,检查并获取邮箱
if self.reg_enabled:
await self.check_and_fetch_emails()
# 确保注册进程正在运行
if not self.registration_process or self.registration_process.poll() is not None:
pending_count = await self.count_pending_accounts()
if pending_count >= self.min_email_to_process:
logger.info(f"发现 {pending_count} 个未处理的邮箱,重新启动注册进程")
await self.start_registration_process()
# 3. 定期上传账号
current_time = time.time()
if current_time - last_upload_time >= self.upload_interval:
await self.upload_accounts()
last_upload_time = current_time
# 4. 等待下一次检查
for _ in range(self.check_interval):
if not self.running:
break
await asyncio.sleep(1)
except Exception as e:
logger.error(f"服务运行时出错: {e}")
await asyncio.sleep(30) # 出错后等待30秒再继续
logger.info("服务已停止")
@classmethod
async def start_service(cls):
"""启动服务的静态方法"""
service = cls()
await service.initialize()
try:
await service.run()
finally:
await service.cleanup()
async def main():
"""主函数"""
# 设置日志
logger.remove()
logger.add(sys.stderr, level="INFO")
logger.add(
"auto_cursor_service.log",
rotation="50 MB",
retention="10 days",
level="DEBUG",
compression="zip"
)
logger.info(f"Cursor自动化服务启动于: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
try:
await AutoCursorService.start_service()
except Exception as e:
logger.error(f"服务异常终止: {e}")
import traceback
logger.error(traceback.format_exc())
if __name__ == "__main__":
if sys.platform.startswith("win"):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())