292 lines
11 KiB
Python
292 lines
11 KiB
Python
from DrissionPage import ChromiumOptions, Chromium
|
||
import sys
|
||
import os
|
||
import logging
|
||
from dotenv import load_dotenv
|
||
import tempfile
|
||
import zipfile
|
||
import string
|
||
|
||
load_dotenv()
|
||
PROXY_HOST = "h464.kdltpspro.com"
|
||
PROXY_PORT = "15818"
|
||
PROXY_USERNAME = "h464" # 代理用户名
|
||
PROXY_PASSWORD = "kdltpspro" # 代理密码
|
||
PROXY_URL = f"http://{PROXY_HOST}:{PROXY_PORT}"
|
||
|
||
|
||
class BrowserManager:
|
||
def __init__(self):
|
||
self.browser = None
|
||
self.current_proxy_info = None
|
||
|
||
|
||
|
||
def create_proxyauth_extension(self, proxy_host, proxy_port, proxy_username, proxy_password, scheme='http', plugin_folder=None):
|
||
"""
|
||
创建Chrome代理认证插件
|
||
"""
|
||
if plugin_folder is None:
|
||
# 创建插件在脚本所在目录
|
||
current_directory = os.path.dirname(os.path.abspath(__file__))
|
||
plugin_folder = os.path.join(current_directory, 'kdl_Chromium_Proxy')
|
||
|
||
# 确保文件夹存在
|
||
if not os.path.exists(plugin_folder):
|
||
os.makedirs(plugin_folder)
|
||
|
||
logging.info(f"创建代理插件,路径: {plugin_folder}")
|
||
|
||
manifest_json = """
|
||
{
|
||
"version": "1.0.0",
|
||
"manifest_version": 2,
|
||
"name": "kdl_Chromium_Proxy",
|
||
"permissions": [
|
||
"proxy",
|
||
"tabs",
|
||
"unlimitedStorage",
|
||
"storage",
|
||
"<all_urls>",
|
||
"webRequest",
|
||
"webRequestBlocking",
|
||
"browsingData"
|
||
],
|
||
"background": {
|
||
"scripts": ["background.js"]
|
||
},
|
||
"minimum_chrome_version":"22.0.0"
|
||
}
|
||
"""
|
||
background_js = string.Template("""
|
||
var config = {
|
||
mode: "fixed_servers",
|
||
rules: {
|
||
singleProxy: {
|
||
scheme: "${scheme}",
|
||
host: "${host}",
|
||
port: parseInt(${port})
|
||
},
|
||
bypassList: []
|
||
}
|
||
};
|
||
|
||
chrome.proxy.settings.set({value: config, scope: "regular"}, function() {});
|
||
|
||
function callbackFn(details) {
|
||
return {
|
||
authCredentials: {
|
||
username: "${username}",
|
||
password: "${password}"
|
||
}
|
||
};
|
||
}
|
||
|
||
chrome.webRequest.onAuthRequired.addListener(
|
||
callbackFn,
|
||
{urls: ["<all_urls>"]},
|
||
['blocking']
|
||
);
|
||
""").substitute(
|
||
host=proxy_host,
|
||
port=proxy_port,
|
||
username=proxy_username,
|
||
password=proxy_password,
|
||
scheme=scheme,
|
||
)
|
||
with open(os.path.join(plugin_folder, "manifest.json"), "w") as manifest_file:
|
||
manifest_file.write(manifest_json)
|
||
with open(os.path.join(plugin_folder, "background.js"), "w") as background_file:
|
||
background_file.write(background_js)
|
||
return plugin_folder
|
||
|
||
def get_proxy(self, use_api=False, mode=False):
|
||
"""
|
||
获取代理配置
|
||
Args:
|
||
use_api: 是否使用API获取代理
|
||
mode: 如果开启 返回格式要改变成 host port username password
|
||
Returns:
|
||
str 或 tuple: 代理URL或(host, port, username, password)元组
|
||
"""
|
||
logging.info(f"获取代理配置, use_api={use_api}, mode={mode}")
|
||
if use_api:
|
||
try:
|
||
# 从API获取代理
|
||
import requests
|
||
logging.info("正在从API获取代理...")
|
||
response = requests.get("https://cursorapi.nosqli.com/admin/api.proxyinfo/getproxyarmybendi")
|
||
#response = requests.get("https://cursorapi.nosqli.com/admin/api.GlobalProxyip/get_proxy")
|
||
|
||
|
||
logging.info(f"API响应状态码: {response.status_code}")
|
||
|
||
if response.status_code == 200:
|
||
proxy_data = response.json()
|
||
logging.info(f"API返回数据: {proxy_data}")
|
||
|
||
if proxy_data.get("code") == 0:
|
||
proxy_info = proxy_data.get("data", {})
|
||
proxy_host = proxy_info.get("host")
|
||
proxy_port = proxy_info.get("port")
|
||
proxy_username = proxy_info.get("username", "")
|
||
proxy_password = proxy_info.get("password", "")
|
||
logging.info(f"解析到代理信息 - host:{proxy_host}, port:{proxy_port}")
|
||
|
||
if proxy_host and proxy_port:
|
||
if mode:
|
||
logging.info(f"使用API代理(mode=True): 返回元组格式")
|
||
return proxy_host, proxy_port, proxy_username, proxy_password
|
||
else:
|
||
proxy_url = f"http://{proxy_host}:{proxy_port}"
|
||
logging.info(f"使用API代理: {proxy_url}")
|
||
return proxy_url
|
||
|
||
logging.warning("从API获取代理失败,使用默认代理配置")
|
||
# 使用默认代理
|
||
if mode:
|
||
# 使用环境变量或常量中的默认值
|
||
proxy_host = os.getenv("PROXY_HOST", PROXY_HOST)
|
||
proxy_port = os.getenv("PROXY_PORT", PROXY_PORT)
|
||
proxy_username = os.getenv("PROXY_USERNAME", PROXY_USERNAME )
|
||
proxy_password = os.getenv("PROXY_PASSWORD", PROXY_PASSWORD)
|
||
logging.info(f"使用默认代理信息(mode=True): host={proxy_host}, port={proxy_port}")
|
||
return proxy_host, proxy_port, proxy_username, proxy_password
|
||
else:
|
||
default_proxy = os.getenv("BROWSER_PROXY", PROXY_URL)
|
||
logging.info(f"使用默认代理: {default_proxy}")
|
||
return default_proxy
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取代理出错: {str(e)}")
|
||
# 发生异常时使用默认代理
|
||
if mode:
|
||
proxy_host = os.getenv("PROXY_HOST", PROXY_HOST)
|
||
proxy_port = os.getenv("PROXY_PORT", PROXY_PORT)
|
||
proxy_username = os.getenv("PROXY_USERNAME", PROXY_USERNAME)
|
||
proxy_password = os.getenv("PROXY_PASSWORD", PROXY_PASSWORD)
|
||
logging.info(f"异常情况下使用默认代理信息(mode=True)")
|
||
return proxy_host, proxy_port, proxy_username, proxy_password
|
||
else:
|
||
default_proxy = os.getenv("BROWSER_PROXY", PROXY_URL)
|
||
logging.info(f"使用默认代理: {default_proxy}")
|
||
return default_proxy
|
||
|
||
# 不使用API时返回默认配置
|
||
if mode:
|
||
proxy_host = os.getenv("PROXY_HOST", PROXY_HOST)
|
||
proxy_port = os.getenv("PROXY_PORT", PROXY_PORT)
|
||
proxy_username = os.getenv("PROXY_USERNAME", PROXY_USERNAME)
|
||
proxy_password = os.getenv("PROXY_PASSWORD", PROXY_PASSWORD)
|
||
logging.info(f"不使用API,返回默认代理信息(mode=True)")
|
||
return proxy_host, proxy_port, proxy_username, proxy_password
|
||
else:
|
||
default_proxy = os.getenv("BROWSER_PROXY", PROXY_URL)
|
||
logging.info(f"不使用API,直接返回默认代理: {default_proxy}")
|
||
return default_proxy
|
||
|
||
def init_browser(self, user_agent=None):
|
||
"""
|
||
初始化浏览器
|
||
Args:
|
||
user_agent: 用户代理字符串
|
||
Returns:
|
||
tuple: (browser实例, 代理信息元组(host, port, username, password))
|
||
"""
|
||
logging.info("正在初始化浏览器...")
|
||
|
||
co = self._get_browser_options(user_agent)
|
||
|
||
logging.info("正在启动临时浏览器...")
|
||
try:
|
||
browser = Chromium(co)
|
||
self.browser = browser
|
||
logging.info("浏览器初始化成功")
|
||
# 返回浏览器实例和代理信息
|
||
return browser, self.current_proxy_info
|
||
except Exception as e:
|
||
logging.error(f"浏览器初始化失败: {str(e)}")
|
||
if "DevToolsActivePort file doesn't exist" in str(e):
|
||
logging.error("可能是Chrome实例未正确关闭,请检查进程")
|
||
raise
|
||
|
||
|
||
def _get_browser_options(self, user_agent=None):
|
||
"""获取浏览器配置"""
|
||
co = ChromiumOptions()
|
||
try:
|
||
extension_path = self._get_extension_path()
|
||
co.add_extension(extension_path)
|
||
except FileNotFoundError as e:
|
||
logging.warning(f"警告: {e}")
|
||
|
||
co.set_pref("credentials_enable_service", False)
|
||
co.set_argument("--hide-crash-restore-bubble")
|
||
|
||
proxy = self.get_proxy(True,True)
|
||
logging.info(f"代理-----: {proxy}")
|
||
|
||
# 在类的__init__方法中声明self.current_proxy_info = None
|
||
if isinstance(proxy, tuple) and len(proxy) == 4:
|
||
self.current_proxy_info = proxy
|
||
else:
|
||
logging.error("代理信息格式错误")
|
||
self.current_proxy_info = None
|
||
|
||
proxy_host, proxy_port, proxy_username, proxy_password = proxy
|
||
try:
|
||
# 使用类方法
|
||
# proxyauth_plugin_folder = BrowserManager.create_proxyauth_extension(
|
||
proxyauth_plugin_folder = self.create_proxyauth_extension(
|
||
proxy_host=proxy_host,
|
||
proxy_port=proxy_port,
|
||
proxy_username=proxy_username,
|
||
proxy_password=proxy_password
|
||
)
|
||
# 使用函数返回的插件文件夹路径而不是硬编码路径
|
||
logging.info(f"代理插件文件夹路径: {proxyauth_plugin_folder}")
|
||
co.add_extension(proxyauth_plugin_folder)
|
||
except Exception as e:
|
||
logging.error(f"创建代理扩展失败: {str(e)}")
|
||
# 如果创建扩展失败,尝试直接设置代理
|
||
proxy_url = f"http://{proxy_username}:{proxy_password}@{proxy_host}:{proxy_port}"
|
||
co.set_proxy(proxy_url)
|
||
logging.info(f"直接设置代理URL: {proxy_url}")
|
||
|
||
co.auto_port()
|
||
if user_agent:
|
||
co.set_user_agent(user_agent)
|
||
|
||
# 设置为有头模式以便观察验证过程
|
||
co.headless(True) # 使用有头模式进行测试
|
||
|
||
# Mac 系统特殊处理
|
||
if sys.platform == "darwin":
|
||
co.set_argument("--no-sandbox")
|
||
co.set_argument("--disable-gpu")
|
||
|
||
return co
|
||
|
||
|
||
|
||
def _get_extension_path(self):
|
||
"""获取插件路径"""
|
||
root_dir = os.getcwd()
|
||
extension_path = os.path.join(root_dir, "turnstilePatch")
|
||
|
||
if hasattr(sys, "_MEIPASS"):
|
||
extension_path = os.path.join(sys._MEIPASS, "turnstilePatch")
|
||
|
||
if not os.path.exists(extension_path):
|
||
raise FileNotFoundError(f"插件不存在: {extension_path}")
|
||
|
||
return extension_path
|
||
|
||
def quit(self):
|
||
"""关闭浏览器"""
|
||
if self.browser:
|
||
try:
|
||
self.browser.quit()
|
||
except:
|
||
pass
|