diff --git a/backend/internal/handler/admin/account_handler.go b/backend/internal/handler/admin/account_handler.go index 9883d007..7454451a 100644 --- a/backend/internal/handler/admin/account_handler.go +++ b/backend/internal/handler/admin/account_handler.go @@ -652,6 +652,7 @@ func (h *AccountHandler) Delete(c *gin.Context) { type TestAccountRequest struct { ModelID string `json:"model_id"` Prompt string `json:"prompt"` + Mode string `json:"mode"` } type SyncFromCRSRequest struct { @@ -682,7 +683,7 @@ func (h *AccountHandler) Test(c *gin.Context) { _ = c.ShouldBindJSON(&req) // Use AccountTestService to test the account with SSE streaming - if err := h.accountTestService.TestAccountConnection(c, accountID, req.ModelID, req.Prompt); err != nil { + if err := h.accountTestService.TestAccountConnection(c, accountID, req.ModelID, req.Prompt, req.Mode); err != nil { // Error already sent via SSE, just log return } diff --git a/backend/internal/handler/openai_chat_completions.go b/backend/internal/handler/openai_chat_completions.go index 3c4e6251..f395970a 100644 --- a/backend/internal/handler/openai_chat_completions.go +++ b/backend/internal/handler/openai_chat_completions.go @@ -130,6 +130,7 @@ func (h *OpenAIGatewayHandler) ChatCompletions(c *gin.Context) { reqModel, failedAccountIDs, service.OpenAIUpstreamTransportAny, + false, ) if err != nil { reqLog.Warn("openai_chat_completions.account_select_failed", @@ -153,6 +154,7 @@ func (h *OpenAIGatewayHandler) ChatCompletions(c *gin.Context) { defaultModel, failedAccountIDs, service.OpenAIUpstreamTransportAny, + false, ) if err == nil && selection != nil { c.Set("openai_chat_completions_fallback_model", defaultModel) diff --git a/backend/internal/handler/openai_gateway_handler.go b/backend/internal/handler/openai_gateway_handler.go index 1c975573..7676ffa3 100644 --- a/backend/internal/handler/openai_gateway_handler.go +++ b/backend/internal/handler/openai_gateway_handler.go @@ -238,6 +238,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) { // Generate session hash (header first; fallback to prompt_cache_key) sessionHash := h.gatewayService.GenerateSessionHash(c, sessionHashBody) + requireCompact := isOpenAIRemoteCompactPath(c) maxAccountSwitches := h.maxAccountSwitches switchCount := 0 @@ -256,6 +257,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) { reqModel, failedAccountIDs, service.OpenAIUpstreamTransportAny, + requireCompact, ) if err != nil { reqLog.Warn("openai.account_select_failed", @@ -263,6 +265,10 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) { zap.Int("excluded_account_count", len(failedAccountIDs)), ) if len(failedAccountIDs) == 0 { + if errors.Is(err, service.ErrNoAvailableCompactAccounts) { + h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "compact_not_supported", "No available OpenAI accounts support /responses/compact", streamStarted) + return + } h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "Service temporarily unavailable", streamStarted) return } @@ -644,6 +650,7 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) { currentRoutingModel, failedAccountIDs, service.OpenAIUpstreamTransportAny, + false, ) if err != nil { reqLog.Warn("openai_messages.account_select_failed", @@ -1167,6 +1174,7 @@ func (h *OpenAIGatewayHandler) ResponsesWebSocket(c *gin.Context) { reqModel, nil, service.OpenAIUpstreamTransportResponsesWebsocketV2, + false, ) if err != nil { reqLog.Warn("openai.websocket_account_select_failed", zap.Error(err)) diff --git a/backend/internal/repository/account_repo_compact_extra_test.go b/backend/internal/repository/account_repo_compact_extra_test.go new file mode 100644 index 00000000..604f392e --- /dev/null +++ b/backend/internal/repository/account_repo_compact_extra_test.go @@ -0,0 +1,14 @@ +package repository + +import "testing" + +func TestShouldEnqueueSchedulerOutboxForExtraUpdates_CompactCapabilityKeysAreRelevant(t *testing.T) { + updates := map[string]any{ + "openai_compact_supported": true, + "openai_compact_checked_at": "2026-04-10T10:00:00Z", + } + + if !shouldEnqueueSchedulerOutboxForExtraUpdates(updates) { + t.Fatalf("expected compact capability updates to enqueue scheduler outbox") + } +} diff --git a/backend/internal/service/account.go b/backend/internal/service/account.go index 0fb6e18f..cd06ffa3 100644 --- a/backend/internal/service/account.go +++ b/backend/internal/service/account.go @@ -393,6 +393,56 @@ func parseTempUnschedInt(value any) int { return 0 } +const ( + // OpenAICompactModeAuto follows compact-probe results when deciding compact eligibility. + OpenAICompactModeAuto = "auto" + // OpenAICompactModeForceOn always treats the account as compact-supported. + OpenAICompactModeForceOn = "force_on" + // OpenAICompactModeForceOff always treats the account as compact-unsupported. + OpenAICompactModeForceOff = "force_off" +) + +func normalizeOpenAICompactMode(mode string) string { + switch strings.ToLower(strings.TrimSpace(mode)) { + case OpenAICompactModeForceOn: + return OpenAICompactModeForceOn + case OpenAICompactModeForceOff: + return OpenAICompactModeForceOff + default: + return OpenAICompactModeAuto + } +} + +func stringMappingFromRaw(raw any) map[string]string { + switch mapping := raw.(type) { + case map[string]any: + if len(mapping) == 0 { + return nil + } + result := make(map[string]string, len(mapping)) + for key, value := range mapping { + if str, ok := value.(string); ok { + result[key] = str + } + } + if len(result) == 0 { + return nil + } + return result + case map[string]string: + if len(mapping) == 0 { + return nil + } + result := make(map[string]string, len(mapping)) + for key, value := range mapping { + result[key] = value + } + return result + default: + return nil + } +} + func (a *Account) GetModelMapping() map[string]string { credentialsPtr := mapPtr(a.Credentials) rawMapping, _ := a.Credentials["model_mapping"].(map[string]any) @@ -598,6 +648,77 @@ func (a *Account) ResolveMappedModel(requestedModel string) (mappedModel string, return requestedModel, false } +// GetOpenAICompactMode returns the compact routing mode for an OpenAI account. +// Missing or invalid values fall back to "auto". +func (a *Account) GetOpenAICompactMode() string { + if a == nil || !a.IsOpenAI() || a.Extra == nil { + return OpenAICompactModeAuto + } + mode, _ := a.Extra["openai_compact_mode"].(string) + return normalizeOpenAICompactMode(mode) +} + +// OpenAICompactSupportKnown reports whether compact capability is known for this +// account and, when known, whether it is supported. +func (a *Account) OpenAICompactSupportKnown() (supported bool, known bool) { + if a == nil || !a.IsOpenAI() { + return false, false + } + + switch a.GetOpenAICompactMode() { + case OpenAICompactModeForceOn: + return true, true + case OpenAICompactModeForceOff: + return false, true + } + + if a.Extra == nil { + return false, false + } + supported, ok := a.Extra["openai_compact_supported"].(bool) + if !ok { + return false, false + } + return supported, true +} + +// AllowsOpenAICompact reports whether the account may be considered for compact +// requests. Unknown capability remains allowed to avoid breaking older accounts +// before an explicit probe has been run. +func (a *Account) AllowsOpenAICompact() bool { + if a == nil || !a.IsOpenAI() { + return false + } + supported, known := a.OpenAICompactSupportKnown() + if !known { + return true + } + return supported +} + +// GetCompactModelMapping returns compact-only model remapping configuration. +// This mapping is intended for /responses/compact only and does not affect +// normal /responses traffic. +func (a *Account) GetCompactModelMapping() map[string]string { + if a == nil || a.Credentials == nil { + return nil + } + return stringMappingFromRaw(a.Credentials["compact_model_mapping"]) +} + +// ResolveCompactMappedModel resolves compact-only model remapping and reports +// whether a compact-specific mapping rule matched. +func (a *Account) ResolveCompactMappedModel(requestedModel string) (mappedModel string, matched bool) { + mapping := a.GetCompactModelMapping() + if len(mapping) == 0 { + return requestedModel, false + } + if mappedModel, matched := resolveRequestedModelInMapping(mapping, requestedModel); matched { + return mappedModel, true + } + return requestedModel, false +} + func (a *Account) GetBaseURL() string { if a.Type != AccountTypeAPIKey { return "" diff --git a/backend/internal/service/account_openai_compact_test.go b/backend/internal/service/account_openai_compact_test.go new file mode 100644 index 00000000..442b00da --- /dev/null +++ b/backend/internal/service/account_openai_compact_test.go @@ -0,0 +1,369 @@ +package service + +import "testing" + +func TestAccountGetOpenAICompactMode(t *testing.T) { + tests := []struct { + name string + account *Account + want string + }{ + { + name: "nil account defaults to auto", + want: OpenAICompactModeAuto, + }, + { + name: "non openai account defaults to auto", + account: &Account{ + Platform: PlatformAnthropic, + Extra: map[string]any{"openai_compact_mode": OpenAICompactModeForceOn}, + }, + want: OpenAICompactModeAuto, + }, + { + name: "missing extra defaults to auto", + account: &Account{ + Platform: PlatformOpenAI, + }, + want: OpenAICompactModeAuto, + }, + { + name: "invalid mode falls back to auto", + account: &Account{ + Platform: PlatformOpenAI, + Extra: map[string]any{"openai_compact_mode": " invalid "}, + }, + want: OpenAICompactModeAuto, + }, + { + name: "force on is normalized", + account: &Account{ + Platform: PlatformOpenAI, + Extra: map[string]any{"openai_compact_mode": " FORCE_ON "}, + }, + want: OpenAICompactModeForceOn, + }, + { + name: "force off is normalized", + account: &Account{ + Platform: PlatformOpenAI, + Extra: map[string]any{"openai_compact_mode": "force_off"}, + }, + want: OpenAICompactModeForceOff, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.account.GetOpenAICompactMode(); got != tt.want { + t.Fatalf("GetOpenAICompactMode() = %q, want %q", got, tt.want) + } + }) + } +} + +func TestAccountOpenAICompactSupportKnown(t *testing.T) { + tests := []struct { + name string + account *Account + wantSupported bool + wantKnown bool + }{ + { + name: "nil account is unknown", + wantSupported: false, + wantKnown: false, + }, + { + name: "non openai account is unknown", + account: &Account{ + Platform: PlatformAnthropic, + Extra: map[string]any{"openai_compact_supported": true}, + }, + wantSupported: false, + wantKnown: false, + }, + { + name: "force on overrides probe state", + account: &Account{ + Platform: PlatformOpenAI, + Extra: map[string]any{ + "openai_compact_mode": OpenAICompactModeForceOn, + "openai_compact_supported": false, + }, + }, + wantSupported: true, + wantKnown: true, + }, + { + name: "force off overrides probe state", + account: &Account{ + Platform: PlatformOpenAI, + Extra: map[string]any{ + "openai_compact_mode": OpenAICompactModeForceOff, + "openai_compact_supported": true, + }, + }, + wantSupported: false, + wantKnown: true, + }, + { + name: "auto true is known supported", + account: &Account{ + Platform: PlatformOpenAI, + Extra: map[string]any{"openai_compact_supported": true}, + }, + wantSupported: true, + wantKnown: true, + }, + { + name: "auto false is known unsupported", + account: &Account{ + Platform: PlatformOpenAI, + Extra: map[string]any{"openai_compact_supported": false}, + }, + wantSupported: false, + wantKnown: true, + }, + { + name: "auto without probe state remains unknown", + account: &Account{ + Platform: PlatformOpenAI, + Extra: map[string]any{}, + }, + wantSupported: false, + wantKnown: false, + }, + { + name: "invalid probe field remains unknown", + account: &Account{ + Platform: PlatformOpenAI, + Extra: map[string]any{"openai_compact_supported": "true"}, + }, + wantSupported: false, + wantKnown: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotSupported, gotKnown := tt.account.OpenAICompactSupportKnown() + if gotSupported != tt.wantSupported || gotKnown != tt.wantKnown { + t.Fatalf("OpenAICompactSupportKnown() = (%v, %v), want (%v, %v)", gotSupported, gotKnown, tt.wantSupported, tt.wantKnown) + } + }) + } +} + +func TestAccountAllowsOpenAICompact(t *testing.T) { + tests := []struct { + name string + account *Account + want bool + }{ + { + name: "nil account does not allow compact", + want: false, + }, + { + name: "non openai account does not allow compact", + account: &Account{ + Platform: PlatformAnthropic, + }, + want: false, + }, + { + name: "unknown openai account remains allowed", + account: &Account{ + Platform: PlatformOpenAI, + Extra: map[string]any{}, + }, + want: true, + }, + { + name: "supported openai account is allowed", + account: &Account{ + Platform: PlatformOpenAI, + Extra: map[string]any{"openai_compact_supported": true}, + }, + want: true, + }, + { + name: "unsupported openai account is rejected", + account: &Account{ + Platform: PlatformOpenAI, + Extra: map[string]any{"openai_compact_supported": false}, + }, + want: false, + }, + { + name: "force on is allowed", + account: &Account{ + Platform: PlatformOpenAI, + Extra: map[string]any{"openai_compact_mode": OpenAICompactModeForceOn}, + }, + want: true, + }, + { + name: "force off is rejected", + account: &Account{ + Platform: PlatformOpenAI, + Extra: map[string]any{"openai_compact_mode": OpenAICompactModeForceOff}, + }, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.account.AllowsOpenAICompact(); got != tt.want { + t.Fatalf("AllowsOpenAICompact() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestAccountGetCompactModelMapping(t *testing.T) { + tests := []struct { + name string + account *Account + want map[string]string + }{ + { + name: "nil account returns nil", + want: nil, + }, + { + name: "missing credentials returns nil", + account: &Account{ + Platform: PlatformOpenAI, + }, + want: nil, + }, + { + name: "map any is converted", + account: &Account{ + Credentials: map[string]any{ + "compact_model_mapping": map[string]any{ + "gpt-5.4": "gpt-5.4-openai-compact", + "invalid": 1, + }, + }, + }, + want: map[string]string{ + "gpt-5.4": "gpt-5.4-openai-compact", + }, + }, + { + name: "map string string is copied", + account: &Account{ + Credentials: map[string]any{ + "compact_model_mapping": map[string]string{ + "gpt-*": "compact-*", + }, + }, + }, + want: map[string]string{ + "gpt-*": "compact-*", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.account.GetCompactModelMapping() + if !equalStringMap(got, tt.want) { + t.Fatalf("GetCompactModelMapping() = %#v, want %#v", got, tt.want) + } + }) + } +} + +func TestAccountResolveCompactMappedModel(t *testing.T) { + tests := []struct { + name string + credentials map[string]any + requestedModel string + expectedModel string + expectedMatch bool + }{ + { + name: "no compact mapping reports unmatched", + credentials: nil, + requestedModel: "gpt-5.4", + expectedModel: "gpt-5.4", + expectedMatch: false, + }, + { + name: "exact compact mapping matches", + credentials: map[string]any{ + "compact_model_mapping": map[string]any{ + "gpt-5.4": "gpt-5.4-openai-compact", + }, + }, + requestedModel: "gpt-5.4", + expectedModel: "gpt-5.4-openai-compact", + expectedMatch: true, + }, + { + name: "exact passthrough counts as match", + credentials: map[string]any{ + "compact_model_mapping": map[string]any{ + "gpt-5.4": "gpt-5.4", + }, + }, + requestedModel: "gpt-5.4", + expectedModel: "gpt-5.4", + expectedMatch: true, + }, + { + name: "longest wildcard wins", + credentials: map[string]any{ + "compact_model_mapping": map[string]any{ + "gpt-*": "fallback-compact", + "gpt-5.4*": "gpt-5.4-openai-compact", + "gpt-5.4-mini*": "gpt-5.4-mini-openai-compact", + }, + }, + requestedModel: "gpt-5.4-mini", + expectedModel: "gpt-5.4-mini-openai-compact", + expectedMatch: true, + }, + { + name: "missing compact mapping reports unmatched", + credentials: map[string]any{ + "compact_model_mapping": map[string]any{ + "gpt-5.3": "gpt-5.3-openai-compact", + }, + }, + requestedModel: "gpt-5.4", + expectedModel: "gpt-5.4", + expectedMatch: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + account := &Account{ + Platform: PlatformOpenAI, + Credentials: tt.credentials, + } + gotModel, gotMatch := account.ResolveCompactMappedModel(tt.requestedModel) + if gotModel != tt.expectedModel || gotMatch != tt.expectedMatch { + t.Fatalf("ResolveCompactMappedModel(%q) = (%q, %v), want (%q, %v)", tt.requestedModel, gotModel, gotMatch, tt.expectedModel, tt.expectedMatch) + } + }) + } +} + +func equalStringMap(left, right map[string]string) bool { + if len(left) != len(right) { + return false + } + for key, want := range right { + if got, ok := left[key]; !ok || got != want { + return false + } + } + return true +} diff --git a/backend/internal/service/account_test_service.go b/backend/internal/service/account_test_service.go index bb2fb8c0..d78dcd79 100644 --- a/backend/internal/service/account_test_service.go +++ b/backend/internal/service/account_test_service.go @@ -165,7 +165,8 @@ func createTestPayload(modelID string) (map[string]any, error) { // TestAccountConnection tests an account's connection by sending a test request // All account types use full Claude Code client characteristics, only auth header differs // modelID is optional - if empty, defaults to claude.DefaultTestModel -func (s *AccountTestService) TestAccountConnection(c *gin.Context, accountID int64, modelID string, prompt string) error { +// mode is optional - "compact" routes OpenAI accounts to the /responses/compact probe path +func (s *AccountTestService) TestAccountConnection(c *gin.Context, accountID int64, modelID string, prompt string, mode string) error { ctx := c.Request.Context() // Get account @@ -176,7 +177,7 @@ func (s *AccountTestService) TestAccountConnection(c *gin.Context, accountID int // Route to platform-specific test method if account.IsOpenAI() { - return s.testOpenAIAccountConnection(c, account, modelID, prompt) + return s.testOpenAIAccountConnection(c, account, modelID, prompt, normalizeAccountTestMode(mode)) } if account.IsGemini() { @@ -416,9 +417,10 @@ func (s *AccountTestService) testBedrockAccountConnection(c *gin.Context, ctx co } // testOpenAIAccountConnection tests an OpenAI account's connection -func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account *Account, modelID string, prompt string) error { +func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account *Account, modelID string, prompt string, mode string) error { ctx := c.Request.Context() _ = prompt + mode = normalizeAccountTestMode(mode) // Default to openai.DefaultTestModel for OpenAI testing testModelID := modelID @@ -426,14 +428,12 @@ func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account testModelID = openai.DefaultTestModel } - // For API Key accounts with model mapping, map the model - if account.Type == "apikey" { - mapping := account.GetModelMapping() - if len(mapping) > 0 { - if mappedModel, exists := mapping[testModelID]; exists { - testModelID = mappedModel - } - } + // Align test routing with gateway behavior: OpenAI accounts apply normal + // account model mapping, and compact mode applies compact-only mapping on top. + testModelID = account.GetMappedModel(testModelID) + if mode == AccountTestModeCompact { + testModelID = resolveOpenAICompactForwardModel(account, testModelID) + return s.testOpenAICompactConnection(c, account, testModelID) } // Route to image generation test if an image model is selected @@ -553,6 +553,121 @@ func (s *AccountTestService) testOpenAIAccountConnection(c *gin.Context, account return s.processOpenAIStream(c, resp.Body) } +// testOpenAICompactConnection probes /responses/compact and persists the +// resulting capability state on the account. +func (s *AccountTestService) testOpenAICompactConnection(c *gin.Context, account *Account, testModelID string) error { + ctx := c.Request.Context() + + authToken := "" + apiURL := "" + isOAuth := false + chatgptAccountID := "" + + switch { + case account.IsOAuth(): + isOAuth = true + authToken = account.GetOpenAIAccessToken() + if authToken == "" { + return s.sendErrorAndEnd(c, "No access token available") + } + apiURL = chatgptCodexAPIURL + "/compact" + chatgptAccountID = account.GetChatGPTAccountID() + case account.Type == AccountTypeAPIKey: + authToken = account.GetOpenAIApiKey() + if authToken == "" { + return s.sendErrorAndEnd(c, "No API key available") + } + baseURL := account.GetOpenAIBaseURL() + if baseURL == "" { + baseURL = "https://api.openai.com" + } + normalizedBaseURL, err := s.validateUpstreamBaseURL(baseURL) + if err != nil { + return s.sendErrorAndEnd(c, fmt.Sprintf("Invalid base URL: %s", err.Error())) + } + apiURL = appendOpenAIResponsesRequestPathSuffix(buildOpenAIResponsesURL(normalizedBaseURL), "/compact") + default: + return s.sendErrorAndEnd(c, fmt.Sprintf("Unsupported account type: %s", account.Type)) + } + + c.Writer.Header().Set("Content-Type", "text/event-stream") + c.Writer.Header().Set("Cache-Control", "no-cache") + c.Writer.Header().Set("Connection", "keep-alive") + c.Writer.Header().Set("X-Accel-Buffering", "no") + c.Writer.Flush() + + payloadBytes, _ := json.Marshal(createOpenAICompactProbePayload(testModelID)) + s.sendEvent(c, TestEvent{Type: "test_start", Model: testModelID}) + + req, err := http.NewRequestWithContext(ctx, "POST", apiURL, bytes.NewReader(payloadBytes)) + if err != nil { + return s.sendErrorAndEnd(c, "Failed to create request") + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + req.Header.Set("Authorization", "Bearer "+authToken) + req.Header.Set("OpenAI-Beta", "responses=experimental") + req.Header.Set("Originator", "codex_cli_rs") + req.Header.Set("User-Agent", codexCLIUserAgent) + req.Header.Set("Version", codexCLIVersion) + probeSessionID := compactProbeSessionID(account.ID) + req.Header.Set("Session_ID", probeSessionID) + req.Header.Set("Conversation_ID", probeSessionID) + + if isOAuth { + req.Host = "chatgpt.com" + if chatgptAccountID != "" { + req.Header.Set("chatgpt-account-id", chatgptAccountID) + } + } + + proxyURL := "" + if account.ProxyID != nil && account.Proxy != nil { + proxyURL = account.Proxy.URL() + } + + resp, err := s.httpUpstream.DoWithTLS(req, proxyURL, account.ID, account.Concurrency, s.tlsFPProfileService.ResolveTLSProfile(account)) + if err != nil { + if s.accountRepo != nil { + updates := buildOpenAICompactProbeExtraUpdates(nil, nil, err, time.Now()) + _ = s.accountRepo.UpdateExtra(ctx, account.ID, updates) + mergeAccountExtra(account, updates) + } + return s.sendErrorAndEnd(c, fmt.Sprintf("Request failed: %s", err.Error())) + } + defer func() { _ = resp.Body.Close() }() + + body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) + + if s.accountRepo != nil { + updates := buildOpenAICompactProbeExtraUpdates(resp, body, nil, time.Now()) + if codexUpdates, err := extractOpenAICodexProbeUpdates(resp); err == nil && len(codexUpdates) > 0 { + updates = mergeExtraUpdates(updates, codexUpdates) + } + if len(updates) > 0 { + _ = s.accountRepo.UpdateExtra(ctx, account.ID, updates) + mergeAccountExtra(account, updates) + } + // 探测如返回 429,主动同步限流状态,避免后续短时间内继续选中。 + if resp.StatusCode == http.StatusTooManyRequests { + s.reconcileOpenAI429State(ctx, account, resp.Header, body) + } + } + + if resp.StatusCode != http.StatusOK { + if resp.StatusCode == http.StatusUnauthorized && s.accountRepo != nil { + errMsg := fmt.Sprintf("Authentication failed (401): %s", string(body)) + _ = s.accountRepo.SetError(ctx, account.ID, errMsg) + } + return s.sendErrorAndEnd(c, fmt.Sprintf("API returned %d: %s", resp.StatusCode, string(body))) + } + + s.sendEvent(c, TestEvent{Type: "content", Text: "Compact probe succeeded"}) + s.sendEvent(c, TestEvent{Type: "test_complete", Success: true}) + return nil +} + func (s *AccountTestService) reconcileOpenAI429State(ctx context.Context, account *Account, headers http.Header, body []byte) { if s == nil || s.accountRepo == nil || account == nil { return @@ -1297,7 +1412,7 @@ func (s *AccountTestService) RunTestBackground(ctx context.Context, accountID in ginCtx, _ := gin.CreateTestContext(w) ginCtx.Request = (&http.Request{}).WithContext(ctx) - testErr := s.TestAccountConnection(ginCtx, accountID, modelID, "") + testErr := s.TestAccountConnection(ginCtx, accountID, modelID, "", AccountTestModeDefault) finishedAt := time.Now() body := w.Body.String() diff --git a/backend/internal/service/account_test_service_openai_compact_test.go b/backend/internal/service/account_test_service_openai_compact_test.go new file mode 100644 index 00000000..9eb98fdc --- /dev/null +++ b/backend/internal/service/account_test_service_openai_compact_test.go @@ -0,0 +1,199 @@ +package service + +import ( + "bytes" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/require" + "github.com/tidwall/gjson" +) + +func TestAccountTestService_TestAccountConnection_OpenAICompactOAuthSuccessPersistsSupport(t *testing.T) { + gin.SetMode(gin.TestMode) + + updateCalls := make(chan map[string]any, 1) + account := Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + repo := &snapshotUpdateAccountRepo{ + stubOpenAIAccountRepo: stubOpenAIAccountRepo{accounts: []Account{account}}, + updateExtraCalls: updateCalls, + } + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}, "x-request-id": []string{"rid-probe"}}, + Body: io.NopCloser(strings.NewReader(`{"id":"cmp_probe","status":"completed"}`)), + }} + svc := &AccountTestService{ + accountRepo: repo, + httpUpstream: upstream, + } + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/api/v1/admin/accounts/1/test", bytes.NewReader(nil)) + + err := svc.TestAccountConnection(c, account.ID, "gpt-5.4", "", AccountTestModeCompact) + require.NoError(t, err) + + require.Equal(t, chatgptCodexAPIURL+"/compact", upstream.lastReq.URL.String()) + require.Equal(t, "chatgpt.com", upstream.lastReq.Host) + require.Equal(t, "application/json", upstream.lastReq.Header.Get("Accept")) + require.Equal(t, codexCLIVersion, upstream.lastReq.Header.Get("Version")) + require.NotEmpty(t, upstream.lastReq.Header.Get("Session_Id")) + require.Equal(t, codexCLIUserAgent, upstream.lastReq.Header.Get("User-Agent")) + require.Equal(t, "chatgpt-acc", upstream.lastReq.Header.Get("chatgpt-account-id")) + require.Equal(t, "gpt-5.4", gjson.GetBytes(upstream.lastBody, "model").String()) + + updates := <-updateCalls + require.Equal(t, true, updates["openai_compact_supported"]) + require.Equal(t, http.StatusOK, updates["openai_compact_last_status"]) + require.Contains(t, rec.Body.String(), `"type":"test_complete"`) +} + +func TestAccountTestService_TestAccountConnection_OpenAICompactOAuth404MarksUnsupported(t *testing.T) { + gin.SetMode(gin.TestMode) + + updateCalls := make(chan map[string]any, 1) + account := Account{ + ID: 2, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + } + repo := &snapshotUpdateAccountRepo{ + stubOpenAIAccountRepo: stubOpenAIAccountRepo{accounts: []Account{account}}, + updateExtraCalls: updateCalls, + } + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusNotFound, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(`404 page not found`)), + }} + svc := &AccountTestService{ + accountRepo: repo, + httpUpstream: upstream, + } + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/api/v1/admin/accounts/2/test", bytes.NewReader(nil)) + + err := svc.TestAccountConnection(c, account.ID, "gpt-5.4", "", AccountTestModeCompact) + require.Error(t, err) + + updates := <-updateCalls + require.Equal(t, false, updates["openai_compact_supported"]) + require.Equal(t, http.StatusNotFound, updates["openai_compact_last_status"]) + require.Contains(t, rec.Body.String(), `"type":"error"`) +} + +func TestAccountTestService_TestAccountConnection_OpenAICompactAPIKeyUsesCompactPath(t *testing.T) { + gin.SetMode(gin.TestMode) + + updateCalls := make(chan map[string]any, 1) + account := Account{ + ID: 3, + Name: "openai-apikey", + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "sk-test", + "base_url": "https://example.com/v1", + "compact_model_mapping": map[string]any{"gpt-5.4": "gpt-5.4-openai-compact"}, + }, + } + repo := &snapshotUpdateAccountRepo{ + stubOpenAIAccountRepo: stubOpenAIAccountRepo{accounts: []Account{account}}, + updateExtraCalls: updateCalls, + } + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(`{"id":"cmp_probe_apikey","status":"completed"}`)), + }} + svc := &AccountTestService{ + accountRepo: repo, + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/api/v1/admin/accounts/3/test", bytes.NewReader(nil)) + + err := svc.TestAccountConnection(c, account.ID, "gpt-5.4", "", AccountTestModeCompact) + require.NoError(t, err) + + require.Equal(t, "https://example.com/v1/responses/compact", upstream.lastReq.URL.String()) + require.Equal(t, "gpt-5.4-openai-compact", gjson.GetBytes(upstream.lastBody, "model").String()) + updates := <-updateCalls + require.Equal(t, true, updates["openai_compact_supported"]) +} + +func TestAccountTestService_TestAccountConnection_OpenAICompactAPIKeyDefaultBaseURLUsesV1Path(t *testing.T) { + gin.SetMode(gin.TestMode) + + updateCalls := make(chan map[string]any, 1) + account := Account{ + ID: 4, + Name: "openai-apikey-default", + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Credentials: map[string]any{ + "api_key": "sk-test", + }, + } + repo := &snapshotUpdateAccountRepo{ + stubOpenAIAccountRepo: stubOpenAIAccountRepo{accounts: []Account{account}}, + updateExtraCalls: updateCalls, + } + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(`{"id":"cmp_probe_apikey_default","status":"completed"}`)), + }} + svc := &AccountTestService{ + accountRepo: repo, + httpUpstream: upstream, + cfg: &config.Config{Security: config.SecurityConfig{URLAllowlist: config.URLAllowlistConfig{Enabled: false}}}, + } + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/api/v1/admin/accounts/4/test", bytes.NewReader(nil)) + + err := svc.TestAccountConnection(c, account.ID, "gpt-5.4", "", AccountTestModeCompact) + require.NoError(t, err) + require.Equal(t, "https://api.openai.com/v1/responses/compact", upstream.lastReq.URL.String()) + <-updateCalls +} diff --git a/backend/internal/service/account_test_service_openai_test.go b/backend/internal/service/account_test_service_openai_test.go index c1e42b4f..7202799d 100644 --- a/backend/internal/service/account_test_service_openai_test.go +++ b/backend/internal/service/account_test_service_openai_test.go @@ -117,7 +117,7 @@ func TestAccountTestService_OpenAISuccessPersistsSnapshotFromHeaders(t *testing. Credentials: map[string]any{"access_token": "test-token"}, } - err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "") + err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "", "") require.NoError(t, err) require.NotEmpty(t, repo.updatedExtra) require.Equal(t, 42.0, repo.updatedExtra["codex_5h_used_percent"]) @@ -149,7 +149,7 @@ func TestAccountTestService_OpenAI429PersistsSnapshotAndRateLimitState(t *testin Credentials: map[string]any{"access_token": "test-token"}, } - err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "") + err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "", "") require.Error(t, err) require.NotEmpty(t, repo.updatedExtra) require.Equal(t, 100.0, repo.updatedExtra["codex_5h_used_percent"]) @@ -180,7 +180,7 @@ func TestAccountTestService_OpenAI429BodyOnlyPersistsRateLimitAndClearsStaleErro Credentials: map[string]any{"access_token": "test-token"}, } - err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "") + err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "", "") require.Error(t, err) require.Equal(t, account.ID, repo.rateLimitedID) require.NotNil(t, repo.rateLimitedAt) @@ -209,7 +209,7 @@ func TestAccountTestService_OpenAI429ActiveAccountDoesNotClearError(t *testing.T Credentials: map[string]any{"access_token": "test-token"}, } - err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "") + err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "", "") require.Error(t, err) require.Equal(t, account.ID, repo.rateLimitedID) require.NotNil(t, repo.rateLimitedAt) @@ -237,7 +237,7 @@ func TestAccountTestService_OpenAI429WithoutResetSignalDoesNotMutateRuntimeState Credentials: map[string]any{"access_token": "test-token"}, } - err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "") + err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "", "") require.Error(t, err) require.Zero(t, repo.rateLimitedID) require.Nil(t, repo.rateLimitedAt) @@ -265,7 +265,7 @@ func TestAccountTestService_OpenAI401SetsPermanentErrorOnly(t *testing.T) { Credentials: map[string]any{"access_token": "test-token"}, } - err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "") + err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "", "") require.Error(t, err) require.Equal(t, account.ID, repo.setErrorID) require.Contains(t, repo.setErrorMsg, "Authentication failed (401)") diff --git a/backend/internal/service/openai_account_scheduler.go b/backend/internal/service/openai_account_scheduler.go index 808f1229..7a0a6636 100644 --- a/backend/internal/service/openai_account_scheduler.go +++ b/backend/internal/service/openai_account_scheduler.go @@ -3,7 +3,6 @@ package service import ( "container/heap" "context" - "errors" "fmt" "hash/fnv" "math" @@ -45,6 +44,7 @@ type OpenAIAccountScheduleRequest struct { RequestedModel string RequiredTransport OpenAIUpstreamTransport RequiredImageCapability OpenAIImagesCapability + RequireCompact bool ExcludedIDs map[int64]struct{} } @@ -258,12 +258,16 @@ func (s *defaultOpenAIAccountScheduler) Select( previousResponseID, req.RequestedModel, req.ExcludedIDs, + req.RequireCompact, ) if err != nil { return nil, decision, err } if selection != nil && selection.Account != nil { if !s.isAccountTransportCompatible(selection.Account, req.RequiredTransport) { + if selection.ReleaseFunc != nil { + selection.ReleaseFunc() + } selection = nil } } @@ -348,8 +352,8 @@ func (s *defaultOpenAIAccountScheduler) selectBySessionHash( _ = s.service.deleteStickySessionAccountID(ctx, req.GroupID, sessionHash) return nil, nil } - account = s.service.recheckSelectedOpenAIAccountFromDB(ctx, account, req.RequestedModel) - if account == nil { + account = s.service.recheckSelectedOpenAIAccountFromDB(ctx, account, req.RequestedModel, req.RequireCompact) + if account == nil || !s.isAccountTransportCompatible(account, req.RequiredTransport) { _ = s.service.deleteStickySessionAccountID(ctx, req.GroupID, sessionHash) return nil, nil } @@ -590,7 +594,7 @@ func (s *defaultOpenAIAccountScheduler) selectByLoadBalance( return nil, 0, 0, 0, err } if len(accounts) == 0 { - return nil, 0, 0, 0, errors.New("no available OpenAI accounts") + return nil, 0, 0, 0, noAvailableOpenAISelectionError(req.RequestedModel, false) } // require_privacy_set: 获取分组信息 @@ -630,7 +634,7 @@ func (s *defaultOpenAIAccountScheduler) selectByLoadBalance( }) } if len(filtered) == 0 { - return nil, 0, 0, 0, errors.New("no available OpenAI accounts") + return nil, 0, 0, 0, noAvailableOpenAISelectionError(req.RequestedModel, false) } loadMap := map[int64]*AccountLoadInfo{} @@ -640,45 +644,14 @@ func (s *defaultOpenAIAccountScheduler) selectByLoadBalance( } } - minPriority, maxPriority := filtered[0].Priority, filtered[0].Priority - maxWaiting := 1 - loadRateSum := 0.0 - loadRateSumSquares := 0.0 - minTTFT, maxTTFT := 0.0, 0.0 - hasTTFTSample := false - candidates := make([]openAIAccountCandidateScore, 0, len(filtered)) + allCandidates := make([]openAIAccountCandidateScore, 0, len(filtered)) for _, account := range filtered { loadInfo := loadMap[account.ID] if loadInfo == nil { loadInfo = &AccountLoadInfo{AccountID: account.ID} } - if account.Priority < minPriority { - minPriority = account.Priority - } - if account.Priority > maxPriority { - maxPriority = account.Priority - } - if loadInfo.WaitingCount > maxWaiting { - maxWaiting = loadInfo.WaitingCount - } errorRate, ttft, hasTTFT := s.stats.snapshot(account.ID) - if hasTTFT && ttft > 0 { - if !hasTTFTSample { - minTTFT, maxTTFT = ttft, ttft - hasTTFTSample = true - } else { - if ttft < minTTFT { - minTTFT = ttft - } - if ttft > maxTTFT { - maxTTFT = ttft - } - } - } - loadRate := float64(loadInfo.LoadRate) - loadRateSum += loadRate - loadRateSumSquares += loadRate * loadRate - candidates = append(candidates, openAIAccountCandidateScore{ + allCandidates = append(allCandidates, openAIAccountCandidateScore{ account: account, loadInfo: loadInfo, errorRate: errorRate, @@ -686,53 +659,183 @@ func (s *defaultOpenAIAccountScheduler) selectByLoadBalance( hasTTFT: hasTTFT, }) } - loadSkew := calcLoadSkewByMoments(loadRateSum, loadRateSumSquares, len(candidates)) - weights := s.service.openAIWSSchedulerWeights() - for i := range candidates { - item := &candidates[i] - priorityFactor := 1.0 - if maxPriority > minPriority { - priorityFactor = 1 - float64(item.account.Priority-minPriority)/float64(maxPriority-minPriority) + // Compact 模式下把明确不支持 compact 的账号拆出,仅在 schedulerSnapshot 启用 + // 时作为最后兜底(snapshot 可能已陈旧)。 + candidates := allCandidates + staleSnapshotCompactRetry := make([]openAIAccountCandidateScore, 0, len(allCandidates)) + if req.RequireCompact { + candidates = make([]openAIAccountCandidateScore, 0, len(allCandidates)) + for _, candidate := range allCandidates { + if openAICompactSupportTier(candidate.account) == 0 { + staleSnapshotCompactRetry = append(staleSnapshotCompactRetry, candidate) + continue + } + candidates = append(candidates, candidate) } - loadFactor := 1 - clamp01(float64(item.loadInfo.LoadRate)/100.0) - queueFactor := 1 - clamp01(float64(item.loadInfo.WaitingCount)/float64(maxWaiting)) - errorFactor := 1 - clamp01(item.errorRate) - ttftFactor := 0.5 - if item.hasTTFT && hasTTFTSample && maxTTFT > minTTFT { - ttftFactor = 1 - clamp01((item.ttft-minTTFT)/(maxTTFT-minTTFT)) + if len(candidates) == 0 && len(staleSnapshotCompactRetry) == 0 { + return nil, 0, 0, 0, ErrNoAvailableCompactAccounts } - - item.score = weights.Priority*priorityFactor + - weights.Load*loadFactor + - weights.Queue*queueFactor + - weights.ErrorRate*errorFactor + - weights.TTFT*ttftFactor } - topK := s.service.openAIWSLBTopK() - if topK > len(candidates) { - topK = len(candidates) - } - if topK <= 0 { - topK = 1 - } - rankedCandidates := selectTopKOpenAICandidates(candidates, topK) - selectionOrder := buildOpenAIWeightedSelectionOrder(rankedCandidates, req) + candidateCount := len(candidates) + loadSkew := 0.0 + if len(candidates) > 0 { + minPriority, maxPriority := candidates[0].account.Priority, candidates[0].account.Priority + maxWaiting := 1 + loadRateSum := 0.0 + loadRateSumSquares := 0.0 + minTTFT, maxTTFT := 0.0, 0.0 + hasTTFTSample := false + for _, candidate := range candidates { + if candidate.account.Priority < minPriority { + minPriority = candidate.account.Priority + } + if candidate.account.Priority > maxPriority { + maxPriority = candidate.account.Priority + } + if candidate.loadInfo.WaitingCount > maxWaiting { + maxWaiting = candidate.loadInfo.WaitingCount + } + if candidate.hasTTFT && candidate.ttft > 0 { + if !hasTTFTSample { + minTTFT, maxTTFT = candidate.ttft, candidate.ttft + hasTTFTSample = true + } else { + if candidate.ttft < minTTFT { + minTTFT = candidate.ttft + } + if candidate.ttft > maxTTFT { + maxTTFT = candidate.ttft + } + } + } + loadRate := float64(candidate.loadInfo.LoadRate) + loadRateSum += loadRate + loadRateSumSquares += loadRate * loadRate + } + loadSkew = calcLoadSkewByMoments(loadRateSum, loadRateSumSquares, len(candidates)) + weights := s.service.openAIWSSchedulerWeights() + for i := range candidates { + item := &candidates[i] + priorityFactor := 1.0 + if maxPriority > minPriority { + priorityFactor = 1 - float64(item.account.Priority-minPriority)/float64(maxPriority-minPriority) + } + loadFactor := 1 - clamp01(float64(item.loadInfo.LoadRate)/100.0) + queueFactor := 1 - clamp01(float64(item.loadInfo.WaitingCount)/float64(maxWaiting)) + errorFactor := 1 - clamp01(item.errorRate) + ttftFactor := 0.5 + if item.hasTTFT && hasTTFTSample && maxTTFT > minTTFT { + ttftFactor = 1 - clamp01((item.ttft-minTTFT)/(maxTTFT-minTTFT)) + } + + item.score = weights.Priority*priorityFactor + + weights.Load*loadFactor + + weights.Queue*queueFactor + + weights.ErrorRate*errorFactor + + weights.TTFT*ttftFactor + } + } + + topK := 0 + if len(candidates) > 0 { + topK = s.service.openAIWSLBTopK() + if topK > len(candidates) { + topK = len(candidates) + } + if topK <= 0 { + topK = 1 + } + } + + buildSelectionOrder := func(pool []openAIAccountCandidateScore) []openAIAccountCandidateScore { + if len(pool) == 0 || topK <= 0 { + return nil + } + groupTopK := topK + if groupTopK > len(pool) { + groupTopK = len(pool) + } + ranked := selectTopKOpenAICandidates(pool, groupTopK) + return buildOpenAIWeightedSelectionOrder(ranked, req) + } + sortCompactRetryCandidates := func(pool []openAIAccountCandidateScore) []openAIAccountCandidateScore { + if len(pool) == 0 { + return nil + } + ordered := append([]openAIAccountCandidateScore(nil), pool...) + sort.SliceStable(ordered, func(i, j int) bool { + a, b := ordered[i], ordered[j] + if a.account.Priority != b.account.Priority { + return a.account.Priority < b.account.Priority + } + if a.loadInfo.LoadRate != b.loadInfo.LoadRate { + return a.loadInfo.LoadRate < b.loadInfo.LoadRate + } + if a.loadInfo.WaitingCount != b.loadInfo.WaitingCount { + return a.loadInfo.WaitingCount < b.loadInfo.WaitingCount + } + switch { + case a.account.LastUsedAt == nil && b.account.LastUsedAt != nil: + return true + case a.account.LastUsedAt != nil && b.account.LastUsedAt == nil: + return false + case a.account.LastUsedAt == nil && b.account.LastUsedAt == nil: + return false + default: + return a.account.LastUsedAt.Before(*b.account.LastUsedAt) + } + }) + return ordered + } + + selectionOrder := make([]openAIAccountCandidateScore, 0, len(allCandidates)) + if req.RequireCompact { + supported := make([]openAIAccountCandidateScore, 0, len(candidates)) + unknown := make([]openAIAccountCandidateScore, 0, len(candidates)) + for _, candidate := range candidates { + switch openAICompactSupportTier(candidate.account) { + case 2: + supported = append(supported, candidate) + case 1: + unknown = append(unknown, candidate) + } + } + if len(supported) == 0 && len(unknown) == 0 && s.service.schedulerSnapshot == nil { + return nil, candidateCount, topK, loadSkew, ErrNoAvailableCompactAccounts + } + selectionOrder = append(selectionOrder, buildSelectionOrder(supported)...) + selectionOrder = append(selectionOrder, buildSelectionOrder(unknown)...) + if len(staleSnapshotCompactRetry) > 0 && s.service.schedulerSnapshot != nil { + selectionOrder = append(selectionOrder, sortCompactRetryCandidates(staleSnapshotCompactRetry)...) + } + } else { + selectionOrder = buildSelectionOrder(candidates) + } + if len(selectionOrder) == 0 { + return nil, candidateCount, topK, loadSkew, noAvailableOpenAISelectionError(req.RequestedModel, req.RequireCompact && len(allCandidates) > 0) + } + + compactBlocked := false for i := 0; i < len(selectionOrder); i++ { candidate := selectionOrder[i] - fresh := s.service.resolveFreshSchedulableOpenAIAccount(ctx, candidate.account, req.RequestedModel) + fresh := s.service.resolveFreshSchedulableOpenAIAccount(ctx, candidate.account, req.RequestedModel, false) if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) || !s.isAccountRequestCompatible(fresh, req) { continue } - fresh = s.service.recheckSelectedOpenAIAccountFromDB(ctx, fresh, req.RequestedModel) + fresh = s.service.recheckSelectedOpenAIAccountFromDB(ctx, fresh, req.RequestedModel, false) if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) || !s.isAccountRequestCompatible(fresh, req) { continue } + if req.RequireCompact && openAICompactSupportTier(fresh) == 0 { + compactBlocked = true + continue + } result, acquireErr := s.service.tryAcquireAccountSlot(ctx, fresh.ID, fresh.Concurrency) if acquireErr != nil { - return nil, len(candidates), topK, loadSkew, acquireErr + return nil, candidateCount, topK, loadSkew, acquireErr } if result != nil && result.Acquired { if req.SessionHash != "" { @@ -742,17 +845,25 @@ func (s *defaultOpenAIAccountScheduler) selectByLoadBalance( Account: fresh, Acquired: true, ReleaseFunc: result.ReleaseFunc, - }, len(candidates), topK, loadSkew, nil + }, candidateCount, topK, loadSkew, nil } } cfg := s.service.schedulingConfig() // WaitPlan.MaxConcurrency 使用 Concurrency(非 EffectiveLoadFactor),因为 WaitPlan 控制的是 Redis 实际并发槽位等待。 for _, candidate := range selectionOrder { - fresh := s.service.resolveFreshSchedulableOpenAIAccount(ctx, candidate.account, req.RequestedModel) + fresh := s.service.resolveFreshSchedulableOpenAIAccount(ctx, candidate.account, req.RequestedModel, false) if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) || !s.isAccountRequestCompatible(fresh, req) { continue } + fresh = s.service.recheckSelectedOpenAIAccountFromDB(ctx, fresh, req.RequestedModel, false) + if fresh == nil || !s.isAccountTransportCompatible(fresh, req.RequiredTransport) || !s.isAccountRequestCompatible(fresh, req) { + continue + } + if req.RequireCompact && openAICompactSupportTier(fresh) == 0 { + compactBlocked = true + continue + } return &AccountSelectionResult{ Account: fresh, WaitPlan: &AccountWaitPlan{ @@ -761,10 +872,10 @@ func (s *defaultOpenAIAccountScheduler) selectByLoadBalance( Timeout: cfg.FallbackWaitTimeout, MaxWaiting: cfg.FallbackMaxWaiting, }, - }, len(candidates), topK, loadSkew, nil + }, candidateCount, topK, loadSkew, nil } - return nil, len(candidates), topK, loadSkew, ErrNoAvailableAccounts + return nil, candidateCount, topK, loadSkew, noAvailableOpenAISelectionError(req.RequestedModel, compactBlocked) } func (s *defaultOpenAIAccountScheduler) isAccountTransportCompatible(account *Account, requiredTransport OpenAIUpstreamTransport) bool { @@ -905,8 +1016,9 @@ func (s *OpenAIGatewayService) SelectAccountWithScheduler( requestedModel string, excludedIDs map[int64]struct{}, requiredTransport OpenAIUpstreamTransport, + requireCompact bool, ) (*AccountSelectionResult, OpenAIAccountScheduleDecision, error) { - return s.selectAccountWithScheduler(ctx, groupID, previousResponseID, sessionHash, requestedModel, excludedIDs, requiredTransport, "") + return s.selectAccountWithScheduler(ctx, groupID, previousResponseID, sessionHash, requestedModel, excludedIDs, requiredTransport, "", requireCompact) } func (s *OpenAIGatewayService) SelectAccountWithSchedulerForImages( @@ -917,13 +1029,13 @@ func (s *OpenAIGatewayService) SelectAccountWithSchedulerForImages( excludedIDs map[int64]struct{}, requiredCapability OpenAIImagesCapability, ) (*AccountSelectionResult, OpenAIAccountScheduleDecision, error) { - selection, decision, err := s.selectAccountWithScheduler(ctx, groupID, "", sessionHash, requestedModel, excludedIDs, OpenAIUpstreamTransportHTTPSSE, requiredCapability) + selection, decision, err := s.selectAccountWithScheduler(ctx, groupID, "", sessionHash, requestedModel, excludedIDs, OpenAIUpstreamTransportHTTPSSE, requiredCapability, false) if err == nil && selection != nil && selection.Account != nil { return selection, decision, nil } // 如果要求 native 能力(如指定了模型)但没有可用的 APIKey 账号,回退到 basic(OAuth 账号) if requiredCapability == OpenAIImagesCapabilityNative { - return s.selectAccountWithScheduler(ctx, groupID, "", sessionHash, requestedModel, excludedIDs, OpenAIUpstreamTransportHTTPSSE, OpenAIImagesCapabilityBasic) + return s.selectAccountWithScheduler(ctx, groupID, "", sessionHash, requestedModel, excludedIDs, OpenAIUpstreamTransportHTTPSSE, OpenAIImagesCapabilityBasic, false) } return selection, decision, err } @@ -937,6 +1049,7 @@ func (s *OpenAIGatewayService) selectAccountWithScheduler( excludedIDs map[int64]struct{}, requiredTransport OpenAIUpstreamTransport, requiredImageCapability OpenAIImagesCapability, + requireCompact bool, ) (*AccountSelectionResult, OpenAIAccountScheduleDecision, error) { decision := OpenAIAccountScheduleDecision{} scheduler := s.getOpenAIAccountScheduler(ctx) @@ -945,7 +1058,7 @@ func (s *OpenAIGatewayService) selectAccountWithScheduler( if requiredTransport == OpenAIUpstreamTransportAny || requiredTransport == OpenAIUpstreamTransportHTTPSSE { effectiveExcludedIDs := cloneExcludedAccountIDs(excludedIDs) for { - selection, err := s.SelectAccountWithLoadAwareness(ctx, groupID, sessionHash, requestedModel, effectiveExcludedIDs) + selection, err := s.selectAccountWithLoadAwareness(ctx, groupID, sessionHash, requestedModel, effectiveExcludedIDs, requireCompact) if err != nil { return nil, decision, err } @@ -970,7 +1083,7 @@ func (s *OpenAIGatewayService) selectAccountWithScheduler( effectiveExcludedIDs := cloneExcludedAccountIDs(excludedIDs) for { - selection, err := s.SelectAccountWithLoadAwareness(ctx, groupID, sessionHash, requestedModel, effectiveExcludedIDs) + selection, err := s.selectAccountWithLoadAwareness(ctx, groupID, sessionHash, requestedModel, effectiveExcludedIDs, requireCompact) if err != nil { return nil, decision, err } @@ -1008,6 +1121,7 @@ func (s *OpenAIGatewayService) selectAccountWithScheduler( RequestedModel: requestedModel, RequiredTransport: requiredTransport, RequiredImageCapability: requiredImageCapability, + RequireCompact: requireCompact, ExcludedIDs: excludedIDs, }) } diff --git a/backend/internal/service/openai_account_scheduler_compact_test.go b/backend/internal/service/openai_account_scheduler_compact_test.go new file mode 100644 index 00000000..f7e08a20 --- /dev/null +++ b/backend/internal/service/openai_account_scheduler_compact_test.go @@ -0,0 +1,195 @@ +package service + +import ( + "context" + "errors" + "testing" + + "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/stretchr/testify/require" +) + +// TestOpenAIGatewayService_SelectAccountWithScheduler_CompactPrefersSupportedOverUnknown +// 验证 compact 调度时显式支持 (tier=2) 优先于未探测 (tier=1)。 +func TestOpenAIGatewayService_SelectAccountWithScheduler_CompactPrefersSupportedOverUnknown(t *testing.T) { + resetOpenAIAdvancedSchedulerSettingCacheForTest() + + ctx := context.Background() + groupID := int64(91001) + accounts := []Account{ + { + ID: 71001, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{}, // unknown + }, + { + ID: 71002, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{"openai_compact_supported": true}, // tier=2 + }, + } + cfg := &config.Config{} + cfg.Gateway.Scheduling.LoadBatchEnabled = false + svc := &OpenAIGatewayService{ + accountRepo: schedulerTestOpenAIAccountRepo{accounts: accounts}, + cache: &schedulerTestGatewayCache{}, + cfg: cfg, + concurrencyService: NewConcurrencyService(schedulerTestConcurrencyCache{}), + } + + selection, _, err := svc.SelectAccountWithScheduler( + ctx, + &groupID, + "", + "", + "gpt-5.4", + nil, + OpenAIUpstreamTransportAny, + true, + ) + require.NoError(t, err) + require.NotNil(t, selection) + require.NotNil(t, selection.Account) + require.Equal(t, int64(71002), selection.Account.ID, "compact-supported account should win over unknown") +} + +// TestOpenAIGatewayService_SelectAccountWithScheduler_CompactRejectsExplicitlyUnsupported +// 验证 force_off / 已探测不支持 (tier=0) 的账号不会被 compact 请求选中。 +func TestOpenAIGatewayService_SelectAccountWithScheduler_CompactRejectsExplicitlyUnsupported(t *testing.T) { + resetOpenAIAdvancedSchedulerSettingCacheForTest() + + ctx := context.Background() + groupID := int64(91002) + accounts := []Account{ + { + ID: 71010, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{"openai_compact_mode": OpenAICompactModeForceOff}, + }, + { + ID: 71011, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{"openai_compact_supported": false}, + }, + } + cfg := &config.Config{} + cfg.Gateway.Scheduling.LoadBatchEnabled = false + svc := &OpenAIGatewayService{ + accountRepo: schedulerTestOpenAIAccountRepo{accounts: accounts}, + cache: &schedulerTestGatewayCache{}, + cfg: cfg, + concurrencyService: NewConcurrencyService(schedulerTestConcurrencyCache{}), + } + + selection, _, err := svc.SelectAccountWithScheduler( + ctx, + &groupID, + "", + "", + "gpt-5.4", + nil, + OpenAIUpstreamTransportAny, + true, + ) + require.Error(t, err) + require.True(t, errors.Is(err, ErrNoAvailableCompactAccounts), "compact-only accounts should rejected explicitly unsupported and return compact error") + require.Nil(t, selection) +} + +// TestOpenAIGatewayService_SelectAccountWithScheduler_CompactFallsBackToUnknown +// 验证当没有"已知支持"账号时,compact 请求会回退到"未探测"账号。 +func TestOpenAIGatewayService_SelectAccountWithScheduler_CompactFallsBackToUnknown(t *testing.T) { + resetOpenAIAdvancedSchedulerSettingCacheForTest() + + ctx := context.Background() + groupID := int64(91003) + accounts := []Account{ + { + ID: 71020, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{"openai_compact_supported": false}, // tier=0 + }, + { + ID: 71021, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + Extra: map[string]any{}, // unknown -> tier=1 + }, + } + cfg := &config.Config{} + cfg.Gateway.Scheduling.LoadBatchEnabled = false + svc := &OpenAIGatewayService{ + accountRepo: schedulerTestOpenAIAccountRepo{accounts: accounts}, + cache: &schedulerTestGatewayCache{}, + cfg: cfg, + concurrencyService: NewConcurrencyService(schedulerTestConcurrencyCache{}), + } + + selection, _, err := svc.SelectAccountWithScheduler( + ctx, + &groupID, + "", + "", + "gpt-5.4", + nil, + OpenAIUpstreamTransportAny, + true, + ) + require.NoError(t, err) + require.NotNil(t, selection) + require.NotNil(t, selection.Account) + require.Equal(t, int64(71021), selection.Account.ID, "unknown account should be picked when no supported account available") +} + +// TestOpenAICompactSupportTier 验证 tier 分类逻辑。 +func TestOpenAICompactSupportTier(t *testing.T) { + tests := []struct { + name string + account *Account + want int + }{ + {name: "nil", account: nil, want: 0}, + {name: "non openai", account: &Account{Platform: PlatformAnthropic}, want: 0}, + {name: "openai unknown", account: &Account{Platform: PlatformOpenAI, Extra: map[string]any{}}, want: 1}, + {name: "openai supported", account: &Account{Platform: PlatformOpenAI, Extra: map[string]any{"openai_compact_supported": true}}, want: 2}, + {name: "openai unsupported", account: &Account{Platform: PlatformOpenAI, Extra: map[string]any{"openai_compact_supported": false}}, want: 0}, + {name: "force on", account: &Account{Platform: PlatformOpenAI, Extra: map[string]any{"openai_compact_mode": OpenAICompactModeForceOn}}, want: 2}, + {name: "force off overrides probe true", account: &Account{Platform: PlatformOpenAI, Extra: map[string]any{"openai_compact_mode": OpenAICompactModeForceOff, "openai_compact_supported": true}}, want: 0}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := openAICompactSupportTier(tt.account); got != tt.want { + t.Fatalf("openAICompactSupportTier(...) = %d, want %d", got, tt.want) + } + }) + } +} diff --git a/backend/internal/service/openai_account_scheduler_test.go b/backend/internal/service/openai_account_scheduler_test.go index b02370cb..0950ee54 100644 --- a/backend/internal/service/openai_account_scheduler_test.go +++ b/backend/internal/service/openai_account_scheduler_test.go @@ -289,6 +289,7 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_DefaultDisabledUsesLega "gpt-5.1", nil, OpenAIUpstreamTransportAny, + false, ) require.NoError(t, err) require.NotNil(t, selection) @@ -343,6 +344,7 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_DefaultDisabled_Require "gpt-5.1", nil, OpenAIUpstreamTransportResponsesWebsocketV2, + false, ) require.NoError(t, err) require.NotNil(t, selection) @@ -384,6 +386,7 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_DefaultDisabled_Require "gpt-5.1", nil, OpenAIUpstreamTransportResponsesWebsocketV2, + false, ) require.ErrorContains(t, err, "no available OpenAI accounts") require.Nil(t, selection) @@ -445,6 +448,7 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_EnabledUsesAdvancedPrev "gpt-5.1", nil, OpenAIUpstreamTransportAny, + false, ) require.NoError(t, err) require.NotNil(t, selection) @@ -486,7 +490,7 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_SessionStickyRateLimite concurrencyService: NewConcurrencyService(schedulerTestConcurrencyCache{}), } - selection, decision, err := svc.SelectAccountWithScheduler(ctx, &groupID, "", "session_hash_rate_limited", "gpt-5.1", nil, OpenAIUpstreamTransportAny) + selection, decision, err := svc.SelectAccountWithScheduler(ctx, &groupID, "", "session_hash_rate_limited", "gpt-5.1", nil, OpenAIUpstreamTransportAny, false) require.NoError(t, err) require.NotNil(t, selection) require.NotNil(t, selection.Account) @@ -540,7 +544,7 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_SessionStickyDBRuntimeR concurrencyService: NewConcurrencyService(schedulerTestConcurrencyCache{}), } - selection, decision, err := svc.SelectAccountWithScheduler(ctx, &groupID, "", "session_hash_db_runtime_recheck", "gpt-5.1", nil, OpenAIUpstreamTransportAny) + selection, decision, err := svc.SelectAccountWithScheduler(ctx, &groupID, "", "session_hash_db_runtime_recheck", "gpt-5.1", nil, OpenAIUpstreamTransportAny, false) require.NoError(t, err) require.NotNil(t, selection) require.NotNil(t, selection.Account) @@ -616,6 +620,7 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_PreviousResponseSticky( "gpt-5.1", nil, OpenAIUpstreamTransportAny, + false, ) require.NoError(t, err) require.NotNil(t, selection) @@ -662,6 +667,7 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_SessionSticky(t *testin "gpt-5.1", nil, OpenAIUpstreamTransportAny, + false, ) require.NoError(t, err) require.NotNil(t, selection) @@ -740,6 +746,7 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_SessionStickyBusyKeepsS "gpt-5.1", nil, OpenAIUpstreamTransportAny, + false, ) require.NoError(t, err) require.NotNil(t, selection) @@ -788,6 +795,7 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_SessionSticky_ForceHTTP "gpt-5.1", nil, OpenAIUpstreamTransportAny, + false, ) require.NoError(t, err) require.NotNil(t, selection) @@ -857,6 +865,7 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_RequiredWSV2_SkipsStick "gpt-5.1", nil, OpenAIUpstreamTransportResponsesWebsocketV2, + false, ) require.NoError(t, err) require.NotNil(t, selection) @@ -900,6 +909,7 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_RequiredWSV2_NoAvailabl "gpt-5.1", nil, OpenAIUpstreamTransportResponsesWebsocketV2, + false, ) require.Error(t, err) require.Nil(t, selection) @@ -976,6 +986,7 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_LoadBalanceTopKFallback "gpt-5.1", nil, OpenAIUpstreamTransportAny, + false, ) require.NoError(t, err) require.NotNil(t, selection) @@ -1014,7 +1025,7 @@ func TestOpenAIGatewayService_OpenAIAccountSchedulerMetrics(t *testing.T) { concurrencyService: NewConcurrencyService(schedulerTestConcurrencyCache{}), } - selection, _, err := svc.SelectAccountWithScheduler(ctx, &groupID, "", "session_hash_metrics", "gpt-5.1", nil, OpenAIUpstreamTransportAny) + selection, _, err := svc.SelectAccountWithScheduler(ctx, &groupID, "", "session_hash_metrics", "gpt-5.1", nil, OpenAIUpstreamTransportAny, false) require.NoError(t, err) require.NotNil(t, selection) svc.ReportOpenAIAccountScheduleResult(account.ID, true, intPtrForTest(120)) @@ -1218,6 +1229,7 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_LoadBalanceDistributesA "gpt-5.1", nil, OpenAIUpstreamTransportAny, + false, ) require.NoError(t, err) require.NotNil(t, selection) diff --git a/backend/internal/service/openai_account_scheduler_ws_snapshot_test.go b/backend/internal/service/openai_account_scheduler_ws_snapshot_test.go index ddafc6eb..8d63e68e 100644 --- a/backend/internal/service/openai_account_scheduler_ws_snapshot_test.go +++ b/backend/internal/service/openai_account_scheduler_ws_snapshot_test.go @@ -54,6 +54,7 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_UsesWSPassthroughSnapsh "gpt-5.1", nil, OpenAIUpstreamTransportResponsesWebsocketV2, + false, ) require.NoError(t, err) require.NotNil(t, selection) diff --git a/backend/internal/service/openai_compact_model_mapping_test.go b/backend/internal/service/openai_compact_model_mapping_test.go new file mode 100644 index 00000000..fc408e64 --- /dev/null +++ b/backend/internal/service/openai_compact_model_mapping_test.go @@ -0,0 +1,135 @@ +package service + +import ( + "bytes" + "context" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/require" + "github.com/tidwall/gjson" +) + +func TestOpenAIGatewayService_Forward_CompactOnlyModelMappingOverridesOAuthUpstreamModel(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.4","stream":false,"instructions":"compact-test","input":"hello"}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses/compact", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}, "x-request-id": []string{"rid-compact-map"}}, + Body: io.NopCloser(strings.NewReader(`{"id":"resp_123","status":"completed","model":"gpt-5.4-openai-compact","output":[],"usage":{"input_tokens":1,"output_tokens":1}}`)), + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 1, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + "compact_model_mapping": map[string]any{"gpt-5.4": "gpt-5.4-openai-compact"}, + }, + Status: StatusActive, + Schedulable: true, + } + + result, err := svc.Forward(context.Background(), c, account, body) + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, "gpt-5.4", result.Model) + require.Equal(t, "gpt-5.4-openai-compact", result.UpstreamModel) + require.Equal(t, "gpt-5.4-openai-compact", gjson.GetBytes(upstream.lastBody, "model").String()) +} + +func TestOpenAIGatewayService_Forward_NonCompactRequestIgnoresCompactOnlyModelMapping(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.4","stream":false,"instructions":"normal-test","input":"hello"}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}, "x-request-id": []string{"rid-normal-map"}}, + Body: io.NopCloser(strings.NewReader(`{"id":"resp_124","status":"completed","model":"gpt-5.4","output":[],"usage":{"input_tokens":1,"output_tokens":1}}`)), + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 2, + Name: "openai-oauth", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + "compact_model_mapping": map[string]any{"gpt-5.4": "gpt-5.4-openai-compact"}, + }, + Status: StatusActive, + Schedulable: true, + } + + result, err := svc.Forward(context.Background(), c, account, body) + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, "gpt-5.4", result.Model) + require.Equal(t, "gpt-5.4", result.UpstreamModel) + require.Equal(t, "gpt-5.4", gjson.GetBytes(upstream.lastBody, "model").String()) +} + +func TestOpenAIGatewayService_OAuthPassthrough_CompactOnlyModelMappingOverridesUpstreamModel(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses/compact", bytes.NewReader(nil)) + c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0") + c.Request.Header.Set("Content-Type", "application/json") + + originalBody := []byte(`{"model":"gpt-5.4","stream":true,"store":true,"instructions":"compact-pass","input":[{"type":"text","text":"compact me"}]}`) + upstream := &httpUpstreamRecorder{resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}, "x-request-id": []string{"rid-compact-pass-map"}}, + Body: io.NopCloser(strings.NewReader(`{"id":"cmp_124","model":"gpt-5.4-openai-compact","usage":{"input_tokens":2,"output_tokens":3}}`)), + }} + + svc := &OpenAIGatewayService{httpUpstream: upstream} + account := &Account{ + ID: 3, + Name: "openai-oauth-pass", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + "compact_model_mapping": map[string]any{"gpt-5.4": "gpt-5.4-openai-compact"}, + }, + Extra: map[string]any{"openai_passthrough": true}, + Status: StatusActive, + Schedulable: true, + } + + result, err := svc.Forward(context.Background(), c, account, originalBody) + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, "gpt-5.4", result.Model) + require.Equal(t, "gpt-5.4-openai-compact", result.UpstreamModel) + require.Equal(t, "gpt-5.4-openai-compact", gjson.GetBytes(upstream.lastBody, "model").String()) + require.Equal(t, "gpt-5.4", gjson.GetBytes(rec.Body.Bytes(), "model").String()) +} diff --git a/backend/internal/service/openai_compact_probe.go b/backend/internal/service/openai_compact_probe.go new file mode 100644 index 00000000..e8deff2d --- /dev/null +++ b/backend/internal/service/openai_compact_probe.go @@ -0,0 +1,120 @@ +package service + +import ( + "net/http" + "strconv" + "strings" + "time" +) + +const ( + // AccountTestModeDefault drives the standard /responses connection test. + AccountTestModeDefault = "default" + // AccountTestModeCompact drives the /responses/compact compact-probe test. + AccountTestModeCompact = "compact" +) + +func normalizeAccountTestMode(mode string) string { + switch strings.ToLower(strings.TrimSpace(mode)) { + case AccountTestModeCompact: + return AccountTestModeCompact + default: + return AccountTestModeDefault + } +} + +func createOpenAICompactProbePayload(model string) map[string]any { + return map[string]any{ + "model": strings.TrimSpace(model), + "instructions": "You are a helpful coding assistant.", + "input": []any{ + map[string]any{ + "type": "message", + "role": "user", + "content": "Respond with OK.", + }, + }, + } +} + +func shouldMarkOpenAICompactUnsupported(status int, body []byte) bool { + switch status { + case http.StatusNotFound, http.StatusMethodNotAllowed, http.StatusNotImplemented: + return true + case http.StatusBadRequest, http.StatusForbidden, http.StatusUnprocessableEntity: + lower := strings.ToLower(strings.TrimSpace(extractUpstreamErrorMessage(body) + " " + string(body))) + if strings.Contains(lower, "compact") { + for _, keyword := range []string{ + "unsupported", + "not support", + "does not support", + "not available", + "disabled", + } { + if strings.Contains(lower, keyword) { + return true + } + } + } + } + return false +} + +func buildOpenAICompactProbeExtraUpdates(resp *http.Response, body []byte, probeErr error, now time.Time) map[string]any { + updates := map[string]any{ + "openai_compact_checked_at": now.Format(time.RFC3339), + "openai_compact_last_status": nil, + } + + if resp != nil { + updates["openai_compact_last_status"] = resp.StatusCode + } + + switch { + case probeErr != nil: + updates["openai_compact_last_error"] = truncateString(sanitizeUpstreamErrorMessage(probeErr.Error()), 2048) + case resp == nil: + updates["openai_compact_last_error"] = "compact probe failed" + default: + errMsg := strings.TrimSpace(extractUpstreamErrorMessage(body)) + if errMsg == "" && len(body) > 0 { + errMsg = strings.TrimSpace(string(body)) + } + if errMsg == "" && (resp.StatusCode < 200 || resp.StatusCode >= 300) { + errMsg = "HTTP " + strconv.Itoa(resp.StatusCode) + } + errMsg = truncateString(sanitizeUpstreamErrorMessage(errMsg), 2048) + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + updates["openai_compact_supported"] = true + updates["openai_compact_last_error"] = "" + } else { + if shouldMarkOpenAICompactUnsupported(resp.StatusCode, body) { + updates["openai_compact_supported"] = false + } + updates["openai_compact_last_error"] = errMsg + } + } + + return updates +} + +func mergeExtraUpdates(base map[string]any, more map[string]any) map[string]any { + if len(base) == 0 && len(more) == 0 { + return nil + } + out := make(map[string]any, len(base)+len(more)) + for key, value := range base { + out[key] = value + } + for key, value := range more { + out[key] = value + } + return out +} + +func compactProbeSessionID(accountID int64) string { + if accountID <= 0 { + return "probe_compact" + } + return "probe_compact_" + strconv.FormatInt(accountID, 10) +} diff --git a/backend/internal/service/openai_compact_probe_test.go b/backend/internal/service/openai_compact_probe_test.go new file mode 100644 index 00000000..fe3ba0e8 --- /dev/null +++ b/backend/internal/service/openai_compact_probe_test.go @@ -0,0 +1,122 @@ +package service + +import ( + "errors" + "net/http" + "testing" + "time" +) + +func TestNormalizeAccountTestMode(t *testing.T) { + tests := []struct { + input string + want string + }{ + {input: "", want: AccountTestModeDefault}, + {input: "default", want: AccountTestModeDefault}, + {input: " compact ", want: AccountTestModeCompact}, + {input: "COMPACT", want: AccountTestModeCompact}, + {input: "unknown", want: AccountTestModeDefault}, + } + + for _, tt := range tests { + if got := normalizeAccountTestMode(tt.input); got != tt.want { + t.Fatalf("normalizeAccountTestMode(%q) = %q, want %q", tt.input, got, tt.want) + } + } +} + +func TestBuildOpenAICompactProbeExtraUpdates_SuccessMarksSupported(t *testing.T) { + now := time.Date(2026, 4, 10, 10, 0, 0, 0, time.UTC) + updates := buildOpenAICompactProbeExtraUpdates(&http.Response{StatusCode: http.StatusOK}, []byte(`{"id":"cmp_1"}`), nil, now) + + if got := updates["openai_compact_supported"]; got != true { + t.Fatalf("openai_compact_supported = %v, want true", got) + } + if got := updates["openai_compact_last_status"]; got != http.StatusOK { + t.Fatalf("openai_compact_last_status = %v, want %d", got, http.StatusOK) + } + if got := updates["openai_compact_last_error"]; got != "" { + t.Fatalf("openai_compact_last_error = %v, want empty string", got) + } + if got := updates["openai_compact_checked_at"]; got != now.Format(time.RFC3339) { + t.Fatalf("openai_compact_checked_at = %v, want %s", got, now.Format(time.RFC3339)) + } +} + +func TestBuildOpenAICompactProbeExtraUpdates_404MarksUnsupported(t *testing.T) { + now := time.Date(2026, 4, 10, 10, 0, 0, 0, time.UTC) + body := []byte(`404 page not found`) + updates := buildOpenAICompactProbeExtraUpdates(&http.Response{StatusCode: http.StatusNotFound}, body, nil, now) + + if got := updates["openai_compact_supported"]; got != false { + t.Fatalf("openai_compact_supported = %v, want false", got) + } + if got := updates["openai_compact_last_status"]; got != http.StatusNotFound { + t.Fatalf("openai_compact_last_status = %v, want %d", got, http.StatusNotFound) + } +} + +func TestBuildOpenAICompactProbeExtraUpdates_502DoesNotMarkUnsupported(t *testing.T) { + now := time.Date(2026, 4, 10, 10, 0, 0, 0, time.UTC) + updates := buildOpenAICompactProbeExtraUpdates(&http.Response{StatusCode: http.StatusBadGateway}, []byte(`Upstream request failed`), nil, now) + + if _, exists := updates["openai_compact_supported"]; exists { + t.Fatalf("did not expect openai_compact_supported for 502 response") + } + if got := updates["openai_compact_last_status"]; got != http.StatusBadGateway { + t.Fatalf("openai_compact_last_status = %v, want %d", got, http.StatusBadGateway) + } +} + +func TestBuildOpenAICompactProbeExtraUpdates_RequestErrorDoesNotMarkUnsupported(t *testing.T) { + now := time.Date(2026, 4, 10, 10, 0, 0, 0, time.UTC) + updates := buildOpenAICompactProbeExtraUpdates(nil, nil, errors.New("dial tcp timeout"), now) + + if _, exists := updates["openai_compact_supported"]; exists { + t.Fatalf("did not expect openai_compact_supported for request error") + } + if got, exists := updates["openai_compact_last_status"]; !exists || got != nil { + t.Fatalf("openai_compact_last_status = %v, want nil key", got) + } + if got := updates["openai_compact_last_error"]; got == "" { + t.Fatalf("expected openai_compact_last_error to be populated") + } +} + +func TestBuildOpenAICompactProbeExtraUpdates_NoResponseClearsLastStatus(t *testing.T) { + now := time.Date(2026, 4, 10, 10, 0, 0, 0, time.UTC) + updates := buildOpenAICompactProbeExtraUpdates(nil, nil, nil, now) + + if got, exists := updates["openai_compact_last_status"]; !exists || got != nil { + t.Fatalf("openai_compact_last_status = %v, want nil key", got) + } + if got := updates["openai_compact_last_error"]; got != "compact probe failed" { + t.Fatalf("openai_compact_last_error = %v, want compact probe failed", got) + } +} + +func TestBuildOpenAICompactProbeExtraUpdates_UnknownModelDoesNotMarkUnsupported(t *testing.T) { + now := time.Date(2026, 4, 10, 10, 0, 0, 0, time.UTC) + body := []byte(`{"error":{"message":"unknown model gpt-5.4-openai-compact"}}`) + updates := buildOpenAICompactProbeExtraUpdates(&http.Response{StatusCode: http.StatusBadRequest}, body, nil, now) + + if _, exists := updates["openai_compact_supported"]; exists { + t.Fatalf("did not expect openai_compact_supported for unknown-model diagnostics") + } + if got := updates["openai_compact_last_status"]; got != http.StatusBadRequest { + t.Fatalf("openai_compact_last_status = %v, want %d", got, http.StatusBadRequest) + } +} + +func TestBuildOpenAICompactProbeExtraUpdates_EmptyFailureBodyFallsBackToHTTPStatus(t *testing.T) { + now := time.Date(2026, 4, 10, 10, 0, 0, 0, time.UTC) + updates := buildOpenAICompactProbeExtraUpdates(&http.Response{StatusCode: http.StatusServiceUnavailable}, nil, nil, now) + + if got := updates["openai_compact_last_status"]; got != http.StatusServiceUnavailable { + t.Fatalf("openai_compact_last_status = %v, want %d", got, http.StatusServiceUnavailable) + } + if got := updates["openai_compact_last_error"]; got != "HTTP 503" { + t.Fatalf("openai_compact_last_error = %v, want HTTP 503", got) + } +} diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index 2d05c3ea..86b79216 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -306,6 +306,10 @@ func (t *accountWriteThrottle) Allow(id int64, now time.Time) bool { var defaultOpenAICodexSnapshotPersistThrottle = newAccountWriteThrottle(openAICodexSnapshotPersistMinInterval) +// ErrNoAvailableCompactAccounts indicates the request needs /responses/compact +// support but no compatible account is available. +var ErrNoAvailableCompactAccounts = errors.New("no available OpenAI accounts support /responses/compact") + // OpenAIGatewayService handles OpenAI API gateway operations type OpenAIGatewayService struct { accountRepo AccountRepository @@ -442,11 +446,11 @@ func (s *OpenAIGatewayService) checkChannelPricingRestriction(ctx context.Contex return s.channelService.IsModelRestricted(ctx, *groupID, billingModel) } -func (s *OpenAIGatewayService) isUpstreamModelRestrictedByChannel(ctx context.Context, groupID int64, account *Account, requestedModel string) bool { +func (s *OpenAIGatewayService) isUpstreamModelRestrictedByChannel(ctx context.Context, groupID int64, account *Account, requestedModel string, requireCompact bool) bool { if s.channelService == nil { return false } - upstreamModel := resolveOpenAIForwardModel(account, requestedModel, "") + upstreamModel := resolveOpenAIAccountUpstreamModelForRequest(account, requestedModel, requireCompact) if upstreamModel == "" { return false } @@ -1208,10 +1212,94 @@ func (s *OpenAIGatewayService) SelectAccountForModel(ctx context.Context, groupI // SelectAccountForModelWithExclusions selects an account supporting the requested model while excluding specified accounts. // SelectAccountForModelWithExclusions 选择支持指定模型的账号,同时排除指定的账号。 func (s *OpenAIGatewayService) SelectAccountForModelWithExclusions(ctx context.Context, groupID *int64, sessionHash string, requestedModel string, excludedIDs map[int64]struct{}) (*Account, error) { - return s.selectAccountForModelWithExclusions(ctx, groupID, sessionHash, requestedModel, excludedIDs, 0) + return s.selectAccountForModelWithExclusions(ctx, groupID, sessionHash, requestedModel, excludedIDs, false, 0) } -func (s *OpenAIGatewayService) selectAccountForModelWithExclusions(ctx context.Context, groupID *int64, sessionHash string, requestedModel string, excludedIDs map[int64]struct{}, stickyAccountID int64) (*Account, error) { +// noAvailableOpenAISelectionError builds the standard "no account available" error +// while preserving the compact-specific error when applicable. +func noAvailableOpenAISelectionError(requestedModel string, compactBlocked bool) error { + if compactBlocked { + return ErrNoAvailableCompactAccounts + } + if requestedModel != "" { + return fmt.Errorf("no available OpenAI accounts supporting model: %s", requestedModel) + } + return errors.New("no available OpenAI accounts") +} + +// openAICompactSupportTier classifies an OpenAI account by compact capability. +// 0 = explicitly unsupported, 1 = unknown / not yet probed, 2 = explicitly supported. +func openAICompactSupportTier(account *Account) int { + if account == nil || !account.IsOpenAI() { + return 0 + } + supported, known := account.OpenAICompactSupportKnown() + if !known { + return 1 + } + if supported { + return 2 + } + return 0 +} + +// isOpenAIAccountEligibleForRequest centralises the schedulable / OpenAI / model / +// compact-support checks used during account selection. +func isOpenAIAccountEligibleForRequest(account *Account, requestedModel string, requireCompact bool) bool { + if account == nil || !account.IsSchedulable() || !account.IsOpenAI() { + return false + } + if requestedModel != "" && !account.IsModelSupported(requestedModel) { + return false + } + if requireCompact && openAICompactSupportTier(account) == 0 { + return false + } + return true +} + +// prioritizeOpenAICompactAccounts re-orders a slice so that accounts with known +// compact support are tried first, followed by unknown, then explicitly unsupported. +// The relative order within each tier is preserved. +func prioritizeOpenAICompactAccounts(accounts []*Account) []*Account { + if len(accounts) == 0 { + return nil + } + supported := make([]*Account, 0, len(accounts)) + unknown := make([]*Account, 0, len(accounts)) + unsupported := make([]*Account, 0, len(accounts)) + for _, account := range accounts { + switch openAICompactSupportTier(account) { + case 2: + supported = append(supported, account) + case 1: + unknown = append(unknown, account) + default: + unsupported = append(unsupported, account) + } + } + out := make([]*Account, 0, len(accounts)) + out = append(out, supported...) + out = append(out, unknown...) + out = append(out, unsupported...) + return out +} + +// resolveOpenAIAccountUpstreamModelForRequest resolves the upstream model that +// would be sent for a given request, honouring compact-only mappings when the +// caller is on the /responses/compact path. +func resolveOpenAIAccountUpstreamModelForRequest(account *Account, requestedModel string, requireCompact bool) string { + upstreamModel := resolveOpenAIForwardModel(account, requestedModel, "") + if upstreamModel == "" { + return "" + } + if requireCompact { + return resolveOpenAICompactForwardModel(account, upstreamModel) + } + return upstreamModel +} + +func (s *OpenAIGatewayService) selectAccountForModelWithExclusions(ctx context.Context, groupID *int64, sessionHash string, requestedModel string, excludedIDs map[int64]struct{}, requireCompact bool, stickyAccountID int64) (*Account, error) { if s.checkChannelPricingRestriction(ctx, groupID, requestedModel) { slog.Warn("channel pricing restriction blocked request", "group_id", derefGroupID(groupID), @@ -1221,7 +1309,7 @@ func (s *OpenAIGatewayService) selectAccountForModelWithExclusions(ctx context.C // 1. 尝试粘性会话命中 // Try sticky session hit - if account := s.tryStickySessionHit(ctx, groupID, sessionHash, requestedModel, excludedIDs, stickyAccountID); account != nil { + if account := s.tryStickySessionHit(ctx, groupID, sessionHash, requestedModel, excludedIDs, requireCompact, stickyAccountID); account != nil { return account, nil } @@ -1234,13 +1322,10 @@ func (s *OpenAIGatewayService) selectAccountForModelWithExclusions(ctx context.C // 3. 按优先级 + LRU 选择最佳账号 // Select by priority + LRU - selected := s.selectBestAccount(ctx, groupID, accounts, requestedModel, excludedIDs) + selected, compactBlocked := s.selectBestAccount(ctx, groupID, accounts, requestedModel, excludedIDs, requireCompact) if selected == nil { - if requestedModel != "" { - return nil, fmt.Errorf("no available OpenAI accounts supporting model: %s", requestedModel) - } - return nil, errors.New("no available OpenAI accounts") + return nil, noAvailableOpenAISelectionError(requestedModel, compactBlocked) } // 4. 设置粘性会话绑定 @@ -1257,7 +1342,7 @@ func (s *OpenAIGatewayService) selectAccountForModelWithExclusions(ctx context.C // // tryStickySessionHit attempts to get account from sticky session. // Returns account if hit and usable; clears session and returns nil if account is unavailable. -func (s *OpenAIGatewayService) tryStickySessionHit(ctx context.Context, groupID *int64, sessionHash, requestedModel string, excludedIDs map[int64]struct{}, stickyAccountID int64) *Account { +func (s *OpenAIGatewayService) tryStickySessionHit(ctx context.Context, groupID *int64, sessionHash, requestedModel string, excludedIDs map[int64]struct{}, requireCompact bool, stickyAccountID int64) *Account { if sessionHash == "" { return nil } @@ -1289,19 +1374,16 @@ func (s *OpenAIGatewayService) tryStickySessionHit(ctx context.Context, groupID // 验证账号是否可用于当前请求 // Verify account is usable for current request - if !account.IsSchedulable() || !account.IsOpenAI() { + if !isOpenAIAccountEligibleForRequest(account, requestedModel, false) { return nil } - if requestedModel != "" && !account.IsModelSupported(requestedModel) { - return nil - } - account = s.recheckSelectedOpenAIAccountFromDB(ctx, account, requestedModel) + account = s.recheckSelectedOpenAIAccountFromDB(ctx, account, requestedModel, requireCompact) if account == nil { _ = s.deleteStickySessionAccountID(ctx, groupID, sessionHash) return nil } if groupID != nil && s.needsUpstreamChannelRestrictionCheck(ctx, groupID) && - s.isUpstreamModelRestrictedByChannel(ctx, *groupID, account, requestedModel) { + s.isUpstreamModelRestrictedByChannel(ctx, *groupID, account, requestedModel, requireCompact) { _ = s.deleteStickySessionAccountID(ctx, groupID, sessionHash) return nil } @@ -1316,9 +1398,13 @@ func (s *OpenAIGatewayService) tryStickySessionHit(ctx context.Context, groupID // 返回 nil 表示无可用账号。 // // selectBestAccount selects the best account from candidates (priority + LRU). -// Returns nil if no available account. -func (s *OpenAIGatewayService) selectBestAccount(ctx context.Context, groupID *int64, accounts []Account, requestedModel string, excludedIDs map[int64]struct{}) *Account { +// Returns nil if no available account. The second return reports whether at +// least one candidate was filtered out solely because it lacks compact support +// (only meaningful when requireCompact=true). +func (s *OpenAIGatewayService) selectBestAccount(ctx context.Context, groupID *int64, accounts []Account, requestedModel string, excludedIDs map[int64]struct{}, requireCompact bool) (*Account, bool) { var selected *Account + selectedCompactTier := -1 + compactBlocked := false needsUpstreamCheck := s.needsUpstreamChannelRestrictionCheck(ctx, groupID) for i := range accounts { @@ -1330,31 +1416,50 @@ func (s *OpenAIGatewayService) selectBestAccount(ctx context.Context, groupID *i continue } - fresh := s.resolveFreshSchedulableOpenAIAccount(ctx, acc, requestedModel) + fresh := s.resolveFreshSchedulableOpenAIAccount(ctx, acc, requestedModel, false) if fresh == nil { continue } - fresh = s.recheckSelectedOpenAIAccountFromDB(ctx, fresh, requestedModel) + fresh = s.recheckSelectedOpenAIAccountFromDB(ctx, fresh, requestedModel, false) if fresh == nil { continue } - if needsUpstreamCheck && s.isUpstreamModelRestrictedByChannel(ctx, *groupID, fresh, requestedModel) { + if needsUpstreamCheck && s.isUpstreamModelRestrictedByChannel(ctx, *groupID, fresh, requestedModel, requireCompact) { continue } + compactTier := 0 + if requireCompact { + compactTier = openAICompactSupportTier(fresh) + if compactTier == 0 { + compactBlocked = true + continue + } + } // 选择优先级最高且最久未使用的账号 // Select highest priority and least recently used if selected == nil { selected = fresh + selectedCompactTier = compactTier + continue + } + + // compact 模式下高 tier 优先;同 tier 内才比较 priority/LRU。 + if requireCompact && compactTier != selectedCompactTier { + if compactTier > selectedCompactTier { + selected = fresh + selectedCompactTier = compactTier + } continue } if s.isBetterAccount(fresh, selected) { selected = fresh + selectedCompactTier = compactTier } } - return selected + return selected, compactBlocked } // isBetterAccount 判断 candidate 是否比 current 更优。 @@ -1392,6 +1497,10 @@ func (s *OpenAIGatewayService) isBetterAccount(candidate, current *Account) bool // SelectAccountWithLoadAwareness selects an account with load-awareness and wait plan. func (s *OpenAIGatewayService) SelectAccountWithLoadAwareness(ctx context.Context, groupID *int64, sessionHash string, requestedModel string, excludedIDs map[int64]struct{}) (*AccountSelectionResult, error) { + return s.selectAccountWithLoadAwareness(ctx, groupID, sessionHash, requestedModel, excludedIDs, false) +} + +func (s *OpenAIGatewayService) selectAccountWithLoadAwareness(ctx context.Context, groupID *int64, sessionHash string, requestedModel string, excludedIDs map[int64]struct{}, requireCompact bool) (*AccountSelectionResult, error) { if s.checkChannelPricingRestriction(ctx, groupID, requestedModel) { slog.Warn("channel pricing restriction blocked request", "group_id", derefGroupID(groupID), @@ -1408,7 +1517,7 @@ func (s *OpenAIGatewayService) SelectAccountWithLoadAwareness(ctx context.Contex } } if s.concurrencyService == nil || !cfg.LoadBatchEnabled { - account, err := s.selectAccountForModelWithExclusions(ctx, groupID, sessionHash, requestedModel, excludedIDs, stickyAccountID) + account, err := s.selectAccountForModelWithExclusions(ctx, groupID, sessionHash, requestedModel, excludedIDs, requireCompact, stickyAccountID) if err != nil { return nil, err } @@ -1461,12 +1570,11 @@ func (s *OpenAIGatewayService) SelectAccountWithLoadAwareness(ctx context.Contex if clearSticky { _ = s.deleteStickySessionAccountID(ctx, groupID, sessionHash) } - if !clearSticky && account.IsSchedulable() && account.IsOpenAI() && - (requestedModel == "" || account.IsModelSupported(requestedModel)) { - account = s.recheckSelectedOpenAIAccountFromDB(ctx, account, requestedModel) + if !clearSticky && isOpenAIAccountEligibleForRequest(account, requestedModel, false) { + account = s.recheckSelectedOpenAIAccountFromDB(ctx, account, requestedModel, requireCompact) if account == nil { _ = s.deleteStickySessionAccountID(ctx, groupID, sessionHash) - } else if needsUpstreamCheck && s.isUpstreamModelRestrictedByChannel(ctx, *groupID, account, requestedModel) { + } else if needsUpstreamCheck && s.isUpstreamModelRestrictedByChannel(ctx, *groupID, account, requestedModel, requireCompact) { _ = s.deleteStickySessionAccountID(ctx, groupID, sessionHash) } else { result, err := s.tryAcquireAccountSlot(ctx, accountID, account.Concurrency) @@ -1491,6 +1599,7 @@ func (s *OpenAIGatewayService) SelectAccountWithLoadAwareness(ctx context.Contex } // ============ Layer 2: Load-aware selection ============ + baseCandidateCount := 0 candidates := make([]*Account, 0, len(accounts)) for i := range accounts { acc := &accounts[i] @@ -1506,9 +1615,10 @@ func (s *OpenAIGatewayService) SelectAccountWithLoadAwareness(ctx context.Contex if requestedModel != "" && !acc.IsModelSupported(requestedModel) { continue } - if needsUpstreamCheck && s.isUpstreamModelRestrictedByChannel(ctx, *groupID, acc, requestedModel) { + if needsUpstreamCheck && s.isUpstreamModelRestrictedByChannel(ctx, *groupID, acc, requestedModel, requireCompact) { continue } + baseCandidateCount++ candidates = append(candidates, acc) } @@ -1528,12 +1638,19 @@ func (s *OpenAIGatewayService) SelectAccountWithLoadAwareness(ctx context.Contex if err != nil { ordered := append([]*Account(nil), candidates...) sortAccountsByPriorityAndLastUsed(ordered, false) + if requireCompact { + ordered = prioritizeOpenAICompactAccounts(ordered) + } for _, acc := range ordered { - fresh := s.resolveFreshSchedulableOpenAIAccount(ctx, acc, requestedModel) + fresh := s.resolveFreshSchedulableOpenAIAccount(ctx, acc, requestedModel, false) if fresh == nil { continue } - if needsUpstreamCheck && s.isUpstreamModelRestrictedByChannel(ctx, *groupID, fresh, requestedModel) { + fresh = s.recheckSelectedOpenAIAccountFromDB(ctx, fresh, requestedModel, requireCompact) + if fresh == nil { + continue + } + if needsUpstreamCheck && s.isUpstreamModelRestrictedByChannel(ctx, *groupID, fresh, requestedModel, requireCompact) { continue } result, err := s.tryAcquireAccountSlot(ctx, fresh.ID, fresh.Concurrency) @@ -1581,12 +1698,35 @@ func (s *OpenAIGatewayService) SelectAccountWithLoadAwareness(ctx context.Contex }) shuffleWithinSortGroups(available) - for _, item := range available { - fresh := s.resolveFreshSchedulableOpenAIAccount(ctx, item.account, requestedModel) + selectionOrder := make([]accountWithLoad, 0, len(available)) + if requireCompact { + appendTier := func(out []accountWithLoad, tier int) []accountWithLoad { + for _, item := range available { + if openAICompactSupportTier(item.account) == tier { + out = append(out, item) + } + } + return out + } + selectionOrder = appendTier(selectionOrder, 2) + selectionOrder = appendTier(selectionOrder, 1) + // tier 0 候选作为兜底追加:DB recheck 时若发现 cache tier 0 实际 + // 已升级为 1/2(探测刚跑完,cache 尚未刷新),仍可正常命中。 + selectionOrder = appendTier(selectionOrder, 0) + } else { + selectionOrder = append(selectionOrder, available...) + } + + for _, item := range selectionOrder { + fresh := s.resolveFreshSchedulableOpenAIAccount(ctx, item.account, requestedModel, false) if fresh == nil { continue } - if needsUpstreamCheck && s.isUpstreamModelRestrictedByChannel(ctx, *groupID, fresh, requestedModel) { + fresh = s.recheckSelectedOpenAIAccountFromDB(ctx, fresh, requestedModel, requireCompact) + if fresh == nil { + continue + } + if needsUpstreamCheck && s.isUpstreamModelRestrictedByChannel(ctx, *groupID, fresh, requestedModel, requireCompact) { continue } result, err := s.tryAcquireAccountSlot(ctx, fresh.ID, fresh.Concurrency) @@ -1602,12 +1742,19 @@ func (s *OpenAIGatewayService) SelectAccountWithLoadAwareness(ctx context.Contex // ============ Layer 3: Fallback wait ============ sortAccountsByPriorityAndLastUsed(candidates, false) + if requireCompact { + candidates = prioritizeOpenAICompactAccounts(candidates) + } for _, acc := range candidates { - fresh := s.resolveFreshSchedulableOpenAIAccount(ctx, acc, requestedModel) + fresh := s.resolveFreshSchedulableOpenAIAccount(ctx, acc, requestedModel, false) if fresh == nil { continue } - if needsUpstreamCheck && s.isUpstreamModelRestrictedByChannel(ctx, *groupID, fresh, requestedModel) { + fresh = s.recheckSelectedOpenAIAccountFromDB(ctx, fresh, requestedModel, requireCompact) + if fresh == nil { + continue + } + if needsUpstreamCheck && s.isUpstreamModelRestrictedByChannel(ctx, *groupID, fresh, requestedModel, requireCompact) { continue } return s.newSelectionResult(ctx, fresh, false, nil, &AccountWaitPlan{ @@ -1618,6 +1765,9 @@ func (s *OpenAIGatewayService) SelectAccountWithLoadAwareness(ctx context.Contex }) } + if requireCompact && baseCandidateCount > 0 { + return nil, ErrNoAvailableCompactAccounts + } return nil, ErrNoAvailableAccounts } @@ -1648,7 +1798,7 @@ func (s *OpenAIGatewayService) tryAcquireAccountSlot(ctx context.Context, accoun return s.concurrencyService.AcquireAccountSlot(ctx, accountID, maxConcurrency) } -func (s *OpenAIGatewayService) resolveFreshSchedulableOpenAIAccount(ctx context.Context, account *Account, requestedModel string) *Account { +func (s *OpenAIGatewayService) resolveFreshSchedulableOpenAIAccount(ctx context.Context, account *Account, requestedModel string, requireCompact bool) *Account { if account == nil { return nil } @@ -1662,20 +1812,20 @@ func (s *OpenAIGatewayService) resolveFreshSchedulableOpenAIAccount(ctx context. fresh = current } - if !fresh.IsSchedulable() || !fresh.IsOpenAI() { - return nil - } - if requestedModel != "" && !fresh.IsModelSupported(requestedModel) { + if !isOpenAIAccountEligibleForRequest(fresh, requestedModel, requireCompact) { return nil } return fresh } -func (s *OpenAIGatewayService) recheckSelectedOpenAIAccountFromDB(ctx context.Context, account *Account, requestedModel string) *Account { +func (s *OpenAIGatewayService) recheckSelectedOpenAIAccountFromDB(ctx context.Context, account *Account, requestedModel string, requireCompact bool) *Account { if account == nil { return nil } if s.schedulerSnapshot == nil || s.accountRepo == nil { + if !isOpenAIAccountEligibleForRequest(account, requestedModel, requireCompact) { + return nil + } return account } @@ -1683,10 +1833,7 @@ func (s *OpenAIGatewayService) recheckSelectedOpenAIAccountFromDB(ctx context.Co if err != nil || latest == nil { return nil } - if !latest.IsSchedulable() || !latest.IsOpenAI() { - return nil - } - if requestedModel != "" && !latest.IsModelSupported(requestedModel) { + if !isOpenAIAccountEligibleForRequest(latest, requestedModel, requireCompact) { return nil } return latest @@ -2007,17 +2154,35 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco return nil, err } + // Compact-only model 映射:仅在 /responses/compact 路径生效,且优先级高于 + // OAuth 模型规范化(避免 OAuth 规范化覆盖 compact-only 自定义模型)。 + isCompactRequest := isOpenAIResponsesCompactPath(c) + compactMapped := false + if isCompactRequest { + compactMappedModel := resolveOpenAICompactForwardModel(account, billingModel) + if compactMappedModel != "" && compactMappedModel != billingModel { + compactMapped = true + upstreamModel = compactMappedModel + reqBody["model"] = compactMappedModel + bodyModified = true + markPatchSet("model", compactMappedModel) + logger.LegacyPrintf("service.openai_gateway", "[OpenAI] Compact model mapping applied: %s -> %s (account: %s, isCodexCLI: %v)", billingModel, compactMappedModel, account.Name, isCodexCLI) + } + } + // OpenAI OAuth 账号走 ChatGPT internal Codex endpoint,需要将模型名规范化为 // 上游可识别的 Codex/GPT 系列。API Key 账号则应保留原始/映射后的模型名, // 以兼容自定义 base_url 的 OpenAI-compatible 上游。 if model, ok := reqBody["model"].(string); ok { - upstreamModel = normalizeOpenAIModelForUpstream(account, model) - if upstreamModel != "" && upstreamModel != model { - logger.LegacyPrintf("service.openai_gateway", "[OpenAI] Upstream model resolved: %s -> %s (account: %s, type: %s, isCodexCLI: %v)", - model, upstreamModel, account.Name, account.Type, isCodexCLI) - reqBody["model"] = upstreamModel - bodyModified = true - markPatchSet("model", upstreamModel) + if !compactMapped { + upstreamModel = normalizeOpenAIModelForUpstream(account, model) + if upstreamModel != "" && upstreamModel != model { + logger.LegacyPrintf("service.openai_gateway", "[OpenAI] Upstream model resolved: %s -> %s (account: %s, type: %s, isCodexCLI: %v)", + model, upstreamModel, account.Name, account.Type, isCodexCLI) + reqBody["model"] = upstreamModel + bodyModified = true + markPatchSet("model", upstreamModel) + } } // 移除 gpt-5.2-codex 以下的版本 verbosity 参数 @@ -2040,7 +2205,7 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco } if account.Type == AccountTypeOAuth { - codexResult := applyCodexOAuthTransform(reqBody, isCodexCLI, isOpenAIResponsesCompactPath(c)) + codexResult := applyCodexOAuthTransform(reqBody, isCodexCLI, isCompactRequest) if codexResult.Modified { bodyModified = true disablePatch() @@ -2515,6 +2680,19 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough( reqStream bool, startTime time.Time, ) (*OpenAIForwardResult, error) { + upstreamPassthroughModel := "" + if isOpenAIResponsesCompactPath(c) { + compactMappedModel := resolveOpenAICompactForwardModel(account, reqModel) + if compactMappedModel != "" && compactMappedModel != reqModel { + nextBody, setErr := sjson.SetBytes(body, "model", compactMappedModel) + if setErr != nil { + return nil, fmt.Errorf("set compact passthrough model: %w", setErr) + } + body = nextBody + upstreamPassthroughModel = compactMappedModel + } + } + if account != nil && account.Type == AccountTypeOAuth { if rejectReason := detectOpenAIPassthroughInstructionsRejectReason(reqModel, body); rejectReason != "" { rejectMsg := "OpenAI codex passthrough requires a non-empty instructions field" @@ -2640,14 +2818,14 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough( var usage *OpenAIUsage var firstTokenMs *int if reqStream { - result, err := s.handleStreamingResponsePassthrough(ctx, resp, c, account, startTime) + result, err := s.handleStreamingResponsePassthrough(ctx, resp, c, account, startTime, reqModel, upstreamPassthroughModel) if err != nil { return nil, err } usage = result.usage firstTokenMs = result.firstTokenMs } else { - usage, err = s.handleNonStreamingResponsePassthrough(ctx, resp, c) + usage, err = s.handleNonStreamingResponsePassthrough(ctx, resp, c, reqModel, upstreamPassthroughModel) if err != nil { return nil, err } @@ -2665,6 +2843,7 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough( RequestID: resp.Header.Get("x-request-id"), Usage: *usage, Model: reqModel, + UpstreamModel: upstreamPassthroughModel, ServiceTier: extractOpenAIServiceTierFromBody(body), ReasoningEffort: reasoningEffort, Stream: reqStream, @@ -2974,6 +3153,8 @@ func (s *OpenAIGatewayService) handleStreamingResponsePassthrough( c *gin.Context, account *Account, startTime time.Time, + originalModel string, + mappedModel string, ) (*openaiStreamingResultPassthrough, error) { writeOpenAIPassthroughResponseHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter) @@ -3008,11 +3189,20 @@ func (s *OpenAIGatewayService) handleStreamingResponsePassthrough( scanner.Buffer(scanBuf[:0], maxLineSize) defer putSSEScannerBuf64K(scanBuf) + needModelReplace := strings.TrimSpace(originalModel) != "" && strings.TrimSpace(mappedModel) != "" && strings.TrimSpace(originalModel) != strings.TrimSpace(mappedModel) + for scanner.Scan() { line := scanner.Text() if data, ok := extractOpenAISSEDataLine(line); ok { dataBytes := []byte(data) trimmedData := strings.TrimSpace(data) + if needModelReplace && strings.Contains(data, mappedModel) { + line = s.replaceModelInSSELine(line, mappedModel, originalModel) + if replacedData, replaced := extractOpenAISSEDataLine(line); replaced { + dataBytes = []byte(replacedData) + trimmedData = strings.TrimSpace(replacedData) + } + } if trimmedData == "[DONE]" { sawDone = true } @@ -3073,6 +3263,8 @@ func (s *OpenAIGatewayService) handleNonStreamingResponsePassthrough( ctx context.Context, resp *http.Response, c *gin.Context, + originalModel string, + mappedModel string, ) (*OpenAIUsage, error) { body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, openAITooLargeError) if err != nil { @@ -3084,7 +3276,7 @@ func (s *OpenAIGatewayService) handleNonStreamingResponsePassthrough( // stream=false was requested. Without this conversion the client would // receive raw SSE text or a terminal event with empty output. if isEventStreamResponse(resp.Header) { - return s.handlePassthroughSSEToJSON(resp, c, body) + return s.handlePassthroughSSEToJSON(resp, c, body, originalModel, mappedModel) } usage := &OpenAIUsage{} @@ -3106,14 +3298,18 @@ func (s *OpenAIGatewayService) handleNonStreamingResponsePassthrough( if contentType == "" { contentType = "application/json" } + if originalModel != "" && mappedModel != "" && originalModel != mappedModel { + body = s.replaceModelInResponseBody(body, mappedModel, originalModel) + } c.Data(resp.StatusCode, contentType, body) return usage, nil } // handlePassthroughSSEToJSON converts an SSE response body into a JSON -// response for the passthrough path. It mirrors handleSSEToJSON but skips -// model replacement (passthrough does not remap models). -func (s *OpenAIGatewayService) handlePassthroughSSEToJSON(resp *http.Response, c *gin.Context, body []byte) (*OpenAIUsage, error) { +// response for the passthrough path. It mirrors handleSSEToJSON while +// preserving passthrough payloads, except compact-only model remapping may +// rewrite model fields back to the original requested model. +func (s *OpenAIGatewayService) handlePassthroughSSEToJSON(resp *http.Response, c *gin.Context, body []byte, originalModel string, mappedModel string) (*OpenAIUsage, error) { bodyText := string(body) finalResponse, ok := extractCodexFinalResponse(bodyText) @@ -3132,6 +3328,9 @@ func (s *OpenAIGatewayService) handlePassthroughSSEToJSON(resp *http.Response, c } } body = finalResponse + if originalModel != "" && mappedModel != "" && originalModel != mappedModel { + body = s.replaceModelInResponseBody(body, mappedModel, originalModel) + } // Correct tool calls in final response body = s.correctToolCallsInResponseBody(body) } else { @@ -3144,6 +3343,10 @@ func (s *OpenAIGatewayService) handlePassthroughSSEToJSON(resp *http.Response, c return nil, s.writeOpenAINonStreamingProtocolError(resp, c, msg) } usage = s.parseSSEUsageFromBody(bodyText) + if originalModel != "" && mappedModel != "" && originalModel != mappedModel { + bodyText = s.replaceModelInSSEBody(bodyText, mappedModel, originalModel) + } + body = []byte(bodyText) } writeOpenAIPassthroughResponseHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter) diff --git a/backend/internal/service/openai_gateway_service_test.go b/backend/internal/service/openai_gateway_service_test.go index ed7c78a3..8b7945bc 100644 --- a/backend/internal/service/openai_gateway_service_test.go +++ b/backend/internal/service/openai_gateway_service_test.go @@ -1107,7 +1107,7 @@ func TestOpenAIStreamingPassthroughMissingTerminalEventReturnsIncompleteError(t _, _ = pw.Write([]byte("data: {\"type\":\"response.in_progress\",\"response\":{}}\n\n")) }() - _, err := svc.handleStreamingResponsePassthrough(c.Request.Context(), resp, c, &Account{ID: 1}, time.Now()) + _, err := svc.handleStreamingResponsePassthrough(c.Request.Context(), resp, c, &Account{ID: 1}, time.Now(), "", "") _ = pr.Close() if err == nil || !strings.Contains(err.Error(), "missing terminal event") { t.Fatalf("expected missing terminal event error, got %v", err) @@ -1139,7 +1139,7 @@ func TestOpenAIStreamingPassthroughResponseDoneWithoutDoneMarkerStillSucceeds(t _, _ = pw.Write([]byte("data: {\"type\":\"response.done\",\"response\":{\"usage\":{\"input_tokens\":2,\"output_tokens\":3,\"input_tokens_details\":{\"cached_tokens\":1}}}}\n\n")) }() - result, err := svc.handleStreamingResponsePassthrough(c.Request.Context(), resp, c, &Account{ID: 1}, time.Now()) + result, err := svc.handleStreamingResponsePassthrough(c.Request.Context(), resp, c, &Account{ID: 1}, time.Now(), "", "") _ = pr.Close() require.NoError(t, err) require.NotNil(t, result) diff --git a/backend/internal/service/openai_model_mapping.go b/backend/internal/service/openai_model_mapping.go index 993c0b13..f332633c 100644 --- a/backend/internal/service/openai_model_mapping.go +++ b/backend/internal/service/openai_model_mapping.go @@ -39,3 +39,22 @@ func isExplicitCodexModel(model string) bool { } return false } + +// resolveOpenAICompactForwardModel determines the compact-only upstream model +// for /responses/compact requests. It never affects normal /responses traffic. +// When no compact-specific mapping matches, the input model is returned as-is. +func resolveOpenAICompactForwardModel(account *Account, model string) string { + trimmedModel := strings.TrimSpace(model) + if trimmedModel == "" || account == nil { + return trimmedModel + } + + mappedModel, matched := account.ResolveCompactMappedModel(trimmedModel) + if !matched { + return trimmedModel + } + if trimmedMapped := strings.TrimSpace(mappedModel); trimmedMapped != "" { + return trimmedMapped + } + return trimmedModel +} diff --git a/backend/internal/service/openai_model_mapping_test.go b/backend/internal/service/openai_model_mapping_test.go index 21a2e9a0..4802c089 100644 --- a/backend/internal/service/openai_model_mapping_test.go +++ b/backend/internal/service/openai_model_mapping_test.go @@ -130,6 +130,74 @@ func TestResolveOpenAIForwardModel_PreventsClaudeModelFromFallingBackToGpt54(t * } } +func TestResolveOpenAICompactForwardModel(t *testing.T) { + tests := []struct { + name string + account *Account + model string + expectedModel string + }{ + { + name: "nil account keeps original model", + account: nil, + model: "gpt-5.4", + expectedModel: "gpt-5.4", + }, + { + name: "missing compact mapping keeps original model", + account: &Account{ + Credentials: map[string]any{}, + }, + model: "gpt-5.4", + expectedModel: "gpt-5.4", + }, + { + name: "exact compact mapping overrides model", + account: &Account{ + Credentials: map[string]any{ + "compact_model_mapping": map[string]any{ + "gpt-5.4": "gpt-5.4-openai-compact", + }, + }, + }, + model: "gpt-5.4", + expectedModel: "gpt-5.4-openai-compact", + }, + { + name: "wildcard compact mapping overrides model", + account: &Account{ + Credentials: map[string]any{ + "compact_model_mapping": map[string]any{ + "gpt-5.*": "gpt-5-openai-compact", + }, + }, + }, + model: "gpt-5.4", + expectedModel: "gpt-5-openai-compact", + }, + { + name: "passthrough compact mapping remains unchanged", + account: &Account{ + Credentials: map[string]any{ + "compact_model_mapping": map[string]any{ + "gpt-5.4": "gpt-5.4", + }, + }, + }, + model: "gpt-5.4", + expectedModel: "gpt-5.4", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := resolveOpenAICompactForwardModel(tt.account, tt.model); got != tt.expectedModel { + t.Fatalf("resolveOpenAICompactForwardModel(...) = %q, want %q", got, tt.expectedModel) + } + }) + } +} + func TestNormalizeCodexModel(t *testing.T) { cases := map[string]string{ "gpt-5.3-codex-spark": "gpt-5.3-codex-spark", diff --git a/backend/internal/service/openai_ws_account_sticky_test.go b/backend/internal/service/openai_ws_account_sticky_test.go index a5b97ca9..4005a921 100644 --- a/backend/internal/service/openai_ws_account_sticky_test.go +++ b/backend/internal/service/openai_ws_account_sticky_test.go @@ -37,7 +37,7 @@ func TestOpenAIGatewayService_SelectAccountByPreviousResponseID_Hit(t *testing.T require.NoError(t, store.BindResponseAccount(ctx, groupID, "resp_prev_1", account.ID, time.Hour)) - selection, err := svc.SelectAccountByPreviousResponseID(ctx, &groupID, "resp_prev_1", "gpt-5.1", nil) + selection, err := svc.SelectAccountByPreviousResponseID(ctx, &groupID, "resp_prev_1", "gpt-5.1", nil, false) require.NoError(t, err) require.NotNil(t, selection) require.NotNil(t, selection.Account) @@ -77,7 +77,7 @@ func TestOpenAIGatewayService_SelectAccountByPreviousResponseID_RateLimitedMiss( require.NoError(t, store.BindResponseAccount(ctx, groupID, "resp_prev_rl", account.ID, time.Hour)) - selection, err := svc.SelectAccountByPreviousResponseID(ctx, &groupID, "resp_prev_rl", "gpt-5.1", nil) + selection, err := svc.SelectAccountByPreviousResponseID(ctx, &groupID, "resp_prev_rl", "gpt-5.1", nil, false) require.NoError(t, err) require.Nil(t, selection, "限额中的账号不应继续命中 previous_response_id 粘连") boundAccountID, getErr := store.GetResponseAccount(ctx, groupID, "resp_prev_rl") @@ -129,7 +129,7 @@ func TestOpenAIGatewayService_SelectAccountByPreviousResponseID_DBRuntimeRecheck require.NoError(t, store.BindResponseAccount(ctx, groupID, "resp_prev_db_rl", dbAccount.ID, time.Hour)) - selection, err := svc.SelectAccountByPreviousResponseID(ctx, &groupID, "resp_prev_db_rl", "gpt-5.1", nil) + selection, err := svc.SelectAccountByPreviousResponseID(ctx, &groupID, "resp_prev_db_rl", "gpt-5.1", nil, false) require.NoError(t, err) require.Nil(t, selection, "DB 中已限流的账号不应继续命中 previous_response_id 粘连") boundAccountID, getErr := store.GetResponseAccount(ctx, groupID, "resp_prev_db_rl") @@ -164,7 +164,7 @@ func TestOpenAIGatewayService_SelectAccountByPreviousResponseID_Excluded(t *test require.NoError(t, store.BindResponseAccount(ctx, groupID, "resp_prev_2", account.ID, time.Hour)) - selection, err := svc.SelectAccountByPreviousResponseID(ctx, &groupID, "resp_prev_2", "gpt-5.1", map[int64]struct{}{account.ID: {}}) + selection, err := svc.SelectAccountByPreviousResponseID(ctx, &groupID, "resp_prev_2", "gpt-5.1", map[int64]struct{}{account.ID: {}}, false) require.NoError(t, err) require.Nil(t, selection) } @@ -197,7 +197,7 @@ func TestOpenAIGatewayService_SelectAccountByPreviousResponseID_ForceHTTPIgnored require.NoError(t, store.BindResponseAccount(ctx, groupID, "resp_prev_force_http", account.ID, time.Hour)) - selection, err := svc.SelectAccountByPreviousResponseID(ctx, &groupID, "resp_prev_force_http", "gpt-5.1", nil) + selection, err := svc.SelectAccountByPreviousResponseID(ctx, &groupID, "resp_prev_force_http", "gpt-5.1", nil, false) require.NoError(t, err) require.Nil(t, selection, "force_http 场景应忽略 previous_response_id 粘连") } @@ -258,7 +258,7 @@ func TestOpenAIGatewayService_SelectAccountByPreviousResponseID_BusyKeepsSticky( require.NoError(t, store.BindResponseAccount(ctx, groupID, "resp_prev_busy", 21, time.Hour)) - selection, err := svc.SelectAccountByPreviousResponseID(ctx, &groupID, "resp_prev_busy", "gpt-5.1", nil) + selection, err := svc.SelectAccountByPreviousResponseID(ctx, &groupID, "resp_prev_busy", "gpt-5.1", nil, false) require.NoError(t, err) require.NotNil(t, selection) require.NotNil(t, selection.Account) diff --git a/backend/internal/service/openai_ws_forwarder.go b/backend/internal/service/openai_ws_forwarder.go index 83849bf3..8c0222e2 100644 --- a/backend/internal/service/openai_ws_forwarder.go +++ b/backend/internal/service/openai_ws_forwarder.go @@ -3800,6 +3800,7 @@ func (s *OpenAIGatewayService) SelectAccountByPreviousResponseID( previousResponseID string, requestedModel string, excludedIDs map[int64]struct{}, + requireCompact bool, ) (*AccountSelectionResult, error) { if s == nil { return nil, nil @@ -3840,11 +3841,16 @@ func (s *OpenAIGatewayService) SelectAccountByPreviousResponseID( if requestedModel != "" && !account.IsModelSupported(requestedModel) { return nil, nil } - account = s.recheckSelectedOpenAIAccountFromDB(ctx, account, requestedModel) + account = s.recheckSelectedOpenAIAccountFromDB(ctx, account, requestedModel, requireCompact) if account == nil { _ = store.DeleteResponseAccount(ctx, derefGroupID(groupID), responseID) return nil, nil } + // 兜底:若上游 compact 能力刚被探测为不支持,但 sticky 还在,需要主动放弃。 + if requireCompact && openAICompactSupportTier(account) == 0 { + _ = store.DeleteResponseAccount(ctx, derefGroupID(groupID), responseID) + return nil, nil + } result, acquireErr := s.tryAcquireAccountSlot(ctx, accountID, account.Concurrency) if acquireErr == nil && result.Acquired { diff --git a/frontend/src/components/account/AccountTestModal.vue b/frontend/src/components/account/AccountTestModal.vue index 2e3db61b..236f85f1 100644 --- a/frontend/src/components/account/AccountTestModal.vue +++ b/frontend/src/components/account/AccountTestModal.vue @@ -55,6 +55,17 @@ /> +
+ +