fix: 非流式响应路径扩展SSE检测至所有账号类型 (#1493)
当上游返回SSE格式响应(如sub2api链路)时,API Key账号的非流式路径 未检测SSE,导致终态事件中空output直接透传给客户端。 - 将Content-Type SSE检测从仅OAuth扩展至所有账号类型 - 重命名handleOAuthSSEToJSON为handleSSEToJSON(无OAuth专属逻辑) - 为透传路径新增handlePassthroughSSEToJSON,支持SSE转JSON及空output重建 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -3007,6 +3007,14 @@ func (s *OpenAIGatewayService) handleNonStreamingResponsePassthrough(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Detect SSE responses from upstream and convert to JSON.
|
||||
// Some upstreams (e.g. other sub2api instances) may return SSE even when
|
||||
// stream=false was requested. Without this conversion the client would
|
||||
// receive raw SSE text or a terminal event with empty output.
|
||||
if isEventStreamResponse(resp.Header) {
|
||||
return s.handlePassthroughSSEToJSON(resp, c, body)
|
||||
}
|
||||
|
||||
usage := &OpenAIUsage{}
|
||||
usageParsed := false
|
||||
if len(body) > 0 {
|
||||
@@ -3030,6 +3038,56 @@ func (s *OpenAIGatewayService) handleNonStreamingResponsePassthrough(
|
||||
return usage, nil
|
||||
}
|
||||
|
||||
// handlePassthroughSSEToJSON converts an SSE response body into a JSON
|
||||
// response for the passthrough path. It mirrors handleSSEToJSON but skips
|
||||
// model replacement (passthrough does not remap models).
|
||||
func (s *OpenAIGatewayService) handlePassthroughSSEToJSON(resp *http.Response, c *gin.Context, body []byte) (*OpenAIUsage, error) {
|
||||
bodyText := string(body)
|
||||
finalResponse, ok := extractCodexFinalResponse(bodyText)
|
||||
|
||||
usage := &OpenAIUsage{}
|
||||
if ok {
|
||||
if parsedUsage, parsed := extractOpenAIUsageFromJSONBytes(finalResponse); parsed {
|
||||
*usage = parsedUsage
|
||||
}
|
||||
// When the terminal event has an empty output array, reconstruct
|
||||
// output from accumulated delta events so the client gets full content.
|
||||
if len(gjson.GetBytes(finalResponse, "output").Array()) == 0 {
|
||||
if outputJSON, reconstructed := reconstructResponseOutputFromSSE(bodyText); reconstructed {
|
||||
if patched, err := sjson.SetRawBytes(finalResponse, "output", outputJSON); err == nil {
|
||||
finalResponse = patched
|
||||
}
|
||||
}
|
||||
}
|
||||
body = finalResponse
|
||||
// Correct tool calls in final response
|
||||
body = s.correctToolCallsInResponseBody(body)
|
||||
} else {
|
||||
terminalType, terminalPayload, terminalOK := extractOpenAISSETerminalEvent(bodyText)
|
||||
if terminalOK && terminalType == "response.failed" {
|
||||
msg := extractOpenAISSEErrorMessage(terminalPayload)
|
||||
if msg == "" {
|
||||
msg = "Upstream compact response failed"
|
||||
}
|
||||
return nil, s.writeOpenAINonStreamingProtocolError(resp, c, msg)
|
||||
}
|
||||
usage = s.parseSSEUsageFromBody(bodyText)
|
||||
}
|
||||
|
||||
writeOpenAIPassthroughResponseHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
|
||||
|
||||
contentType := "application/json; charset=utf-8"
|
||||
if !ok {
|
||||
contentType = resp.Header.Get("Content-Type")
|
||||
if contentType == "" {
|
||||
contentType = "text/event-stream"
|
||||
}
|
||||
}
|
||||
c.Data(resp.StatusCode, contentType, body)
|
||||
|
||||
return usage, nil
|
||||
}
|
||||
|
||||
func writeOpenAIPassthroughResponseHeaders(dst http.Header, src http.Header, filter *responseheaders.CompiledHeaderFilter) {
|
||||
if dst == nil || src == nil {
|
||||
return
|
||||
@@ -3858,10 +3916,21 @@ func (s *OpenAIGatewayService) handleNonStreamingResponse(ctx context.Context, r
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Detect SSE responses for ALL account types via Content-Type header.
|
||||
// Some OpenAI-compatible upstreams (including other sub2api instances)
|
||||
// may return SSE even when stream=false was requested.
|
||||
if isEventStreamResponse(resp.Header) {
|
||||
return s.handleSSEToJSON(resp, c, body, originalModel, mappedModel)
|
||||
}
|
||||
// For OAuth accounts, also fall back to a body-content heuristic because
|
||||
// the upstream may omit the Content-Type header while still sending SSE.
|
||||
// This heuristic is NOT applied to API-key accounts to avoid false
|
||||
// positives on JSON responses that coincidentally contain "data:" or
|
||||
// "event:" in their text content.
|
||||
if account.Type == AccountTypeOAuth {
|
||||
bodyLooksLikeSSE := bytes.Contains(body, []byte("data:")) || bytes.Contains(body, []byte("event:"))
|
||||
if isEventStreamResponse(resp.Header) || bodyLooksLikeSSE {
|
||||
return s.handleOAuthSSEToJSON(resp, c, body, originalModel, mappedModel)
|
||||
if bodyLooksLikeSSE {
|
||||
return s.handleSSEToJSON(resp, c, body, originalModel, mappedModel)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3895,7 +3964,7 @@ func isEventStreamResponse(header http.Header) bool {
|
||||
return strings.Contains(contentType, "text/event-stream")
|
||||
}
|
||||
|
||||
func (s *OpenAIGatewayService) handleOAuthSSEToJSON(resp *http.Response, c *gin.Context, body []byte, originalModel, mappedModel string) (*OpenAIUsage, error) {
|
||||
func (s *OpenAIGatewayService) handleSSEToJSON(resp *http.Response, c *gin.Context, body []byte, originalModel, mappedModel string) (*OpenAIUsage, error) {
|
||||
bodyText := string(body)
|
||||
finalResponse, ok := extractCodexFinalResponse(bodyText)
|
||||
|
||||
|
||||
@@ -1797,7 +1797,7 @@ func TestExtractCodexFinalResponse_SampleReplay(t *testing.T) {
|
||||
require.Contains(t, string(finalResp), `"input_tokens":11`)
|
||||
}
|
||||
|
||||
func TestHandleOAuthSSEToJSON_CompletedEventReturnsJSON(t *testing.T) {
|
||||
func TestHandleSSEToJSON_CompletedEventReturnsJSON(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
@@ -1814,7 +1814,7 @@ func TestHandleOAuthSSEToJSON_CompletedEventReturnsJSON(t *testing.T) {
|
||||
`data: [DONE]`,
|
||||
}, "\n"))
|
||||
|
||||
usage, err := svc.handleOAuthSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o")
|
||||
usage, err := svc.handleSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o")
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, usage)
|
||||
require.Equal(t, 7, usage.InputTokens)
|
||||
@@ -1826,7 +1826,7 @@ func TestHandleOAuthSSEToJSON_CompletedEventReturnsJSON(t *testing.T) {
|
||||
require.NotContains(t, rec.Body.String(), "data:")
|
||||
}
|
||||
|
||||
func TestHandleOAuthSSEToJSON_NoFinalResponseKeepsSSEBody(t *testing.T) {
|
||||
func TestHandleSSEToJSON_NoFinalResponseKeepsSSEBody(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
@@ -1842,7 +1842,7 @@ func TestHandleOAuthSSEToJSON_NoFinalResponseKeepsSSEBody(t *testing.T) {
|
||||
`data: [DONE]`,
|
||||
}, "\n"))
|
||||
|
||||
usage, err := svc.handleOAuthSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o")
|
||||
usage, err := svc.handleSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o")
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, usage)
|
||||
require.Equal(t, 0, usage.InputTokens)
|
||||
@@ -1850,7 +1850,7 @@ func TestHandleOAuthSSEToJSON_NoFinalResponseKeepsSSEBody(t *testing.T) {
|
||||
require.Contains(t, rec.Body.String(), `data: {"type":"response.in_progress"`)
|
||||
}
|
||||
|
||||
func TestHandleOAuthSSEToJSON_ResponseFailedReturnsProtocolError(t *testing.T) {
|
||||
func TestHandleSSEToJSON_ResponseFailedReturnsProtocolError(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
@@ -1866,7 +1866,7 @@ func TestHandleOAuthSSEToJSON_ResponseFailedReturnsProtocolError(t *testing.T) {
|
||||
`data: [DONE]`,
|
||||
}, "\n"))
|
||||
|
||||
usage, err := svc.handleOAuthSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o")
|
||||
usage, err := svc.handleSSEToJSON(resp, c, body, "gpt-4o", "gpt-4o")
|
||||
require.Nil(t, usage)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, http.StatusBadGateway, rec.Code)
|
||||
|
||||
Reference in New Issue
Block a user