From 9e515ea7c4f6eba392201b2f32474153987a50e6 Mon Sep 17 00:00:00 2001 From: Elysia <1628615876@qq.com> Date: Tue, 7 Apr 2026 22:49:14 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E9=9D=9E=E6=B5=81=E5=BC=8F=E5=93=8D?= =?UTF-8?q?=E5=BA=94=E8=B7=AF=E5=BE=84=E6=89=A9=E5=B1=95SSE=E6=A3=80?= =?UTF-8?q?=E6=B5=8B=E8=87=B3=E6=89=80=E6=9C=89=E8=B4=A6=E5=8F=B7=E7=B1=BB?= =?UTF-8?q?=E5=9E=8B=20(#1493)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 当上游返回SSE格式响应(如sub2api链路)时,API Key账号的非流式路径 未检测SSE,导致终态事件中空output直接透传给客户端。 - 将Content-Type SSE检测从仅OAuth扩展至所有账号类型 - 重命名handleOAuthSSEToJSON为handleSSEToJSON(无OAuth专属逻辑) - 为透传路径新增handlePassthroughSSEToJSON,支持SSE转JSON及空output重建 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../service/openai_gateway_service.go | 75 ++++++++++++++++++- .../service/openai_gateway_service_test.go | 12 +-- 2 files changed, 78 insertions(+), 9 deletions(-) diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index 65e70408..5ecb4ebc 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -3007,6 +3007,14 @@ func (s *OpenAIGatewayService) handleNonStreamingResponsePassthrough( return nil, err } + // Detect SSE responses from upstream and convert to JSON. + // Some upstreams (e.g. other sub2api instances) may return SSE even when + // stream=false was requested. Without this conversion the client would + // receive raw SSE text or a terminal event with empty output. + if isEventStreamResponse(resp.Header) { + return s.handlePassthroughSSEToJSON(resp, c, body) + } + usage := &OpenAIUsage{} usageParsed := false if len(body) > 0 { @@ -3030,6 +3038,56 @@ func (s *OpenAIGatewayService) handleNonStreamingResponsePassthrough( return usage, nil } +// handlePassthroughSSEToJSON converts an SSE response body into a JSON +// response for the passthrough path. It mirrors handleSSEToJSON but skips +// model replacement (passthrough does not remap models). +func (s *OpenAIGatewayService) handlePassthroughSSEToJSON(resp *http.Response, c *gin.Context, body []byte) (*OpenAIUsage, error) { + bodyText := string(body) + finalResponse, ok := extractCodexFinalResponse(bodyText) + + usage := &OpenAIUsage{} + if ok { + if parsedUsage, parsed := extractOpenAIUsageFromJSONBytes(finalResponse); parsed { + *usage = parsedUsage + } + // When the terminal event has an empty output array, reconstruct + // output from accumulated delta events so the client gets full content. + if len(gjson.GetBytes(finalResponse, "output").Array()) == 0 { + if outputJSON, reconstructed := reconstructResponseOutputFromSSE(bodyText); reconstructed { + if patched, err := sjson.SetRawBytes(finalResponse, "output", outputJSON); err == nil { + finalResponse = patched + } + } + } + body = finalResponse + // Correct tool calls in final response + body = s.correctToolCallsInResponseBody(body) + } else { + terminalType, terminalPayload, terminalOK := extractOpenAISSETerminalEvent(bodyText) + if terminalOK && terminalType == "response.failed" { + msg := extractOpenAISSEErrorMessage(terminalPayload) + if msg == "" { + msg = "Upstream compact response failed" + } + return nil, s.writeOpenAINonStreamingProtocolError(resp, c, msg) + } + usage = s.parseSSEUsageFromBody(bodyText) + } + + writeOpenAIPassthroughResponseHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter) + + contentType := "application/json; charset=utf-8" + if !ok { + contentType = resp.Header.Get("Content-Type") + if contentType == "" { + contentType = "text/event-stream" + } + } + c.Data(resp.StatusCode, contentType, body) + + return usage, nil +} + func writeOpenAIPassthroughResponseHeaders(dst http.Header, src http.Header, filter *responseheaders.CompiledHeaderFilter) { if dst == nil || src == nil { return @@ -3858,10 +3916,21 @@ func (s *OpenAIGatewayService) handleNonStreamingResponse(ctx context.Context, r return nil, err } + // Detect SSE responses for ALL account types via Content-Type header. + // Some OpenAI-compatible upstreams (including other sub2api instances) + // may return SSE even when stream=false was requested. + if isEventStreamResponse(resp.Header) { + return s.handleSSEToJSON(resp, c, body, originalModel, mappedModel) + } + // For OAuth accounts, also fall back to a body-content heuristic because + // the upstream may omit the Content-Type header while still sending SSE. + // This heuristic is NOT applied to API-key accounts to avoid false + // positives on JSON responses that coincidentally contain "data:" or + // "event:" in their text content. if account.Type == AccountTypeOAuth { bodyLooksLikeSSE := bytes.Contains(body, []byte("data:")) || bytes.Contains(body, []byte("event:")) - if isEventStreamResponse(resp.Header) || bodyLooksLikeSSE { - return s.handleOAuthSSEToJSON(resp, c, body, originalModel, mappedModel) + if bodyLooksLikeSSE { + return s.handleSSEToJSON(resp, c, body, originalModel, mappedModel) } } @@ -3895,7 +3964,7 @@ func isEventStreamResponse(header http.Header) bool { return strings.Contains(contentType, "text/event-stream") } -func (s *OpenAIGatewayService) handleOAuthSSEToJSON(resp *http.Response, c *gin.Context, body []byte, originalModel, mappedModel string) (*OpenAIUsage, error) { +func (s *OpenAIGatewayService) handleSSEToJSON(resp *http.Response, c *gin.Context, body []byte, originalModel, mappedModel string) (*OpenAIUsage, error) { bodyText := string(body) finalResponse, ok := extractCodexFinalResponse(bodyText) diff --git a/backend/internal/service/openai_gateway_service_test.go b/backend/internal/service/openai_gateway_service_test.go index 9e2f33f2..65880961 100644 --- a/backend/internal/service/openai_gateway_service_test.go +++ b/backend/internal/service/openai_gateway_service_test.go @@ -1797,7 +1797,7 @@ func TestExtractCodexFinalResponse_SampleReplay(t *testing.T) { require.Contains(t, string(finalResp), `"input_tokens":11`) } -func TestHandleOAuthSSEToJSON_CompletedEventReturnsJSON(t *testing.T) { +func TestHandleSSEToJSON_CompletedEventReturnsJSON(t *testing.T) { gin.SetMode(gin.TestMode) rec := httptest.NewRecorder() c, _ := gin.CreateTestContext(rec) @@ -1814,7 +1814,7 @@ func TestHandleOAuthSSEToJSON_CompletedEventReturnsJSON(t *testing.T) { `data: [DONE]`, }, "\n")) - usage, err := svc.handleOAuthSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o") + usage, err := svc.handleSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o") require.NoError(t, err) require.NotNil(t, usage) require.Equal(t, 7, usage.InputTokens) @@ -1826,7 +1826,7 @@ func TestHandleOAuthSSEToJSON_CompletedEventReturnsJSON(t *testing.T) { require.NotContains(t, rec.Body.String(), "data:") } -func TestHandleOAuthSSEToJSON_NoFinalResponseKeepsSSEBody(t *testing.T) { +func TestHandleSSEToJSON_NoFinalResponseKeepsSSEBody(t *testing.T) { gin.SetMode(gin.TestMode) rec := httptest.NewRecorder() c, _ := gin.CreateTestContext(rec) @@ -1842,7 +1842,7 @@ func TestHandleOAuthSSEToJSON_NoFinalResponseKeepsSSEBody(t *testing.T) { `data: [DONE]`, }, "\n")) - usage, err := svc.handleOAuthSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o") + usage, err := svc.handleSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o") require.NoError(t, err) require.NotNil(t, usage) require.Equal(t, 0, usage.InputTokens) @@ -1850,7 +1850,7 @@ func TestHandleOAuthSSEToJSON_NoFinalResponseKeepsSSEBody(t *testing.T) { require.Contains(t, rec.Body.String(), `data: {"type":"response.in_progress"`) } -func TestHandleOAuthSSEToJSON_ResponseFailedReturnsProtocolError(t *testing.T) { +func TestHandleSSEToJSON_ResponseFailedReturnsProtocolError(t *testing.T) { gin.SetMode(gin.TestMode) rec := httptest.NewRecorder() c, _ := gin.CreateTestContext(rec) @@ -1866,7 +1866,7 @@ func TestHandleOAuthSSEToJSON_ResponseFailedReturnsProtocolError(t *testing.T) { `data: [DONE]`, }, "\n")) - usage, err := svc.handleOAuthSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o") + usage, err := svc.handleSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o") require.Nil(t, usage) require.Error(t, err) require.Equal(t, http.StatusBadGateway, rec.Code)