97 lines
3.5 KiB
Python
97 lines
3.5 KiB
Python
import asyncio
|
||
from typing import Any, List, Tuple
|
||
|
||
from loguru import logger
|
||
|
||
from core.config import Config
|
||
from core.exceptions import TokenGenerationError
|
||
from services.yescaptcha import TurnstileConfig, YesCaptcha
|
||
from services.capsolver import Capsolver
|
||
|
||
|
||
class TokenPool:
|
||
def __init__(self, config: Config):
|
||
self.config = config
|
||
|
||
if config.captcha_config.provider == "capsolver":
|
||
self.solver = Capsolver(
|
||
api_key=config.captcha_config.capsolver.api_key,
|
||
website_url=config.captcha_config.capsolver.website_url,
|
||
website_key=config.captcha_config.capsolver.website_key
|
||
)
|
||
else:
|
||
self.turnstile_config = TurnstileConfig(
|
||
client_key=config.captcha_config.yescaptcha.client_key,
|
||
website_url=config.captcha_config.yescaptcha.website_url,
|
||
website_key=config.captcha_config.yescaptcha.website_key,
|
||
use_cn_server=config.captcha_config.yescaptcha.use_cn_server
|
||
)
|
||
self.solver = YesCaptcha(self.turnstile_config)
|
||
|
||
async def _get_token(self) -> str:
|
||
"""获取单个token"""
|
||
try:
|
||
if isinstance(self.solver, Capsolver):
|
||
# Capsolver 是异步的,直接调用
|
||
token = await self.solver.solve_turnstile()
|
||
else:
|
||
# YesCaptcha 是同步的,需要转换
|
||
token = await asyncio.to_thread(self.solver.solve_turnstile)
|
||
|
||
if not token:
|
||
raise TokenGenerationError("Failed to get token")
|
||
return token
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取 token 失败: {str(e)}")
|
||
raise TokenGenerationError(f"Failed to get token: {str(e)}")
|
||
|
||
async def get_token_pair(self) -> Tuple[str, str]:
|
||
"""获取一对token"""
|
||
token1 = await self._get_token()
|
||
token2 = await self._get_token()
|
||
return token1, token2
|
||
|
||
async def batch_generate(self, num: int) -> List[Tuple[str, str]]:
|
||
"""批量生成token对
|
||
|
||
Args:
|
||
num: 需要的token对数量
|
||
|
||
Returns:
|
||
List[Tuple[str, str]]: token对列表,每个元素是(token1, token2)
|
||
"""
|
||
logger.info(f"开始批量生成 {num} 对 token")
|
||
|
||
# 创建所有token获取任务
|
||
tasks = []
|
||
for _ in range(num * 2): # 每对需要两个token
|
||
tasks.append(self._get_token())
|
||
|
||
# 并发执行所有任务
|
||
try:
|
||
tokens = await asyncio.gather(*tasks, return_exceptions=True)
|
||
|
||
# 过滤出成功的token(仅保留字符串类型)
|
||
valid_tokens = [
|
||
token for token in tokens
|
||
if isinstance(token, str) and token.startswith('0.')
|
||
]
|
||
|
||
# 将token分组为对
|
||
token_pairs = []
|
||
for i in range(0, num * 2, 2):
|
||
try:
|
||
pair = (valid_tokens[i], valid_tokens[i+1])
|
||
token_pairs.append(pair)
|
||
except IndexError:
|
||
logger.error(f"生成token对时索引越界,i={i}, tokens数量={len(valid_tokens)}")
|
||
break
|
||
|
||
logger.success(f"成功生成 {len(token_pairs)} 对 token")
|
||
return token_pairs
|
||
|
||
except Exception as e:
|
||
logger.error(f"批量生成 token 失败: {str(e)}")
|
||
return []
|