diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index f095f317..3f3deefc 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -280,6 +280,9 @@ type GatewayConfig struct { // ForceCodexCLI: 强制将 OpenAI `/v1/responses` 请求按 Codex CLI 处理。 // 用于网关未透传/改写 User-Agent 时的兼容兜底(默认关闭,避免影响其他客户端)。 ForceCodexCLI bool `mapstructure:"force_codex_cli"` + // OpenAIPassthroughAllowTimeoutHeaders: OpenAI 透传模式是否放行客户端超时头 + // 关闭(默认)可避免 x-stainless-timeout 等头导致上游提前断流。 + OpenAIPassthroughAllowTimeoutHeaders bool `mapstructure:"openai_passthrough_allow_timeout_headers"` // HTTP 上游连接池配置(性能优化:支持高并发场景调优) // MaxIdleConns: 所有主机的最大空闲连接总数 @@ -995,6 +998,7 @@ func setDefaults() { viper.SetDefault("gateway.max_account_switches", 10) viper.SetDefault("gateway.max_account_switches_gemini", 3) viper.SetDefault("gateway.force_codex_cli", false) + viper.SetDefault("gateway.openai_passthrough_allow_timeout_headers", false) viper.SetDefault("gateway.antigravity_fallback_cooldown_minutes", 1) viper.SetDefault("gateway.max_body_size", int64(100*1024*1024)) viper.SetDefault("gateway.sora_max_body_size", int64(256*1024*1024)) diff --git a/backend/internal/handler/admin/admin_helpers_test.go b/backend/internal/handler/admin/admin_helpers_test.go index 863c755c..3833d32e 100644 --- a/backend/internal/handler/admin/admin_helpers_test.go +++ b/backend/internal/handler/admin/admin_helpers_test.go @@ -58,6 +58,96 @@ func TestParseOpsDuration(t *testing.T) { require.False(t, ok) } +func TestParseOpsOpenAITokenStatsDuration(t *testing.T) { + tests := []struct { + input string + want time.Duration + ok bool + }{ + {input: "30m", want: 30 * time.Minute, ok: true}, + {input: "1h", want: time.Hour, ok: true}, + {input: "1d", want: 24 * time.Hour, ok: true}, + {input: "15d", want: 15 * 24 * time.Hour, ok: true}, + {input: "30d", want: 30 * 24 * time.Hour, ok: true}, + {input: "7d", want: 0, ok: false}, + } + + for _, tt := range tests { + got, ok := parseOpsOpenAITokenStatsDuration(tt.input) + require.Equal(t, tt.ok, ok, "input=%s", tt.input) + require.Equal(t, tt.want, got, "input=%s", tt.input) + } +} + +func TestParseOpsOpenAITokenStatsFilter_Defaults(t *testing.T) { + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodGet, "/", nil) + + before := time.Now().UTC() + filter, err := parseOpsOpenAITokenStatsFilter(c) + after := time.Now().UTC() + + require.NoError(t, err) + require.NotNil(t, filter) + require.Equal(t, "30d", filter.TimeRange) + require.Equal(t, 1, filter.Page) + require.Equal(t, 20, filter.PageSize) + require.Equal(t, 0, filter.TopN) + require.Nil(t, filter.GroupID) + require.Equal(t, "", filter.Platform) + require.True(t, filter.StartTime.Before(filter.EndTime)) + require.WithinDuration(t, before.Add(-30*24*time.Hour), filter.StartTime, 2*time.Second) + require.WithinDuration(t, after, filter.EndTime, 2*time.Second) +} + +func TestParseOpsOpenAITokenStatsFilter_WithTopN(t *testing.T) { + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest( + http.MethodGet, + "/?time_range=1h&platform=openai&group_id=12&top_n=50", + nil, + ) + + filter, err := parseOpsOpenAITokenStatsFilter(c) + require.NoError(t, err) + require.Equal(t, "1h", filter.TimeRange) + require.Equal(t, "openai", filter.Platform) + require.NotNil(t, filter.GroupID) + require.Equal(t, int64(12), *filter.GroupID) + require.Equal(t, 50, filter.TopN) + require.Equal(t, 0, filter.Page) + require.Equal(t, 0, filter.PageSize) +} + +func TestParseOpsOpenAITokenStatsFilter_InvalidParams(t *testing.T) { + tests := []string{ + "/?time_range=7d", + "/?group_id=0", + "/?group_id=abc", + "/?top_n=0", + "/?top_n=101", + "/?top_n=10&page=1", + "/?top_n=10&page_size=20", + "/?page=0", + "/?page_size=0", + "/?page_size=101", + } + + gin.SetMode(gin.TestMode) + for _, rawURL := range tests { + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodGet, rawURL, nil) + + _, err := parseOpsOpenAITokenStatsFilter(c) + require.Error(t, err, "url=%s", rawURL) + } +} + func TestParseOpsTimeRange(t *testing.T) { gin.SetMode(gin.TestMode) w := httptest.NewRecorder() diff --git a/backend/internal/handler/admin/ops_dashboard_handler.go b/backend/internal/handler/admin/ops_dashboard_handler.go index 2c87f734..01f7bc2b 100644 --- a/backend/internal/handler/admin/ops_dashboard_handler.go +++ b/backend/internal/handler/admin/ops_dashboard_handler.go @@ -1,6 +1,7 @@ package admin import ( + "fmt" "net/http" "strconv" "strings" @@ -218,6 +219,115 @@ func (h *OpsHandler) GetDashboardErrorDistribution(c *gin.Context) { response.Success(c, data) } +// GetDashboardOpenAITokenStats returns OpenAI token efficiency stats grouped by model. +// GET /api/v1/admin/ops/dashboard/openai-token-stats +func (h *OpsHandler) GetDashboardOpenAITokenStats(c *gin.Context) { + if h.opsService == nil { + response.Error(c, http.StatusServiceUnavailable, "Ops service not available") + return + } + if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil { + response.ErrorFrom(c, err) + return + } + + filter, err := parseOpsOpenAITokenStatsFilter(c) + if err != nil { + response.BadRequest(c, err.Error()) + return + } + + data, err := h.opsService.GetOpenAITokenStats(c.Request.Context(), filter) + if err != nil { + response.ErrorFrom(c, err) + return + } + response.Success(c, data) +} + +func parseOpsOpenAITokenStatsFilter(c *gin.Context) (*service.OpsOpenAITokenStatsFilter, error) { + if c == nil { + return nil, fmt.Errorf("invalid request") + } + + timeRange := strings.TrimSpace(c.Query("time_range")) + if timeRange == "" { + timeRange = "30d" + } + dur, ok := parseOpsOpenAITokenStatsDuration(timeRange) + if !ok { + return nil, fmt.Errorf("invalid time_range") + } + end := time.Now().UTC() + start := end.Add(-dur) + + filter := &service.OpsOpenAITokenStatsFilter{ + TimeRange: timeRange, + StartTime: start, + EndTime: end, + Platform: strings.TrimSpace(c.Query("platform")), + } + + if v := strings.TrimSpace(c.Query("group_id")); v != "" { + id, err := strconv.ParseInt(v, 10, 64) + if err != nil || id <= 0 { + return nil, fmt.Errorf("invalid group_id") + } + filter.GroupID = &id + } + + topNRaw := strings.TrimSpace(c.Query("top_n")) + pageRaw := strings.TrimSpace(c.Query("page")) + pageSizeRaw := strings.TrimSpace(c.Query("page_size")) + if topNRaw != "" && (pageRaw != "" || pageSizeRaw != "") { + return nil, fmt.Errorf("invalid query: top_n cannot be used with page/page_size") + } + + if topNRaw != "" { + topN, err := strconv.Atoi(topNRaw) + if err != nil || topN < 1 || topN > 100 { + return nil, fmt.Errorf("invalid top_n") + } + filter.TopN = topN + return filter, nil + } + + filter.Page = 1 + filter.PageSize = 20 + if pageRaw != "" { + page, err := strconv.Atoi(pageRaw) + if err != nil || page < 1 { + return nil, fmt.Errorf("invalid page") + } + filter.Page = page + } + if pageSizeRaw != "" { + pageSize, err := strconv.Atoi(pageSizeRaw) + if err != nil || pageSize < 1 || pageSize > 100 { + return nil, fmt.Errorf("invalid page_size") + } + filter.PageSize = pageSize + } + return filter, nil +} + +func parseOpsOpenAITokenStatsDuration(v string) (time.Duration, bool) { + switch strings.TrimSpace(v) { + case "30m": + return 30 * time.Minute, true + case "1h": + return time.Hour, true + case "1d": + return 24 * time.Hour, true + case "15d": + return 15 * 24 * time.Hour, true + case "30d": + return 30 * 24 * time.Hour, true + default: + return 0, false + } +} + func pickThroughputBucketSeconds(window time.Duration) int { // Keep buckets predictable and avoid huge responses. switch { diff --git a/backend/internal/handler/openai_gateway_handler.go b/backend/internal/handler/openai_gateway_handler.go index 67d607fa..fce3fc1c 100644 --- a/backend/internal/handler/openai_gateway_handler.go +++ b/backend/internal/handler/openai_gateway_handler.go @@ -28,7 +28,6 @@ type OpenAIGatewayHandler struct { errorPassthroughService *service.ErrorPassthroughService concurrencyHelper *ConcurrencyHelper maxAccountSwitches int - cfg *config.Config } // NewOpenAIGatewayHandler creates a new OpenAIGatewayHandler @@ -55,7 +54,6 @@ func NewOpenAIGatewayHandler( errorPassthroughService: errorPassthroughService, concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatComment, pingInterval), maxAccountSwitches: maxAccountSwitches, - cfg: cfg, } } diff --git a/backend/internal/repository/concurrency_cache.go b/backend/internal/repository/concurrency_cache.go index 974ad0f8..e047bff0 100644 --- a/backend/internal/repository/concurrency_cache.go +++ b/backend/internal/repository/concurrency_cache.go @@ -147,108 +147,6 @@ var ( return 1 `) - // WARNING: Redis Cluster 不兼容 — 脚本内部拼接 key,Cluster 模式下可能路由到错误节点。 - // 调用时传递空 KEYS 数组,所有 key 在 Lua 内通过 ARGV 动态拼接, - // 无法被 Redis Cluster 正确路由到对应 slot,仅适用于单节点或 Sentinel 模式。 - // - // getAccountsLoadBatchScript - batch load query with expired slot cleanup - // ARGV[1] = slot TTL (seconds) - // ARGV[2..n] = accountID1, maxConcurrency1, accountID2, maxConcurrency2, ... - getAccountsLoadBatchScript = redis.NewScript(` - local result = {} - local slotTTL = tonumber(ARGV[1]) - - -- Get current server time - local timeResult = redis.call('TIME') - local nowSeconds = tonumber(timeResult[1]) - local cutoffTime = nowSeconds - slotTTL - - local i = 2 - while i <= #ARGV do - local accountID = ARGV[i] - local maxConcurrency = tonumber(ARGV[i + 1]) - - local slotKey = 'concurrency:account:' .. accountID - - -- Clean up expired slots before counting - redis.call('ZREMRANGEBYSCORE', slotKey, '-inf', cutoffTime) - local currentConcurrency = redis.call('ZCARD', slotKey) - - local waitKey = 'wait:account:' .. accountID - local waitingCount = redis.call('GET', waitKey) - if waitingCount == false then - waitingCount = 0 - else - waitingCount = tonumber(waitingCount) - end - - local loadRate = 0 - if maxConcurrency > 0 then - loadRate = math.floor((currentConcurrency + waitingCount) * 100 / maxConcurrency) - end - - table.insert(result, accountID) - table.insert(result, currentConcurrency) - table.insert(result, waitingCount) - table.insert(result, loadRate) - - i = i + 2 - end - - return result - `) - - // WARNING: Redis Cluster 不兼容 — 脚本内部拼接 key,Cluster 模式下可能路由到错误节点。 - // 调用时传递空 KEYS 数组,所有 key 在 Lua 内通过 ARGV 动态拼接, - // 无法被 Redis Cluster 正确路由到对应 slot,仅适用于单节点或 Sentinel 模式。 - // - // getUsersLoadBatchScript - batch load query for users with expired slot cleanup - // ARGV[1] = slot TTL (seconds) - // ARGV[2..n] = userID1, maxConcurrency1, userID2, maxConcurrency2, ... - getUsersLoadBatchScript = redis.NewScript(` - local result = {} - local slotTTL = tonumber(ARGV[1]) - - -- Get current server time - local timeResult = redis.call('TIME') - local nowSeconds = tonumber(timeResult[1]) - local cutoffTime = nowSeconds - slotTTL - - local i = 2 - while i <= #ARGV do - local userID = ARGV[i] - local maxConcurrency = tonumber(ARGV[i + 1]) - - local slotKey = 'concurrency:user:' .. userID - - -- Clean up expired slots before counting - redis.call('ZREMRANGEBYSCORE', slotKey, '-inf', cutoffTime) - local currentConcurrency = redis.call('ZCARD', slotKey) - - local waitKey = 'concurrency:wait:' .. userID - local waitingCount = redis.call('GET', waitKey) - if waitingCount == false then - waitingCount = 0 - else - waitingCount = tonumber(waitingCount) - end - - local loadRate = 0 - if maxConcurrency > 0 then - loadRate = math.floor((currentConcurrency + waitingCount) * 100 / maxConcurrency) - end - - table.insert(result, userID) - table.insert(result, currentConcurrency) - table.insert(result, waitingCount) - table.insert(result, loadRate) - - i = i + 2 - end - - return result - `) - // cleanupExpiredSlotsScript - remove expired slots // KEYS[1] = concurrency:account:{accountID} // ARGV[1] = TTL (seconds) diff --git a/backend/internal/repository/ops_repo_openai_token_stats.go b/backend/internal/repository/ops_repo_openai_token_stats.go new file mode 100644 index 00000000..6aea416e --- /dev/null +++ b/backend/internal/repository/ops_repo_openai_token_stats.go @@ -0,0 +1,145 @@ +package repository + +import ( + "context" + "database/sql" + "fmt" + "strings" + + "github.com/Wei-Shaw/sub2api/internal/service" +) + +func (r *opsRepository) GetOpenAITokenStats(ctx context.Context, filter *service.OpsOpenAITokenStatsFilter) (*service.OpsOpenAITokenStatsResponse, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if filter == nil { + return nil, fmt.Errorf("nil filter") + } + if filter.StartTime.IsZero() || filter.EndTime.IsZero() { + return nil, fmt.Errorf("start_time/end_time required") + } + // 允许 start_time == end_time(结果为空),与 service 层校验口径保持一致。 + if filter.StartTime.After(filter.EndTime) { + return nil, fmt.Errorf("start_time must be <= end_time") + } + + dashboardFilter := &service.OpsDashboardFilter{ + StartTime: filter.StartTime.UTC(), + EndTime: filter.EndTime.UTC(), + Platform: strings.TrimSpace(strings.ToLower(filter.Platform)), + GroupID: filter.GroupID, + } + + join, where, baseArgs, next := buildUsageWhere(dashboardFilter, dashboardFilter.StartTime, dashboardFilter.EndTime, 1) + where += " AND ul.model LIKE 'gpt%'" + + baseCTE := ` +WITH stats AS ( + SELECT + ul.model AS model, + COUNT(*)::bigint AS request_count, + ROUND( + AVG( + CASE + WHEN ul.duration_ms > 0 AND ul.output_tokens > 0 + THEN ul.output_tokens * 1000.0 / ul.duration_ms + END + )::numeric, + 2 + )::float8 AS avg_tokens_per_sec, + ROUND(AVG(ul.first_token_ms)::numeric, 2)::float8 AS avg_first_token_ms, + COALESCE(SUM(ul.output_tokens), 0)::bigint AS total_output_tokens, + COALESCE(ROUND(AVG(ul.duration_ms)::numeric, 0), 0)::bigint AS avg_duration_ms, + COUNT(CASE WHEN ul.first_token_ms IS NOT NULL THEN 1 END)::bigint AS requests_with_first_token + FROM usage_logs ul + ` + join + ` + ` + where + ` + GROUP BY ul.model +) +` + + countSQL := baseCTE + `SELECT COUNT(*) FROM stats` + var total int64 + if err := r.db.QueryRowContext(ctx, countSQL, baseArgs...).Scan(&total); err != nil { + return nil, err + } + + querySQL := baseCTE + ` +SELECT + model, + request_count, + avg_tokens_per_sec, + avg_first_token_ms, + total_output_tokens, + avg_duration_ms, + requests_with_first_token +FROM stats +ORDER BY request_count DESC, model ASC` + + args := make([]any, 0, len(baseArgs)+2) + args = append(args, baseArgs...) + + if filter.IsTopNMode() { + querySQL += fmt.Sprintf("\nLIMIT $%d", next) + args = append(args, filter.TopN) + } else { + offset := (filter.Page - 1) * filter.PageSize + querySQL += fmt.Sprintf("\nLIMIT $%d OFFSET $%d", next, next+1) + args = append(args, filter.PageSize, offset) + } + + rows, err := r.db.QueryContext(ctx, querySQL, args...) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + + items := make([]*service.OpsOpenAITokenStatsItem, 0, 32) + for rows.Next() { + item := &service.OpsOpenAITokenStatsItem{} + var avgTPS sql.NullFloat64 + var avgFirstToken sql.NullFloat64 + if err := rows.Scan( + &item.Model, + &item.RequestCount, + &avgTPS, + &avgFirstToken, + &item.TotalOutputTokens, + &item.AvgDurationMs, + &item.RequestsWithFirstToken, + ); err != nil { + return nil, err + } + if avgTPS.Valid { + v := avgTPS.Float64 + item.AvgTokensPerSec = &v + } + if avgFirstToken.Valid { + v := avgFirstToken.Float64 + item.AvgFirstTokenMs = &v + } + items = append(items, item) + } + if err := rows.Err(); err != nil { + return nil, err + } + + resp := &service.OpsOpenAITokenStatsResponse{ + TimeRange: strings.TrimSpace(filter.TimeRange), + StartTime: dashboardFilter.StartTime, + EndTime: dashboardFilter.EndTime, + Platform: dashboardFilter.Platform, + GroupID: dashboardFilter.GroupID, + Items: items, + Total: total, + } + if filter.IsTopNMode() { + topN := filter.TopN + resp.TopN = &topN + } else { + resp.Page = filter.Page + resp.PageSize = filter.PageSize + } + return resp, nil +} diff --git a/backend/internal/repository/ops_repo_openai_token_stats_test.go b/backend/internal/repository/ops_repo_openai_token_stats_test.go new file mode 100644 index 00000000..bb01d820 --- /dev/null +++ b/backend/internal/repository/ops_repo_openai_token_stats_test.go @@ -0,0 +1,156 @@ +package repository + +import ( + "context" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/Wei-Shaw/sub2api/internal/service" + "github.com/stretchr/testify/require" +) + +func TestOpsRepositoryGetOpenAITokenStats_PaginationMode(t *testing.T) { + db, mock := newSQLMock(t) + repo := &opsRepository{db: db} + + start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + end := start.Add(24 * time.Hour) + groupID := int64(9) + + filter := &service.OpsOpenAITokenStatsFilter{ + TimeRange: "1d", + StartTime: start, + EndTime: end, + Platform: " OpenAI ", + GroupID: &groupID, + Page: 2, + PageSize: 10, + } + + mock.ExpectQuery(`SELECT COUNT\(\*\) FROM stats`). + WithArgs(start, end, groupID, "openai"). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(int64(3))) + + rows := sqlmock.NewRows([]string{ + "model", + "request_count", + "avg_tokens_per_sec", + "avg_first_token_ms", + "total_output_tokens", + "avg_duration_ms", + "requests_with_first_token", + }). + AddRow("gpt-4o-mini", int64(20), 21.56, 120.34, int64(3000), int64(850), int64(18)). + AddRow("gpt-4.1", int64(20), 10.2, 240.0, int64(2500), int64(900), int64(20)) + + mock.ExpectQuery(`ORDER BY request_count DESC, model ASC\s+LIMIT \$5 OFFSET \$6`). + WithArgs(start, end, groupID, "openai", 10, 10). + WillReturnRows(rows) + + resp, err := repo.GetOpenAITokenStats(context.Background(), filter) + require.NoError(t, err) + require.NotNil(t, resp) + require.Equal(t, int64(3), resp.Total) + require.Equal(t, 2, resp.Page) + require.Equal(t, 10, resp.PageSize) + require.Nil(t, resp.TopN) + require.Equal(t, "openai", resp.Platform) + require.NotNil(t, resp.GroupID) + require.Equal(t, groupID, *resp.GroupID) + require.Len(t, resp.Items, 2) + require.Equal(t, "gpt-4o-mini", resp.Items[0].Model) + require.NotNil(t, resp.Items[0].AvgTokensPerSec) + require.InDelta(t, 21.56, *resp.Items[0].AvgTokensPerSec, 0.0001) + require.NotNil(t, resp.Items[0].AvgFirstTokenMs) + require.InDelta(t, 120.34, *resp.Items[0].AvgFirstTokenMs, 0.0001) + + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestOpsRepositoryGetOpenAITokenStats_TopNMode(t *testing.T) { + db, mock := newSQLMock(t) + repo := &opsRepository{db: db} + + start := time.Date(2026, 1, 1, 10, 0, 0, 0, time.UTC) + end := start.Add(time.Hour) + filter := &service.OpsOpenAITokenStatsFilter{ + TimeRange: "1h", + StartTime: start, + EndTime: end, + TopN: 5, + } + + mock.ExpectQuery(`SELECT COUNT\(\*\) FROM stats`). + WithArgs(start, end). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(int64(1))) + + rows := sqlmock.NewRows([]string{ + "model", + "request_count", + "avg_tokens_per_sec", + "avg_first_token_ms", + "total_output_tokens", + "avg_duration_ms", + "requests_with_first_token", + }). + AddRow("gpt-4o", int64(5), nil, nil, int64(0), int64(0), int64(0)) + + mock.ExpectQuery(`ORDER BY request_count DESC, model ASC\s+LIMIT \$3`). + WithArgs(start, end, 5). + WillReturnRows(rows) + + resp, err := repo.GetOpenAITokenStats(context.Background(), filter) + require.NoError(t, err) + require.NotNil(t, resp) + require.NotNil(t, resp.TopN) + require.Equal(t, 5, *resp.TopN) + require.Equal(t, 0, resp.Page) + require.Equal(t, 0, resp.PageSize) + require.Len(t, resp.Items, 1) + require.Nil(t, resp.Items[0].AvgTokensPerSec) + require.Nil(t, resp.Items[0].AvgFirstTokenMs) + + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestOpsRepositoryGetOpenAITokenStats_EmptyResult(t *testing.T) { + db, mock := newSQLMock(t) + repo := &opsRepository{db: db} + + start := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC) + end := start.Add(30 * time.Minute) + filter := &service.OpsOpenAITokenStatsFilter{ + TimeRange: "30m", + StartTime: start, + EndTime: end, + Page: 1, + PageSize: 20, + } + + mock.ExpectQuery(`SELECT COUNT\(\*\) FROM stats`). + WithArgs(start, end). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(int64(0))) + + mock.ExpectQuery(`ORDER BY request_count DESC, model ASC\s+LIMIT \$3 OFFSET \$4`). + WithArgs(start, end, 20, 0). + WillReturnRows(sqlmock.NewRows([]string{ + "model", + "request_count", + "avg_tokens_per_sec", + "avg_first_token_ms", + "total_output_tokens", + "avg_duration_ms", + "requests_with_first_token", + })) + + resp, err := repo.GetOpenAITokenStats(context.Background(), filter) + require.NoError(t, err) + require.NotNil(t, resp) + require.Equal(t, int64(0), resp.Total) + require.Len(t, resp.Items, 0) + require.Equal(t, 1, resp.Page) + require.Equal(t, 20, resp.PageSize) + + require.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/backend/internal/server/routes/admin.go b/backend/internal/server/routes/admin.go index 39c5d2fc..ca1fdf98 100644 --- a/backend/internal/server/routes/admin.go +++ b/backend/internal/server/routes/admin.go @@ -150,6 +150,7 @@ func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) { ops.GET("/dashboard/latency-histogram", h.Admin.Ops.GetDashboardLatencyHistogram) ops.GET("/dashboard/error-trend", h.Admin.Ops.GetDashboardErrorTrend) ops.GET("/dashboard/error-distribution", h.Admin.Ops.GetDashboardErrorDistribution) + ops.GET("/dashboard/openai-token-stats", h.Admin.Ops.GetDashboardOpenAITokenStats) } } diff --git a/backend/internal/service/gateway_request.go b/backend/internal/service/gateway_request.go index 46a4ef94..4a004ad4 100644 --- a/backend/internal/service/gateway_request.go +++ b/backend/internal/service/gateway_request.go @@ -189,45 +189,6 @@ func sliceRawFromBody(body []byte, r gjson.Result) []byte { return []byte(r.Raw) } -// parseIntegralNumber 将 JSON 解码后的数字安全转换为 int。 -// 仅接受“整数值”的输入,小数/NaN/Inf/越界值都会返回 false。 -func parseIntegralNumber(raw any) (int, bool) { - switch v := raw.(type) { - case float64: - if math.IsNaN(v) || math.IsInf(v, 0) || v != math.Trunc(v) { - return 0, false - } - if v > float64(math.MaxInt) || v < float64(math.MinInt) { - return 0, false - } - return int(v), true - case int: - return v, true - case int8: - return int(v), true - case int16: - return int(v), true - case int32: - return int(v), true - case int64: - if v > int64(math.MaxInt) || v < int64(math.MinInt) { - return 0, false - } - return int(v), true - case json.Number: - i64, err := v.Int64() - if err != nil { - return 0, false - } - if i64 > int64(math.MaxInt) || i64 < int64(math.MinInt) { - return 0, false - } - return int(i64), true - default: - return 0, false - } -} - // FilterThinkingBlocks removes thinking blocks from request body // Returns filtered body or original body if filtering fails (fail-safe) // This prevents 400 errors from invalid thinking block signatures diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index 1de60665..4cd0f171 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -1020,6 +1020,23 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough( reqModel, reqStream, ) + if reqStream && c != nil && c.Request != nil { + if timeoutHeaders := collectOpenAIPassthroughTimeoutHeaders(c.Request.Header); len(timeoutHeaders) > 0 { + if s.isOpenAIPassthroughTimeoutHeadersAllowed() { + log.Printf( + "[WARN] [OpenAI passthrough] 透传请求包含超时相关请求头,且当前配置为放行,可能导致上游提前断流: account=%d headers=%s", + account.ID, + strings.Join(timeoutHeaders, ", "), + ) + } else { + log.Printf( + "[WARN] [OpenAI passthrough] 检测到超时相关请求头,将按配置过滤以降低断流风险: account=%d headers=%s", + account.ID, + strings.Join(timeoutHeaders, ", "), + ) + } + } + } // Get access token token, _, err := s.GetAccessToken(ctx, account) @@ -1135,12 +1152,16 @@ func (s *OpenAIGatewayService) buildUpstreamRequestOpenAIPassthrough( } // 透传客户端请求头(尽可能原样),并做安全剔除。 + allowTimeoutHeaders := s.isOpenAIPassthroughTimeoutHeadersAllowed() if c != nil && c.Request != nil { for key, values := range c.Request.Header { lower := strings.ToLower(key) if isOpenAIPassthroughBlockedRequestHeader(lower) { continue } + if !allowTimeoutHeaders && isOpenAIPassthroughTimeoutHeader(lower) { + continue + } for _, v := range values { req.Header.Add(key, v) } @@ -1233,6 +1254,38 @@ func isOpenAIPassthroughBlockedRequestHeader(lowerKey string) bool { } } +func isOpenAIPassthroughTimeoutHeader(lowerKey string) bool { + switch lowerKey { + case "x-stainless-timeout", "x-stainless-read-timeout", "x-stainless-connect-timeout", "x-request-timeout", "request-timeout", "grpc-timeout": + return true + default: + return false + } +} + +func (s *OpenAIGatewayService) isOpenAIPassthroughTimeoutHeadersAllowed() bool { + return s != nil && s.cfg != nil && s.cfg.Gateway.OpenAIPassthroughAllowTimeoutHeaders +} + +func collectOpenAIPassthroughTimeoutHeaders(h http.Header) []string { + if h == nil { + return nil + } + var matched []string + for key, values := range h { + lowerKey := strings.ToLower(strings.TrimSpace(key)) + if isOpenAIPassthroughTimeoutHeader(lowerKey) { + entry := lowerKey + if len(values) > 0 { + entry = fmt.Sprintf("%s=%s", lowerKey, strings.Join(values, "|")) + } + matched = append(matched, entry) + } + } + sort.Strings(matched) + return matched +} + type openaiStreamingResultPassthrough struct { usage *OpenAIUsage firstTokenMs *int @@ -1265,6 +1318,8 @@ func (s *OpenAIGatewayService) handleStreamingResponsePassthrough( usage := &OpenAIUsage{} var firstTokenMs *int clientDisconnected := false + sawDone := false + upstreamRequestID := strings.TrimSpace(resp.Header.Get("x-request-id")) scanner := bufio.NewScanner(resp.Body) maxLineSize := defaultMaxLineSize @@ -1278,7 +1333,11 @@ func (s *OpenAIGatewayService) handleStreamingResponsePassthrough( for scanner.Scan() { line := scanner.Text() if data, ok := extractOpenAISSEDataLine(line); ok { - if firstTokenMs == nil && strings.TrimSpace(data) != "" { + trimmedData := strings.TrimSpace(data) + if trimmedData == "[DONE]" { + sawDone = true + } + if firstTokenMs == nil && trimmedData != "" && trimmedData != "[DONE]" { ms := int(time.Since(startTime).Milliseconds()) firstTokenMs = &ms } @@ -1300,14 +1359,34 @@ func (s *OpenAIGatewayService) handleStreamingResponsePassthrough( return &openaiStreamingResultPassthrough{usage: usage, firstTokenMs: firstTokenMs}, nil } if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + log.Printf( + "[WARN] [OpenAI passthrough] 流读取被取消,可能发生断流: account=%d request_id=%s err=%v ctx_err=%v", + account.ID, + upstreamRequestID, + err, + ctx.Err(), + ) return &openaiStreamingResultPassthrough{usage: usage, firstTokenMs: firstTokenMs}, nil } if errors.Is(err, bufio.ErrTooLong) { log.Printf("[OpenAI passthrough] SSE line too long: account=%d max_size=%d error=%v", account.ID, maxLineSize, err) return &openaiStreamingResultPassthrough{usage: usage, firstTokenMs: firstTokenMs}, err } + log.Printf( + "[WARN] [OpenAI passthrough] 流读取异常中断: account=%d request_id=%s err=%v", + account.ID, + upstreamRequestID, + err, + ) return &openaiStreamingResultPassthrough{usage: usage, firstTokenMs: firstTokenMs}, fmt.Errorf("stream read error: %w", err) } + if !clientDisconnected && !sawDone && ctx.Err() == nil { + log.Printf( + "[WARN] [OpenAI passthrough] 上游流在未收到 [DONE] 时结束,疑似断流: account=%d request_id=%s", + account.ID, + upstreamRequestID, + ) + } return &openaiStreamingResultPassthrough{usage: usage, firstTokenMs: firstTokenMs}, nil } diff --git a/backend/internal/service/openai_oauth_passthrough_test.go b/backend/internal/service/openai_oauth_passthrough_test.go index f6932469..970c4b84 100644 --- a/backend/internal/service/openai_oauth_passthrough_test.go +++ b/backend/internal/service/openai_oauth_passthrough_test.go @@ -4,9 +4,12 @@ import ( "bytes" "context" "io" + "log" "net/http" "net/http/httptest" + "os" "strings" + "sync" "testing" "time" @@ -43,6 +46,27 @@ func (u *httpUpstreamRecorder) DoWithTLS(req *http.Request, proxyURL string, acc return u.Do(req, proxyURL, accountID, accountConcurrency) } +var stdLogCaptureMu sync.Mutex + +func captureStdLog(t *testing.T) (*bytes.Buffer, func()) { + t.Helper() + stdLogCaptureMu.Lock() + buf := &bytes.Buffer{} + prevWriter := log.Writer() + prevFlags := log.Flags() + log.SetFlags(0) + log.SetOutput(buf) + return buf, func() { + log.SetOutput(prevWriter) + log.SetFlags(prevFlags) + // 防御性恢复,避免其他测试改动了底层 writer。 + if prevWriter == nil { + log.SetOutput(os.Stderr) + } + stdLogCaptureMu.Unlock() + } +} + func TestOpenAIGatewayService_OAuthPassthrough_StreamKeepsToolNameAndBodyUnchanged(t *testing.T) { gin.SetMode(gin.TestMode) @@ -459,3 +483,170 @@ func TestOpenAIGatewayService_APIKeyPassthrough_PreservesBodyAndUsesResponsesEnd require.Equal(t, "curl/8.0", upstream.lastReq.Header.Get("User-Agent")) require.Equal(t, "keep", upstream.lastReq.Header.Get("X-Test")) } + +func TestOpenAIGatewayService_OAuthPassthrough_WarnOnTimeoutHeadersForStream(t *testing.T) { + gin.SetMode(gin.TestMode) + logBuf, restore := captureStdLog(t) + defer restore() + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(nil)) + c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0") + c.Request.Header.Set("x-stainless-timeout", "10000") + + originalBody := []byte(`{"model":"gpt-5.2","stream":true,"input":[{"type":"text","text":"hi"}]}`) + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "X-Request-Id": []string{"rid-timeout"}}, + Body: io.NopCloser(strings.NewReader("data: [DONE]\n\n")), + } + upstream := &httpUpstreamRecorder{resp: resp} + svc := &OpenAIGatewayService{ + cfg: &config.Config{Gateway: config.GatewayConfig{ForceCodexCLI: false}}, + httpUpstream: upstream, + } + account := &Account{ + ID: 321, + Name: "acc", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{"access_token": "oauth-token", "chatgpt_account_id": "chatgpt-acc"}, + Extra: map[string]any{"openai_passthrough": true}, + Status: StatusActive, + Schedulable: true, + RateMultiplier: f64p(1), + } + + _, err := svc.Forward(context.Background(), c, account, originalBody) + require.NoError(t, err) + require.Contains(t, logBuf.String(), "检测到超时相关请求头,将按配置过滤以降低断流风险") + require.Contains(t, logBuf.String(), "x-stainless-timeout=10000") +} + +func TestOpenAIGatewayService_OAuthPassthrough_WarnWhenStreamEndsWithoutDone(t *testing.T) { + gin.SetMode(gin.TestMode) + logBuf, restore := captureStdLog(t) + defer restore() + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(nil)) + c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0") + + originalBody := []byte(`{"model":"gpt-5.2","stream":true,"input":[{"type":"text","text":"hi"}]}`) + // 注意:刻意不发送 [DONE],模拟上游中途断流。 + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}, "X-Request-Id": []string{"rid-truncate"}}, + Body: io.NopCloser(strings.NewReader("data: {\"type\":\"response.output_text.delta\",\"delta\":\"h\"}\n\n")), + } + upstream := &httpUpstreamRecorder{resp: resp} + svc := &OpenAIGatewayService{ + cfg: &config.Config{Gateway: config.GatewayConfig{ForceCodexCLI: false}}, + httpUpstream: upstream, + } + account := &Account{ + ID: 654, + Name: "acc", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{"access_token": "oauth-token", "chatgpt_account_id": "chatgpt-acc"}, + Extra: map[string]any{"openai_passthrough": true}, + Status: StatusActive, + Schedulable: true, + RateMultiplier: f64p(1), + } + + _, err := svc.Forward(context.Background(), c, account, originalBody) + require.NoError(t, err) + require.Contains(t, logBuf.String(), "上游流在未收到 [DONE] 时结束,疑似断流") + require.Contains(t, logBuf.String(), "rid-truncate") +} + +func TestOpenAIGatewayService_OAuthPassthrough_DefaultFiltersTimeoutHeaders(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(nil)) + c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0") + c.Request.Header.Set("x-stainless-timeout", "120000") + c.Request.Header.Set("X-Test", "keep") + + originalBody := []byte(`{"model":"gpt-5.2","stream":false,"input":[{"type":"text","text":"hi"}]}`) + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}, "X-Request-Id": []string{"rid-filter-default"}}, + Body: io.NopCloser(strings.NewReader(`{"output":[],"usage":{"input_tokens":1,"output_tokens":1,"input_tokens_details":{"cached_tokens":0}}}`)), + } + upstream := &httpUpstreamRecorder{resp: resp} + svc := &OpenAIGatewayService{ + cfg: &config.Config{Gateway: config.GatewayConfig{ForceCodexCLI: false}}, + httpUpstream: upstream, + } + account := &Account{ + ID: 111, + Name: "acc", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{"access_token": "oauth-token", "chatgpt_account_id": "chatgpt-acc"}, + Extra: map[string]any{"openai_passthrough": true}, + Status: StatusActive, + Schedulable: true, + RateMultiplier: f64p(1), + } + + _, err := svc.Forward(context.Background(), c, account, originalBody) + require.NoError(t, err) + require.NotNil(t, upstream.lastReq) + require.Empty(t, upstream.lastReq.Header.Get("x-stainless-timeout")) + require.Equal(t, "keep", upstream.lastReq.Header.Get("X-Test")) +} + +func TestOpenAIGatewayService_OAuthPassthrough_AllowTimeoutHeadersWhenConfigured(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(nil)) + c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0") + c.Request.Header.Set("x-stainless-timeout", "120000") + c.Request.Header.Set("X-Test", "keep") + + originalBody := []byte(`{"model":"gpt-5.2","stream":false,"input":[{"type":"text","text":"hi"}]}`) + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}, "X-Request-Id": []string{"rid-filter-allow"}}, + Body: io.NopCloser(strings.NewReader(`{"output":[],"usage":{"input_tokens":1,"output_tokens":1,"input_tokens_details":{"cached_tokens":0}}}`)), + } + upstream := &httpUpstreamRecorder{resp: resp} + svc := &OpenAIGatewayService{ + cfg: &config.Config{Gateway: config.GatewayConfig{ + ForceCodexCLI: false, + OpenAIPassthroughAllowTimeoutHeaders: true, + }}, + httpUpstream: upstream, + } + account := &Account{ + ID: 222, + Name: "acc", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{"access_token": "oauth-token", "chatgpt_account_id": "chatgpt-acc"}, + Extra: map[string]any{"openai_passthrough": true}, + Status: StatusActive, + Schedulable: true, + RateMultiplier: f64p(1), + } + + _, err := svc.Forward(context.Background(), c, account, originalBody) + require.NoError(t, err) + require.NotNil(t, upstream.lastReq) + require.Equal(t, "120000", upstream.lastReq.Header.Get("x-stainless-timeout")) + require.Equal(t, "keep", upstream.lastReq.Header.Get("X-Test")) +} diff --git a/backend/internal/service/ops_openai_token_stats.go b/backend/internal/service/ops_openai_token_stats.go new file mode 100644 index 00000000..63f88ba0 --- /dev/null +++ b/backend/internal/service/ops_openai_token_stats.go @@ -0,0 +1,55 @@ +package service + +import ( + "context" + + infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors" +) + +func (s *OpsService) GetOpenAITokenStats(ctx context.Context, filter *OpsOpenAITokenStatsFilter) (*OpsOpenAITokenStatsResponse, error) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return nil, err + } + if s.opsRepo == nil { + return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") + } + if filter == nil { + return nil, infraerrors.BadRequest("OPS_FILTER_REQUIRED", "filter is required") + } + if filter.StartTime.IsZero() || filter.EndTime.IsZero() { + return nil, infraerrors.BadRequest("OPS_TIME_RANGE_REQUIRED", "start_time/end_time are required") + } + if filter.StartTime.After(filter.EndTime) { + return nil, infraerrors.BadRequest("OPS_TIME_RANGE_INVALID", "start_time must be <= end_time") + } + + if filter.GroupID != nil && *filter.GroupID <= 0 { + return nil, infraerrors.BadRequest("OPS_GROUP_ID_INVALID", "group_id must be > 0") + } + + // top_n cannot be mixed with page/page_size params. + if filter.TopN > 0 && (filter.Page > 0 || filter.PageSize > 0) { + return nil, infraerrors.BadRequest("OPS_PAGINATION_CONFLICT", "top_n cannot be used with page/page_size") + } + + if filter.TopN > 0 { + if filter.TopN < 1 || filter.TopN > 100 { + return nil, infraerrors.BadRequest("OPS_TOPN_INVALID", "top_n must be between 1 and 100") + } + } else { + if filter.Page <= 0 { + filter.Page = 1 + } + if filter.PageSize <= 0 { + filter.PageSize = 20 + } + if filter.Page < 1 { + return nil, infraerrors.BadRequest("OPS_PAGE_INVALID", "page must be >= 1") + } + if filter.PageSize < 1 || filter.PageSize > 100 { + return nil, infraerrors.BadRequest("OPS_PAGE_SIZE_INVALID", "page_size must be between 1 and 100") + } + } + + return s.opsRepo.GetOpenAITokenStats(ctx, filter) +} diff --git a/backend/internal/service/ops_openai_token_stats_models.go b/backend/internal/service/ops_openai_token_stats_models.go new file mode 100644 index 00000000..ef40fa1f --- /dev/null +++ b/backend/internal/service/ops_openai_token_stats_models.go @@ -0,0 +1,54 @@ +package service + +import "time" + +type OpsOpenAITokenStatsFilter struct { + TimeRange string + StartTime time.Time + EndTime time.Time + + Platform string + GroupID *int64 + + // Pagination mode (default): page/page_size + Page int + PageSize int + + // TopN mode: top_n + TopN int +} + +func (f *OpsOpenAITokenStatsFilter) IsTopNMode() bool { + return f != nil && f.TopN > 0 +} + +type OpsOpenAITokenStatsItem struct { + Model string `json:"model"` + RequestCount int64 `json:"request_count"` + AvgTokensPerSec *float64 `json:"avg_tokens_per_sec"` + AvgFirstTokenMs *float64 `json:"avg_first_token_ms"` + TotalOutputTokens int64 `json:"total_output_tokens"` + AvgDurationMs int64 `json:"avg_duration_ms"` + RequestsWithFirstToken int64 `json:"requests_with_first_token"` +} + +type OpsOpenAITokenStatsResponse struct { + TimeRange string `json:"time_range"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + + Platform string `json:"platform,omitempty"` + GroupID *int64 `json:"group_id,omitempty"` + + Items []*OpsOpenAITokenStatsItem `json:"items"` + + // Total model rows before pagination/topN trimming. + Total int64 `json:"total"` + + // Pagination mode metadata. + Page int `json:"page,omitempty"` + PageSize int `json:"page_size,omitempty"` + + // TopN mode metadata. + TopN *int `json:"top_n,omitempty"` +} diff --git a/backend/internal/service/ops_openai_token_stats_test.go b/backend/internal/service/ops_openai_token_stats_test.go new file mode 100644 index 00000000..ee332f91 --- /dev/null +++ b/backend/internal/service/ops_openai_token_stats_test.go @@ -0,0 +1,162 @@ +package service + +import ( + "context" + "testing" + "time" + + infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors" + "github.com/stretchr/testify/require" +) + +type openAITokenStatsRepoStub struct { + OpsRepository + resp *OpsOpenAITokenStatsResponse + err error + captured *OpsOpenAITokenStatsFilter +} + +func (s *openAITokenStatsRepoStub) GetOpenAITokenStats(ctx context.Context, filter *OpsOpenAITokenStatsFilter) (*OpsOpenAITokenStatsResponse, error) { + s.captured = filter + if s.err != nil { + return nil, s.err + } + if s.resp != nil { + return s.resp, nil + } + return &OpsOpenAITokenStatsResponse{}, nil +} + +func TestOpsServiceGetOpenAITokenStats_Validation(t *testing.T) { + now := time.Now().UTC() + + tests := []struct { + name string + filter *OpsOpenAITokenStatsFilter + wantCode int + wantReason string + }{ + { + name: "filter 不能为空", + filter: nil, + wantCode: 400, + wantReason: "OPS_FILTER_REQUIRED", + }, + { + name: "start_time/end_time 必填", + filter: &OpsOpenAITokenStatsFilter{ + StartTime: time.Time{}, + EndTime: now, + }, + wantCode: 400, + wantReason: "OPS_TIME_RANGE_REQUIRED", + }, + { + name: "start_time 不能晚于 end_time", + filter: &OpsOpenAITokenStatsFilter{ + StartTime: now, + EndTime: now.Add(-1 * time.Minute), + }, + wantCode: 400, + wantReason: "OPS_TIME_RANGE_INVALID", + }, + { + name: "group_id 必须大于 0", + filter: &OpsOpenAITokenStatsFilter{ + StartTime: now.Add(-time.Hour), + EndTime: now, + GroupID: int64Ptr(0), + }, + wantCode: 400, + wantReason: "OPS_GROUP_ID_INVALID", + }, + { + name: "top_n 与分页参数互斥", + filter: &OpsOpenAITokenStatsFilter{ + StartTime: now.Add(-time.Hour), + EndTime: now, + TopN: 10, + Page: 1, + }, + wantCode: 400, + wantReason: "OPS_PAGINATION_CONFLICT", + }, + { + name: "top_n 参数越界", + filter: &OpsOpenAITokenStatsFilter{ + StartTime: now.Add(-time.Hour), + EndTime: now, + TopN: 101, + }, + wantCode: 400, + wantReason: "OPS_TOPN_INVALID", + }, + { + name: "page_size 参数越界", + filter: &OpsOpenAITokenStatsFilter{ + StartTime: now.Add(-time.Hour), + EndTime: now, + Page: 1, + PageSize: 101, + }, + wantCode: 400, + wantReason: "OPS_PAGE_SIZE_INVALID", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + svc := &OpsService{ + opsRepo: &openAITokenStatsRepoStub{}, + } + + _, err := svc.GetOpenAITokenStats(context.Background(), tt.filter) + require.Error(t, err) + require.Equal(t, tt.wantCode, infraerrors.Code(err)) + require.Equal(t, tt.wantReason, infraerrors.Reason(err)) + }) + } +} + +func TestOpsServiceGetOpenAITokenStats_DefaultPagination(t *testing.T) { + now := time.Now().UTC() + repo := &openAITokenStatsRepoStub{ + resp: &OpsOpenAITokenStatsResponse{ + Items: []*OpsOpenAITokenStatsItem{ + {Model: "gpt-4o-mini", RequestCount: 10}, + }, + Total: 1, + }, + } + svc := &OpsService{opsRepo: repo} + + filter := &OpsOpenAITokenStatsFilter{ + TimeRange: "30d", + StartTime: now.Add(-30 * 24 * time.Hour), + EndTime: now, + } + resp, err := svc.GetOpenAITokenStats(context.Background(), filter) + require.NoError(t, err) + require.NotNil(t, resp) + require.NotNil(t, repo.captured) + require.Equal(t, 1, repo.captured.Page) + require.Equal(t, 20, repo.captured.PageSize) + require.Equal(t, 0, repo.captured.TopN) +} + +func TestOpsServiceGetOpenAITokenStats_RepoUnavailable(t *testing.T) { + now := time.Now().UTC() + svc := &OpsService{} + + _, err := svc.GetOpenAITokenStats(context.Background(), &OpsOpenAITokenStatsFilter{ + TimeRange: "1h", + StartTime: now.Add(-time.Hour), + EndTime: now, + TopN: 10, + }) + require.Error(t, err) + require.Equal(t, 503, infraerrors.Code(err)) + require.Equal(t, "OPS_REPO_UNAVAILABLE", infraerrors.Reason(err)) +} + +func int64Ptr(v int64) *int64 { return &v } diff --git a/backend/internal/service/ops_port.go b/backend/internal/service/ops_port.go index bbef4ceb..7a00988c 100644 --- a/backend/internal/service/ops_port.go +++ b/backend/internal/service/ops_port.go @@ -27,6 +27,7 @@ type OpsRepository interface { GetLatencyHistogram(ctx context.Context, filter *OpsDashboardFilter) (*OpsLatencyHistogramResponse, error) GetErrorTrend(ctx context.Context, filter *OpsDashboardFilter, bucketSeconds int) (*OpsErrorTrendResponse, error) GetErrorDistribution(ctx context.Context, filter *OpsDashboardFilter) (*OpsErrorDistributionResponse, error) + GetOpenAITokenStats(ctx context.Context, filter *OpsOpenAITokenStatsFilter) (*OpsOpenAITokenStatsResponse, error) InsertSystemMetrics(ctx context.Context, input *OpsInsertSystemMetricsInput) error GetLatestSystemMetrics(ctx context.Context, windowMinutes int) (*OpsSystemMetricsSnapshot, error) diff --git a/backend/internal/service/parse_integral_number_unit.go b/backend/internal/service/parse_integral_number_unit.go new file mode 100644 index 00000000..c9c617b1 --- /dev/null +++ b/backend/internal/service/parse_integral_number_unit.go @@ -0,0 +1,51 @@ +//go:build unit + +package service + +import ( + "encoding/json" + "math" +) + +// parseIntegralNumber 将 JSON 解码后的数字安全转换为 int。 +// 仅接受“整数值”的输入,小数/NaN/Inf/越界值都会返回 false。 +// +// 说明: +// - 该函数当前仅用于 unit 测试中的 map-based 解析逻辑验证,因此放在 unit build tag 下, +// 避免在默认构建中触发 unused lint。 +func parseIntegralNumber(raw any) (int, bool) { + switch v := raw.(type) { + case float64: + if math.IsNaN(v) || math.IsInf(v, 0) || v != math.Trunc(v) { + return 0, false + } + if v > float64(math.MaxInt) || v < float64(math.MinInt) { + return 0, false + } + return int(v), true + case int: + return v, true + case int8: + return int(v), true + case int16: + return int(v), true + case int32: + return int(v), true + case int64: + if v > int64(math.MaxInt) || v < int64(math.MinInt) { + return 0, false + } + return int(v), true + case json.Number: + i64, err := v.Int64() + if err != nil { + return 0, false + } + if i64 > int64(math.MaxInt) || i64 < int64(math.MinInt) { + return 0, false + } + return int(i64), true + default: + return 0, false + } +} diff --git a/deploy/config.example.yaml b/deploy/config.example.yaml index b60082b9..9ab3bfd0 100644 --- a/deploy/config.example.yaml +++ b/deploy/config.example.yaml @@ -187,6 +187,9 @@ gateway: # # 注意:开启后会影响所有客户端的行为(不仅限于 VS Code / Codex CLI),请谨慎开启。 force_codex_cli: false + # OpenAI 透传模式是否放行客户端超时头(如 x-stainless-timeout) + # 默认 false:过滤超时头,降低上游提前断流风险。 + openai_passthrough_allow_timeout_headers: false # HTTP upstream connection pool settings (HTTP/2 + multi-proxy scenario defaults) # HTTP 上游连接池配置(HTTP/2 + 多代理场景默认值) # Max idle connections across all hosts diff --git a/frontend/src/api/admin/ops.ts b/frontend/src/api/admin/ops.ts index 9f980a12..7e70aacb 100644 --- a/frontend/src/api/admin/ops.ts +++ b/frontend/src/api/admin/ops.ts @@ -259,6 +259,40 @@ export interface OpsErrorDistributionResponse { items: OpsErrorDistributionItem[] } +export type OpsOpenAITokenStatsTimeRange = '30m' | '1h' | '1d' | '15d' | '30d' + +export interface OpsOpenAITokenStatsItem { + model: string + request_count: number + avg_tokens_per_sec?: number | null + avg_first_token_ms?: number | null + total_output_tokens: number + avg_duration_ms: number + requests_with_first_token: number +} + +export interface OpsOpenAITokenStatsResponse { + time_range: OpsOpenAITokenStatsTimeRange + start_time: string + end_time: string + platform?: string + group_id?: number | null + items: OpsOpenAITokenStatsItem[] + total: number + page?: number + page_size?: number + top_n?: number | null +} + +export interface OpsOpenAITokenStatsParams { + time_range?: OpsOpenAITokenStatsTimeRange + platform?: string + group_id?: number | null + page?: number + page_size?: number + top_n?: number +} + export interface OpsSystemMetricsSnapshot { id: number created_at: string @@ -971,6 +1005,17 @@ export async function getErrorDistribution( return data } +export async function getOpenAITokenStats( + params: OpsOpenAITokenStatsParams, + options: OpsRequestOptions = {} +): Promise { + const { data } = await apiClient.get('/admin/ops/dashboard/openai-token-stats', { + params, + signal: options.signal + }) + return data +} + export type OpsErrorListView = 'errors' | 'excluded' | 'all' export type OpsErrorListQueryParams = { @@ -1188,6 +1233,7 @@ export const opsAPI = { getLatencyHistogram, getErrorTrend, getErrorDistribution, + getOpenAITokenStats, getConcurrencyStats, getUserConcurrencyStats, getAccountAvailabilityStats, diff --git a/frontend/src/i18n/locales/en.ts b/frontend/src/i18n/locales/en.ts index 3b22ae33..862904e3 100644 --- a/frontend/src/i18n/locales/en.ts +++ b/frontend/src/i18n/locales/en.ts @@ -2508,11 +2508,33 @@ export default { '5m': 'Last 5 minutes', '30m': 'Last 30 minutes', '1h': 'Last 1 hour', + '1d': 'Last 1 day', + '15d': 'Last 15 days', '6h': 'Last 6 hours', '24h': 'Last 24 hours', '7d': 'Last 7 days', '30d': 'Last 30 days' }, + openaiTokenStats: { + title: 'OpenAI Token Request Stats', + viewModeTopN: 'TopN', + viewModePagination: 'Pagination', + prevPage: 'Previous', + nextPage: 'Next', + pageInfo: 'Page {page}/{total}', + totalModels: 'Total models: {total}', + failedToLoad: 'Failed to load OpenAI token stats', + empty: 'No OpenAI token stats for the current filters', + table: { + model: 'Model', + requestCount: 'Requests', + avgTokensPerSec: 'Avg Tokens/sec', + avgFirstTokenMs: 'Avg First Token Latency (ms)', + totalOutputTokens: 'Total Output Tokens', + avgDurationMs: 'Avg Duration (ms)', + requestsWithFirstToken: 'Requests With First Token' + } + }, fullscreen: { enter: 'Enter Fullscreen' }, diff --git a/frontend/src/i18n/locales/zh.ts b/frontend/src/i18n/locales/zh.ts index 493dcac8..dea8814a 100644 --- a/frontend/src/i18n/locales/zh.ts +++ b/frontend/src/i18n/locales/zh.ts @@ -2675,12 +2675,34 @@ export default { '5m': '近5分钟', '30m': '近30分钟', '1h': '近1小时', + '1d': '近1天', + '15d': '近15天', '6h': '近6小时', '24h': '近24小时', '7d': '近7天', '30d': '近30天', custom: '自定义' }, + openaiTokenStats: { + title: 'OpenAI Token 请求统计', + viewModeTopN: 'TopN', + viewModePagination: '分页', + prevPage: '上一页', + nextPage: '下一页', + pageInfo: '第 {page}/{total} 页', + totalModels: '模型总数:{total}', + failedToLoad: '加载 OpenAI Token 统计失败', + empty: '当前筛选条件下暂无 OpenAI Token 请求统计数据', + table: { + model: '模型', + requestCount: '请求数', + avgTokensPerSec: '平均 Tokens/秒', + avgFirstTokenMs: '平均首 Token 延迟(ms)', + totalOutputTokens: '输出 Token 总数', + avgDurationMs: '平均时长(ms)', + requestsWithFirstToken: '首 Token 样本数' + } + }, customTimeRange: { startTime: '开始时间', endTime: '结束时间' diff --git a/frontend/src/views/admin/ops/OpsDashboard.vue b/frontend/src/views/admin/ops/OpsDashboard.vue index 927fee94..fa9d41f1 100644 --- a/frontend/src/views/admin/ops/OpsDashboard.vue +++ b/frontend/src/views/admin/ops/OpsDashboard.vue @@ -84,6 +84,15 @@ /> + +
+ +
+ @@ -148,6 +157,7 @@ import OpsLatencyChart from './components/OpsLatencyChart.vue' import OpsThroughputTrendChart from './components/OpsThroughputTrendChart.vue' import OpsSwitchRateTrendChart from './components/OpsSwitchRateTrendChart.vue' import OpsAlertEventsCard from './components/OpsAlertEventsCard.vue' +import OpsOpenAITokenStatsCard from './components/OpsOpenAITokenStatsCard.vue' import OpsRequestDetailsModal, { type OpsRequestDetailsPreset } from './components/OpsRequestDetailsModal.vue' import OpsSettingsDialog from './components/OpsSettingsDialog.vue' import OpsAlertRulesCard from './components/OpsAlertRulesCard.vue' diff --git a/frontend/src/views/admin/ops/components/OpsOpenAITokenStatsCard.vue b/frontend/src/views/admin/ops/components/OpsOpenAITokenStatsCard.vue new file mode 100644 index 00000000..9b6f08be --- /dev/null +++ b/frontend/src/views/admin/ops/components/OpsOpenAITokenStatsCard.vue @@ -0,0 +1,246 @@ + + + diff --git a/frontend/src/views/admin/ops/components/__tests__/OpsOpenAITokenStatsCard.spec.ts b/frontend/src/views/admin/ops/components/__tests__/OpsOpenAITokenStatsCard.spec.ts new file mode 100644 index 00000000..3e95f460 --- /dev/null +++ b/frontend/src/views/admin/ops/components/__tests__/OpsOpenAITokenStatsCard.spec.ts @@ -0,0 +1,215 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest' +import { defineComponent } from 'vue' +import { flushPromises, mount } from '@vue/test-utils' +import OpsOpenAITokenStatsCard from '../OpsOpenAITokenStatsCard.vue' + +const mockGetOpenAITokenStats = vi.fn() + +vi.mock('@/api/admin/ops', () => ({ + opsAPI: { + getOpenAITokenStats: (...args: any[]) => mockGetOpenAITokenStats(...args), + }, +})) + +vi.mock('vue-i18n', async (importOriginal) => { + const actual = await importOriginal() + return { + ...actual, + useI18n: () => ({ + t: (key: string, params?: Record) => { + if (key === 'admin.ops.openaiTokenStats.pageInfo' && params) { + return `第 ${params.page}/${params.total} 页` + } + return key + }, + }), + } +}) + +const SelectStub = defineComponent({ + name: 'SelectControlStub', + props: { + modelValue: { + type: [String, Number], + default: '', + }, + }, + emits: ['update:modelValue'], + template: '
', +}) + +const EmptyStateStub = defineComponent({ + name: 'EmptyState', + props: { + title: { type: String, default: '' }, + description: { type: String, default: '' }, + }, + template: '
{{ title }}|{{ description }}
', +}) + +const sampleResponse = { + time_range: '30d' as const, + start_time: '2026-01-01T00:00:00Z', + end_time: '2026-01-31T00:00:00Z', + platform: 'openai', + group_id: 7, + items: [ + { + model: 'gpt-4o-mini', + request_count: 12, + avg_tokens_per_sec: 22.5, + avg_first_token_ms: 123.45, + total_output_tokens: 1234, + avg_duration_ms: 321, + requests_with_first_token: 10, + }, + ], + total: 40, + page: 1, + page_size: 20, + top_n: null, +} + +describe('OpsOpenAITokenStatsCard', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('默认加载并透传 platform/group 过滤,支持时间窗口切换', async () => { + mockGetOpenAITokenStats.mockResolvedValue(sampleResponse) + + const wrapper = mount(OpsOpenAITokenStatsCard, { + props: { + platformFilter: 'openai', + groupIdFilter: 7, + refreshToken: 0, + }, + global: { + stubs: { + Select: SelectStub, + EmptyState: EmptyStateStub, + }, + }, + }) + + await flushPromises() + expect(mockGetOpenAITokenStats).toHaveBeenCalledWith( + expect.objectContaining({ + time_range: '30d', + platform: 'openai', + group_id: 7, + top_n: 20, + }) + ) + + const selects = wrapper.findAllComponents(SelectStub) + await selects[0].vm.$emit('update:modelValue', '1h') + await flushPromises() + + expect(mockGetOpenAITokenStats).toHaveBeenCalledWith( + expect.objectContaining({ + time_range: '1h', + platform: 'openai', + group_id: 7, + }) + ) + }) + + it('支持分页与 TopN 模式切换并按参数请求', async () => { + mockGetOpenAITokenStats.mockImplementation(async (params: Record) => ({ + ...sampleResponse, + time_range: params.time_range ?? '30d', + page: params.page ?? 1, + page_size: params.page_size ?? 20, + top_n: params.top_n ?? null, + total: 40, + })) + + const wrapper = mount(OpsOpenAITokenStatsCard, { + props: { + refreshToken: 0, + }, + global: { + stubs: { + Select: SelectStub, + EmptyState: EmptyStateStub, + }, + }, + }) + await flushPromises() + + let selects = wrapper.findAllComponents(SelectStub) + await selects[1].vm.$emit('update:modelValue', 'pagination') + await flushPromises() + + expect(mockGetOpenAITokenStats).toHaveBeenCalledWith( + expect.objectContaining({ + page: 1, + page_size: 20, + }) + ) + + const buttons = wrapper.findAll('button') + expect(buttons.length).toBeGreaterThanOrEqual(2) + await buttons[1].trigger('click') + await flushPromises() + + expect(mockGetOpenAITokenStats).toHaveBeenCalledWith( + expect.objectContaining({ + page: 2, + page_size: 20, + }) + ) + + selects = wrapper.findAllComponents(SelectStub) + await selects[1].vm.$emit('update:modelValue', 'topn') + await flushPromises() + selects = wrapper.findAllComponents(SelectStub) + await selects[2].vm.$emit('update:modelValue', 50) + await flushPromises() + + expect(mockGetOpenAITokenStats).toHaveBeenCalledWith( + expect.objectContaining({ + top_n: 50, + }) + ) + }) + + it('接口返回空数据时显示空态', async () => { + mockGetOpenAITokenStats.mockResolvedValue({ + ...sampleResponse, + items: [], + total: 0, + }) + + const wrapper = mount(OpsOpenAITokenStatsCard, { + props: { refreshToken: 0 }, + global: { + stubs: { + Select: SelectStub, + EmptyState: EmptyStateStub, + }, + }, + }) + await flushPromises() + + expect(wrapper.find('.empty-state').exists()).toBe(true) + }) + + it('接口异常时显示错误提示', async () => { + mockGetOpenAITokenStats.mockRejectedValue(new Error('加载失败')) + + const wrapper = mount(OpsOpenAITokenStatsCard, { + props: { refreshToken: 0 }, + global: { + stubs: { + Select: SelectStub, + EmptyState: EmptyStateStub, + }, + }, + }) + await flushPromises() + + expect(wrapper.text()).toContain('加载失败') + }) +})