168 lines
5.7 KiB
Python
168 lines
5.7 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import requests
|
||
import json
|
||
import os
|
||
from typing import Dict, List, Any, Optional, Union
|
||
|
||
# 禁用代理设置
|
||
os.environ['HTTP_PROXY'] = ''
|
||
os.environ['HTTPS_PROXY'] = ''
|
||
os.environ['http_proxy'] = ''
|
||
os.environ['https_proxy'] = ''
|
||
|
||
class StalwartClient:
|
||
"""Stalwart API客户端"""
|
||
|
||
def __init__(self, base_url: str, api_key: Optional[str] = None, oauth_token: Optional[str] = None):
|
||
"""
|
||
初始化Stalwart API客户端
|
||
|
||
Args:
|
||
base_url: API基础URL,例如 https://mail.example.org/api
|
||
api_key: API密钥(可选)
|
||
oauth_token: OAuth令牌(可选)
|
||
"""
|
||
self.base_url = base_url.rstrip('/')
|
||
self.api_key = api_key
|
||
self.oauth_token = oauth_token
|
||
self.session = requests.Session()
|
||
|
||
# 禁用环境代理设置
|
||
self.session.trust_env = False
|
||
|
||
# 如果提供了API密钥,设置基本认证
|
||
if api_key:
|
||
if ':' in api_key:
|
||
username, password = api_key.split(':', 1)
|
||
self.session.auth = (username, password)
|
||
else:
|
||
# 如果API密钥格式不包含冒号,则使用Bearer认证
|
||
self.session.headers.update({"Authorization": f"Bearer {api_key}"})
|
||
|
||
# 如果提供了OAuth令牌,设置Bearer认证
|
||
if oauth_token:
|
||
self.session.headers.update({"Authorization": f"Bearer {oauth_token}"})
|
||
|
||
def _make_request(self, method: str, endpoint: str, **kwargs) -> Any:
|
||
"""
|
||
发送API请求
|
||
|
||
Args:
|
||
method: HTTP方法(GET, POST, PATCH, DELETE等)
|
||
endpoint: API端点路径
|
||
**kwargs: 传递给requests的其他参数
|
||
|
||
Returns:
|
||
解析后的JSON响应或原始响应
|
||
|
||
Raises:
|
||
requests.HTTPError: 请求失败时引发
|
||
"""
|
||
url = f"{self.base_url}/{endpoint.lstrip('/')}"
|
||
|
||
# 确保默认接受JSON响应
|
||
headers = kwargs.pop('headers', {})
|
||
headers.setdefault('Accept', 'application/json')
|
||
|
||
# 调试信息
|
||
print(f"发送请求: {method} {url}")
|
||
|
||
try:
|
||
response = self.session.request(method, url, headers=headers, **kwargs)
|
||
|
||
# 调试信息
|
||
print(f"响应状态码: {response.status_code}")
|
||
|
||
# 检查请求是否成功
|
||
response.raise_for_status()
|
||
|
||
# 尝试解析JSON响应
|
||
if response.headers.get('content-type', '').startswith('application/json'):
|
||
return response.json()
|
||
|
||
return response
|
||
except requests.exceptions.HTTPError as e:
|
||
print(f"HTTP错误: {e}")
|
||
if hasattr(e.response, 'text'):
|
||
print(f"响应内容: {e.response.text}")
|
||
raise
|
||
except requests.exceptions.ConnectionError as e:
|
||
print(f"连接错误: {e}")
|
||
raise
|
||
except requests.exceptions.Timeout as e:
|
||
print(f"超时错误: {e}")
|
||
raise
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"请求异常: {e}")
|
||
raise
|
||
|
||
# -------------------- OAuth相关方法 --------------------
|
||
|
||
def obtain_oauth_token(self, client_id: str, redirect_uri: str, nonce: str) -> Dict:
|
||
"""获取OAuth令牌"""
|
||
data = {
|
||
"type": "code",
|
||
"client_id": client_id,
|
||
"redirect_uri": redirect_uri,
|
||
"nonce": nonce
|
||
}
|
||
|
||
return self._make_request('POST', '/oauth', json=data)
|
||
|
||
# -------------------- 主体(Principal)相关方法 --------------------
|
||
|
||
def list_principals(self, page: Optional[int] = None, limit: Optional[int] = None,
|
||
types: Optional[str] = None) -> Dict:
|
||
"""列出主体"""
|
||
params = {}
|
||
if page:
|
||
params['page'] = page
|
||
if limit:
|
||
params['limit'] = limit
|
||
if types:
|
||
params['types'] = types
|
||
|
||
return self._make_request('GET', '/principal', params=params)
|
||
|
||
def create_principal(self, principal_data: Dict) -> Dict:
|
||
"""创建主体"""
|
||
return self._make_request('POST', '/principal', json=principal_data)
|
||
|
||
def fetch_principal(self, principal_id: str) -> Dict:
|
||
"""获取主体详情"""
|
||
return self._make_request('GET', f'/principal/{principal_id}')
|
||
|
||
def update_principal(self, principal_id: str, updates: List[Dict]) -> Dict:
|
||
"""更新主体"""
|
||
return self._make_request('PATCH', f'/principal/{principal_id}', json=updates)
|
||
|
||
def delete_principal(self, principal_id: str) -> Dict:
|
||
"""删除主体"""
|
||
return self._make_request('DELETE', f'/principal/{principal_id}')
|
||
|
||
# -------------------- 队列相关方法 --------------------
|
||
|
||
def list_queued_messages(self, page: Optional[int] = None, max_total: Optional[int] = None,
|
||
limit: Optional[int] = None, values: Optional[int] = None) -> Dict:
|
||
"""列出排队消息"""
|
||
params = {}
|
||
if page:
|
||
params['page'] = page
|
||
if max_total:
|
||
params['max-total'] = max_total
|
||
if limit:
|
||
params['limit'] = limit
|
||
if values:
|
||
params['values'] = values
|
||
|
||
return self._make_request('GET', '/queue/messages', params=params)
|
||
|
||
def disable_proxy():
|
||
"""禁用代理设置"""
|
||
import os
|
||
os.environ['HTTP_PROXY'] = ''
|
||
os.environ['HTTPS_PROXY'] = ''
|
||
os.environ['http_proxy'] = ''
|
||
os.environ['https_proxy'] = '' |