chore: 清理一些无用的文件

This commit is contained in:
shaw
2026-03-04 10:15:42 +08:00
parent 72961c5858
commit 0aa3cf677a
27 changed files with 5 additions and 4129 deletions

View File

@@ -1,247 +0,0 @@
#!/usr/bin/env python3
import argparse
import json
import sys
from datetime import date
HIGH_SEVERITIES = {"high", "critical"}
REQUIRED_FIELDS = {"package", "advisory", "severity", "mitigation", "expires_on"}
def split_kv(line: str) -> tuple[str, str]:
# 解析 "key: value" 形式的简单 YAML 行,并去除引号。
key, value = line.split(":", 1)
value = value.strip()
if (value.startswith('"') and value.endswith('"')) or (
value.startswith("'") and value.endswith("'")
):
value = value[1:-1]
return key.strip(), value
def parse_exceptions(path: str) -> list[dict]:
# 轻量解析异常清单,避免引入额外依赖。
exceptions = []
current = None
with open(path, "r", encoding="utf-8") as handle:
for raw in handle:
line = raw.strip()
if not line or line.startswith("#"):
continue
if line.startswith("version:") or line.startswith("exceptions:"):
continue
if line.startswith("- "):
if current:
exceptions.append(current)
current = {}
line = line[2:].strip()
if line:
key, value = split_kv(line)
current[key] = value
continue
if current is not None and ":" in line:
key, value = split_kv(line)
current[key] = value
if current:
exceptions.append(current)
return exceptions
def pick_advisory_id(advisory: dict) -> str | None:
# 优先使用可稳定匹配的标识GHSA/URL/CVE避免误匹配到其他同名漏洞。
return (
advisory.get("github_advisory_id")
or advisory.get("url")
or (advisory.get("cves") or [None])[0]
or (str(advisory.get("id")) if advisory.get("id") is not None else None)
or advisory.get("title")
or advisory.get("advisory")
or advisory.get("overview")
)
def iter_vulns(data: dict):
# 兼容 pnpm audit 的不同输出结构advisories / vulnerabilities并提取 advisory 标识。
advisories = data.get("advisories")
if isinstance(advisories, dict):
for advisory in advisories.values():
name = advisory.get("module_name") or advisory.get("name")
severity = advisory.get("severity")
advisory_id = pick_advisory_id(advisory)
title = (
advisory.get("title")
or advisory.get("advisory")
or advisory.get("overview")
or advisory.get("url")
)
yield name, severity, advisory_id, title
vulnerabilities = data.get("vulnerabilities")
if isinstance(vulnerabilities, dict):
for name, vuln in vulnerabilities.items():
severity = vuln.get("severity")
via = vuln.get("via", [])
titles = []
advisories = []
if isinstance(via, list):
for item in via:
if isinstance(item, dict):
advisories.append(
item.get("github_advisory_id")
or item.get("url")
or item.get("source")
or item.get("title")
or item.get("name")
)
titles.append(
item.get("title")
or item.get("url")
or item.get("advisory")
or item.get("source")
)
elif isinstance(item, str):
advisories.append(item)
titles.append(item)
elif isinstance(via, str):
advisories.append(via)
titles.append(via)
title = "; ".join([t for t in titles if t])
for advisory_id in [a for a in advisories if a]:
yield name, severity, advisory_id, title
def normalize_severity(severity: str) -> str:
# 统一大小写,避免比较失败。
return (severity or "").strip().lower()
def normalize_package(name: str) -> str:
# 包名只去掉首尾空白,保留原始大小写,同时兼容非字符串输入。
if name is None:
return ""
return str(name).strip()
def normalize_advisory(advisory: str) -> str:
# advisory 统一为小写匹配,避免 GHSA/URL 因大小写差异导致漏匹配。
# pnpm 的 source 字段可能是数字,这里统一转为字符串以保证可比较。
if advisory is None:
return ""
return str(advisory).strip().lower()
def parse_date(value: str) -> date | None:
# 仅接受 ISO8601 日期格式,非法值视为无效。
try:
return date.fromisoformat(value)
except ValueError:
return None
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--audit", required=True)
parser.add_argument("--exceptions", required=True)
args = parser.parse_args()
with open(args.audit, "r", encoding="utf-8") as handle:
audit = json.load(handle)
# 读取异常清单并建立索引,便于快速匹配包名 + advisory。
exceptions = parse_exceptions(args.exceptions)
exception_index = {}
errors = []
for exc in exceptions:
missing = [field for field in REQUIRED_FIELDS if not exc.get(field)]
if missing:
errors.append(
f"Exception missing required fields {missing}: {exc.get('package', '<unknown>')}"
)
continue
exc_severity = normalize_severity(exc.get("severity"))
exc_package = normalize_package(exc.get("package"))
exc_advisory = normalize_advisory(exc.get("advisory"))
exc_date = parse_date(exc.get("expires_on"))
if exc_date is None:
errors.append(
f"Exception has invalid expires_on date: {exc.get('package', '<unknown>')}"
)
continue
if not exc_package or not exc_advisory:
errors.append("Exception missing package or advisory value")
continue
key = (exc_package, exc_advisory)
if key in exception_index:
errors.append(
f"Duplicate exception for {exc_package} advisory {exc.get('advisory')}"
)
continue
exception_index[key] = {
"raw": exc,
"severity": exc_severity,
"expires_on": exc_date,
}
today = date.today()
missing_exceptions = []
expired_exceptions = []
# 去重处理:同一包名 + advisory 可能在不同字段重复出现。
seen = set()
for name, severity, advisory_id, title in iter_vulns(audit):
sev = normalize_severity(severity)
if sev not in HIGH_SEVERITIES or not name:
continue
advisory_key = normalize_advisory(advisory_id)
if not advisory_key:
errors.append(
f"High/Critical vulnerability missing advisory id: {name} ({sev})"
)
continue
key = (normalize_package(name), advisory_key)
if key in seen:
continue
seen.add(key)
exc = exception_index.get(key)
if exc is None:
missing_exceptions.append((name, sev, advisory_id, title))
continue
if exc["severity"] and exc["severity"] != sev:
errors.append(
"Exception severity mismatch: "
f"{name} ({advisory_id}) expected {sev}, got {exc['severity']}"
)
if exc["expires_on"] and exc["expires_on"] < today:
expired_exceptions.append(
(name, sev, advisory_id, exc["expires_on"].isoformat())
)
if missing_exceptions:
errors.append("High/Critical vulnerabilities missing exceptions:")
for name, sev, advisory_id, title in missing_exceptions:
label = f"{name} ({sev})"
if advisory_id:
label = f"{label} [{advisory_id}]"
if title:
label = f"{label}: {title}"
errors.append(f"- {label}")
if expired_exceptions:
errors.append("Exceptions expired:")
for name, sev, advisory_id, expires_on in expired_exceptions:
errors.append(
f"- {name} ({sev}) [{advisory_id}] expired on {expires_on}"
)
if errors:
sys.stderr.write("\n".join(errors) + "\n")
return 1
print("Audit exceptions validated.")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,164 +0,0 @@
#!/usr/bin/env python3
"""OpenAI OAuth 灰度发布演练脚本(本地模拟)。
该脚本会启动本地 mock Ops API调用 openai_oauth_gray_guard.py
验证以下场景:
1) A/B/C/D 四个灰度批次均通过
2) 注入异常场景触发阈值告警并返回退出码 2模拟自动回滚触发
"""
from __future__ import annotations
import json
import subprocess
import threading
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from typing import Dict, Tuple
from urllib.parse import parse_qs, urlparse
ROOT = Path(__file__).resolve().parents[2]
GUARD_SCRIPT = ROOT / "tools" / "perf" / "openai_oauth_gray_guard.py"
REPORT_PATH = ROOT / "docs" / "perf" / "openai-oauth-gray-drill-report.md"
THRESHOLDS = {
"sla_percent_min": 99.5,
"ttft_p99_ms_max": 900,
"request_error_rate_percent_max": 2.0,
"upstream_error_rate_percent_max": 2.0,
}
STAGE_SNAPSHOTS: Dict[str, Dict[str, float]] = {
"A": {"sla": 99.78, "ttft": 780, "error_rate": 1.20, "upstream_error_rate": 1.05},
"B": {"sla": 99.82, "ttft": 730, "error_rate": 1.05, "upstream_error_rate": 0.92},
"C": {"sla": 99.86, "ttft": 680, "error_rate": 0.88, "upstream_error_rate": 0.80},
"D": {"sla": 99.89, "ttft": 640, "error_rate": 0.72, "upstream_error_rate": 0.67},
"rollback": {"sla": 97.10, "ttft": 1550, "error_rate": 6.30, "upstream_error_rate": 5.60},
}
class _MockHandler(BaseHTTPRequestHandler):
def _write_json(self, payload: dict) -> None:
raw = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(raw)))
self.end_headers()
self.wfile.write(raw)
def log_message(self, format: str, *args): # noqa: A003
return
def do_GET(self): # noqa: N802
parsed = urlparse(self.path)
if parsed.path.endswith("/api/v1/admin/ops/settings/metric-thresholds"):
self._write_json({"code": 0, "message": "success", "data": THRESHOLDS})
return
if parsed.path.endswith("/api/v1/admin/ops/dashboard/overview"):
q = parse_qs(parsed.query)
stage = (q.get("group_id") or ["A"])[0]
snapshot = STAGE_SNAPSHOTS.get(stage, STAGE_SNAPSHOTS["A"])
self._write_json(
{
"code": 0,
"message": "success",
"data": {
"sla": snapshot["sla"],
"error_rate": snapshot["error_rate"],
"upstream_error_rate": snapshot["upstream_error_rate"],
"ttft": {"p99_ms": snapshot["ttft"]},
},
}
)
return
self.send_response(404)
self.end_headers()
def run_guard(base_url: str, stage: str) -> Tuple[int, str]:
cmd = [
"python",
str(GUARD_SCRIPT),
"--base-url",
base_url,
"--platform",
"openai",
"--time-range",
"30m",
"--group-id",
stage,
]
proc = subprocess.run(cmd, cwd=str(ROOT), capture_output=True, text=True)
output = (proc.stdout + "\n" + proc.stderr).strip()
return proc.returncode, output
def main() -> int:
server = HTTPServer(("127.0.0.1", 0), _MockHandler)
host, port = server.server_address
base_url = f"http://{host}:{port}"
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
lines = [
"# OpenAI OAuth 灰度守护演练报告",
"",
"> 类型:本地 mock 演练(用于验证灰度守护与回滚触发机制)",
f"> 生成脚本:`tools/perf/openai_oauth_gray_drill.py`",
"",
"## 1. 灰度批次结果6.1",
"",
"| 批次 | 流量比例 | 守护脚本退出码 | 结果 |",
"|---|---:|---:|---|",
]
batch_plan = [("A", "5%"), ("B", "20%"), ("C", "50%"), ("D", "100%")]
all_pass = True
for stage, ratio in batch_plan:
code, _ = run_guard(base_url, stage)
ok = code == 0
all_pass = all_pass and ok
lines.append(f"| {stage} | {ratio} | {code} | {'通过' if ok else '失败'} |")
lines.extend([
"",
"## 2. 回滚触发演练6.2",
"",
])
rollback_code, rollback_output = run_guard(base_url, "rollback")
rollback_triggered = rollback_code == 2
lines.append(f"- 注入异常场景退出码:`{rollback_code}`")
lines.append(f"- 是否触发回滚条件:`{'' if rollback_triggered else ''}`")
lines.append("- 关键信息摘录:")
excerpt = "\n".join(rollback_output.splitlines()[:8])
lines.append("```text")
lines.append(excerpt)
lines.append("```")
lines.extend([
"",
"## 3. 验收结论6.3",
"",
f"- 批次灰度结果:`{'通过' if all_pass else '不通过'}`",
f"- 回滚触发机制:`{'通过' if rollback_triggered else '不通过'}`",
f"- 结论:`{'通过(可进入真实环境灰度)' if all_pass and rollback_triggered else '不通过(需修复后复测)'}`",
])
REPORT_PATH.parent.mkdir(parents=True, exist_ok=True)
REPORT_PATH.write_text("\n".join(lines) + "\n", encoding="utf-8")
server.shutdown()
server.server_close()
print(f"drill report generated: {REPORT_PATH}")
return 0 if all_pass and rollback_triggered else 1
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,213 +0,0 @@
#!/usr/bin/env python3
"""OpenAI OAuth 灰度阈值守护脚本。
用途:
- 拉取 Ops 指标阈值配置与 Dashboard Overview 实时数据
- 对比 P99 TTFT / 错误率 / SLA
- 作为 6.2 灰度守护的自动化门禁(退出码可直接用于 CI/CD
退出码:
- 0: 指标通过
- 1: 请求失败/参数错误
- 2: 指标超阈值(建议停止扩量并回滚)
"""
from __future__ import annotations
import argparse
import json
import sys
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
@dataclass
class GuardThresholds:
sla_percent_min: Optional[float]
ttft_p99_ms_max: Optional[float]
request_error_rate_percent_max: Optional[float]
upstream_error_rate_percent_max: Optional[float]
@dataclass
class GuardSnapshot:
sla: Optional[float]
ttft_p99_ms: Optional[float]
request_error_rate_percent: Optional[float]
upstream_error_rate_percent: Optional[float]
def build_headers(token: str) -> Dict[str, str]:
headers = {"Accept": "application/json"}
if token.strip():
headers["Authorization"] = f"Bearer {token.strip()}"
return headers
def request_json(url: str, headers: Dict[str, str]) -> Dict[str, Any]:
req = urllib.request.Request(url=url, method="GET", headers=headers)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
raw = resp.read().decode("utf-8")
return json.loads(raw)
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {e.code}: {body}") from e
except urllib.error.URLError as e:
raise RuntimeError(f"request failed: {e}") from e
def parse_envelope_data(payload: Dict[str, Any]) -> Dict[str, Any]:
if not isinstance(payload, dict):
raise RuntimeError("invalid response payload")
if payload.get("code") != 0:
raise RuntimeError(f"api error: code={payload.get('code')} message={payload.get('message')}")
data = payload.get("data")
if not isinstance(data, dict):
raise RuntimeError("invalid response data")
return data
def parse_thresholds(data: Dict[str, Any]) -> GuardThresholds:
return GuardThresholds(
sla_percent_min=to_float_or_none(data.get("sla_percent_min")),
ttft_p99_ms_max=to_float_or_none(data.get("ttft_p99_ms_max")),
request_error_rate_percent_max=to_float_or_none(data.get("request_error_rate_percent_max")),
upstream_error_rate_percent_max=to_float_or_none(data.get("upstream_error_rate_percent_max")),
)
def parse_snapshot(data: Dict[str, Any]) -> GuardSnapshot:
ttft = data.get("ttft") if isinstance(data.get("ttft"), dict) else {}
return GuardSnapshot(
sla=to_float_or_none(data.get("sla")),
ttft_p99_ms=to_float_or_none(ttft.get("p99_ms")),
request_error_rate_percent=to_float_or_none(data.get("error_rate")),
upstream_error_rate_percent=to_float_or_none(data.get("upstream_error_rate")),
)
def to_float_or_none(v: Any) -> Optional[float]:
if v is None:
return None
try:
return float(v)
except (TypeError, ValueError):
return None
def evaluate(snapshot: GuardSnapshot, thresholds: GuardThresholds) -> List[str]:
violations: List[str] = []
if thresholds.sla_percent_min is not None and snapshot.sla is not None:
if snapshot.sla < thresholds.sla_percent_min:
violations.append(
f"SLA 低于阈值: actual={snapshot.sla:.2f}% threshold={thresholds.sla_percent_min:.2f}%"
)
if thresholds.ttft_p99_ms_max is not None and snapshot.ttft_p99_ms is not None:
if snapshot.ttft_p99_ms > thresholds.ttft_p99_ms_max:
violations.append(
f"TTFT P99 超阈值: actual={snapshot.ttft_p99_ms:.2f}ms threshold={thresholds.ttft_p99_ms_max:.2f}ms"
)
if (
thresholds.request_error_rate_percent_max is not None
and snapshot.request_error_rate_percent is not None
and snapshot.request_error_rate_percent > thresholds.request_error_rate_percent_max
):
violations.append(
"请求错误率超阈值: "
f"actual={snapshot.request_error_rate_percent:.2f}% "
f"threshold={thresholds.request_error_rate_percent_max:.2f}%"
)
if (
thresholds.upstream_error_rate_percent_max is not None
and snapshot.upstream_error_rate_percent is not None
and snapshot.upstream_error_rate_percent > thresholds.upstream_error_rate_percent_max
):
violations.append(
"上游错误率超阈值: "
f"actual={snapshot.upstream_error_rate_percent:.2f}% "
f"threshold={thresholds.upstream_error_rate_percent_max:.2f}%"
)
return violations
def main() -> int:
parser = argparse.ArgumentParser(description="OpenAI OAuth 灰度阈值守护")
parser.add_argument("--base-url", required=True, help="服务地址,例如 http://127.0.0.1:5231")
parser.add_argument("--admin-token", default="", help="Admin JWT可选按部署策略")
parser.add_argument("--platform", default="openai", help="平台过滤,默认 openai")
parser.add_argument("--time-range", default="30m", help="时间窗口: 5m/30m/1h/6h/24h/7d/30d")
parser.add_argument("--group-id", default="", help="可选 group_id")
args = parser.parse_args()
base = args.base_url.rstrip("/")
headers = build_headers(args.admin_token)
try:
threshold_url = f"{base}/api/v1/admin/ops/settings/metric-thresholds"
thresholds_raw = request_json(threshold_url, headers)
thresholds = parse_thresholds(parse_envelope_data(thresholds_raw))
query = {"platform": args.platform, "time_range": args.time_range}
if args.group_id.strip():
query["group_id"] = args.group_id.strip()
overview_url = (
f"{base}/api/v1/admin/ops/dashboard/overview?"
+ urllib.parse.urlencode(query)
)
overview_raw = request_json(overview_url, headers)
snapshot = parse_snapshot(parse_envelope_data(overview_raw))
print("[OpenAI OAuth Gray Guard] 当前快照:")
print(
json.dumps(
{
"sla": snapshot.sla,
"ttft_p99_ms": snapshot.ttft_p99_ms,
"request_error_rate_percent": snapshot.request_error_rate_percent,
"upstream_error_rate_percent": snapshot.upstream_error_rate_percent,
},
ensure_ascii=False,
indent=2,
)
)
print("[OpenAI OAuth Gray Guard] 阈值配置:")
print(
json.dumps(
{
"sla_percent_min": thresholds.sla_percent_min,
"ttft_p99_ms_max": thresholds.ttft_p99_ms_max,
"request_error_rate_percent_max": thresholds.request_error_rate_percent_max,
"upstream_error_rate_percent_max": thresholds.upstream_error_rate_percent_max,
},
ensure_ascii=False,
indent=2,
)
)
violations = evaluate(snapshot, thresholds)
if violations:
print("[OpenAI OAuth Gray Guard] 检测到阈值违例:")
for idx, line in enumerate(violations, start=1):
print(f" {idx}. {line}")
print("[OpenAI OAuth Gray Guard] 建议:停止扩量并执行回滚。")
return 2
print("[OpenAI OAuth Gray Guard] 指标通过,可继续观察或按计划扩量。")
return 0
except Exception as exc:
print(f"[OpenAI OAuth Gray Guard] 执行失败: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,122 +0,0 @@
import http from 'k6/http';
import { check } from 'k6';
import { Rate, Trend } from 'k6/metrics';
const baseURL = __ENV.BASE_URL || 'http://127.0.0.1:5231';
const apiKey = __ENV.API_KEY || '';
const model = __ENV.MODEL || 'gpt-5';
const timeout = __ENV.TIMEOUT || '180s';
const nonStreamRPS = Number(__ENV.NON_STREAM_RPS || 8);
const streamRPS = Number(__ENV.STREAM_RPS || 4);
const duration = __ENV.DURATION || '3m';
const preAllocatedVUs = Number(__ENV.PRE_ALLOCATED_VUS || 30);
const maxVUs = Number(__ENV.MAX_VUS || 200);
const reqDurationMs = new Trend('openai_oauth_req_duration_ms', true);
const ttftMs = new Trend('openai_oauth_ttft_ms', true);
const non2xxRate = new Rate('openai_oauth_non2xx_rate');
const streamDoneRate = new Rate('openai_oauth_stream_done_rate');
export const options = {
scenarios: {
non_stream: {
executor: 'constant-arrival-rate',
rate: nonStreamRPS,
timeUnit: '1s',
duration,
preAllocatedVUs,
maxVUs,
exec: 'runNonStream',
tags: { request_type: 'non_stream' },
},
stream: {
executor: 'constant-arrival-rate',
rate: streamRPS,
timeUnit: '1s',
duration,
preAllocatedVUs,
maxVUs,
exec: 'runStream',
tags: { request_type: 'stream' },
},
},
thresholds: {
openai_oauth_non2xx_rate: ['rate<0.01'],
openai_oauth_req_duration_ms: ['p(95)<3000', 'p(99)<6000'],
openai_oauth_ttft_ms: ['p(99)<1200'],
openai_oauth_stream_done_rate: ['rate>0.99'],
},
};
function buildHeaders() {
const headers = {
'Content-Type': 'application/json',
'User-Agent': 'codex_cli_rs/0.1.0',
};
if (apiKey) {
headers.Authorization = `Bearer ${apiKey}`;
}
return headers;
}
function buildBody(stream) {
return JSON.stringify({
model,
stream,
input: [
{
role: 'user',
content: [
{
type: 'input_text',
text: '请返回一句极短的话pong',
},
],
},
],
max_output_tokens: 32,
});
}
function recordMetrics(res, stream) {
reqDurationMs.add(res.timings.duration, { request_type: stream ? 'stream' : 'non_stream' });
ttftMs.add(res.timings.waiting, { request_type: stream ? 'stream' : 'non_stream' });
non2xxRate.add(res.status < 200 || res.status >= 300, { request_type: stream ? 'stream' : 'non_stream' });
if (stream) {
const done = !!res.body && res.body.indexOf('[DONE]') >= 0;
streamDoneRate.add(done, { request_type: 'stream' });
}
}
function postResponses(stream) {
const url = `${baseURL}/v1/responses`;
const res = http.post(url, buildBody(stream), {
headers: buildHeaders(),
timeout,
tags: { endpoint: '/v1/responses', request_type: stream ? 'stream' : 'non_stream' },
});
check(res, {
'status is 2xx': (r) => r.status >= 200 && r.status < 300,
});
recordMetrics(res, stream);
return res;
}
export function runNonStream() {
postResponses(false);
}
export function runStream() {
postResponses(true);
}
export function handleSummary(data) {
return {
stdout: `\nOpenAI OAuth /v1/responses 基线完成\n${JSON.stringify(data.metrics, null, 2)}\n`,
'docs/perf/openai-oauth-k6-summary.json': JSON.stringify(data, null, 2),
};
}

View File

@@ -1,167 +0,0 @@
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate, Trend } from 'k6/metrics';
const baseURL = (__ENV.BASE_URL || 'http://127.0.0.1:5231').replace(/\/$/, '');
const httpAPIKey = (__ENV.HTTP_API_KEY || '').trim();
const wsAPIKey = (__ENV.WS_API_KEY || '').trim();
const model = __ENV.MODEL || 'gpt-5.1';
const duration = __ENV.DURATION || '5m';
const timeout = __ENV.TIMEOUT || '180s';
const httpRPS = Number(__ENV.HTTP_RPS || 10);
const wsRPS = Number(__ENV.WS_RPS || 10);
const chainRPS = Number(__ENV.CHAIN_RPS || 1);
const chainRounds = Number(__ENV.CHAIN_ROUNDS || 20);
const preAllocatedVUs = Number(__ENV.PRE_ALLOCATED_VUS || 40);
const maxVUs = Number(__ENV.MAX_VUS || 300);
const httpDurationMs = new Trend('openai_http_req_duration_ms', true);
const wsDurationMs = new Trend('openai_ws_req_duration_ms', true);
const wsChainDurationMs = new Trend('openai_ws_chain_round_duration_ms', true);
const wsChainTTFTMs = new Trend('openai_ws_chain_round_ttft_ms', true);
const httpNon2xxRate = new Rate('openai_http_non2xx_rate');
const wsNon2xxRate = new Rate('openai_ws_non2xx_rate');
const wsChainRoundSuccessRate = new Rate('openai_ws_chain_round_success_rate');
export const options = {
scenarios: {
http_baseline: {
executor: 'constant-arrival-rate',
exec: 'runHTTPBaseline',
rate: httpRPS,
timeUnit: '1s',
duration,
preAllocatedVUs,
maxVUs,
tags: { path: 'http_baseline' },
},
ws_baseline: {
executor: 'constant-arrival-rate',
exec: 'runWSBaseline',
rate: wsRPS,
timeUnit: '1s',
duration,
preAllocatedVUs,
maxVUs,
tags: { path: 'ws_baseline' },
},
ws_chain_20_rounds: {
executor: 'constant-arrival-rate',
exec: 'runWSChain20Rounds',
rate: chainRPS,
timeUnit: '1s',
duration,
preAllocatedVUs: Math.max(2, Math.ceil(chainRPS * 2)),
maxVUs: Math.max(20, Math.ceil(chainRPS * 10)),
tags: { path: 'ws_chain_20_rounds' },
},
},
thresholds: {
openai_http_non2xx_rate: ['rate<0.02'],
openai_ws_non2xx_rate: ['rate<0.02'],
openai_http_req_duration_ms: ['p(95)<4000', 'p(99)<7000'],
openai_ws_req_duration_ms: ['p(95)<3000', 'p(99)<6000'],
openai_ws_chain_round_success_rate: ['rate>0.98'],
openai_ws_chain_round_ttft_ms: ['p(99)<1200'],
},
};
function buildHeaders(apiKey) {
const headers = {
'Content-Type': 'application/json',
'User-Agent': 'codex_cli_rs/0.98.0',
};
if (apiKey) {
headers.Authorization = `Bearer ${apiKey}`;
}
return headers;
}
function buildBody(previousResponseID) {
const body = {
model,
stream: false,
input: [
{
role: 'user',
content: [{ type: 'input_text', text: '请回复一个单词: pong' }],
},
],
max_output_tokens: 64,
};
if (previousResponseID) {
body.previous_response_id = previousResponseID;
}
return JSON.stringify(body);
}
function postResponses(apiKey, body, tags) {
const res = http.post(`${baseURL}/v1/responses`, body, {
headers: buildHeaders(apiKey),
timeout,
tags,
});
check(res, {
'status is 2xx': (r) => r.status >= 200 && r.status < 300,
});
return res;
}
function parseResponseID(res) {
if (!res || !res.body) {
return '';
}
try {
const payload = JSON.parse(res.body);
if (payload && typeof payload.id === 'string') {
return payload.id.trim();
}
} catch (_) {
return '';
}
return '';
}
export function runHTTPBaseline() {
const res = postResponses(httpAPIKey, buildBody(''), { transport: 'http' });
httpDurationMs.add(res.timings.duration, { transport: 'http' });
httpNon2xxRate.add(res.status < 200 || res.status >= 300, { transport: 'http' });
}
export function runWSBaseline() {
const res = postResponses(wsAPIKey, buildBody(''), { transport: 'ws_v2' });
wsDurationMs.add(res.timings.duration, { transport: 'ws_v2' });
wsNon2xxRate.add(res.status < 200 || res.status >= 300, { transport: 'ws_v2' });
}
// 20+ 轮续链专项,验证 previous_response_id 在长链下的稳定性与时延。
export function runWSChain20Rounds() {
let previousResponseID = '';
for (let round = 1; round <= chainRounds; round += 1) {
const roundStart = Date.now();
const res = postResponses(wsAPIKey, buildBody(previousResponseID), { transport: 'ws_v2_chain' });
const ok = res.status >= 200 && res.status < 300;
wsChainRoundSuccessRate.add(ok, { round: `${round}` });
wsChainDurationMs.add(Date.now() - roundStart, { round: `${round}` });
wsChainTTFTMs.add(res.timings.waiting, { round: `${round}` });
wsNon2xxRate.add(!ok, { transport: 'ws_v2_chain' });
if (!ok) {
return;
}
const respID = parseResponseID(res);
if (!respID) {
wsChainRoundSuccessRate.add(false, { round: `${round}`, reason: 'missing_response_id' });
return;
}
previousResponseID = respID;
sleep(0.01);
}
}
export function handleSummary(data) {
return {
stdout: `\nOpenAI WSv2 对比压测完成\n${JSON.stringify(data.metrics, null, 2)}\n`,
'docs/perf/openai-ws-v2-compare-summary.json': JSON.stringify(data, null, 2),
};
}

View File

@@ -1,123 +0,0 @@
import http from 'k6/http';
import { check } from 'k6';
import { Rate, Trend } from 'k6/metrics';
const pooledBaseURL = (__ENV.POOLED_BASE_URL || 'http://127.0.0.1:5231').replace(/\/$/, '');
const oneToOneBaseURL = (__ENV.ONE_TO_ONE_BASE_URL || '').replace(/\/$/, '');
const wsAPIKey = (__ENV.WS_API_KEY || '').trim();
const model = __ENV.MODEL || 'gpt-5.1';
const timeout = __ENV.TIMEOUT || '180s';
const duration = __ENV.DURATION || '5m';
const pooledRPS = Number(__ENV.POOLED_RPS || 12);
const oneToOneRPS = Number(__ENV.ONE_TO_ONE_RPS || 12);
const preAllocatedVUs = Number(__ENV.PRE_ALLOCATED_VUS || 50);
const maxVUs = Number(__ENV.MAX_VUS || 400);
const pooledDurationMs = new Trend('openai_ws_pooled_duration_ms', true);
const oneToOneDurationMs = new Trend('openai_ws_one_to_one_duration_ms', true);
const pooledTTFTMs = new Trend('openai_ws_pooled_ttft_ms', true);
const oneToOneTTFTMs = new Trend('openai_ws_one_to_one_ttft_ms', true);
const pooledNon2xxRate = new Rate('openai_ws_pooled_non2xx_rate');
const oneToOneNon2xxRate = new Rate('openai_ws_one_to_one_non2xx_rate');
export const options = {
scenarios: {
pooled_mode: {
executor: 'constant-arrival-rate',
exec: 'runPooledMode',
rate: pooledRPS,
timeUnit: '1s',
duration,
preAllocatedVUs,
maxVUs,
tags: { mode: 'pooled' },
},
one_to_one_mode: {
executor: 'constant-arrival-rate',
exec: 'runOneToOneMode',
rate: oneToOneRPS,
timeUnit: '1s',
duration,
preAllocatedVUs,
maxVUs,
tags: { mode: 'one_to_one' },
startTime: '5s',
},
},
thresholds: {
openai_ws_pooled_non2xx_rate: ['rate<0.02'],
openai_ws_one_to_one_non2xx_rate: ['rate<0.02'],
openai_ws_pooled_duration_ms: ['p(95)<3000', 'p(99)<6000'],
openai_ws_one_to_one_duration_ms: ['p(95)<6000', 'p(99)<10000'],
},
};
function buildHeaders() {
const headers = {
'Content-Type': 'application/json',
'User-Agent': 'codex_cli_rs/0.98.0',
};
if (wsAPIKey) {
headers.Authorization = `Bearer ${wsAPIKey}`;
}
return headers;
}
function buildBody() {
return JSON.stringify({
model,
stream: false,
input: [
{
role: 'user',
content: [{ type: 'input_text', text: '请回复: pong' }],
},
],
max_output_tokens: 48,
});
}
function send(baseURL, mode) {
if (!baseURL) {
return null;
}
const res = http.post(`${baseURL}/v1/responses`, buildBody(), {
headers: buildHeaders(),
timeout,
tags: { mode },
});
check(res, {
'status is 2xx': (r) => r.status >= 200 && r.status < 300,
});
return res;
}
export function runPooledMode() {
const res = send(pooledBaseURL, 'pooled');
if (!res) {
return;
}
pooledDurationMs.add(res.timings.duration, { mode: 'pooled' });
pooledTTFTMs.add(res.timings.waiting, { mode: 'pooled' });
pooledNon2xxRate.add(res.status < 200 || res.status >= 300, { mode: 'pooled' });
}
export function runOneToOneMode() {
if (!oneToOneBaseURL) {
return;
}
const res = send(oneToOneBaseURL, 'one_to_one');
if (!res) {
return;
}
oneToOneDurationMs.add(res.timings.duration, { mode: 'one_to_one' });
oneToOneTTFTMs.add(res.timings.waiting, { mode: 'one_to_one' });
oneToOneNon2xxRate.add(res.status < 200 || res.status >= 300, { mode: 'one_to_one' });
}
export function handleSummary(data) {
return {
stdout: `\nOpenAI WS 池化 vs 1:1 对比压测完成\n${JSON.stringify(data.metrics, null, 2)}\n`,
'docs/perf/openai-ws-pooling-compare-summary.json': JSON.stringify(data, null, 2),
};
}

View File

@@ -1,216 +0,0 @@
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate, Trend } from 'k6/metrics';
const baseURL = (__ENV.BASE_URL || 'http://127.0.0.1:5231').replace(/\/$/, '');
const wsAPIKey = (__ENV.WS_API_KEY || '').trim();
const wsHotspotAPIKey = (__ENV.WS_HOTSPOT_API_KEY || wsAPIKey).trim();
const model = __ENV.MODEL || 'gpt-5.3-codex';
const duration = __ENV.DURATION || '5m';
const timeout = __ENV.TIMEOUT || '180s';
const shortRPS = Number(__ENV.SHORT_RPS || 12);
const longRPS = Number(__ENV.LONG_RPS || 4);
const errorRPS = Number(__ENV.ERROR_RPS || 2);
const hotspotRPS = Number(__ENV.HOTSPOT_RPS || 10);
const preAllocatedVUs = Number(__ENV.PRE_ALLOCATED_VUS || 50);
const maxVUs = Number(__ENV.MAX_VUS || 400);
const reqDurationMs = new Trend('openai_ws_v2_perf_req_duration_ms', true);
const ttftMs = new Trend('openai_ws_v2_perf_ttft_ms', true);
const non2xxRate = new Rate('openai_ws_v2_perf_non2xx_rate');
const doneRate = new Rate('openai_ws_v2_perf_done_rate');
const expectedErrorRate = new Rate('openai_ws_v2_perf_expected_error_rate');
export const options = {
scenarios: {
short_request: {
executor: 'constant-arrival-rate',
exec: 'runShortRequest',
rate: shortRPS,
timeUnit: '1s',
duration,
preAllocatedVUs,
maxVUs,
tags: { scenario: 'short_request' },
},
long_request: {
executor: 'constant-arrival-rate',
exec: 'runLongRequest',
rate: longRPS,
timeUnit: '1s',
duration,
preAllocatedVUs: Math.max(20, Math.ceil(longRPS * 6)),
maxVUs: Math.max(100, Math.ceil(longRPS * 20)),
tags: { scenario: 'long_request' },
},
error_injection: {
executor: 'constant-arrival-rate',
exec: 'runErrorInjection',
rate: errorRPS,
timeUnit: '1s',
duration,
preAllocatedVUs: Math.max(8, Math.ceil(errorRPS * 4)),
maxVUs: Math.max(40, Math.ceil(errorRPS * 12)),
tags: { scenario: 'error_injection' },
},
hotspot_account: {
executor: 'constant-arrival-rate',
exec: 'runHotspotAccount',
rate: hotspotRPS,
timeUnit: '1s',
duration,
preAllocatedVUs: Math.max(16, Math.ceil(hotspotRPS * 3)),
maxVUs: Math.max(80, Math.ceil(hotspotRPS * 10)),
tags: { scenario: 'hotspot_account' },
},
},
thresholds: {
openai_ws_v2_perf_non2xx_rate: ['rate<0.05'],
openai_ws_v2_perf_req_duration_ms: ['p(95)<5000', 'p(99)<9000'],
openai_ws_v2_perf_ttft_ms: ['p(99)<2000'],
openai_ws_v2_perf_done_rate: ['rate>0.95'],
},
};
function buildHeaders(apiKey, opts = {}) {
const headers = {
'Content-Type': 'application/json',
'User-Agent': 'codex_cli_rs/0.104.0',
'OpenAI-Beta': 'responses_websockets=2026-02-06,responses=experimental',
};
if (apiKey) {
headers.Authorization = `Bearer ${apiKey}`;
}
if (opts.sessionID) {
headers.session_id = opts.sessionID;
}
if (opts.conversationID) {
headers.conversation_id = opts.conversationID;
}
return headers;
}
function shortBody() {
return JSON.stringify({
model,
stream: false,
input: [
{
role: 'user',
content: [{ type: 'input_text', text: '请回复一个词pong' }],
},
],
max_output_tokens: 64,
});
}
function longBody() {
const tools = [];
for (let i = 0; i < 28; i += 1) {
tools.push({
type: 'function',
name: `perf_tool_${i}`,
description: 'load test tool schema',
parameters: {
type: 'object',
properties: {
query: { type: 'string' },
limit: { type: 'number' },
with_cache: { type: 'boolean' },
},
required: ['query'],
},
});
}
const input = [];
for (let i = 0; i < 20; i += 1) {
input.push({
role: 'user',
content: [{ type: 'input_text', text: `长请求压测消息 ${i}: 请输出简要摘要。` }],
});
}
return JSON.stringify({
model,
stream: false,
input,
tools,
parallel_tool_calls: true,
max_output_tokens: 256,
reasoning: { effort: 'medium' },
instructions: '你是压测助手,简洁回复。',
});
}
function errorInjectionBody() {
return JSON.stringify({
model,
stream: false,
previous_response_id: `resp_not_found_${__VU}_${__ITER}`,
input: [
{
role: 'user',
content: [{ type: 'input_text', text: '触发错误注入路径。' }],
},
],
});
}
function postResponses(apiKey, body, tags, opts = {}) {
const res = http.post(`${baseURL}/v1/responses`, body, {
headers: buildHeaders(apiKey, opts),
timeout,
tags,
});
reqDurationMs.add(res.timings.duration, tags);
ttftMs.add(res.timings.waiting, tags);
non2xxRate.add(res.status < 200 || res.status >= 300, tags);
return res;
}
function hasDone(res) {
return !!res && !!res.body && res.body.indexOf('[DONE]') >= 0;
}
export function runShortRequest() {
const tags = { scenario: 'short_request' };
const res = postResponses(wsAPIKey, shortBody(), tags);
check(res, { 'short status is 2xx': (r) => r.status >= 200 && r.status < 300 });
doneRate.add(hasDone(res) || (res.status >= 200 && res.status < 300), tags);
}
export function runLongRequest() {
const tags = { scenario: 'long_request' };
const res = postResponses(wsAPIKey, longBody(), tags);
check(res, { 'long status is 2xx': (r) => r.status >= 200 && r.status < 300 });
doneRate.add(hasDone(res) || (res.status >= 200 && res.status < 300), tags);
}
export function runErrorInjection() {
const tags = { scenario: 'error_injection' };
const res = postResponses(wsAPIKey, errorInjectionBody(), tags);
// 错误注入场景允许 4xx/5xx重点观测 fallback 和错误路径抖动。
expectedErrorRate.add(res.status >= 400, tags);
doneRate.add(hasDone(res), tags);
}
export function runHotspotAccount() {
const tags = { scenario: 'hotspot_account' };
const opts = {
sessionID: 'perf-hotspot-session-fixed',
conversationID: 'perf-hotspot-conversation-fixed',
};
const res = postResponses(wsHotspotAPIKey, shortBody(), tags, opts);
check(res, { 'hotspot status is 2xx': (r) => r.status >= 200 && r.status < 300 });
doneRate.add(hasDone(res) || (res.status >= 200 && res.status < 300), tags);
sleep(0.01);
}
export function handleSummary(data) {
return {
stdout: `\nOpenAI WSv2 性能套件压测完成\n${JSON.stringify(data.metrics, null, 2)}\n`,
'docs/perf/openai-ws-v2-perf-suite-summary.json': JSON.stringify(data, null, 2),
};
}

View File

@@ -1,149 +0,0 @@
#!/usr/bin/env python3
"""轻量 secret scanningCI 门禁 + 本地自检)。
目标:在不引入额外依赖的情况下,阻止常见敏感凭据误提交。
注意:
- 该脚本只扫描 git tracked files优先以避免误扫本地 .env。
- 输出仅包含 file:line 与命中类型,不回显完整命中内容(避免二次泄露)。
"""
from __future__ import annotations
import argparse
import os
import re
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Sequence
@dataclass(frozen=True)
class Rule:
name: str
pattern: re.Pattern[str]
# allowlist 仅用于减少示例文档/占位符带来的误报
allowlist: Sequence[re.Pattern[str]]
RULES: list[Rule] = [
Rule(
name="google_oauth_client_secret",
# Google OAuth client_secret 常见前缀
# 真实值通常较长;提高最小长度以避免命中文档里的占位符(例如 GOCSPX-your-client-secret
pattern=re.compile(r"GOCSPX-[0-9A-Za-z_-]{24,}"),
allowlist=(
re.compile(r"GOCSPX-your-"),
re.compile(r"GOCSPX-REDACTED"),
),
),
Rule(
name="google_api_key",
# Gemini / Google API Key
# 典型格式AIza + 35 位字符。占位符如 'AIza...' 不会匹配。
pattern=re.compile(r"AIza[0-9A-Za-z_-]{35}"),
allowlist=(
re.compile(r"AIza\.{3}"),
re.compile(r"AIza-your-"),
re.compile(r"AIza-REDACTED"),
),
),
]
def iter_git_files(repo_root: Path) -> list[Path]:
try:
out = subprocess.check_output(
["git", "ls-files"], cwd=repo_root, stderr=subprocess.DEVNULL, text=True
)
except Exception:
return []
files: list[Path] = []
for line in out.splitlines():
p = (repo_root / line).resolve()
if p.is_file():
files.append(p)
return files
def iter_walk_files(repo_root: Path) -> Iterable[Path]:
for dirpath, _dirnames, filenames in os.walk(repo_root):
if "/.git/" in dirpath.replace("\\", "/"):
continue
for name in filenames:
yield Path(dirpath) / name
def should_skip(path: Path, repo_root: Path) -> bool:
rel = path.relative_to(repo_root).as_posix()
# 本地环境文件一般不应入库;若误入库也会被 git ls-files 扫出来。
# 这里仍跳过一些明显不该扫描的二进制。
if any(rel.endswith(s) for s in (".png", ".jpg", ".jpeg", ".gif", ".pdf", ".zip")):
return True
if rel.startswith("backend/bin/"):
return True
return False
def scan_file(path: Path, repo_root: Path) -> list[tuple[str, int]]:
try:
raw = path.read_bytes()
except Exception:
return []
# 尝试按 utf-8 解码,失败则当二进制跳过
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
return []
findings: list[tuple[str, int]] = []
lines = text.splitlines()
for idx, line in enumerate(lines, start=1):
for rule in RULES:
if not rule.pattern.search(line):
continue
if any(allow.search(line) for allow in rule.allowlist):
continue
rel = path.relative_to(repo_root).as_posix()
findings.append((f"{rel}:{idx} ({rule.name})", idx))
return findings
def main(argv: Sequence[str]) -> int:
parser = argparse.ArgumentParser()
parser.add_argument(
"--repo-root",
default=str(Path(__file__).resolve().parents[1]),
help="仓库根目录(默认:脚本上两级目录)",
)
args = parser.parse_args(argv)
repo_root = Path(args.repo_root).resolve()
files = iter_git_files(repo_root)
if not files:
files = list(iter_walk_files(repo_root))
problems: list[str] = []
for f in files:
if should_skip(f, repo_root):
continue
for msg, _line in scan_file(f, repo_root):
problems.append(msg)
if problems:
sys.stderr.write("Secret scan FAILED. Potential secrets detected:\n")
for p in problems:
sys.stderr.write(f"- {p}\n")
sys.stderr.write("\n请移除/改为环境变量注入,或使用明确的占位符(例如 GOCSPX-your-client-secret\n")
return 1
print("Secret scan OK")
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))

View File

@@ -1,192 +0,0 @@
#!/usr/bin/env python3
"""
Sora access token tester.
Usage:
tools/sora-test -at "<ACCESS_TOKEN>"
"""
from __future__ import annotations
import argparse
import base64
import json
import sys
import textwrap
import urllib.error
import urllib.request
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Dict, Optional, Tuple
DEFAULT_BASE_URL = "https://sora.chatgpt.com"
DEFAULT_TIMEOUT = 20
DEFAULT_USER_AGENT = "Sora/1.2026.007 (Android 15; 24122RKC7C; build 2600700)"
@dataclass
class EndpointResult:
path: str
status: int
request_id: str
cf_ray: str
body_preview: str
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Test Sora access token against core backend endpoints.",
formatter_class=argparse.RawTextHelpFormatter,
epilog=textwrap.dedent(
"""\
Examples:
tools/sora-test -at "eyJhbGciOi..."
tools/sora-test -at "eyJhbGciOi..." --timeout 30
"""
),
)
parser.add_argument("-at", "--access-token", required=True, help="Sora/OpenAI access token (JWT)")
parser.add_argument(
"--base-url",
default=DEFAULT_BASE_URL,
help=f"Base URL for Sora backend (default: {DEFAULT_BASE_URL})",
)
parser.add_argument(
"--timeout",
type=int,
default=DEFAULT_TIMEOUT,
help=f"HTTP timeout seconds (default: {DEFAULT_TIMEOUT})",
)
return parser.parse_args()
def mask_token(token: str) -> str:
if len(token) <= 16:
return token
return f"{token[:10]}...{token[-6:]}"
def decode_jwt_payload(token: str) -> Optional[Dict]:
parts = token.split(".")
if len(parts) != 3:
return None
payload = parts[1]
payload += "=" * ((4 - len(payload) % 4) % 4)
payload = payload.replace("-", "+").replace("_", "/")
try:
decoded = base64.b64decode(payload)
return json.loads(decoded.decode("utf-8", errors="replace"))
except Exception:
return None
def ts_to_iso(ts: Optional[int]) -> str:
if not ts:
return "-"
try:
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
except Exception:
return "-"
def http_get(base_url: str, path: str, access_token: str, timeout: int) -> EndpointResult:
url = base_url.rstrip("/") + path
req = urllib.request.Request(url=url, method="GET")
req.add_header("Authorization", f"Bearer {access_token}")
req.add_header("Accept", "application/json, text/plain, */*")
req.add_header("Origin", DEFAULT_BASE_URL)
req.add_header("Referer", DEFAULT_BASE_URL + "/")
req.add_header("User-Agent", DEFAULT_USER_AGENT)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read()
body = raw.decode("utf-8", errors="replace")
return EndpointResult(
path=path,
status=resp.getcode(),
request_id=(resp.headers.get("x-request-id") or "").strip(),
cf_ray=(resp.headers.get("cf-ray") or "").strip(),
body_preview=body[:500].replace("\n", " "),
)
except urllib.error.HTTPError as e:
raw = e.read()
body = raw.decode("utf-8", errors="replace")
return EndpointResult(
path=path,
status=e.code,
request_id=(e.headers.get("x-request-id") if e.headers else "") or "",
cf_ray=(e.headers.get("cf-ray") if e.headers else "") or "",
body_preview=body[:500].replace("\n", " "),
)
except Exception as e:
return EndpointResult(
path=path,
status=0,
request_id="",
cf_ray="",
body_preview=f"network_error: {e}",
)
def classify(me_status: int) -> Tuple[str, int]:
if me_status == 200:
return "AT looks valid for Sora (/backend/me == 200).", 0
if me_status == 401:
return "AT is invalid or expired (/backend/me == 401).", 2
if me_status == 403:
return "AT may be blocked by policy/challenge or lacks permission (/backend/me == 403).", 3
if me_status == 0:
return "Request failed before reaching Sora (network/proxy/TLS issue).", 4
return f"Unexpected status on /backend/me: {me_status}", 5
def main() -> int:
args = parse_args()
token = args.access_token.strip()
if not token:
print("ERROR: empty access token")
return 1
payload = decode_jwt_payload(token)
print("=== Sora AT Test ===")
print(f"token: {mask_token(token)}")
if payload:
exp = payload.get("exp")
iat = payload.get("iat")
scopes = payload.get("scp")
scope_count = len(scopes) if isinstance(scopes, list) else 0
print(f"jwt.iat: {iat} ({ts_to_iso(iat)})")
print(f"jwt.exp: {exp} ({ts_to_iso(exp)})")
print(f"jwt.scope_count: {scope_count}")
else:
print("jwt: payload decode failed (token may not be JWT)")
endpoints = [
"/backend/me",
"/backend/nf/check",
"/backend/project_y/invite/mine",
"/backend/billing/subscriptions",
]
print("\n--- endpoint checks ---")
results = []
for path in endpoints:
res = http_get(args.base_url, path, token, args.timeout)
results.append(res)
print(f"{res.path} -> status={res.status} request_id={res.request_id or '-'} cf_ray={res.cf_ray or '-'}")
if res.body_preview:
print(f" body: {res.body_preview}")
me_result = next((r for r in results if r.path == "/backend/me"), None)
me_status = me_result.status if me_result else 0
summary, code = classify(me_status)
print("\n--- summary ---")
print(summary)
return code
if __name__ == "__main__":
sys.exit(main())