feat(monitor): admin channel monitor MVP with SSRF protection and batch aggregation

新增 admin「渠道监控」模块(参考 BingZi-233/check-cx),独立于现有 Channel 体系。
admin 配置 + 后台定时调用上游 LLM chat completions 健康检查 + 所有登录用户只读可见。

后端:
- ent: channel_monitor + channel_monitor_history(AES-256-GCM 加密 api_key)
- service 按职责拆分:service/aggregator/validate/checker/runner/ssrf
- provider strategy map 替代 switch(openai/anthropic/gemini)
- repository batch 聚合(ListLatestForMonitorIDs + ComputeAvailabilityForMonitors)消除 N+1
- runner: ticker(5s) + pond worker pool(5) + inFlight 防并发 + TrySubmit 防雪崩
  + 凌晨 3 点 cron 清理 30 天历史
- SSRF 防护:强制 https + 私网/loopback/云元数据 IP 拒绝(127/8、10/8、172.16/12、
  192.168/16、169.254/16、100.64/10、::1、fc00::/7、fe80::/10)+ DialContext
  在 socket 层防 DNS rebinding
- API key sanitize:擦除 url.Error 与上游响应 body 中的 sk-/sk-ant-/AIza/JWT 模式
- APIKeyDecryptFailed 标志位 + 单 monitor 路径检测,避免空 key 调用上游

handler:
- admin: CRUD + 手动触发 + 历史接口(api_key 脱敏)
- user: 只读列表 + 状态详情(去除 api_key/endpoint)
- ParseChannelMonitorID 共用 + dto.ChannelMonitorExtraModelStatus 共用

前端:
- 路由 /admin/channels/{pricing,monitor} + /monitor(用户只读)
- AppSidebar 父项 expandOnly 支持
- ChannelMonitorView 拆为 8 个子组件 + ChannelStatusView 拆出 detail dialog
- composables/useChannelMonitorFormat + constants/channelMonitor 共享
- i18n monitorCommon namespace 消除 admin/user 两 view 重复

合规:所有文件符合 CLAUDE.md(Go ≤ 500 行 / Vue ≤ 300 行 / 函数 ≤ 30 行)
CI: go build / gofmt / golangci-lint(0 issues) / make test-unit / pnpm build 全绿
This commit is contained in:
erio
2026-04-20 20:21:02 +08:00
parent 0b85a8da88
commit 20a4e41872
67 changed files with 14997 additions and 32 deletions

View File

@@ -0,0 +1,217 @@
package service
import (
"context"
"fmt"
"log/slog"
)
// 渠道监控聚合层:把 latest + availability 拼成 admin/user 视图所需的 summary / detail。
// 所有方法都遵守"失败仅日志,返回零值"的原则,避免 N+1 查询失败拖垮列表渲染。
// BatchMonitorStatusSummary 批量聚合多个监控的 latest + 7d 可用率admin/user list 用,消除 N+1
// 失败时返回空 map错误仅日志不影响列表渲染。
//
// 参数:
// - ids: 要聚合的 monitor ID 列表
// - primaryByID: monitor ID -> primary model用于读 7d 可用率与 latest 状态)
// - extrasByID: monitor ID -> extra models 列表(用于读 latest 状态填充 ExtraModels
func (s *ChannelMonitorService) BatchMonitorStatusSummary(
ctx context.Context,
ids []int64,
primaryByID map[int64]string,
extrasByID map[int64][]string,
) map[int64]MonitorStatusSummary {
out := make(map[int64]MonitorStatusSummary, len(ids))
if len(ids) == 0 {
return out
}
latestMap, err := s.repo.ListLatestForMonitorIDs(ctx, ids)
if err != nil {
slog.Warn("channel_monitor: batch load latest failed", "error", err)
latestMap = map[int64][]*ChannelMonitorLatest{}
}
availMap, err := s.repo.ComputeAvailabilityForMonitors(ctx, ids, monitorAvailability7Days)
if err != nil {
slog.Warn("channel_monitor: batch compute availability failed", "error", err)
availMap = map[int64][]*ChannelMonitorAvailability{}
}
for _, id := range ids {
out[id] = buildStatusSummary(
indexLatestByModel(latestMap[id]),
indexAvailabilityByModel(availMap[id]),
primaryByID[id],
extrasByID[id],
)
}
return out
}
// ListUserView 用户只读视图:列出所有 enabled 监控的概览。
// 使用批量聚合接口避免 N+11 次查 monitors1 次查 latest所有 monitor1 次查 availability。
func (s *ChannelMonitorService) ListUserView(ctx context.Context) ([]*UserMonitorView, error) {
monitors, err := s.repo.ListEnabled(ctx)
if err != nil {
return nil, fmt.Errorf("list enabled monitors: %w", err)
}
if len(monitors) == 0 {
return []*UserMonitorView{}, nil
}
ids := make([]int64, 0, len(monitors))
primaryByID := make(map[int64]string, len(monitors))
extrasByID := make(map[int64][]string, len(monitors))
for _, m := range monitors {
ids = append(ids, m.ID)
primaryByID[m.ID] = m.PrimaryModel
extrasByID[m.ID] = m.ExtraModels
}
summaries := s.BatchMonitorStatusSummary(ctx, ids, primaryByID, extrasByID)
views := make([]*UserMonitorView, 0, len(monitors))
for _, m := range monitors {
summary := summaries[m.ID]
views = append(views, buildUserViewFromSummary(m, summary))
}
return views, nil
}
// GetUserDetail 用户只读视图:单个监控详情(每个模型 7d/15d/30d 可用率与平均延迟)。
// 不暴露 api_key。
func (s *ChannelMonitorService) GetUserDetail(ctx context.Context, id int64) (*UserMonitorDetail, error) {
m, err := s.repo.GetByID(ctx, id)
if err != nil {
return nil, err
}
if !m.Enabled {
return nil, ErrChannelMonitorNotFound
}
latest, err := s.repo.ListLatestPerModel(ctx, id)
if err != nil {
return nil, fmt.Errorf("list latest per model: %w", err)
}
availMap, err := s.collectAvailabilityWindows(ctx, id)
if err != nil {
return nil, err
}
models := mergeModelDetails(m, latest, availMap)
return &UserMonitorDetail{
ID: m.ID,
Name: m.Name,
Provider: m.Provider,
GroupName: m.GroupName,
Models: models,
}, nil
}
// collectAvailabilityWindows 一次性查询 7/15/30 天三个窗口,按模型组织。
func (s *ChannelMonitorService) collectAvailabilityWindows(ctx context.Context, monitorID int64) (map[int]map[string]*ChannelMonitorAvailability, error) {
out := make(map[int]map[string]*ChannelMonitorAvailability, 3)
windows := []int{monitorAvailability7Days, monitorAvailability15Days, monitorAvailability30Days}
for _, w := range windows {
rows, err := s.repo.ComputeAvailability(ctx, monitorID, w)
if err != nil {
return nil, fmt.Errorf("compute availability %dd: %w", w, err)
}
out[w] = indexAvailabilityByModel(rows)
}
return out, nil
}
// ---------- 纯函数 helper无 IO可在 batch / 单 monitor / detail 路径复用)----------
// indexLatestByModel 把 latest 切片按 model 索引(小工具,避免在 hot path 重复写)。
func indexLatestByModel(rows []*ChannelMonitorLatest) map[string]*ChannelMonitorLatest {
m := make(map[string]*ChannelMonitorLatest, len(rows))
for _, r := range rows {
m[r.Model] = r
}
return m
}
// indexAvailabilityByModel 把 availability 切片按 model 索引。
func indexAvailabilityByModel(rows []*ChannelMonitorAvailability) map[string]*ChannelMonitorAvailability {
m := make(map[string]*ChannelMonitorAvailability, len(rows))
for _, r := range rows {
m[r.Model] = r
}
return m
}
// buildStatusSummary 由 latest + availability 字典构造 MonitorStatusSummary。
// 不做任何 IO纯组装便于在 batch 与单 monitor 路径复用。
func buildStatusSummary(
latestByModel map[string]*ChannelMonitorLatest,
availByModel map[string]*ChannelMonitorAvailability,
primary string,
extras []string,
) MonitorStatusSummary {
summary := MonitorStatusSummary{ExtraModels: make([]ExtraModelStatus, 0, len(extras))}
if primary != "" {
if l, ok := latestByModel[primary]; ok {
summary.PrimaryStatus = l.Status
summary.PrimaryLatencyMs = l.LatencyMs
}
if a, ok := availByModel[primary]; ok {
summary.Availability7d = a.AvailabilityPct
}
}
for _, model := range extras {
entry := ExtraModelStatus{Model: model}
if l, ok := latestByModel[model]; ok {
entry.Status = l.Status
entry.LatencyMs = l.LatencyMs
}
summary.ExtraModels = append(summary.ExtraModels, entry)
}
return summary
}
// buildUserViewFromSummary 用预聚合好的 MonitorStatusSummary 装填 UserMonitorView无 IO
func buildUserViewFromSummary(m *ChannelMonitor, summary MonitorStatusSummary) *UserMonitorView {
return &UserMonitorView{
ID: m.ID,
Name: m.Name,
Provider: m.Provider,
GroupName: m.GroupName,
PrimaryModel: m.PrimaryModel,
PrimaryStatus: summary.PrimaryStatus,
PrimaryLatencyMs: summary.PrimaryLatencyMs,
Availability7d: summary.Availability7d,
ExtraModels: summary.ExtraModels,
}
}
// mergeModelDetails 合并 latest + availability 三个窗口为 ModelDetail 列表。
// 复用 indexLatestByModel避免在多处重复写 build map 逻辑。
func mergeModelDetails(
m *ChannelMonitor,
latest []*ChannelMonitorLatest,
availMap map[int]map[string]*ChannelMonitorAvailability,
) []ModelDetail {
all := append([]string{m.PrimaryModel}, m.ExtraModels...)
latestByModel := indexLatestByModel(latest)
out := make([]ModelDetail, 0, len(all))
for _, model := range all {
d := ModelDetail{Model: model}
if l, ok := latestByModel[model]; ok {
d.LatestStatus = l.Status
d.LatestLatencyMs = l.LatencyMs
}
if a, ok := availMap[monitorAvailability7Days][model]; ok {
d.Availability7d = a.AvailabilityPct
d.AvgLatency7dMs = a.AvgLatencyMs
}
if a, ok := availMap[monitorAvailability15Days][model]; ok {
d.Availability15d = a.AvailabilityPct
}
if a, ok := availMap[monitorAvailability30Days][model]; ok {
d.Availability30d = a.AvailabilityPct
}
out = append(out, d)
}
return out
}

View File

@@ -0,0 +1,80 @@
package service
import (
"fmt"
"math/rand/v2"
"regexp"
"strconv"
)
// monitorChallengePromptTemplate 1:1 复刻 BingZi-233/check-cx 的 few-shot 模板。
const monitorChallengePromptTemplate = `Calculate and respond with ONLY the number, nothing else.
Q: 3 + 5 = ?
A: 8
Q: 12 - 7 = ?
A: 5
Q: %d %s %d = ?
A:`
// monitorChallengeNumberRegex 提取响应中的所有整数(含负号)。
var monitorChallengeNumberRegex = regexp.MustCompile(`-?\d+`)
// monitorChallenge 一次 challenge 的 prompt + 期望答案。
type monitorChallenge struct {
Prompt string
Expected string
}
// generateChallenge 生成一次随机算术 challenge
// - 随机两个 [monitorChallengeMin, monitorChallengeMax] 整数
// - 50% 加 / 50% 减;减法用 max - min 保证非负
// - 渲染 few-shot 模板
//
// 不强求加密随机math/rand/v2 足够分散,避免 crypto/rand 的开销。
func generateChallenge() monitorChallenge {
a := randIntInRange(monitorChallengeMin, monitorChallengeMax)
b := randIntInRange(monitorChallengeMin, monitorChallengeMax)
if rand.IntN(2) == 0 { //nolint:gosec // 仅用于生成测试问题,无安全影响
// 加法
return monitorChallenge{
Prompt: fmt.Sprintf(monitorChallengePromptTemplate, a, "+", b),
Expected: strconv.Itoa(a + b),
}
}
// 减法,保证非负
hi, lo := a, b
if lo > hi {
hi, lo = lo, hi
}
return monitorChallenge{
Prompt: fmt.Sprintf(monitorChallengePromptTemplate, hi, "-", lo),
Expected: strconv.Itoa(hi - lo),
}
}
// randIntInRange 返回 [min, max] 闭区间的随机整数。
func randIntInRange(minVal, maxVal int) int {
if maxVal <= minVal {
return minVal
}
return minVal + rand.IntN(maxVal-minVal+1) //nolint:gosec
}
// validateChallenge 在响应文本中查找 expected 整数答案,返回是否通过校验。
func validateChallenge(responseText, expected string) bool {
if responseText == "" || expected == "" {
return false
}
matches := monitorChallengeNumberRegex.FindAllString(responseText, -1)
for _, m := range matches {
if m == expected {
return true
}
}
return false
}

View File

@@ -0,0 +1,299 @@
package service
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"regexp"
"strings"
"time"
"github.com/tidwall/gjson"
)
// monitorHTTPClient 共享一个 http.Client避免每次检测重建 transport。
// 自定义 Transport 在 dial 时强制再次校验 IP防止 DNS rebinding 绕过 validateEndpoint。
var monitorHTTPClient = newSSRFSafeHTTPClient(monitorRequestTimeout)
// monitorPingHTTPClient 用于 endpoint origin 的 HEAD ping超时更短。
var monitorPingHTTPClient = newSSRFSafeHTTPClient(monitorPingTimeout)
// newSSRFSafeHTTPClient 返回一个使用 safeDialContext 的 http.Client。
// 仅供监控模块对外发起请求使用——所有目标都应是公网 endpoint。
func newSSRFSafeHTTPClient(timeout time.Duration) *http.Client {
tr := &http.Transport{
DialContext: safeDialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 16,
IdleConnTimeout: monitorIdleConnTimeout,
TLSHandshakeTimeout: monitorTLSHandshakeTimeout,
ResponseHeaderTimeout: monitorResponseHeaderTimeout,
}
return &http.Client{Timeout: timeout, Transport: tr}
}
// runCheckForModel 对单个 (provider, model) 做一次完整检测。
// 不返回 error所有失败都包装进 CheckResult.Status=error/failed。
func runCheckForModel(ctx context.Context, provider, endpoint, apiKey, model string) *CheckResult {
res := &CheckResult{
Model: model,
Status: MonitorStatusError,
CheckedAt: time.Now(),
}
challenge := generateChallenge()
start := time.Now()
respText, statusCode, err := callProvider(ctx, provider, endpoint, apiKey, model, challenge.Prompt)
latency := time.Since(start)
latencyMs := int(latency / time.Millisecond)
res.LatencyMs = &latencyMs
if err != nil {
res.Status = MonitorStatusError
res.Message = truncateMessage(sanitizeErrorMessage(err.Error()))
return res
}
if statusCode < 200 || statusCode >= 300 {
res.Status = MonitorStatusError
res.Message = truncateMessage(sanitizeErrorMessage(fmt.Sprintf("upstream HTTP %d: %s", statusCode, respText)))
return res
}
if !validateChallenge(respText, challenge.Expected) {
res.Status = MonitorStatusFailed
res.Message = truncateMessage(sanitizeErrorMessage(fmt.Sprintf("challenge mismatch (expected %s, got %q)", challenge.Expected, respText)))
return res
}
if latency >= monitorDegradedThreshold {
res.Status = MonitorStatusDegraded
res.Message = truncateMessage(fmt.Sprintf("slow response: %dms", latencyMs))
return res
}
res.Status = MonitorStatusOperational
return res
}
// pingEndpointOrigin 对 endpoint 的 origin (scheme://host) 发起 HEAD 请求,返回耗时。
// 失败时返回 nil不影响主状态判定
func pingEndpointOrigin(ctx context.Context, endpoint string) *int {
origin, err := extractOrigin(endpoint)
if err != nil || origin == "" {
return nil
}
req, err := http.NewRequestWithContext(ctx, http.MethodHead, origin, nil)
if err != nil {
return nil
}
start := time.Now()
resp, err := monitorPingHTTPClient.Do(req)
if err != nil {
return nil
}
defer func() { _ = resp.Body.Close() }()
_, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, monitorPingDiscardMaxBytes))
ms := int(time.Since(start) / time.Millisecond)
return &ms
}
// providerAdapter 描述某个 provider 在 challenge 检测中需要的 4 件事:
// - 拼出请求路径(含 model 占位)
// - 序列化请求体
// - 构造鉴权头
// - 从响应 JSON 中按 path 提取文本gjson path
//
// 加新 provider 只需要在 providerAdapters 里增加一个条目,无需触碰 callProvider / validateProvider。
type providerAdapter struct {
buildPath func(model string) string
buildBody func(model, prompt string) ([]byte, error)
buildHeaders func(apiKey string) map[string]string
textPath string // gjson 提取响应文本的 path
}
// providerAdapters 全部已支持的 provider。键值即 MonitorProvider* 字符串。
//
//nolint:gochecknoglobals // 适配器表是只读静态数据,初始化后不变更。
var providerAdapters = map[string]providerAdapter{
MonitorProviderOpenAI: {
buildPath: func(string) string { return providerOpenAIPath },
buildBody: func(model, prompt string) ([]byte, error) {
return json.Marshal(map[string]any{
"model": model,
"messages": []map[string]string{{"role": "user", "content": prompt}},
"max_tokens": monitorChallengeMaxTokens,
"stream": false,
})
},
buildHeaders: func(apiKey string) map[string]string {
return map[string]string{"Authorization": "Bearer " + apiKey}
},
textPath: "choices.0.message.content",
},
MonitorProviderAnthropic: {
buildPath: func(string) string { return providerAnthropicPath },
buildBody: func(model, prompt string) ([]byte, error) {
return json.Marshal(map[string]any{
"model": model,
"messages": []map[string]string{{"role": "user", "content": prompt}},
"max_tokens": monitorChallengeMaxTokens,
})
},
buildHeaders: func(apiKey string) map[string]string {
return map[string]string{
"x-api-key": apiKey,
"anthropic-version": monitorAnthropicAPIVersion,
}
},
textPath: "content.0.text",
},
MonitorProviderGemini: {
// Gemini 把 model 名写在 URL path 上:/v1beta/models/{model}:generateContent
buildPath: func(model string) string { return fmt.Sprintf(providerGeminiPathTemplate, model) },
buildBody: func(_, prompt string) ([]byte, error) {
return json.Marshal(map[string]any{
"contents": []map[string]any{
{"parts": []map[string]any{{"text": prompt}}},
},
"generationConfig": map[string]any{"maxOutputTokens": monitorChallengeMaxTokens},
})
},
// 使用 x-goog-api-key header 而不是 ?key= query避免 *url.Error 把 key 回填到错误日志。
buildHeaders: func(apiKey string) map[string]string {
return map[string]string{"x-goog-api-key": apiKey}
},
textPath: "candidates.0.content.parts.0.text",
},
}
// isSupportedProvider 校验 provider 字符串是否在 adapter 表中。
// 供 validate.go 的 validateProvider 复用,避免两份 switch 漂移。
func isSupportedProvider(p string) bool {
_, ok := providerAdapters[p]
return ok
}
// callProvider 通过 providerAdapters 分发到具体实现。
// 返回值响应中提取的文本、HTTP status、网络/序列化错误。
func callProvider(ctx context.Context, provider, endpoint, apiKey, model, prompt string) (string, int, error) {
adapter, ok := providerAdapters[provider]
if !ok {
return "", 0, fmt.Errorf("unsupported provider %q", provider)
}
body, err := adapter.buildBody(model, prompt)
if err != nil {
return "", 0, fmt.Errorf("marshal body: %w", err)
}
full := joinURL(endpoint, adapter.buildPath(model))
respBody, status, err := postRawJSON(ctx, full, body, adapter.buildHeaders(apiKey))
if err != nil {
return "", status, err
}
return gjson.GetBytes(respBody, adapter.textPath).String(), status, nil
}
// postRawJSON 发送 POST + 已序列化好的 JSON 字节限制响应体大小返回响应字节、HTTP status、错误。
// adapter 自行 marshal 是为了精确控制字段顺序与类型,所以这里直接收 []byte 而不是 any。
func postRawJSON(ctx context.Context, fullURL string, payload []byte, headers map[string]string) ([]byte, int, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullURL, bytes.NewReader(payload))
if err != nil {
return nil, 0, fmt.Errorf("build request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
for k, v := range headers {
req.Header.Set(k, v)
}
resp, err := monitorHTTPClient.Do(req)
if err != nil {
return nil, 0, fmt.Errorf("do request: %w", err)
}
defer func() { _ = resp.Body.Close() }()
respBody, err := io.ReadAll(io.LimitReader(resp.Body, monitorResponseMaxBytes))
if err != nil {
return nil, resp.StatusCode, fmt.Errorf("read body: %w", err)
}
return respBody, resp.StatusCode, nil
}
// joinURL 把 base origin 与 path 拼成完整 URL。
// 容忍 base 末尾有/无斜杠path 必带前导斜杠。
func joinURL(base, path string) string {
base = strings.TrimRight(base, "/")
if !strings.HasPrefix(path, "/") {
path = "/" + path
}
return base + path
}
// extractOrigin 从一个 endpoint URL 中提取 scheme://host[:port] 部分。
func extractOrigin(endpoint string) (string, error) {
u, err := url.Parse(endpoint)
if err != nil {
return "", err
}
if u.Scheme == "" || u.Host == "" {
return "", errors.New("endpoint missing scheme or host")
}
return u.Scheme + "://" + u.Host, nil
}
// monitorSensitiveQueryParamRegex 匹配 URL query 中可能泄露凭证的参数:
// key / api_key / api-key / access_token / token / authorization / x-api-key。
// 大小写不敏感,匹配 `?name=value` 或 `&name=value` 形式value 截到 & 或字符串末尾)。
var monitorSensitiveQueryParamRegex = regexp.MustCompile(`(?i)([?&](?:key|api[_-]?key|access[_-]?token|token|authorization|x-api-key)=)[^&\s"']+`)
// monitorAPIKeyPatterns 匹配常见 provider 的 API key 字面量。
// 顺序敏感sk-ant- 必须放在 sk- 之前,否则会被通用 sk- 模式先消费。
var monitorAPIKeyPatterns = []struct {
pattern *regexp.Regexp
replace string
}{
// Anthropic带前缀必须先匹配sk-ant-xxxxxxx
{regexp.MustCompile(`sk-ant-[A-Za-z0-9_-]{20,}`), "sk-ant-***REDACTED***"},
// OpenAI / Anthropic 通用 sk-: sk-xxxxxxx
{regexp.MustCompile(`sk-[A-Za-z0-9-]{20,}`), "sk-***REDACTED***"},
// Gemini / Google API Key固定前缀 + 35 位
{regexp.MustCompile(`AIza[A-Za-z0-9_-]{35}`), "AIza***REDACTED***"},
// JWT 三段式Bearer 后常出现eyJxxx.eyJxxx.signature
{regexp.MustCompile(`eyJ[A-Za-z0-9_-]{8,}\.eyJ[A-Za-z0-9_-]{8,}\.[A-Za-z0-9_-]{8,}`), "eyJ***REDACTED.JWT***"},
}
// sanitizeErrorMessage 擦除错误/响应文本中可能泄露的 API key。
// 处理两类来源:
// 1. URL query 中的 ?key= / ?api_key= 等Go *url.Error 会回填完整 URL
// 2. 上游 HTTP body 文本里直接出现的 sk-* / AIza* / JWT 等密钥碎片
//
// 注意:与 gemini_messages_compat_service.go 的 sanitizeUpstreamErrorMessage 关注点类似但参数集更广,
// 监控模块独立维护,避免互相耦合。
func sanitizeErrorMessage(msg string) string {
if msg == "" {
return msg
}
msg = monitorSensitiveQueryParamRegex.ReplaceAllString(msg, `${1}REDACTED`)
for _, p := range monitorAPIKeyPatterns {
msg = p.pattern.ReplaceAllString(msg, p.replace)
}
return msg
}
// truncateMessage 把消息按 monitorMessageMaxBytes 截断,避免 DB 列溢出与日志过长。
func truncateMessage(msg string) string {
if len(msg) <= monitorMessageMaxBytes {
return msg
}
const ellipsis = "...(truncated)"
cutoff := monitorMessageMaxBytes - len(ellipsis)
if cutoff < 0 {
cutoff = 0
}
return msg[:cutoff] + ellipsis
}

View File

@@ -0,0 +1,137 @@
package service
import (
"time"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
// ChannelMonitor 全局常量。
// 这些是 MVP 阶段的硬编码值,按需可以提到 config 中。
const (
// monitorRequestTimeout 单次模型请求总超时(含 Body 读取)。
monitorRequestTimeout = 45 * time.Second
// monitorPingTimeout HEAD 请求 endpoint origin 的超时。
monitorPingTimeout = 8 * time.Second
// monitorDegradedThreshold 主请求成功但耗时超过该阈值视为 degraded。
monitorDegradedThreshold = 6 * time.Second
// monitorHistoryRetentionDays 历史保留天数(每天清理一次)。
monitorHistoryRetentionDays = 30
// monitorWorkerConcurrency 调度器并发执行的监控数pond 池容量)。
monitorWorkerConcurrency = 5
// monitorTickerInterval 调度器扫描"到期监控"的间隔。
monitorTickerInterval = 5 * time.Second
// monitorMinIntervalSeconds / monitorMaxIntervalSeconds 用户配置的检测间隔上下限。
monitorMinIntervalSeconds = 15
monitorMaxIntervalSeconds = 3600
// monitorMessageMaxBytes message 字段最大字节数(与 schema/migration 一致)。
monitorMessageMaxBytes = 500
// monitorResponseMaxBytes 单次模型响应最大读取字节,防止 OOM。
monitorResponseMaxBytes = 64 * 1024
// monitorChallengeMin / monitorChallengeMax challenge 操作数范围。
monitorChallengeMin = 1
monitorChallengeMax = 50
// providerOpenAIPath OpenAI Chat Completions 路径。
providerOpenAIPath = "/v1/chat/completions"
// providerAnthropicPath Anthropic Messages 路径。
providerAnthropicPath = "/v1/messages"
// providerGeminiPathTemplate Gemini generateContent 路径模板(含 model 占位)。
providerGeminiPathTemplate = "/v1beta/models/%s:generateContent"
// MonitorProviderOpenAI / Anthropic / Gemini provider 字符串常量(也是 ent enum 的实际值)。
MonitorProviderOpenAI = "openai"
MonitorProviderAnthropic = "anthropic"
MonitorProviderGemini = "gemini"
// MonitorStatusOperational 等监控状态字符串常量(与 ent enum 一致)。
MonitorStatusOperational = "operational"
MonitorStatusDegraded = "degraded"
MonitorStatusFailed = "failed"
MonitorStatusError = "error"
// monitorAvailability7Days / 15 / 30 用于聚合查询窗口。
monitorAvailability7Days = 7
monitorAvailability15Days = 15
monitorAvailability30Days = 30
// monitorCleanupCheckInterval 历史清理调度器的检查频率(每小时检查"是否到 03:00")。
monitorCleanupCheckInterval = time.Hour
// monitorCleanupHour 凌晨 3 点执行历史清理。
monitorCleanupHour = 3
// MonitorHistoryDefaultLimit 历史查询默认返回条数handler 层共享)。
MonitorHistoryDefaultLimit = 100
// MonitorHistoryMaxLimit 历史查询最大返回条数handler 层共享)。
MonitorHistoryMaxLimit = 1000
// monitorEndpointResolveTimeout validateEndpoint 解析 hostname 的最长耗时。
monitorEndpointResolveTimeout = 5 * time.Second
// ---- checker / runner 行为参数(消除 magic 值)----
// monitorAnthropicAPIVersion Anthropic Messages API 版本头。
monitorAnthropicAPIVersion = "2023-06-01"
// monitorChallengeMaxTokens 单次 challenge 请求的 max_tokens足够回答个位数算术
monitorChallengeMaxTokens = 50
// monitorListDueTimeout tickDueChecks 查询到期监控的总超时。
monitorListDueTimeout = 10 * time.Second
// monitorRunOneBuffer runOne 的总超时缓冲(除请求超时与 ping 超时外的额外裕量)。
monitorRunOneBuffer = 10 * time.Second
// monitorCleanupTimeout 历史清理任务的总超时。
monitorCleanupTimeout = 30 * time.Second
// monitorCleanupDayLayout 历史清理用于"今日是否已跑过"判定的日期格式。
monitorCleanupDayLayout = "2006-01-02"
// monitorIdleConnTimeout HTTP transport 空闲连接关闭超时。
monitorIdleConnTimeout = 30 * time.Second
// monitorTLSHandshakeTimeout HTTP transport TLS 握手超时。
monitorTLSHandshakeTimeout = 10 * time.Second
// monitorResponseHeaderTimeout HTTP transport 等待响应头超时。
monitorResponseHeaderTimeout = 30 * time.Second
// monitorPingDiscardMaxBytes ping 时丢弃响应体的最大字节数。
monitorPingDiscardMaxBytes = 1024
// monitorDialTimeout 自定义 dialer 单次连接超时。
monitorDialTimeout = 10 * time.Second
// monitorDialKeepAlive 自定义 dialer keep-alive 间隔。
monitorDialKeepAlive = 30 * time.Second
)
// 业务错误(统一在此声明,避免散落)。
var (
ErrChannelMonitorNotFound = infraerrors.NotFound(
"CHANNEL_MONITOR_NOT_FOUND", "channel monitor not found",
)
ErrChannelMonitorInvalidProvider = infraerrors.BadRequest(
"CHANNEL_MONITOR_INVALID_PROVIDER", "provider must be one of openai/anthropic/gemini",
)
ErrChannelMonitorInvalidInterval = infraerrors.BadRequest(
"CHANNEL_MONITOR_INVALID_INTERVAL", "interval_seconds must be in [15, 3600]",
)
ErrChannelMonitorInvalidEndpoint = infraerrors.BadRequest(
"CHANNEL_MONITOR_INVALID_ENDPOINT", "endpoint must be a valid https URL",
)
ErrChannelMonitorEndpointScheme = infraerrors.BadRequest(
"CHANNEL_MONITOR_ENDPOINT_SCHEME", "endpoint must use https scheme",
)
ErrChannelMonitorEndpointPath = infraerrors.BadRequest(
"CHANNEL_MONITOR_ENDPOINT_PATH", "endpoint must be base origin only (no path/query/fragment)",
)
ErrChannelMonitorEndpointPrivate = infraerrors.BadRequest(
"CHANNEL_MONITOR_ENDPOINT_PRIVATE", "endpoint must be a public host",
)
ErrChannelMonitorEndpointUnreachable = infraerrors.BadRequest(
"CHANNEL_MONITOR_ENDPOINT_UNREACHABLE", "endpoint hostname could not be resolved",
)
ErrChannelMonitorMissingAPIKey = infraerrors.BadRequest(
"CHANNEL_MONITOR_MISSING_API_KEY", "api_key is required when creating a monitor",
)
ErrChannelMonitorMissingPrimaryModel = infraerrors.BadRequest(
"CHANNEL_MONITOR_MISSING_PRIMARY_MODEL", "primary_model is required",
)
ErrChannelMonitorAPIKeyDecryptFailed = infraerrors.InternalServer(
"CHANNEL_MONITOR_KEY_DECRYPT_FAILED", "api key decryption failed; please re-edit the monitor with a fresh key",
)
)

View File

@@ -0,0 +1,208 @@
package service
import (
"context"
"log/slog"
"sync"
"time"
"github.com/alitto/pond/v2"
)
// ChannelMonitorRunner 渠道监控调度器。
//
// 职责:
// - 每 monitorTickerInterval 扫描一次"到期需要检测"的监控
// - 通过 pond 池(容量 monitorWorkerConcurrency异步执行检测
// - 每小时检查一次时钟,到 monitorCleanupHour 点时执行历史清理
// - Stop 时优雅关闭:池 drain + ticker.Stop + wg.Wait
//
// 不引入 cron 库;清理调度通过"每小时检查时间"实现,足够 MVP。
type ChannelMonitorRunner struct {
svc *ChannelMonitorService
pool pond.Pool
stopCh chan struct{}
once sync.Once
wg sync.WaitGroup
// inFlight 跟踪正在执行的 monitor.ID。tickDueChecks 调度前会检查避免重复提交,
// 防止单次检测耗时 > interval 时同一 monitor 被并发执行。
inFlight map[int64]struct{}
inFlightMu sync.Mutex
// 清理状态lastCleanupDay 记录上次清理的"年-月-日",避免同一天重复跑。
lastCleanupDay string
cleanupMu sync.Mutex
}
// NewChannelMonitorRunner 构造调度器。Start 在 wire 中调用。
func NewChannelMonitorRunner(svc *ChannelMonitorService) *ChannelMonitorRunner {
return &ChannelMonitorRunner{
svc: svc,
stopCh: make(chan struct{}),
inFlight: make(map[int64]struct{}),
}
}
// Start 启动 ticker + worker pool + cleanup loop。
// 调用方需保证只调一次wire ProvideChannelMonitorRunner 内只调一次)。
func (r *ChannelMonitorRunner) Start() {
if r == nil || r.svc == nil {
return
}
// 容量 5 的 pond 池:超出时调用方等待,避免调度堆积无限增长。
r.pool = pond.NewPool(monitorWorkerConcurrency)
r.wg.Add(2)
go r.dueCheckLoop()
go r.cleanupLoop()
}
// Stop 优雅停止close stopCh -> 等待两个 loop 退出 -> 池 drain。
func (r *ChannelMonitorRunner) Stop() {
if r == nil {
return
}
r.once.Do(func() {
close(r.stopCh)
})
r.wg.Wait()
if r.pool != nil {
r.pool.StopAndWait()
}
}
// dueCheckLoop 每 monitorTickerInterval 扫描一次"到期监控",提交到池。
func (r *ChannelMonitorRunner) dueCheckLoop() {
defer r.wg.Done()
ticker := time.NewTicker(monitorTickerInterval)
defer ticker.Stop()
for {
select {
case <-r.stopCh:
return
case <-ticker.C:
r.tickDueChecks()
}
}
}
// tickDueChecks 一次扫描:查询到期监控并逐个提交到池。
// 已在执行的 monitor 会被跳过(防止单次检测耗时 > interval 时重复调度)。
// 池满时使用 TrySubmit 跳过(不能阻塞 ticker同时立即释放已占用的 inFlight 槽。
func (r *ChannelMonitorRunner) tickDueChecks() {
ctx, cancel := context.WithTimeout(context.Background(), monitorListDueTimeout)
defer cancel()
due, err := r.svc.listDueForCheck(ctx)
if err != nil {
slog.Warn("channel_monitor: list due failed", "error", err)
return
}
for _, m := range due {
monitor := m
if !r.tryAcquireInFlight(monitor.ID) {
slog.Debug("channel_monitor: skip already in-flight",
"monitor_id", monitor.ID, "name", monitor.Name)
continue
}
if _, ok := r.pool.TrySubmit(func() {
r.runOne(monitor.ID, monitor.Name)
}); !ok {
// 池满:丢弃本次检测,但必须释放已占用的 inFlight 槽,否则该 monitor 会被永久卡住。
r.releaseInFlight(monitor.ID)
slog.Warn("channel_monitor: worker pool full, skip submission",
"monitor_id", monitor.ID, "name", monitor.Name)
}
}
}
// tryAcquireInFlight 原子地占用 monitor 的 in-flight 槽。
// 已被占用返回 false调用方应跳过本次提交
func (r *ChannelMonitorRunner) tryAcquireInFlight(id int64) bool {
r.inFlightMu.Lock()
defer r.inFlightMu.Unlock()
if _, exists := r.inFlight[id]; exists {
return false
}
r.inFlight[id] = struct{}{}
return true
}
// releaseInFlight 释放 in-flight 槽。runOne 完成(含 panic recover后必须调用。
func (r *ChannelMonitorRunner) releaseInFlight(id int64) {
r.inFlightMu.Lock()
delete(r.inFlight, id)
r.inFlightMu.Unlock()
}
// runOne 执行单个监控的检测。所有错误只记日志,不熔断。
// 任务结束时(含 panic recover必须释放 in-flight 槽。
//
// 单次解密路径:调 RunCheckByID内部统一 Get + APIKeyDecryptFailed 判定 + 跑检测,
// 避免 runner 自己再 Get 一次造成密文二次解密。
func (r *ChannelMonitorRunner) runOne(id int64, name string) {
// 单次任务上限 = 请求超时 + ping + 一些缓冲。
ctx, cancel := context.WithTimeout(context.Background(), monitorRequestTimeout+monitorPingTimeout+monitorRunOneBuffer)
defer cancel()
defer r.releaseInFlight(id)
defer func() {
if rec := recover(); rec != nil {
slog.Error("channel_monitor: runner panic",
"monitor_id", id, "name", name, "panic", rec)
}
}()
if _, err := r.svc.RunCheck(ctx, id); err != nil {
// ErrChannelMonitorAPIKeyDecryptFailed 是预期可恢复错误,降为 Warn 即可。
slog.Warn("channel_monitor: run check failed",
"monitor_id", id, "name", name, "error", err)
}
}
// cleanupLoop 每小时检查当前时间,到 monitorCleanupHour 点(且当天还没清理过)则跑一次清理。
// 启动时立即检查一次,避免长时间运行才跑首次清理。
func (r *ChannelMonitorRunner) cleanupLoop() {
defer r.wg.Done()
ticker := time.NewTicker(monitorCleanupCheckInterval)
defer ticker.Stop()
r.maybeRunCleanup()
for {
select {
case <-r.stopCh:
return
case <-ticker.C:
r.maybeRunCleanup()
}
}
}
// maybeRunCleanup 如果当前小时是 monitorCleanupHour 且当天未跑过,则执行清理。
func (r *ChannelMonitorRunner) maybeRunCleanup() {
now := time.Now()
if now.Hour() != monitorCleanupHour {
return
}
day := now.Format(monitorCleanupDayLayout)
r.cleanupMu.Lock()
if r.lastCleanupDay == day {
r.cleanupMu.Unlock()
return
}
r.lastCleanupDay = day
r.cleanupMu.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), monitorCleanupTimeout)
defer cancel()
if err := r.svc.cleanupOldHistory(ctx); err != nil {
slog.Warn("channel_monitor: cleanup history failed", "error", err)
}
}

View File

@@ -0,0 +1,374 @@
package service
import (
"context"
"fmt"
"log/slog"
"strings"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
// ChannelMonitorRepository 渠道监控数据访问接口。
// 入参/返回的指针类型均使用 service 包的 ChannelMonitor 模型,
// repository 实现负责与 ent 模型互转,并保持 api_key_encrypted 字段为密文。
type ChannelMonitorRepository interface {
// CRUD
Create(ctx context.Context, m *ChannelMonitor) error
GetByID(ctx context.Context, id int64) (*ChannelMonitor, error)
Update(ctx context.Context, m *ChannelMonitor) error
Delete(ctx context.Context, id int64) error
List(ctx context.Context, params ChannelMonitorListParams) ([]*ChannelMonitor, int64, error)
// 调度器辅助
ListEnabled(ctx context.Context) ([]*ChannelMonitor, error)
MarkChecked(ctx context.Context, id int64, checkedAt time.Time) error
InsertHistoryBatch(ctx context.Context, rows []*ChannelMonitorHistoryRow) error
DeleteHistoryBefore(ctx context.Context, before time.Time) (int64, error)
// 历史记录
ListHistory(ctx context.Context, monitorID int64, model string, limit int) ([]*ChannelMonitorHistoryEntry, error)
// 用户视图聚合
ListLatestPerModel(ctx context.Context, monitorID int64) ([]*ChannelMonitorLatest, error)
ComputeAvailability(ctx context.Context, monitorID int64, windowDays int) ([]*ChannelMonitorAvailability, error)
// 批量聚合admin/user list 用,避免 N+1
ListLatestForMonitorIDs(ctx context.Context, ids []int64) (map[int64][]*ChannelMonitorLatest, error)
ComputeAvailabilityForMonitors(ctx context.Context, ids []int64, windowDays int) (map[int64][]*ChannelMonitorAvailability, error)
}
// ChannelMonitorService 渠道监控管理服务。
type ChannelMonitorService struct {
repo ChannelMonitorRepository
encryptor SecretEncryptor
}
// NewChannelMonitorService 创建渠道监控服务实例。
func NewChannelMonitorService(repo ChannelMonitorRepository, encryptor SecretEncryptor) *ChannelMonitorService {
return &ChannelMonitorService{repo: repo, encryptor: encryptor}
}
// ---------- CRUD ----------
// List 列表查询(支持 provider/enabled/search 过滤 + 分页)。
// 返回的 ChannelMonitor.APIKey 已解密为明文handler 层负责脱敏。
func (s *ChannelMonitorService) List(ctx context.Context, params ChannelMonitorListParams) ([]*ChannelMonitor, int64, error) {
if params.Page < 1 {
params.Page = 1
}
if params.PageSize < 1 || params.PageSize > 200 {
params.PageSize = 20
}
items, total, err := s.repo.List(ctx, params)
if err != nil {
return nil, 0, fmt.Errorf("list channel monitors: %w", err)
}
for _, it := range items {
s.decryptInPlace(it)
}
return items, total, nil
}
// Get 查询单个监控(解密 API Key
func (s *ChannelMonitorService) Get(ctx context.Context, id int64) (*ChannelMonitor, error) {
m, err := s.repo.GetByID(ctx, id)
if err != nil {
return nil, err
}
s.decryptInPlace(m)
return m, nil
}
// Create 创建监控(内部加密 api_key
func (s *ChannelMonitorService) Create(ctx context.Context, p ChannelMonitorCreateParams) (*ChannelMonitor, error) {
if err := validateCreateParams(p); err != nil {
return nil, err
}
encrypted, err := s.encryptor.Encrypt(p.APIKey)
if err != nil {
return nil, fmt.Errorf("encrypt api key: %w", err)
}
m := &ChannelMonitor{
Name: strings.TrimSpace(p.Name),
Provider: p.Provider,
Endpoint: normalizeEndpoint(p.Endpoint),
APIKey: encrypted, // 注意:传入 repository 时该字段为密文
PrimaryModel: strings.TrimSpace(p.PrimaryModel),
ExtraModels: normalizeModels(p.ExtraModels),
GroupName: strings.TrimSpace(p.GroupName),
Enabled: p.Enabled,
IntervalSeconds: p.IntervalSeconds,
CreatedBy: p.CreatedBy,
}
if err := s.repo.Create(ctx, m); err != nil {
return nil, fmt.Errorf("create channel monitor: %w", err)
}
// 不再调 s.Get 重走解密链:已知刚加密的明文,直接构造响应。
// 这样可避免 SecretEncryptor 解密失败时 APIKey 被静默清空的问题(见 Fix 4
m.APIKey = strings.TrimSpace(p.APIKey)
return m, nil
}
// validateCreateParams 把 Create 入参的所有校验聚拢为一个函数,避免 Create 主体超过 30 行。
func validateCreateParams(p ChannelMonitorCreateParams) error {
if err := validateProvider(p.Provider); err != nil {
return err
}
if err := validateInterval(p.IntervalSeconds); err != nil {
return err
}
if err := validateEndpoint(p.Endpoint); err != nil {
return err
}
if strings.TrimSpace(p.APIKey) == "" {
return ErrChannelMonitorMissingAPIKey
}
if strings.TrimSpace(p.PrimaryModel) == "" {
return ErrChannelMonitorMissingPrimaryModel
}
return nil
}
// Update 更新监控。APIKey 字段nil 或空字符串 = 不修改;非空 = 加密后覆盖。
func (s *ChannelMonitorService) Update(ctx context.Context, id int64, p ChannelMonitorUpdateParams) (*ChannelMonitor, error) {
existing, err := s.repo.GetByID(ctx, id)
if err != nil {
return nil, err
}
if err := applyMonitorUpdate(existing, p); err != nil {
return nil, err
}
newPlainAPIKey, apiKeyUpdated, err := s.applyAPIKeyUpdate(existing, p.APIKey)
if err != nil {
return nil, err
}
if err := s.repo.Update(ctx, existing); err != nil {
return nil, fmt.Errorf("update channel monitor: %w", err)
}
// 不再调 s.Get 重走解密链:避免二次解密带来的"密文被静默清空"风险(与 Create 一致)。
if apiKeyUpdated {
existing.APIKey = newPlainAPIKey
} else {
s.decryptInPlace(existing)
}
return existing, nil
}
// applyAPIKeyUpdate 处理 Update 中的 APIKey 字段:
// - 入参 raw 为 nil 或空白:不修改 existing.APIKey仍为密文返回 updated=false
// - 非空:加密后写入 existing.APIKey同时把明文返回给调用方
// 供写库成功后塞回 existing 避免把密文吐回客户端
func (s *ChannelMonitorService) applyAPIKeyUpdate(existing *ChannelMonitor, raw *string) (plain string, updated bool, err error) {
if raw == nil || strings.TrimSpace(*raw) == "" {
return "", false, nil
}
plain = strings.TrimSpace(*raw)
encrypted, encErr := s.encryptor.Encrypt(plain)
if encErr != nil {
return "", false, fmt.Errorf("encrypt api key: %w", encErr)
}
existing.APIKey = encrypted
return plain, true, nil
}
// Delete 删除监控(历史通过外键 CASCADE 自动清理)。
func (s *ChannelMonitorService) Delete(ctx context.Context, id int64) error {
if err := s.repo.Delete(ctx, id); err != nil {
return fmt.Errorf("delete channel monitor: %w", err)
}
return nil
}
// ListHistory 列出某个监控最近的检测历史。
// model 为空表示返回所有模型limit <= 0 时使用默认值,超过上限会被截断。
func (s *ChannelMonitorService) ListHistory(ctx context.Context, id int64, model string, limit int) ([]*ChannelMonitorHistoryEntry, error) {
if _, err := s.repo.GetByID(ctx, id); err != nil {
return nil, err
}
if limit <= 0 {
limit = MonitorHistoryDefaultLimit
}
if limit > MonitorHistoryMaxLimit {
limit = MonitorHistoryMaxLimit
}
entries, err := s.repo.ListHistory(ctx, id, strings.TrimSpace(model), limit)
if err != nil {
return nil, fmt.Errorf("list history: %w", err)
}
return entries, nil
}
// ---------- 业务 ----------
// RunCheck 同步触发对一个监控的检测:并发跑 primary + extra 模型,
// 写历史记录并更新 last_checked_at。返回每个模型的检测结果。
func (s *ChannelMonitorService) RunCheck(ctx context.Context, id int64) ([]*CheckResult, error) {
m, err := s.Get(ctx, id) // 已解密 APIKey
if err != nil {
return nil, err
}
if m.APIKeyDecryptFailed {
return nil, ErrChannelMonitorAPIKeyDecryptFailed
}
results := s.runChecksConcurrent(ctx, m)
s.persistCheckResults(ctx, m, results)
return results, nil
}
// persistCheckResults 写入本次检测的历史记录并更新 last_checked_at。
// 任一写库失败都只记日志,不影响调用方拿到 results与 MVP 期望一致:宁可漏记历史也要先返回结果)。
func (s *ChannelMonitorService) persistCheckResults(ctx context.Context, m *ChannelMonitor, results []*CheckResult) {
rows := make([]*ChannelMonitorHistoryRow, 0, len(results))
for _, r := range results {
rows = append(rows, &ChannelMonitorHistoryRow{
MonitorID: m.ID,
Model: r.Model,
Status: r.Status,
LatencyMs: r.LatencyMs,
PingLatencyMs: r.PingLatencyMs,
Message: r.Message,
CheckedAt: r.CheckedAt,
})
}
if err := s.repo.InsertHistoryBatch(ctx, rows); err != nil {
slog.Error("channel_monitor: insert history failed",
"monitor_id", m.ID, "name", m.Name, "error", err)
}
if err := s.repo.MarkChecked(ctx, m.ID, time.Now()); err != nil {
slog.Error("channel_monitor: mark checked failed",
"monitor_id", m.ID, "error", err)
}
}
// runChecksConcurrent 对 primary + extra 模型并发执行检测。
// errgroup 仅用于等待,不传播错误(每个 model 失败都已打包进 CheckResult
func (s *ChannelMonitorService) runChecksConcurrent(ctx context.Context, m *ChannelMonitor) []*CheckResult {
models := append([]string{m.PrimaryModel}, m.ExtraModels...)
results := make([]*CheckResult, len(models))
// ping 共享一次,所有模型记录同一个 ping 延迟。
pingMs := pingEndpointOrigin(ctx, m.Endpoint)
var eg errgroup.Group
var mu sync.Mutex
for i, model := range models {
i, model := i, model
eg.Go(func() error {
r := runCheckForModel(ctx, m.Provider, m.Endpoint, m.APIKey, model)
r.PingLatencyMs = pingMs
mu.Lock()
results[i] = r
mu.Unlock()
return nil
})
}
_ = eg.Wait()
return results
}
// ---------- 调度器内部 ----------
// listDueForCheck 返回需要立即检测的监控列表:
// enabled=true AND (last_checked_at IS NULL OR last_checked_at + interval <= now)。
// 实现下沉到 repository用 SQL 表达式比较),减少应用层数据传输。
func (s *ChannelMonitorService) listDueForCheck(ctx context.Context) ([]*ChannelMonitor, error) {
all, err := s.repo.ListEnabled(ctx)
if err != nil {
return nil, err
}
now := time.Now()
due := make([]*ChannelMonitor, 0, len(all))
for _, m := range all {
if m.LastCheckedAt == nil {
due = append(due, m)
continue
}
nextAt := m.LastCheckedAt.Add(time.Duration(m.IntervalSeconds) * time.Second)
if !nextAt.After(now) {
due = append(due, m)
}
}
return due, nil
}
// cleanupOldHistory 删除 monitorHistoryRetentionDays 天之前的历史记录。
func (s *ChannelMonitorService) cleanupOldHistory(ctx context.Context) error {
before := time.Now().AddDate(0, 0, -monitorHistoryRetentionDays)
deleted, err := s.repo.DeleteHistoryBefore(ctx, before)
if err != nil {
return fmt.Errorf("delete history before %s: %w", before.Format(time.RFC3339), err)
}
if deleted > 0 {
slog.Info("channel_monitor: history cleanup",
"deleted_rows", deleted, "before", before.Format(time.RFC3339))
}
return nil
}
// ---------- helpers ----------
// decryptInPlace 把 ChannelMonitor.APIKey 从密文解密为明文。
// 解密失败时把字段清空 + 设置 APIKeyDecryptFailed=true不返回错误避免阻断列表渲染
// runner / RunCheck 必须读取该标志位并拒绝执行检测。
func (s *ChannelMonitorService) decryptInPlace(m *ChannelMonitor) {
if m == nil || m.APIKey == "" {
return
}
plain, err := s.encryptor.Decrypt(m.APIKey)
if err != nil {
slog.Warn("channel_monitor: decrypt api key failed",
"monitor_id", m.ID, "error", err)
m.APIKey = ""
m.APIKeyDecryptFailed = true
return
}
m.APIKey = plain
}
// applyMonitorUpdate 把 update params 中非 nil 的字段应用到 existing 上。
// APIKey 字段在调用方单独处理(涉及加密)。
//
// 行数稍超过 30这是逐字段平铺的 dispatcher每个 if 都是 1-3 行的"非 nil 则覆盖"模式,
// 拆分反而会增加跳转噪音、影响可读性,故保留为单函数。
func applyMonitorUpdate(existing *ChannelMonitor, p ChannelMonitorUpdateParams) error {
if p.Name != nil {
existing.Name = strings.TrimSpace(*p.Name)
}
if p.Provider != nil {
if err := validateProvider(*p.Provider); err != nil {
return err
}
existing.Provider = *p.Provider
}
if p.Endpoint != nil {
if err := validateEndpoint(*p.Endpoint); err != nil {
return err
}
existing.Endpoint = normalizeEndpoint(*p.Endpoint)
}
if p.PrimaryModel != nil {
existing.PrimaryModel = strings.TrimSpace(*p.PrimaryModel)
}
if p.ExtraModels != nil {
existing.ExtraModels = normalizeModels(*p.ExtraModels)
}
if p.GroupName != nil {
existing.GroupName = strings.TrimSpace(*p.GroupName)
}
if p.Enabled != nil {
existing.Enabled = *p.Enabled
}
if p.IntervalSeconds != nil {
if err := validateInterval(*p.IntervalSeconds); err != nil {
return err
}
existing.IntervalSeconds = *p.IntervalSeconds
}
return nil
}

View File

@@ -0,0 +1,152 @@
package service
import (
"context"
"net"
"strings"
)
// SSRF 防护 helper
// - validateEndpoint 在 admin 提交时阻止 http/loopback/私网/云元数据 URL
// - safeDialContext 在 socket 层再次校验真实 IP防止 DNS rebinding
//
// 已知 cloud metadata hostname 拒绝列表(小写比较)。
var monitorBlockedHostnames = map[string]struct{}{
"localhost": {},
"localhost.localdomain": {},
"metadata": {},
"metadata.google.internal": {},
"metadata.goog": {},
"instance-data": {},
"instance-data.ec2.internal": {},
}
// CIDR 列表:包含所有需要拒绝的 IPv4/IPv6 段。
// 解析时只 panic 一次(启动时确认),生产路径只做 Contains。
var monitorBlockedCIDRs = mustParseCIDRs([]string{
"127.0.0.0/8", // IPv4 loopback
"10.0.0.0/8", // RFC1918
"172.16.0.0/12", // RFC1918
"192.168.0.0/16", // RFC1918
"169.254.0.0/16", // link-local含云元数据 169.254.169.254
"100.64.0.0/10", // CGNAT
"0.0.0.0/8", // "this network"
"::1/128", // IPv6 loopback
"fc00::/7", // IPv6 ULA
"fe80::/10", // IPv6 link-local
"::/128", // IPv6 unspecified
})
// monitorDialer 共享 Dialer与 net/http 默认值对齐。
var monitorDialer = &net.Dialer{
Timeout: monitorDialTimeout,
KeepAlive: monitorDialKeepAlive,
}
// mustParseCIDRs 在包初始化时解析 CIDR 字符串,失败 panic。
func mustParseCIDRs(cidrs []string) []*net.IPNet {
out := make([]*net.IPNet, 0, len(cidrs))
for _, c := range cidrs {
_, n, err := net.ParseCIDR(c)
if err != nil {
panic("channel_monitor_ssrf: invalid CIDR " + c + ": " + err.Error())
}
out = append(out, n)
}
return out
}
// isBlockedHostname 判断 hostname 是否命中黑名单。
func isBlockedHostname(hostname string) bool {
if hostname == "" {
return true
}
_, blocked := monitorBlockedHostnames[strings.ToLower(hostname)]
return blocked
}
// isPrivateIP 判断 IP 是否落在禁止段loopback/RFC1918/link-local/ULA 等)。
func isPrivateIP(ip net.IP) bool {
if ip == nil {
return true
}
if ip.IsUnspecified() || ip.IsLoopback() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() || ip.IsInterfaceLocalMulticast() {
return true
}
for _, n := range monitorBlockedCIDRs {
if n.Contains(ip) {
return true
}
}
return false
}
// isPrivateOrLoopbackHost 解析 hostname 的所有 A/AAAA 记录,
// 任一 IP 落在私网/loopback 段即认为不安全。
//
// hostname 是 IP 字面量时也走同一路径。
func isPrivateOrLoopbackHost(ctx context.Context, hostname string) (bool, error) {
if isBlockedHostname(hostname) {
return true, nil
}
// IP 字面量直接判断。
if ip := net.ParseIP(hostname); ip != nil {
return isPrivateIP(ip), nil
}
resolver := net.DefaultResolver
addrs, err := resolver.LookupIPAddr(ctx, hostname)
if err != nil {
return false, err
}
if len(addrs) == 0 {
return true, nil
}
for _, a := range addrs {
if isPrivateIP(a.IP) {
return true, nil
}
}
return false, nil
}
// safeDialContext 在真实 dial 前再次校验目标 IP防止 DNS rebinding。
// 解析 hostname 后逐个 IP 尝试连接,命中私网即拒绝(即便 validateEndpoint 时返回的是公网 IP
func safeDialContext(ctx context.Context, network, address string) (net.Conn, error) {
host, port, err := net.SplitHostPort(address)
if err != nil {
return nil, err
}
// 字面量 IP 走快速路径。
if ip := net.ParseIP(host); ip != nil {
if isPrivateIP(ip) {
return nil, &net.AddrError{Err: "blocked by SSRF policy", Addr: address}
}
return monitorDialer.DialContext(ctx, network, address)
}
if isBlockedHostname(host) {
return nil, &net.AddrError{Err: "blocked by SSRF policy", Addr: address}
}
addrs, err := net.DefaultResolver.LookupIPAddr(ctx, host)
if err != nil {
return nil, err
}
if len(addrs) == 0 {
return nil, &net.AddrError{Err: "no addresses for host", Addr: host}
}
var lastErr error
for _, a := range addrs {
if isPrivateIP(a.IP) {
lastErr = &net.AddrError{Err: "blocked by SSRF policy", Addr: a.IP.String()}
continue
}
conn, err := monitorDialer.DialContext(ctx, network, net.JoinHostPort(a.IP.String(), port))
if err == nil {
return conn, nil
}
lastErr = err
}
if lastErr == nil {
lastErr = &net.AddrError{Err: "no usable addresses", Addr: host}
}
return nil, lastErr
}

View File

@@ -0,0 +1,161 @@
package service
import "time"
// ChannelMonitor 渠道监控配置service 层模型,不直接暴露 ent 类型)。
type ChannelMonitor struct {
ID int64
Name string
Provider string
Endpoint string
APIKey string // 解密后的明文 API Key仅在 service 内部使用handler 层不应直接序列化返回)
PrimaryModel string
ExtraModels []string
GroupName string
Enabled bool
IntervalSeconds int
LastCheckedAt *time.Time
CreatedBy int64
CreatedAt time.Time
UpdatedAt time.Time
// APIKeyDecryptFailed 表示 APIKey 字段无法解密(密钥不一致或损坏)。
// 此时 APIKey 为空字符串runner / RunCheck 必须跳过该监控并提示重填。
APIKeyDecryptFailed bool
}
// ChannelMonitorListParams 列表查询过滤参数。
type ChannelMonitorListParams struct {
Page int
PageSize int
Provider string
Enabled *bool
Search string
}
// ChannelMonitorCreateParams 创建参数。
type ChannelMonitorCreateParams struct {
Name string
Provider string
Endpoint string
APIKey string
PrimaryModel string
ExtraModels []string
GroupName string
Enabled bool
IntervalSeconds int
CreatedBy int64
}
// ChannelMonitorUpdateParams 更新参数(指针字段表示"未提供则不更新")。
type ChannelMonitorUpdateParams struct {
Name *string
Provider *string
Endpoint *string
APIKey *string // 空字符串表示不修改;非空字符串覆盖
PrimaryModel *string
ExtraModels *[]string
GroupName *string
Enabled *bool
IntervalSeconds *int
}
// CheckResult 单个模型一次检测的结果。
type CheckResult struct {
Model string
Status string // operational / degraded / failed / error
LatencyMs *int
PingLatencyMs *int
Message string
CheckedAt time.Time
}
// UserMonitorView 用户只读视图:监控概览(含主模型最近状态 + 7d 可用率 + 附加模型最近状态)。
type UserMonitorView struct {
ID int64
Name string
Provider string
GroupName string
PrimaryModel string
PrimaryStatus string
PrimaryLatencyMs *int
Availability7d float64 // 0-100
ExtraModels []ExtraModelStatus
}
// ExtraModelStatus 附加模型最近一次状态。
type ExtraModelStatus struct {
Model string
Status string
LatencyMs *int
}
// UserMonitorDetail 用户只读视图:监控详情(含全部模型 7d/15d/30d 可用率与平均延迟)。
type UserMonitorDetail struct {
ID int64
Name string
Provider string
GroupName string
Models []ModelDetail
}
// ModelDetail 单个模型的可用率/延迟统计。
type ModelDetail struct {
Model string
LatestStatus string
LatestLatencyMs *int
Availability7d float64 // 0-100
Availability15d float64
Availability30d float64
AvgLatency7dMs *int
}
// ChannelMonitorHistoryRow 历史记录入库行service 层向 repository 提交的数据)。
type ChannelMonitorHistoryRow struct {
MonitorID int64
Model string
Status string
LatencyMs *int
PingLatencyMs *int
Message string
CheckedAt time.Time
}
// ChannelMonitorHistoryEntry 历史记录查询返回行(含 ent 主键 ID
type ChannelMonitorHistoryEntry struct {
ID int64
Model string
Status string
LatencyMs *int
PingLatencyMs *int
Message string
CheckedAt time.Time
}
// ChannelMonitorLatest 最近一次检测的简明信息(用于 UserMonitorView 聚合)。
type ChannelMonitorLatest struct {
Model string
Status string
LatencyMs *int
CheckedAt time.Time
}
// ChannelMonitorAvailability 单个模型在某窗口内的可用率与平均延迟(用于 UserMonitorDetail 聚合)。
type ChannelMonitorAvailability struct {
Model string
WindowDays int
TotalChecks int
OperationalChecks int // operational + degraded 视为可用
AvailabilityPct float64
AvgLatencyMs *int
}
// MonitorStatusSummary 监控状态聚合admin list 用,单次 repo 查询消除前端 N+1
// PrimaryStatus / PrimaryLatencyMs 描述主模型最近状态Availability7d 是主模型 7 天可用率;
// ExtraModels 描述附加模型最近状态(用于 hover 展示)。
type MonitorStatusSummary struct {
PrimaryStatus string // 空字符串表示无历史
PrimaryLatencyMs *int
Availability7d float64 // 0-100无历史时为 0
ExtraModels []ExtraModelStatus
}

View File

@@ -0,0 +1,99 @@
package service
import (
"context"
"net/url"
"strings"
)
// 渠道监控参数校验与归一化辅助函数。
// 校验失败一律返回 channel_monitor_const.go 中预定义的 Err* 错误,错误信息不含具体 IP/hostname避免泄露内网拓扑。
// validateProvider 校验 provider 字符串。
// 唯一来源于 providerAdapters新增 provider 只需要在 channel_monitor_checker.go 注册 adapter。
func validateProvider(p string) error {
if !isSupportedProvider(p) {
return ErrChannelMonitorInvalidProvider
}
return nil
}
// validateInterval 校验 interval_seconds 范围。
func validateInterval(sec int) error {
if sec < monitorMinIntervalSeconds || sec > monitorMaxIntervalSeconds {
return ErrChannelMonitorInvalidInterval
}
return nil
}
// validateEndpoint 校验 endpoint
// - scheme 强制 https拒绝 http避免明文凭证 + 部分 SSRF 利用面)
// - 必须为 origin无 path/query/fragment防止用户填 https://api.openai.com/v1
// 导致 joinURL 拼出 /v1/v1/chat/completions
// - hostname 不能是 localhost/metadata 等已知元数据 hostname
// - 解析所有 IP任一落在 loopback/RFC1918/link-local/ULA 段即拒绝(防 SSRF
//
// 错误信息不暴露具体 IP / hostname避免泄露内网拓扑。
func validateEndpoint(ep string) error {
ep = strings.TrimSpace(ep)
if ep == "" {
return ErrChannelMonitorInvalidEndpoint
}
u, err := url.Parse(ep)
if err != nil {
return ErrChannelMonitorInvalidEndpoint
}
if u.Scheme != "https" {
return ErrChannelMonitorEndpointScheme
}
if u.Host == "" {
return ErrChannelMonitorInvalidEndpoint
}
if u.Path != "" && u.Path != "/" {
return ErrChannelMonitorEndpointPath
}
if u.RawQuery != "" || u.Fragment != "" {
return ErrChannelMonitorEndpointPath
}
hostname := u.Hostname()
ctx, cancel := context.WithTimeout(context.Background(), monitorEndpointResolveTimeout)
defer cancel()
blocked, err := isPrivateOrLoopbackHost(ctx, hostname)
if err != nil {
return ErrChannelMonitorEndpointUnreachable
}
if blocked {
return ErrChannelMonitorEndpointPrivate
}
return nil
}
// normalizeEndpoint 去除前后空白与末尾 `/`,保证存储统一为 origin。
// validateEndpoint 已确保格式合法(仅 origin这里只做最终归一化。
func normalizeEndpoint(ep string) string {
ep = strings.TrimSpace(ep)
ep = strings.TrimRight(ep, "/")
return ep
}
// normalizeModels 去除空白、重复模型名。保留输入顺序map 的迭代顺序无关)。
func normalizeModels(in []string) []string {
if len(in) == 0 {
return []string{}
}
seen := make(map[string]struct{}, len(in))
out := make([]string, 0, len(in))
for _, m := range in {
m = strings.TrimSpace(m)
if m == "" {
continue
}
if _, ok := seen[m]; ok {
continue
}
seen[m] = struct{}{}
out = append(out, m)
}
return out
}

View File

@@ -467,6 +467,8 @@ var ProviderSet = wire.NewSet(
NewPaymentService,
ProvidePaymentOrderExpiryService,
ProvideBalanceNotifyService,
ProvideChannelMonitorService,
ProvideChannelMonitorRunner,
)
// ProvidePaymentConfigService wraps NewPaymentConfigService to accept the named
@@ -486,3 +488,20 @@ func ProvidePaymentOrderExpiryService(paymentSvc *PaymentService) *PaymentOrderE
svc.Start()
return svc
}
// ProvideChannelMonitorService 创建渠道监控服务CRUD + RunCheck + 用户视图聚合)。
// 加密器复用 wire 中已注入的 SecretEncryptorAES-256-GCM
func ProvideChannelMonitorService(
repo ChannelMonitorRepository,
encryptor SecretEncryptor,
) *ChannelMonitorService {
return NewChannelMonitorService(repo, encryptor)
}
// ProvideChannelMonitorRunner 创建并启动渠道监控调度器。
// Runner.Stop 由 cleanup function 调用。
func ProvideChannelMonitorRunner(svc *ChannelMonitorService) *ChannelMonitorRunner {
r := NewChannelMonitorRunner(svc)
r.Start()
return r
}