import asyncio from typing import Any, List, Tuple from loguru import logger from core.config import Config from core.exceptions import TokenGenerationError from services.yescaptcha import TurnstileConfig, YesCaptcha from services.capsolver import Capsolver class TokenPool: def __init__(self, config: Config): self.config = config if config.captcha_config.provider == "capsolver": self.solver = Capsolver( api_key=config.captcha_config.capsolver.api_key, website_url=config.captcha_config.capsolver.website_url, website_key=config.captcha_config.capsolver.website_key ) else: self.turnstile_config = TurnstileConfig( client_key=config.captcha_config.yescaptcha.client_key, website_url=config.captcha_config.yescaptcha.website_url, website_key=config.captcha_config.yescaptcha.website_key, use_cn_server=config.captcha_config.yescaptcha.use_cn_server ) self.solver = YesCaptcha(self.turnstile_config) async def _get_token(self) -> str: """获取单个token""" try: if isinstance(self.solver, Capsolver): # Capsolver 是异步的,直接调用 token = await self.solver.solve_turnstile() else: # YesCaptcha 是同步的,需要转换 token = await asyncio.to_thread(self.solver.solve_turnstile) if not token: raise TokenGenerationError("Failed to get token") return token except Exception as e: logger.error(f"获取 token 失败: {str(e)}") raise TokenGenerationError(f"Failed to get token: {str(e)}") async def get_token_pair(self) -> Tuple[str, str]: """获取一对token""" token1 = await self._get_token() token2 = await self._get_token() return token1, token2 async def batch_generate(self, num: int) -> List[Tuple[str, str]]: """批量生成token对 Args: num: 需要的token对数量 Returns: List[Tuple[str, str]]: token对列表,每个元素是(token1, token2) """ logger.info(f"开始批量生成 {num} 对 token") # 创建所有token获取任务 tasks = [] for _ in range(num * 2): # 每对需要两个token tasks.append(self._get_token()) # 并发执行所有任务 try: tokens = await asyncio.gather(*tasks, return_exceptions=True) # 过滤出成功的token(仅保留字符串类型) valid_tokens = [ token for token in tokens if isinstance(token, str) and token.startswith('0.') ] # 将token分组为对 token_pairs = [] for i in range(0, num * 2, 2): try: pair = (valid_tokens[i], valid_tokens[i+1]) token_pairs.append(pair) except IndexError: logger.error(f"生成token对时索引越界,i={i}, tokens数量={len(valid_tokens)}") break logger.success(f"成功生成 {len(token_pairs)} 对 token") return token_pairs except Exception as e: logger.error(f"批量生成 token 失败: {str(e)}") return []