Merge branch 'main' into dev
This commit is contained in:
@@ -118,3 +118,96 @@ func (h *OpsHandler) GetAccountAvailability(c *gin.Context) {
|
||||
}
|
||||
response.Success(c, payload)
|
||||
}
|
||||
|
||||
func parseOpsRealtimeWindow(v string) (time.Duration, string, bool) {
|
||||
switch strings.ToLower(strings.TrimSpace(v)) {
|
||||
case "", "1min", "1m":
|
||||
return 1 * time.Minute, "1min", true
|
||||
case "5min", "5m":
|
||||
return 5 * time.Minute, "5min", true
|
||||
case "30min", "30m":
|
||||
return 30 * time.Minute, "30min", true
|
||||
case "1h", "60m", "60min":
|
||||
return 1 * time.Hour, "1h", true
|
||||
default:
|
||||
return 0, "", false
|
||||
}
|
||||
}
|
||||
|
||||
// GetRealtimeTrafficSummary returns QPS/TPS current/peak/avg for the selected window.
|
||||
// GET /api/v1/admin/ops/realtime-traffic
|
||||
//
|
||||
// Query params:
|
||||
// - window: 1min|5min|30min|1h (default: 1min)
|
||||
// - platform: optional
|
||||
// - group_id: optional
|
||||
func (h *OpsHandler) GetRealtimeTrafficSummary(c *gin.Context) {
|
||||
if h.opsService == nil {
|
||||
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
|
||||
return
|
||||
}
|
||||
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
windowDur, windowLabel, ok := parseOpsRealtimeWindow(c.Query("window"))
|
||||
if !ok {
|
||||
response.BadRequest(c, "Invalid window")
|
||||
return
|
||||
}
|
||||
|
||||
platform := strings.TrimSpace(c.Query("platform"))
|
||||
var groupID *int64
|
||||
if v := strings.TrimSpace(c.Query("group_id")); v != "" {
|
||||
id, err := strconv.ParseInt(v, 10, 64)
|
||||
if err != nil || id <= 0 {
|
||||
response.BadRequest(c, "Invalid group_id")
|
||||
return
|
||||
}
|
||||
groupID = &id
|
||||
}
|
||||
|
||||
endTime := time.Now().UTC()
|
||||
startTime := endTime.Add(-windowDur)
|
||||
|
||||
if !h.opsService.IsRealtimeMonitoringEnabled(c.Request.Context()) {
|
||||
disabledSummary := &service.OpsRealtimeTrafficSummary{
|
||||
Window: windowLabel,
|
||||
StartTime: startTime,
|
||||
EndTime: endTime,
|
||||
Platform: platform,
|
||||
GroupID: groupID,
|
||||
QPS: service.OpsRateSummary{},
|
||||
TPS: service.OpsRateSummary{},
|
||||
}
|
||||
response.Success(c, gin.H{
|
||||
"enabled": false,
|
||||
"summary": disabledSummary,
|
||||
"timestamp": endTime,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
filter := &service.OpsDashboardFilter{
|
||||
StartTime: startTime,
|
||||
EndTime: endTime,
|
||||
Platform: platform,
|
||||
GroupID: groupID,
|
||||
QueryMode: service.OpsQueryModeRaw,
|
||||
}
|
||||
|
||||
summary, err := h.opsService.GetRealtimeTrafficSummary(c.Request.Context(), filter)
|
||||
if err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
if summary != nil {
|
||||
summary.Window = windowLabel
|
||||
}
|
||||
response.Success(c, gin.H{
|
||||
"enabled": true,
|
||||
"summary": summary,
|
||||
"timestamp": endTime,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -146,3 +146,49 @@ func (h *OpsHandler) UpdateAdvancedSettings(c *gin.Context) {
|
||||
}
|
||||
response.Success(c, updated)
|
||||
}
|
||||
|
||||
// GetMetricThresholds returns Ops metric thresholds (DB-backed).
|
||||
// GET /api/v1/admin/ops/settings/metric-thresholds
|
||||
func (h *OpsHandler) GetMetricThresholds(c *gin.Context) {
|
||||
if h.opsService == nil {
|
||||
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
|
||||
return
|
||||
}
|
||||
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
cfg, err := h.opsService.GetMetricThresholds(c.Request.Context())
|
||||
if err != nil {
|
||||
response.Error(c, http.StatusInternalServerError, "Failed to get metric thresholds")
|
||||
return
|
||||
}
|
||||
response.Success(c, cfg)
|
||||
}
|
||||
|
||||
// UpdateMetricThresholds updates Ops metric thresholds (DB-backed).
|
||||
// PUT /api/v1/admin/ops/settings/metric-thresholds
|
||||
func (h *OpsHandler) UpdateMetricThresholds(c *gin.Context) {
|
||||
if h.opsService == nil {
|
||||
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
|
||||
return
|
||||
}
|
||||
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
var req service.OpsMetricThresholds
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
response.BadRequest(c, "Invalid request body")
|
||||
return
|
||||
}
|
||||
|
||||
updated, err := h.opsService.UpdateMetricThresholds(c.Request.Context(), &req)
|
||||
if err != nil {
|
||||
response.Error(c, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
response.Success(c, updated)
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package handler
|
||||
import (
|
||||
"github.com/Wei-Shaw/sub2api/internal/config"
|
||||
"github.com/Wei-Shaw/sub2api/internal/handler/dto"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/ip"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/response"
|
||||
middleware2 "github.com/Wei-Shaw/sub2api/internal/server/middleware"
|
||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||
@@ -76,7 +77,7 @@ func (h *AuthHandler) Register(c *gin.Context) {
|
||||
|
||||
// Turnstile 验证(当提供了邮箱验证码时跳过,因为发送验证码时已验证过)
|
||||
if req.VerifyCode == "" {
|
||||
if err := h.authService.VerifyTurnstile(c.Request.Context(), req.TurnstileToken, c.ClientIP()); err != nil {
|
||||
if err := h.authService.VerifyTurnstile(c.Request.Context(), req.TurnstileToken, ip.GetClientIP(c)); err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
@@ -105,7 +106,7 @@ func (h *AuthHandler) SendVerifyCode(c *gin.Context) {
|
||||
}
|
||||
|
||||
// Turnstile 验证
|
||||
if err := h.authService.VerifyTurnstile(c.Request.Context(), req.TurnstileToken, c.ClientIP()); err != nil {
|
||||
if err := h.authService.VerifyTurnstile(c.Request.Context(), req.TurnstileToken, ip.GetClientIP(c)); err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
@@ -132,7 +133,7 @@ func (h *AuthHandler) Login(c *gin.Context) {
|
||||
}
|
||||
|
||||
// Turnstile 验证
|
||||
if err := h.authService.VerifyTurnstile(c.Request.Context(), req.TurnstileToken, c.ClientIP()); err != nil {
|
||||
if err := h.authService.VerifyTurnstile(c.Request.Context(), req.TurnstileToken, ip.GetClientIP(c)); err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/antigravity"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/claude"
|
||||
pkgerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/ip"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/openai"
|
||||
middleware2 "github.com/Wei-Shaw/sub2api/internal/server/middleware"
|
||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||
@@ -88,6 +89,9 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// 检查是否为 Claude Code 客户端,设置到 context 中
|
||||
SetClaudeCodeClientContext(c, body)
|
||||
|
||||
setOpsRequestContext(c, "", false, body)
|
||||
|
||||
parsedReq, err := service.ParseGatewayRequest(body)
|
||||
@@ -271,12 +275,11 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
||||
var failoverErr *service.UpstreamFailoverError
|
||||
if errors.As(err, &failoverErr) {
|
||||
failedAccountIDs[account.ID] = struct{}{}
|
||||
lastFailoverStatus = failoverErr.StatusCode
|
||||
if switchCount >= maxAccountSwitches {
|
||||
lastFailoverStatus = failoverErr.StatusCode
|
||||
h.handleFailoverExhausted(c, lastFailoverStatus, streamStarted)
|
||||
return
|
||||
}
|
||||
lastFailoverStatus = failoverErr.StatusCode
|
||||
switchCount++
|
||||
log.Printf("Account %d: upstream error %d, switching account %d/%d", account.ID, failoverErr.StatusCode, switchCount, maxAccountSwitches)
|
||||
continue
|
||||
@@ -286,8 +289,12 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// 捕获请求信息(用于异步记录,避免在 goroutine 中访问 gin.Context)
|
||||
userAgent := c.GetHeader("User-Agent")
|
||||
clientIP := ip.GetClientIP(c)
|
||||
|
||||
// 异步记录使用量(subscription已在函数开头获取)
|
||||
go func(result *service.ForwardResult, usedAccount *service.Account) {
|
||||
go func(result *service.ForwardResult, usedAccount *service.Account, ua, clientIP string) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := h.gatewayService.RecordUsage(ctx, &service.RecordUsageInput{
|
||||
@@ -296,10 +303,12 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
||||
User: apiKey.User,
|
||||
Account: usedAccount,
|
||||
Subscription: subscription,
|
||||
UserAgent: ua,
|
||||
IPAddress: clientIP,
|
||||
}); err != nil {
|
||||
log.Printf("Record usage failed: %v", err)
|
||||
}
|
||||
}(result, account)
|
||||
}(result, account, userAgent, clientIP)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -399,12 +408,11 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
||||
var failoverErr *service.UpstreamFailoverError
|
||||
if errors.As(err, &failoverErr) {
|
||||
failedAccountIDs[account.ID] = struct{}{}
|
||||
lastFailoverStatus = failoverErr.StatusCode
|
||||
if switchCount >= maxAccountSwitches {
|
||||
lastFailoverStatus = failoverErr.StatusCode
|
||||
h.handleFailoverExhausted(c, lastFailoverStatus, streamStarted)
|
||||
return
|
||||
}
|
||||
lastFailoverStatus = failoverErr.StatusCode
|
||||
switchCount++
|
||||
log.Printf("Account %d: upstream error %d, switching account %d/%d", account.ID, failoverErr.StatusCode, switchCount, maxAccountSwitches)
|
||||
continue
|
||||
@@ -414,8 +422,12 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// 捕获请求信息(用于异步记录,避免在 goroutine 中访问 gin.Context)
|
||||
userAgent := c.GetHeader("User-Agent")
|
||||
clientIP := ip.GetClientIP(c)
|
||||
|
||||
// 异步记录使用量(subscription已在函数开头获取)
|
||||
go func(result *service.ForwardResult, usedAccount *service.Account) {
|
||||
go func(result *service.ForwardResult, usedAccount *service.Account, ua, clientIP string) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := h.gatewayService.RecordUsage(ctx, &service.RecordUsageInput{
|
||||
@@ -424,10 +436,12 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
||||
User: apiKey.User,
|
||||
Account: usedAccount,
|
||||
Subscription: subscription,
|
||||
UserAgent: ua,
|
||||
IPAddress: clientIP,
|
||||
}); err != nil {
|
||||
log.Printf("Record usage failed: %v", err)
|
||||
}
|
||||
}(result, account)
|
||||
}(result, account, userAgent, clientIP)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/antigravity"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/gemini"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/googleapi"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/ip"
|
||||
"github.com/Wei-Shaw/sub2api/internal/server/middleware"
|
||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||
|
||||
@@ -314,8 +315,12 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// 捕获请求信息(用于异步记录,避免在 goroutine 中访问 gin.Context)
|
||||
userAgent := c.GetHeader("User-Agent")
|
||||
clientIP := ip.GetClientIP(c)
|
||||
|
||||
// 6) record usage async
|
||||
go func(result *service.ForwardResult, usedAccount *service.Account) {
|
||||
go func(result *service.ForwardResult, usedAccount *service.Account, ua, ip string) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := h.gatewayService.RecordUsage(ctx, &service.RecordUsageInput{
|
||||
@@ -324,10 +329,12 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
|
||||
User: apiKey.User,
|
||||
Account: usedAccount,
|
||||
Subscription: subscription,
|
||||
UserAgent: ua,
|
||||
IPAddress: ip,
|
||||
}); err != nil {
|
||||
log.Printf("Record usage failed: %v", err)
|
||||
}
|
||||
}(result, account)
|
||||
}(result, account, userAgent, clientIP)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/config"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/ip"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/openai"
|
||||
middleware2 "github.com/Wei-Shaw/sub2api/internal/server/middleware"
|
||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||
@@ -263,8 +264,12 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// 捕获请求信息(用于异步记录,避免在 goroutine 中访问 gin.Context)
|
||||
userAgent := c.GetHeader("User-Agent")
|
||||
clientIP := ip.GetClientIP(c)
|
||||
|
||||
// Async record usage
|
||||
go func(result *service.OpenAIForwardResult, usedAccount *service.Account) {
|
||||
go func(result *service.OpenAIForwardResult, usedAccount *service.Account, ua, ip string) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := h.gatewayService.RecordUsage(ctx, &service.OpenAIRecordUsageInput{
|
||||
@@ -273,10 +278,12 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
|
||||
User: apiKey.User,
|
||||
Account: usedAccount,
|
||||
Subscription: subscription,
|
||||
UserAgent: ua,
|
||||
IPAddress: ip,
|
||||
}); err != nil {
|
||||
log.Printf("Record usage failed: %v", err)
|
||||
}
|
||||
}(result, account)
|
||||
}(result, account, userAgent, clientIP)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/ip"
|
||||
middleware2 "github.com/Wei-Shaw/sub2api/internal/server/middleware"
|
||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||
"github.com/gin-gonic/gin"
|
||||
@@ -489,6 +490,7 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
|
||||
Severity: classifyOpsSeverity("upstream_error", effectiveUpstreamStatus),
|
||||
StatusCode: status,
|
||||
IsBusinessLimited: false,
|
||||
IsCountTokens: isCountTokensRequest(c),
|
||||
|
||||
ErrorMessage: recoveredMsg,
|
||||
ErrorBody: "",
|
||||
@@ -521,7 +523,7 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
|
||||
}
|
||||
|
||||
var clientIP string
|
||||
if ip := strings.TrimSpace(c.ClientIP()); ip != "" {
|
||||
if ip := strings.TrimSpace(ip.GetClientIP(c)); ip != "" {
|
||||
clientIP = ip
|
||||
entry.ClientIP = &clientIP
|
||||
}
|
||||
@@ -598,6 +600,7 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
|
||||
Severity: classifyOpsSeverity(parsed.ErrorType, status),
|
||||
StatusCode: status,
|
||||
IsBusinessLimited: isBusinessLimited,
|
||||
IsCountTokens: isCountTokensRequest(c),
|
||||
|
||||
ErrorMessage: parsed.Message,
|
||||
// Keep the full captured error body (capture is already capped at 64KB) so the
|
||||
@@ -680,7 +683,7 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
|
||||
}
|
||||
|
||||
var clientIP string
|
||||
if ip := strings.TrimSpace(c.ClientIP()); ip != "" {
|
||||
if ip := strings.TrimSpace(ip.GetClientIP(c)); ip != "" {
|
||||
clientIP = ip
|
||||
entry.ClientIP = &clientIP
|
||||
}
|
||||
@@ -704,6 +707,14 @@ var opsRetryRequestHeaderAllowlist = []string{
|
||||
"anthropic-version",
|
||||
}
|
||||
|
||||
// isCountTokensRequest checks if the request is a count_tokens request
|
||||
func isCountTokensRequest(c *gin.Context) bool {
|
||||
if c == nil || c.Request == nil || c.Request.URL == nil {
|
||||
return false
|
||||
}
|
||||
return strings.Contains(c.Request.URL.Path, "/count_tokens")
|
||||
}
|
||||
|
||||
func extractOpsRetryRequestHeaders(c *gin.Context) *string {
|
||||
if c == nil || c.Request == nil {
|
||||
return nil
|
||||
|
||||
@@ -46,6 +46,7 @@ INSERT INTO ops_error_logs (
|
||||
severity,
|
||||
status_code,
|
||||
is_business_limited,
|
||||
is_count_tokens,
|
||||
error_message,
|
||||
error_body,
|
||||
error_source,
|
||||
@@ -64,7 +65,7 @@ INSERT INTO ops_error_logs (
|
||||
retry_count,
|
||||
created_at
|
||||
) VALUES (
|
||||
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34
|
||||
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34,$35
|
||||
) RETURNING id`
|
||||
|
||||
var id int64
|
||||
@@ -88,6 +89,7 @@ INSERT INTO ops_error_logs (
|
||||
opsNullString(input.Severity),
|
||||
opsNullInt(input.StatusCode),
|
||||
input.IsBusinessLimited,
|
||||
input.IsCountTokens,
|
||||
opsNullString(input.ErrorMessage),
|
||||
opsNullString(input.ErrorBody),
|
||||
opsNullString(input.ErrorSource),
|
||||
|
||||
@@ -964,8 +964,8 @@ func buildErrorWhere(filter *service.OpsDashboardFilter, start, end time.Time, s
|
||||
}
|
||||
|
||||
idx := startIndex
|
||||
clauses := make([]string, 0, 4)
|
||||
args = make([]any, 0, 4)
|
||||
clauses := make([]string, 0, 5)
|
||||
args = make([]any, 0, 5)
|
||||
|
||||
args = append(args, start)
|
||||
clauses = append(clauses, fmt.Sprintf("created_at >= $%d", idx))
|
||||
@@ -974,6 +974,8 @@ func buildErrorWhere(filter *service.OpsDashboardFilter, start, end time.Time, s
|
||||
clauses = append(clauses, fmt.Sprintf("created_at < $%d", idx))
|
||||
idx++
|
||||
|
||||
clauses = append(clauses, "is_count_tokens = FALSE")
|
||||
|
||||
if groupID != nil && *groupID > 0 {
|
||||
args = append(args, *groupID)
|
||||
clauses = append(clauses, fmt.Sprintf("group_id = $%d", idx))
|
||||
|
||||
@@ -78,7 +78,9 @@ error_base AS (
|
||||
status_code AS client_status_code,
|
||||
COALESCE(upstream_status_code, status_code, 0) AS effective_status_code
|
||||
FROM ops_error_logs
|
||||
-- Exclude count_tokens requests from error metrics as they are informational probes
|
||||
WHERE created_at >= $1 AND created_at < $2
|
||||
AND is_count_tokens = FALSE
|
||||
),
|
||||
error_agg AS (
|
||||
SELECT
|
||||
|
||||
129
backend/internal/repository/ops_repo_realtime_traffic.go
Normal file
129
backend/internal/repository/ops_repo_realtime_traffic.go
Normal file
@@ -0,0 +1,129 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||
)
|
||||
|
||||
func (r *opsRepository) GetRealtimeTrafficSummary(ctx context.Context, filter *service.OpsDashboardFilter) (*service.OpsRealtimeTrafficSummary, error) {
|
||||
if r == nil || r.db == nil {
|
||||
return nil, fmt.Errorf("nil ops repository")
|
||||
}
|
||||
if filter == nil {
|
||||
return nil, fmt.Errorf("nil filter")
|
||||
}
|
||||
if filter.StartTime.IsZero() || filter.EndTime.IsZero() {
|
||||
return nil, fmt.Errorf("start_time/end_time required")
|
||||
}
|
||||
|
||||
start := filter.StartTime.UTC()
|
||||
end := filter.EndTime.UTC()
|
||||
if start.After(end) {
|
||||
return nil, fmt.Errorf("start_time must be <= end_time")
|
||||
}
|
||||
|
||||
window := end.Sub(start)
|
||||
if window <= 0 {
|
||||
return nil, fmt.Errorf("invalid time window")
|
||||
}
|
||||
if window > time.Hour {
|
||||
return nil, fmt.Errorf("window too large")
|
||||
}
|
||||
|
||||
usageJoin, usageWhere, usageArgs, next := buildUsageWhere(filter, start, end, 1)
|
||||
errorWhere, errorArgs, _ := buildErrorWhere(filter, start, end, next)
|
||||
|
||||
q := `
|
||||
WITH usage_buckets AS (
|
||||
SELECT
|
||||
date_trunc('minute', ul.created_at) AS bucket,
|
||||
COALESCE(COUNT(*), 0) AS success_count,
|
||||
COALESCE(SUM(input_tokens + output_tokens + cache_creation_tokens + cache_read_tokens), 0) AS token_sum
|
||||
FROM usage_logs ul
|
||||
` + usageJoin + `
|
||||
` + usageWhere + `
|
||||
GROUP BY 1
|
||||
),
|
||||
error_buckets AS (
|
||||
SELECT
|
||||
date_trunc('minute', created_at) AS bucket,
|
||||
COALESCE(COUNT(*), 0) AS error_count
|
||||
FROM ops_error_logs
|
||||
` + errorWhere + `
|
||||
AND COALESCE(status_code, 0) >= 400
|
||||
GROUP BY 1
|
||||
),
|
||||
combined AS (
|
||||
SELECT
|
||||
COALESCE(u.bucket, e.bucket) AS bucket,
|
||||
COALESCE(u.success_count, 0) AS success_count,
|
||||
COALESCE(u.token_sum, 0) AS token_sum,
|
||||
COALESCE(e.error_count, 0) AS error_count,
|
||||
COALESCE(u.success_count, 0) + COALESCE(e.error_count, 0) AS request_total
|
||||
FROM usage_buckets u
|
||||
FULL OUTER JOIN error_buckets e ON u.bucket = e.bucket
|
||||
)
|
||||
SELECT
|
||||
COALESCE(SUM(success_count), 0) AS success_total,
|
||||
COALESCE(SUM(error_count), 0) AS error_total,
|
||||
COALESCE(SUM(token_sum), 0) AS token_total,
|
||||
COALESCE(MAX(request_total), 0) AS peak_requests_per_min,
|
||||
COALESCE(MAX(token_sum), 0) AS peak_tokens_per_min
|
||||
FROM combined`
|
||||
|
||||
args := append(usageArgs, errorArgs...)
|
||||
var successCount int64
|
||||
var errorTotal int64
|
||||
var tokenConsumed int64
|
||||
var peakRequestsPerMin int64
|
||||
var peakTokensPerMin int64
|
||||
if err := r.db.QueryRowContext(ctx, q, args...).Scan(
|
||||
&successCount,
|
||||
&errorTotal,
|
||||
&tokenConsumed,
|
||||
&peakRequestsPerMin,
|
||||
&peakTokensPerMin,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
windowSeconds := window.Seconds()
|
||||
if windowSeconds <= 0 {
|
||||
windowSeconds = 1
|
||||
}
|
||||
|
||||
requestCountTotal := successCount + errorTotal
|
||||
qpsAvg := roundTo1DP(float64(requestCountTotal) / windowSeconds)
|
||||
tpsAvg := roundTo1DP(float64(tokenConsumed) / windowSeconds)
|
||||
|
||||
// Keep "current" consistent with the dashboard overview semantics: last 1 minute.
|
||||
// This remains "within the selected window" since end=start+window.
|
||||
qpsCurrent, tpsCurrent, err := r.queryCurrentRates(ctx, filter, end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
qpsPeak := roundTo1DP(float64(peakRequestsPerMin) / 60.0)
|
||||
tpsPeak := roundTo1DP(float64(peakTokensPerMin) / 60.0)
|
||||
|
||||
return &service.OpsRealtimeTrafficSummary{
|
||||
StartTime: start,
|
||||
EndTime: end,
|
||||
Platform: strings.TrimSpace(filter.Platform),
|
||||
GroupID: filter.GroupID,
|
||||
QPS: service.OpsRateSummary{
|
||||
Current: qpsCurrent,
|
||||
Peak: qpsPeak,
|
||||
Avg: qpsAvg,
|
||||
},
|
||||
TPS: service.OpsRateSummary{
|
||||
Current: tpsCurrent,
|
||||
Peak: tpsPeak,
|
||||
Avg: tpsAvg,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
@@ -170,6 +170,7 @@ error_totals AS (
|
||||
FROM ops_error_logs
|
||||
WHERE created_at >= $1 AND created_at < $2
|
||||
AND COALESCE(status_code, 0) >= 400
|
||||
AND is_count_tokens = FALSE -- 排除 count_tokens 请求的错误
|
||||
GROUP BY 1
|
||||
),
|
||||
combined AS (
|
||||
@@ -243,6 +244,7 @@ error_totals AS (
|
||||
AND platform = $3
|
||||
AND group_id IS NOT NULL
|
||||
AND COALESCE(status_code, 0) >= 400
|
||||
AND is_count_tokens = FALSE -- 排除 count_tokens 请求的错误
|
||||
GROUP BY 1
|
||||
),
|
||||
combined AS (
|
||||
|
||||
@@ -80,17 +80,17 @@ func enqueueSchedulerOutbox(ctx context.Context, exec sqlExecutor, eventType str
|
||||
if exec == nil {
|
||||
return nil
|
||||
}
|
||||
var payloadJSON []byte
|
||||
var payloadArg any
|
||||
if payload != nil {
|
||||
encoded, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
payloadJSON = encoded
|
||||
payloadArg = encoded
|
||||
}
|
||||
_, err := exec.ExecContext(ctx, `
|
||||
INSERT INTO scheduler_outbox (event_type, account_id, group_id, payload)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
`, eventType, accountID, groupID, payloadJSON)
|
||||
`, eventType, accountID, groupID, payloadArg)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -46,25 +46,12 @@ func TestSchedulerSnapshotOutboxReplay(t *testing.T) {
|
||||
Extra: map[string]any{},
|
||||
}
|
||||
require.NoError(t, accountRepo.Create(ctx, account))
|
||||
require.NoError(t, cache.SetAccount(ctx, account))
|
||||
|
||||
svc := service.NewSchedulerSnapshotService(cache, outboxRepo, accountRepo, nil, cfg)
|
||||
svc.Start()
|
||||
t.Cleanup(svc.Stop)
|
||||
|
||||
bucket := service.SchedulerBucket{GroupID: 0, Platform: service.PlatformOpenAI, Mode: service.SchedulerModeSingle}
|
||||
require.Eventually(t, func() bool {
|
||||
accounts, hit, err := cache.GetSnapshot(ctx, bucket)
|
||||
if err != nil || !hit {
|
||||
return false
|
||||
}
|
||||
for _, acc := range accounts {
|
||||
if acc.ID == account.ID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}, 5*time.Second, 100*time.Millisecond)
|
||||
|
||||
require.NoError(t, accountRepo.UpdateLastUsed(ctx, account.ID))
|
||||
updated, err := accountRepo.GetByID(ctx, account.ID)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -73,6 +73,7 @@ func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
|
||||
// Realtime ops signals
|
||||
ops.GET("/concurrency", h.Admin.Ops.GetConcurrencyStats)
|
||||
ops.GET("/account-availability", h.Admin.Ops.GetAccountAvailability)
|
||||
ops.GET("/realtime-traffic", h.Admin.Ops.GetRealtimeTrafficSummary)
|
||||
|
||||
// Alerts (rules + events)
|
||||
ops.GET("/alert-rules", h.Admin.Ops.ListAlertRules)
|
||||
@@ -96,6 +97,13 @@ func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
|
||||
ops.GET("/advanced-settings", h.Admin.Ops.GetAdvancedSettings)
|
||||
ops.PUT("/advanced-settings", h.Admin.Ops.UpdateAdvancedSettings)
|
||||
|
||||
// Settings group (DB-backed)
|
||||
settings := ops.Group("/settings")
|
||||
{
|
||||
settings.GET("/metric-thresholds", h.Admin.Ops.GetMetricThresholds)
|
||||
settings.PUT("/metric-thresholds", h.Admin.Ops.UpdateMetricThresholds)
|
||||
}
|
||||
|
||||
// WebSocket realtime (QPS/TPS)
|
||||
ws := ops.Group("/ws")
|
||||
{
|
||||
|
||||
@@ -523,6 +523,9 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
proxyURL = account.Proxy.URL()
|
||||
}
|
||||
|
||||
// Sanitize thinking blocks (clean cache_control and flatten history thinking)
|
||||
sanitizeThinkingBlocks(&claudeReq)
|
||||
|
||||
// 获取转换选项
|
||||
// Antigravity 上游要求必须包含身份提示词,否则会返回 429
|
||||
transformOpts := s.getClaudeTransformOptions(ctx)
|
||||
@@ -534,6 +537,9 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
return nil, fmt.Errorf("transform request: %w", err)
|
||||
}
|
||||
|
||||
// Safety net: ensure no cache_control leaked into Gemini request
|
||||
geminiBody = cleanCacheControlFromGeminiJSON(geminiBody)
|
||||
|
||||
// Antigravity 上游只支持流式请求,统一使用 streamGenerateContent
|
||||
// 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后转换返回
|
||||
action := "streamGenerateContent"
|
||||
@@ -903,6 +909,143 @@ func extractAntigravityErrorMessage(body []byte) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// cleanCacheControlFromGeminiJSON removes cache_control from Gemini JSON (emergency fix)
|
||||
// This should not be needed if transformation is correct, but serves as a safety net
|
||||
func cleanCacheControlFromGeminiJSON(body []byte) []byte {
|
||||
// Try a more robust approach: parse and clean
|
||||
var data map[string]any
|
||||
if err := json.Unmarshal(body, &data); err != nil {
|
||||
log.Printf("[Antigravity] Failed to parse Gemini JSON for cache_control cleaning: %v", err)
|
||||
return body
|
||||
}
|
||||
|
||||
cleaned := removeCacheControlFromAny(data)
|
||||
if !cleaned {
|
||||
return body
|
||||
}
|
||||
|
||||
if result, err := json.Marshal(data); err == nil {
|
||||
log.Printf("[Antigravity] Successfully cleaned cache_control from Gemini JSON")
|
||||
return result
|
||||
}
|
||||
|
||||
return body
|
||||
}
|
||||
|
||||
// removeCacheControlFromAny recursively removes cache_control fields
|
||||
func removeCacheControlFromAny(v any) bool {
|
||||
cleaned := false
|
||||
|
||||
switch val := v.(type) {
|
||||
case map[string]any:
|
||||
for k, child := range val {
|
||||
if k == "cache_control" {
|
||||
delete(val, k)
|
||||
cleaned = true
|
||||
} else if removeCacheControlFromAny(child) {
|
||||
cleaned = true
|
||||
}
|
||||
}
|
||||
case []any:
|
||||
for _, item := range val {
|
||||
if removeCacheControlFromAny(item) {
|
||||
cleaned = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return cleaned
|
||||
}
|
||||
|
||||
// sanitizeThinkingBlocks cleans cache_control and flattens history thinking blocks
|
||||
// Thinking blocks do NOT support cache_control field (Anthropic API/Vertex AI requirement)
|
||||
// Additionally, history thinking blocks are flattened to text to avoid upstream validation errors
|
||||
func sanitizeThinkingBlocks(req *antigravity.ClaudeRequest) {
|
||||
if req == nil {
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("[Antigravity] sanitizeThinkingBlocks: processing request with %d messages", len(req.Messages))
|
||||
|
||||
// Clean system blocks
|
||||
if len(req.System) > 0 {
|
||||
var systemBlocks []map[string]any
|
||||
if err := json.Unmarshal(req.System, &systemBlocks); err == nil {
|
||||
for i := range systemBlocks {
|
||||
if blockType, _ := systemBlocks[i]["type"].(string); blockType == "thinking" || systemBlocks[i]["thinking"] != nil {
|
||||
if removeCacheControlFromAny(systemBlocks[i]) {
|
||||
log.Printf("[Antigravity] Deep cleaned cache_control from thinking block in system[%d]", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Marshal back
|
||||
if cleaned, err := json.Marshal(systemBlocks); err == nil {
|
||||
req.System = cleaned
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clean message content blocks and flatten history
|
||||
lastMsgIdx := len(req.Messages) - 1
|
||||
for msgIdx := range req.Messages {
|
||||
raw := req.Messages[msgIdx].Content
|
||||
if len(raw) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Try to parse as blocks array
|
||||
var blocks []map[string]any
|
||||
if err := json.Unmarshal(raw, &blocks); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
cleaned := false
|
||||
for blockIdx := range blocks {
|
||||
blockType, _ := blocks[blockIdx]["type"].(string)
|
||||
|
||||
// Check for thinking blocks (typed or untyped)
|
||||
if blockType == "thinking" || blocks[blockIdx]["thinking"] != nil {
|
||||
// 1. Clean cache_control
|
||||
if removeCacheControlFromAny(blocks[blockIdx]) {
|
||||
log.Printf("[Antigravity] Deep cleaned cache_control from thinking block in messages[%d].content[%d]", msgIdx, blockIdx)
|
||||
cleaned = true
|
||||
}
|
||||
|
||||
// 2. Flatten to text if it's a history message (not the last one)
|
||||
if msgIdx < lastMsgIdx {
|
||||
log.Printf("[Antigravity] Flattening history thinking block to text at messages[%d].content[%d]", msgIdx, blockIdx)
|
||||
|
||||
// Extract thinking content
|
||||
var textContent string
|
||||
if t, ok := blocks[blockIdx]["thinking"].(string); ok {
|
||||
textContent = t
|
||||
} else {
|
||||
// Fallback for non-string content (marshal it)
|
||||
if b, err := json.Marshal(blocks[blockIdx]["thinking"]); err == nil {
|
||||
textContent = string(b)
|
||||
}
|
||||
}
|
||||
|
||||
// Convert to text block
|
||||
blocks[blockIdx]["type"] = "text"
|
||||
blocks[blockIdx]["text"] = textContent
|
||||
delete(blocks[blockIdx], "thinking")
|
||||
delete(blocks[blockIdx], "signature")
|
||||
delete(blocks[blockIdx], "cache_control") // Ensure it's gone
|
||||
cleaned = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Marshal back if modified
|
||||
if cleaned {
|
||||
if marshaled, err := json.Marshal(blocks); err == nil {
|
||||
req.Messages[msgIdx].Content = marshaled
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// stripThinkingFromClaudeRequest converts thinking blocks to text blocks in a Claude Messages request.
|
||||
// This preserves the thinking content while avoiding signature validation errors.
|
||||
// Note: redacted_thinking blocks are removed because they cannot be converted to text.
|
||||
|
||||
@@ -1227,6 +1227,9 @@ func enforceCacheControlLimit(body []byte) []byte {
|
||||
return body
|
||||
}
|
||||
|
||||
// 清理 thinking 块中的非法 cache_control(thinking 块不支持该字段)
|
||||
removeCacheControlFromThinkingBlocks(data)
|
||||
|
||||
// 计算当前 cache_control 块数量
|
||||
count := countCacheControlBlocks(data)
|
||||
if count <= maxCacheControlBlocks {
|
||||
@@ -1254,6 +1257,7 @@ func enforceCacheControlLimit(body []byte) []byte {
|
||||
}
|
||||
|
||||
// countCacheControlBlocks 统计 system 和 messages 中的 cache_control 块数量
|
||||
// 注意:thinking 块不支持 cache_control,统计时跳过
|
||||
func countCacheControlBlocks(data map[string]any) int {
|
||||
count := 0
|
||||
|
||||
@@ -1261,6 +1265,10 @@ func countCacheControlBlocks(data map[string]any) int {
|
||||
if system, ok := data["system"].([]any); ok {
|
||||
for _, item := range system {
|
||||
if m, ok := item.(map[string]any); ok {
|
||||
// thinking 块不支持 cache_control,跳过
|
||||
if blockType, _ := m["type"].(string); blockType == "thinking" {
|
||||
continue
|
||||
}
|
||||
if _, has := m["cache_control"]; has {
|
||||
count++
|
||||
}
|
||||
@@ -1275,6 +1283,10 @@ func countCacheControlBlocks(data map[string]any) int {
|
||||
if content, ok := msgMap["content"].([]any); ok {
|
||||
for _, item := range content {
|
||||
if m, ok := item.(map[string]any); ok {
|
||||
// thinking 块不支持 cache_control,跳过
|
||||
if blockType, _ := m["type"].(string); blockType == "thinking" {
|
||||
continue
|
||||
}
|
||||
if _, has := m["cache_control"]; has {
|
||||
count++
|
||||
}
|
||||
@@ -1290,6 +1302,7 @@ func countCacheControlBlocks(data map[string]any) int {
|
||||
|
||||
// removeCacheControlFromMessages 从 messages 中移除一个 cache_control(从头开始)
|
||||
// 返回 true 表示成功移除,false 表示没有可移除的
|
||||
// 注意:跳过 thinking 块(它不支持 cache_control)
|
||||
func removeCacheControlFromMessages(data map[string]any) bool {
|
||||
messages, ok := data["messages"].([]any)
|
||||
if !ok {
|
||||
@@ -1307,6 +1320,10 @@ func removeCacheControlFromMessages(data map[string]any) bool {
|
||||
}
|
||||
for _, item := range content {
|
||||
if m, ok := item.(map[string]any); ok {
|
||||
// thinking 块不支持 cache_control,跳过
|
||||
if blockType, _ := m["type"].(string); blockType == "thinking" {
|
||||
continue
|
||||
}
|
||||
if _, has := m["cache_control"]; has {
|
||||
delete(m, "cache_control")
|
||||
return true
|
||||
@@ -1319,6 +1336,7 @@ func removeCacheControlFromMessages(data map[string]any) bool {
|
||||
|
||||
// removeCacheControlFromSystem 从 system 中移除一个 cache_control(从尾部开始,保护注入的 prompt)
|
||||
// 返回 true 表示成功移除,false 表示没有可移除的
|
||||
// 注意:跳过 thinking 块(它不支持 cache_control)
|
||||
func removeCacheControlFromSystem(data map[string]any) bool {
|
||||
system, ok := data["system"].([]any)
|
||||
if !ok {
|
||||
@@ -1328,6 +1346,10 @@ func removeCacheControlFromSystem(data map[string]any) bool {
|
||||
// 从尾部开始移除,保护开头注入的 Claude Code prompt
|
||||
for i := len(system) - 1; i >= 0; i-- {
|
||||
if m, ok := system[i].(map[string]any); ok {
|
||||
// thinking 块不支持 cache_control,跳过
|
||||
if blockType, _ := m["type"].(string); blockType == "thinking" {
|
||||
continue
|
||||
}
|
||||
if _, has := m["cache_control"]; has {
|
||||
delete(m, "cache_control")
|
||||
return true
|
||||
@@ -1337,6 +1359,44 @@ func removeCacheControlFromSystem(data map[string]any) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// removeCacheControlFromThinkingBlocks 强制清理所有 thinking 块中的非法 cache_control
|
||||
// thinking 块不支持 cache_control 字段,这个函数确保所有 thinking 块都不含该字段
|
||||
func removeCacheControlFromThinkingBlocks(data map[string]any) {
|
||||
// 清理 system 中的 thinking 块
|
||||
if system, ok := data["system"].([]any); ok {
|
||||
for _, item := range system {
|
||||
if m, ok := item.(map[string]any); ok {
|
||||
if blockType, _ := m["type"].(string); blockType == "thinking" {
|
||||
if _, has := m["cache_control"]; has {
|
||||
delete(m, "cache_control")
|
||||
log.Printf("[Warning] Removed illegal cache_control from thinking block in system")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 清理 messages 中的 thinking 块
|
||||
if messages, ok := data["messages"].([]any); ok {
|
||||
for msgIdx, msg := range messages {
|
||||
if msgMap, ok := msg.(map[string]any); ok {
|
||||
if content, ok := msgMap["content"].([]any); ok {
|
||||
for contentIdx, item := range content {
|
||||
if m, ok := item.(map[string]any); ok {
|
||||
if blockType, _ := m["type"].(string); blockType == "thinking" {
|
||||
if _, has := m["cache_control"]; has {
|
||||
delete(m, "cache_control")
|
||||
log.Printf("[Warning] Removed illegal cache_control from thinking block in messages[%d].content[%d]", msgIdx, contentIdx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Forward 转发请求到Claude API
|
||||
func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *Account, parsed *ParsedRequest) (*ForwardResult, error) {
|
||||
startTime := time.Now()
|
||||
|
||||
@@ -545,14 +545,12 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
|
||||
|
||||
isCodexCLI := openai.IsCodexCLIRequest(c.GetHeader("User-Agent"))
|
||||
|
||||
// Apply model mapping (skip for Codex CLI for transparent forwarding)
|
||||
mappedModel := reqModel
|
||||
if !isCodexCLI {
|
||||
mappedModel = account.GetMappedModel(reqModel)
|
||||
if mappedModel != reqModel {
|
||||
reqBody["model"] = mappedModel
|
||||
bodyModified = true
|
||||
}
|
||||
// Apply model mapping for all requests (including Codex CLI)
|
||||
mappedModel := account.GetMappedModel(reqModel)
|
||||
if mappedModel != reqModel {
|
||||
log.Printf("[OpenAI] Model mapping applied: %s -> %s (account: %s, isCodexCLI: %v)", reqModel, mappedModel, account.Name, isCodexCLI)
|
||||
reqBody["model"] = mappedModel
|
||||
bodyModified = true
|
||||
}
|
||||
|
||||
if account.Type == AccountTypeOAuth && !isCodexCLI {
|
||||
@@ -568,6 +566,44 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
|
||||
}
|
||||
}
|
||||
|
||||
// Handle max_output_tokens based on platform and account type
|
||||
if !isCodexCLI {
|
||||
if maxOutputTokens, hasMaxOutputTokens := reqBody["max_output_tokens"]; hasMaxOutputTokens {
|
||||
switch account.Platform {
|
||||
case PlatformOpenAI:
|
||||
// For OpenAI API Key, remove max_output_tokens (not supported)
|
||||
// For OpenAI OAuth (Responses API), keep it (supported)
|
||||
if account.Type == AccountTypeAPIKey {
|
||||
delete(reqBody, "max_output_tokens")
|
||||
bodyModified = true
|
||||
}
|
||||
case PlatformAnthropic:
|
||||
// For Anthropic (Claude), convert to max_tokens
|
||||
delete(reqBody, "max_output_tokens")
|
||||
if _, hasMaxTokens := reqBody["max_tokens"]; !hasMaxTokens {
|
||||
reqBody["max_tokens"] = maxOutputTokens
|
||||
}
|
||||
bodyModified = true
|
||||
case PlatformGemini:
|
||||
// For Gemini, remove (will be handled by Gemini-specific transform)
|
||||
delete(reqBody, "max_output_tokens")
|
||||
bodyModified = true
|
||||
default:
|
||||
// For unknown platforms, remove to be safe
|
||||
delete(reqBody, "max_output_tokens")
|
||||
bodyModified = true
|
||||
}
|
||||
}
|
||||
|
||||
// Also handle max_completion_tokens (similar logic)
|
||||
if _, hasMaxCompletionTokens := reqBody["max_completion_tokens"]; hasMaxCompletionTokens {
|
||||
if account.Type == AccountTypeAPIKey || account.Platform != PlatformOpenAI {
|
||||
delete(reqBody, "max_completion_tokens")
|
||||
bodyModified = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Re-serialize body only if modified
|
||||
if bodyModified {
|
||||
var err error
|
||||
|
||||
@@ -17,6 +17,8 @@ type OpsRepository interface {
|
||||
|
||||
// Lightweight window stats (for realtime WS / quick sampling).
|
||||
GetWindowStats(ctx context.Context, filter *OpsDashboardFilter) (*OpsWindowStats, error)
|
||||
// Lightweight realtime traffic summary (for the Ops dashboard header card).
|
||||
GetRealtimeTrafficSummary(ctx context.Context, filter *OpsDashboardFilter) (*OpsRealtimeTrafficSummary, error)
|
||||
|
||||
GetDashboardOverview(ctx context.Context, filter *OpsDashboardFilter) (*OpsDashboardOverview, error)
|
||||
GetThroughputTrend(ctx context.Context, filter *OpsDashboardFilter, bucketSeconds int) (*OpsThroughputTrendResponse, error)
|
||||
@@ -71,6 +73,7 @@ type OpsInsertErrorLogInput struct {
|
||||
Severity string
|
||||
StatusCode int
|
||||
IsBusinessLimited bool
|
||||
IsCountTokens bool // 是否为 count_tokens 请求
|
||||
|
||||
ErrorMessage string
|
||||
ErrorBody string
|
||||
|
||||
36
backend/internal/service/ops_realtime_traffic.go
Normal file
36
backend/internal/service/ops_realtime_traffic.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
|
||||
)
|
||||
|
||||
// GetRealtimeTrafficSummary returns QPS/TPS current/peak/avg for the provided window.
|
||||
// This is used by the Ops dashboard "Realtime Traffic" card and is intentionally lightweight.
|
||||
func (s *OpsService) GetRealtimeTrafficSummary(ctx context.Context, filter *OpsDashboardFilter) (*OpsRealtimeTrafficSummary, error) {
|
||||
if err := s.RequireMonitoringEnabled(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.opsRepo == nil {
|
||||
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
|
||||
}
|
||||
if filter == nil {
|
||||
return nil, infraerrors.BadRequest("OPS_FILTER_REQUIRED", "filter is required")
|
||||
}
|
||||
if filter.StartTime.IsZero() || filter.EndTime.IsZero() {
|
||||
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_REQUIRED", "start_time/end_time are required")
|
||||
}
|
||||
if filter.StartTime.After(filter.EndTime) {
|
||||
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_INVALID", "start_time must be <= end_time")
|
||||
}
|
||||
if filter.EndTime.Sub(filter.StartTime) > time.Hour {
|
||||
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_TOO_LARGE", "invalid time range: max window is 1 hour")
|
||||
}
|
||||
|
||||
// Realtime traffic summary always uses raw logs (minute granularity peaks).
|
||||
filter.QueryMode = OpsQueryModeRaw
|
||||
|
||||
return s.opsRepo.GetRealtimeTrafficSummary(ctx, filter)
|
||||
}
|
||||
19
backend/internal/service/ops_realtime_traffic_models.go
Normal file
19
backend/internal/service/ops_realtime_traffic_models.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package service
|
||||
|
||||
import "time"
|
||||
|
||||
// OpsRealtimeTrafficSummary is a lightweight summary used by the Ops dashboard "Realtime Traffic" card.
|
||||
// It reports QPS/TPS current/peak/avg for the requested time window.
|
||||
type OpsRealtimeTrafficSummary struct {
|
||||
// Window is a normalized label (e.g. "1min", "5min", "30min", "1h").
|
||||
Window string `json:"window"`
|
||||
|
||||
StartTime time.Time `json:"start_time"`
|
||||
EndTime time.Time `json:"end_time"`
|
||||
|
||||
Platform string `json:"platform"`
|
||||
GroupID *int64 `json:"group_id"`
|
||||
|
||||
QPS OpsRateSummary `json:"qps"`
|
||||
TPS OpsRateSummary `json:"tps"`
|
||||
}
|
||||
@@ -368,6 +368,9 @@ func defaultOpsAdvancedSettings() *OpsAdvancedSettings {
|
||||
Aggregation: OpsAggregationSettings{
|
||||
AggregationEnabled: false,
|
||||
},
|
||||
IgnoreCountTokensErrors: false,
|
||||
AutoRefreshEnabled: false,
|
||||
AutoRefreshIntervalSec: 30,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -388,6 +391,10 @@ func normalizeOpsAdvancedSettings(cfg *OpsAdvancedSettings) {
|
||||
if cfg.DataRetention.HourlyMetricsRetentionDays <= 0 {
|
||||
cfg.DataRetention.HourlyMetricsRetentionDays = 30
|
||||
}
|
||||
// Normalize auto refresh interval (default 30 seconds)
|
||||
if cfg.AutoRefreshIntervalSec <= 0 {
|
||||
cfg.AutoRefreshIntervalSec = 30
|
||||
}
|
||||
}
|
||||
|
||||
func validateOpsAdvancedSettings(cfg *OpsAdvancedSettings) error {
|
||||
@@ -403,6 +410,9 @@ func validateOpsAdvancedSettings(cfg *OpsAdvancedSettings) error {
|
||||
if cfg.DataRetention.HourlyMetricsRetentionDays < 1 || cfg.DataRetention.HourlyMetricsRetentionDays > 365 {
|
||||
return errors.New("hourly_metrics_retention_days must be between 1 and 365")
|
||||
}
|
||||
if cfg.AutoRefreshIntervalSec < 15 || cfg.AutoRefreshIntervalSec > 300 {
|
||||
return errors.New("auto_refresh_interval_seconds must be between 15 and 300")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -463,3 +473,93 @@ func (s *OpsService) UpdateOpsAdvancedSettings(ctx context.Context, cfg *OpsAdva
|
||||
_ = json.Unmarshal(raw, updated)
|
||||
return updated, nil
|
||||
}
|
||||
|
||||
// =========================
|
||||
// Metric thresholds
|
||||
// =========================
|
||||
|
||||
const SettingKeyOpsMetricThresholds = "ops_metric_thresholds"
|
||||
|
||||
func defaultOpsMetricThresholds() *OpsMetricThresholds {
|
||||
slaMin := 99.5
|
||||
latencyMax := 2000.0
|
||||
ttftMax := 500.0
|
||||
reqErrMax := 5.0
|
||||
upstreamErrMax := 5.0
|
||||
return &OpsMetricThresholds{
|
||||
SLAPercentMin: &slaMin,
|
||||
LatencyP99MsMax: &latencyMax,
|
||||
TTFTp99MsMax: &ttftMax,
|
||||
RequestErrorRatePercentMax: &reqErrMax,
|
||||
UpstreamErrorRatePercentMax: &upstreamErrMax,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *OpsService) GetMetricThresholds(ctx context.Context) (*OpsMetricThresholds, error) {
|
||||
defaultCfg := defaultOpsMetricThresholds()
|
||||
if s == nil || s.settingRepo == nil {
|
||||
return defaultCfg, nil
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
raw, err := s.settingRepo.GetValue(ctx, SettingKeyOpsMetricThresholds)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrSettingNotFound) {
|
||||
if b, mErr := json.Marshal(defaultCfg); mErr == nil {
|
||||
_ = s.settingRepo.Set(ctx, SettingKeyOpsMetricThresholds, string(b))
|
||||
}
|
||||
return defaultCfg, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg := &OpsMetricThresholds{}
|
||||
if err := json.Unmarshal([]byte(raw), cfg); err != nil {
|
||||
return defaultCfg, nil
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (s *OpsService) UpdateMetricThresholds(ctx context.Context, cfg *OpsMetricThresholds) (*OpsMetricThresholds, error) {
|
||||
if s == nil || s.settingRepo == nil {
|
||||
return nil, errors.New("setting repository not initialized")
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
if cfg == nil {
|
||||
return nil, errors.New("invalid config")
|
||||
}
|
||||
|
||||
// Validate thresholds
|
||||
if cfg.SLAPercentMin != nil && (*cfg.SLAPercentMin < 0 || *cfg.SLAPercentMin > 100) {
|
||||
return nil, errors.New("sla_percent_min must be between 0 and 100")
|
||||
}
|
||||
if cfg.LatencyP99MsMax != nil && *cfg.LatencyP99MsMax < 0 {
|
||||
return nil, errors.New("latency_p99_ms_max must be >= 0")
|
||||
}
|
||||
if cfg.TTFTp99MsMax != nil && *cfg.TTFTp99MsMax < 0 {
|
||||
return nil, errors.New("ttft_p99_ms_max must be >= 0")
|
||||
}
|
||||
if cfg.RequestErrorRatePercentMax != nil && (*cfg.RequestErrorRatePercentMax < 0 || *cfg.RequestErrorRatePercentMax > 100) {
|
||||
return nil, errors.New("request_error_rate_percent_max must be between 0 and 100")
|
||||
}
|
||||
if cfg.UpstreamErrorRatePercentMax != nil && (*cfg.UpstreamErrorRatePercentMax < 0 || *cfg.UpstreamErrorRatePercentMax > 100) {
|
||||
return nil, errors.New("upstream_error_rate_percent_max must be between 0 and 100")
|
||||
}
|
||||
|
||||
raw, err := json.Marshal(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := s.settingRepo.Set(ctx, SettingKeyOpsMetricThresholds, string(raw)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
updated := &OpsMetricThresholds{}
|
||||
_ = json.Unmarshal(raw, updated)
|
||||
return updated, nil
|
||||
}
|
||||
|
||||
@@ -61,17 +61,29 @@ type OpsAlertSilencingSettings struct {
|
||||
Entries []OpsAlertSilenceEntry `json:"entries,omitempty"`
|
||||
}
|
||||
|
||||
type OpsMetricThresholds struct {
|
||||
SLAPercentMin *float64 `json:"sla_percent_min,omitempty"` // SLA低于此值变红
|
||||
LatencyP99MsMax *float64 `json:"latency_p99_ms_max,omitempty"` // 延迟P99高于此值变红
|
||||
TTFTp99MsMax *float64 `json:"ttft_p99_ms_max,omitempty"` // TTFT P99高于此值变红
|
||||
RequestErrorRatePercentMax *float64 `json:"request_error_rate_percent_max,omitempty"` // 请求错误率高于此值变红
|
||||
UpstreamErrorRatePercentMax *float64 `json:"upstream_error_rate_percent_max,omitempty"` // 上游错误率高于此值变红
|
||||
}
|
||||
|
||||
type OpsAlertRuntimeSettings struct {
|
||||
EvaluationIntervalSeconds int `json:"evaluation_interval_seconds"`
|
||||
|
||||
DistributedLock OpsDistributedLockSettings `json:"distributed_lock"`
|
||||
Silencing OpsAlertSilencingSettings `json:"silencing"`
|
||||
Thresholds OpsMetricThresholds `json:"thresholds"` // 指标阈值配置
|
||||
}
|
||||
|
||||
// OpsAdvancedSettings stores advanced ops configuration (data retention, aggregation).
|
||||
type OpsAdvancedSettings struct {
|
||||
DataRetention OpsDataRetentionSettings `json:"data_retention"`
|
||||
Aggregation OpsAggregationSettings `json:"aggregation"`
|
||||
DataRetention OpsDataRetentionSettings `json:"data_retention"`
|
||||
Aggregation OpsAggregationSettings `json:"aggregation"`
|
||||
IgnoreCountTokensErrors bool `json:"ignore_count_tokens_errors"`
|
||||
AutoRefreshEnabled bool `json:"auto_refresh_enabled"`
|
||||
AutoRefreshIntervalSec int `json:"auto_refresh_interval_seconds"`
|
||||
}
|
||||
|
||||
type OpsDataRetentionSettings struct {
|
||||
|
||||
@@ -61,6 +61,7 @@ func (s *RateLimitService) SetSettingService(settingService *SettingService) {
|
||||
func (s *RateLimitService) HandleUpstreamError(ctx context.Context, account *Account, statusCode int, headers http.Header, responseBody []byte) (shouldDisable bool) {
|
||||
// apikey 类型账号:检查自定义错误码配置
|
||||
// 如果启用且错误码不在列表中,则不处理(不停止调度、不标记限流/过载)
|
||||
customErrorCodesEnabled := account.IsCustomErrorCodesEnabled()
|
||||
if !account.ShouldHandleErrorCode(statusCode) {
|
||||
log.Printf("Account %d: error %d skipped (not in custom error codes)", account.ID, statusCode)
|
||||
return false
|
||||
@@ -105,11 +106,19 @@ func (s *RateLimitService) HandleUpstreamError(ctx context.Context, account *Acc
|
||||
s.handle529(ctx, account)
|
||||
shouldDisable = false
|
||||
default:
|
||||
// 其他5xx错误:记录但不停止调度
|
||||
if statusCode >= 500 {
|
||||
// 自定义错误码启用时:在列表中的错误码都应该停止调度
|
||||
if customErrorCodesEnabled {
|
||||
msg := "Custom error code triggered"
|
||||
if upstreamMsg != "" {
|
||||
msg = upstreamMsg
|
||||
}
|
||||
s.handleCustomErrorCode(ctx, account, statusCode, msg)
|
||||
shouldDisable = true
|
||||
} else if statusCode >= 500 {
|
||||
// 未启用自定义错误码时:仅记录5xx错误
|
||||
log.Printf("Account %d received upstream error %d", account.ID, statusCode)
|
||||
shouldDisable = false
|
||||
}
|
||||
shouldDisable = false
|
||||
}
|
||||
|
||||
if tempMatched {
|
||||
@@ -285,6 +294,16 @@ func (s *RateLimitService) handleAuthError(ctx context.Context, account *Account
|
||||
log.Printf("Account %d disabled due to auth error: %s", account.ID, errorMsg)
|
||||
}
|
||||
|
||||
// handleCustomErrorCode 处理自定义错误码,停止账号调度
|
||||
func (s *RateLimitService) handleCustomErrorCode(ctx context.Context, account *Account, statusCode int, errorMsg string) {
|
||||
msg := "Custom error code " + strconv.Itoa(statusCode) + ": " + errorMsg
|
||||
if err := s.accountRepo.SetError(ctx, account.ID, msg); err != nil {
|
||||
log.Printf("SetError failed for account %d: %v", account.ID, err)
|
||||
return
|
||||
}
|
||||
log.Printf("Account %d disabled due to custom error code %d: %s", account.ID, statusCode, errorMsg)
|
||||
}
|
||||
|
||||
// handle429 处理429限流错误
|
||||
// 解析响应头获取重置时间,标记账号为限流状态
|
||||
func (s *RateLimitService) handle429(ctx context.Context, account *Account, headers http.Header) {
|
||||
|
||||
Reference in New Issue
Block a user