From 8987e0ba67c0d1c6dfbb86cf61088ee3c2af5df5 Mon Sep 17 00:00:00 2001 From: hungryboy1025 Date: Sat, 25 Apr 2026 16:56:50 +0800 Subject: [PATCH] fix(openai): tighten responses stream account tests --- .../internal/service/account_test_service.go | 28 ++++++++++++--- .../account_test_service_openai_test.go | 25 +++++++++++++ backend/internal/service/gateway_service.go | 2 +- .../service/openai_gateway_service.go | 5 +-- .../service/openai_gateway_service_test.go | 35 +++++++++++++++++++ 5 files changed, 87 insertions(+), 8 deletions(-) diff --git a/backend/internal/service/account_test_service.go b/backend/internal/service/account_test_service.go index d78dcd79..07646474 100644 --- a/backend/internal/service/account_test_service.go +++ b/backend/internal/service/account_test_service.go @@ -1145,13 +1145,17 @@ func (s *AccountTestService) processClaudeStream(c *gin.Context, body io.Reader) // processOpenAIStream processes the SSE stream from OpenAI Responses API func (s *AccountTestService) processOpenAIStream(c *gin.Context, body io.Reader) error { reader := bufio.NewReader(body) + seenCompleted := false for { line, err := reader.ReadString('\n') if err != nil { if err == io.EOF { - s.sendEvent(c, TestEvent{Type: "test_complete", Success: true}) - return nil + if seenCompleted { + s.sendEvent(c, TestEvent{Type: "test_complete", Success: true}) + return nil + } + return s.sendErrorAndEnd(c, "Stream ended before response.completed") } return s.sendErrorAndEnd(c, fmt.Sprintf("Stream read error: %s", err.Error())) } @@ -1163,8 +1167,11 @@ func (s *AccountTestService) processOpenAIStream(c *gin.Context, body io.Reader) jsonStr := sseDataPrefix.ReplaceAllString(line, "") if jsonStr == "[DONE]" { - s.sendEvent(c, TestEvent{Type: "test_complete", Success: true}) - return nil + if seenCompleted { + s.sendEvent(c, TestEvent{Type: "test_complete", Success: true}) + return nil + } + return s.sendErrorAndEnd(c, "Stream ended before response.completed") } var data map[string]any @@ -1180,9 +1187,20 @@ func (s *AccountTestService) processOpenAIStream(c *gin.Context, body io.Reader) if delta, ok := data["delta"].(string); ok && delta != "" { s.sendEvent(c, TestEvent{Type: "content", Text: delta}) } - case "response.completed": + case "response.completed", "response.done": + seenCompleted = true s.sendEvent(c, TestEvent{Type: "test_complete", Success: true}) return nil + case "response.failed": + errorMsg := "OpenAI response failed" + if responseData, ok := data["response"].(map[string]any); ok { + if errData, ok := responseData["error"].(map[string]any); ok { + if msg, ok := errData["message"].(string); ok && msg != "" { + errorMsg = msg + } + } + } + return s.sendErrorAndEnd(c, errorMsg) case "error": errorMsg := "Unknown error" if errData, ok := data["error"].(map[string]any); ok { diff --git a/backend/internal/service/account_test_service_openai_test.go b/backend/internal/service/account_test_service_openai_test.go index 7202799d..56204be3 100644 --- a/backend/internal/service/account_test_service_openai_test.go +++ b/backend/internal/service/account_test_service_openai_test.go @@ -125,6 +125,31 @@ func TestAccountTestService_OpenAISuccessPersistsSnapshotFromHeaders(t *testing. require.Contains(t, recorder.Body.String(), "test_complete") } +func TestAccountTestService_OpenAIStreamEOFBeforeCompletedFails(t *testing.T) { + gin.SetMode(gin.TestMode) + ctx, recorder := newTestContext() + + resp := newJSONResponse(http.StatusOK, "") + resp.Body = io.NopCloser(strings.NewReader(`data: {"type":"response.output_text.delta","delta":"hi"} + +`)) + + upstream := &queuedHTTPUpstream{responses: []*http.Response{resp}} + svc := &AccountTestService{httpUpstream: upstream} + account := &Account{ + ID: 90, + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{"access_token": "test-token"}, + } + + err := svc.testOpenAIAccountConnection(ctx, account, "gpt-5.4", "", "") + require.Error(t, err) + require.Contains(t, recorder.Body.String(), "response.completed") + require.NotContains(t, recorder.Body.String(), `"success":true`) +} + func TestAccountTestService_OpenAI429PersistsSnapshotAndRateLimitState(t *testing.T) { gin.SetMode(gin.TestMode) ctx, _ := newTestContext() diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 1713e561..ffd66fc7 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -119,7 +119,7 @@ func openAIStreamEventIsTerminal(data string) bool { return true } switch gjson.Get(trimmed, "type").String() { - case "response.completed", "response.done", "response.failed": + case "response.completed", "response.done", "response.failed", "response.incomplete", "response.cancelled", "response.canceled": return true default: return false diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index 75a92f6e..50e00c01 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -4372,7 +4372,8 @@ func (s *OpenAIGatewayService) parseSSEUsageBytes(data []byte, usage *OpenAIUsag return } eventType := gjson.GetBytes(data, "type").String() - if eventType != "response.completed" && eventType != "response.done" { + if eventType != "response.completed" && eventType != "response.done" && + eventType != "response.incomplete" && eventType != "response.cancelled" && eventType != "response.canceled" { return } @@ -4519,7 +4520,7 @@ func extractOpenAISSETerminalEvent(body string) (string, []byte, bool) { } eventType := strings.TrimSpace(gjson.Get(data, "type").String()) switch eventType { - case "response.completed", "response.done", "response.failed": + case "response.completed", "response.done", "response.failed", "response.incomplete", "response.cancelled", "response.canceled": return eventType, []byte(data), true } } diff --git a/backend/internal/service/openai_gateway_service_test.go b/backend/internal/service/openai_gateway_service_test.go index 0cf2392d..154b7908 100644 --- a/backend/internal/service/openai_gateway_service_test.go +++ b/backend/internal/service/openai_gateway_service_test.go @@ -1336,6 +1336,41 @@ func TestOpenAIStreamingPassthroughResponseDoneWithoutDoneMarkerStillSucceeds(t require.Equal(t, 1, result.usage.CacheReadInputTokens) } +func TestOpenAIStreamingPassthroughResponseIncompleteWithoutDoneMarkerStillSucceeds(t *testing.T) { + gin.SetMode(gin.TestMode) + cfg := &config.Config{ + Gateway: config.GatewayConfig{ + MaxLineSize: defaultMaxLineSize, + }, + } + svc := &OpenAIGatewayService{cfg: cfg} + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/", nil) + + pr, pw := io.Pipe() + resp := &http.Response{ + StatusCode: http.StatusOK, + Body: pr, + Header: http.Header{}, + } + + go func() { + defer func() { _ = pw.Close() }() + _, _ = pw.Write([]byte("data: {\"type\":\"response.incomplete\",\"response\":{\"usage\":{\"input_tokens\":2,\"output_tokens\":3,\"input_tokens_details\":{\"cached_tokens\":1}}}}\n\n")) + }() + + result, err := svc.handleStreamingResponsePassthrough(c.Request.Context(), resp, c, &Account{ID: 1}, time.Now(), "", "") + _ = pr.Close() + require.NoError(t, err) + require.NotNil(t, result) + require.NotNil(t, result.usage) + require.Equal(t, 2, result.usage.InputTokens) + require.Equal(t, 3, result.usage.OutputTokens) + require.Equal(t, 1, result.usage.CacheReadInputTokens) +} + func TestOpenAIStreamingTooLong(t *testing.T) { gin.SetMode(gin.TestMode) cfg := &config.Config{