From 909b8a8f9c2e314a884b47291da0f78b9f159aa7 Mon Sep 17 00:00:00 2001 From: lynoot Date: Fri, 23 Jan 2026 13:49:37 +0000 Subject: [PATCH] fix(gateway): aggregate all text chunks in non-streaming Gemini responses Previously, collectGeminiSSE() only returned the last chunk received from the upstream streaming response when converting to non-streaming. This caused incomplete responses where only the final text fragment was returned to clients. For example, a request asking to "count from 1 to 10" would only return "\n" (the last chunk) instead of "1\n2\n3\n...\n10\n". This was especially problematic for JSON structured output where the opening brace "{" from the first chunk was lost, resulting in invalid JSON like: colors": ["red", "blue"]} The fix: - Collect all text parts from each SSE chunk into a slice - Merge all collected text parts into the final response - Reuse the same pattern as handleGeminiStreamToNonStreaming in antigravity_gateway_service.go Fixes: non-streaming responses returning incomplete text Fixes: structured output (JSON schema) returning invalid JSON --- .../service/gemini_messages_compat_service.go | 88 ++++++++++++++++++- 1 file changed, 86 insertions(+), 2 deletions(-) diff --git a/backend/internal/service/gemini_messages_compat_service.go b/backend/internal/service/gemini_messages_compat_service.go index 7234540f..396c4829 100644 --- a/backend/internal/service/gemini_messages_compat_service.go +++ b/backend/internal/service/gemini_messages_compat_service.go @@ -1972,6 +1972,7 @@ func collectGeminiSSE(body io.Reader, isOAuth bool) (map[string]any, *ClaudeUsag var last map[string]any var lastWithParts map[string]any + var collectedTextParts []string // Collect all text parts for aggregation usage := &ClaudeUsage{} for { @@ -1983,7 +1984,7 @@ func collectGeminiSSE(body io.Reader, isOAuth bool) (map[string]any, *ClaudeUsag switch payload { case "", "[DONE]": if payload == "[DONE]" { - return pickGeminiCollectResult(last, lastWithParts), usage, nil + return mergeCollectedTextParts(pickGeminiCollectResult(last, lastWithParts), collectedTextParts), usage, nil } default: var parsed map[string]any @@ -2002,6 +2003,12 @@ func collectGeminiSSE(body io.Reader, isOAuth bool) (map[string]any, *ClaudeUsag } if parts := extractGeminiParts(parsed); len(parts) > 0 { lastWithParts = parsed + // Collect text from each part for aggregation + for _, part := range parts { + if text, ok := part["text"].(string); ok && text != "" { + collectedTextParts = append(collectedTextParts, text) + } + } } } } @@ -2016,7 +2023,7 @@ func collectGeminiSSE(body io.Reader, isOAuth bool) (map[string]any, *ClaudeUsag } } - return pickGeminiCollectResult(last, lastWithParts), usage, nil + return mergeCollectedTextParts(pickGeminiCollectResult(last, lastWithParts), collectedTextParts), usage, nil } func pickGeminiCollectResult(last map[string]any, lastWithParts map[string]any) map[string]any { @@ -2029,6 +2036,83 @@ func pickGeminiCollectResult(last map[string]any, lastWithParts map[string]any) return map[string]any{} } +// mergeCollectedTextParts merges all collected text chunks into the final response. +// This fixes the issue where non-streaming responses only returned the last chunk +// instead of the complete aggregated text. +func mergeCollectedTextParts(response map[string]any, textParts []string) map[string]any { + if len(textParts) == 0 { + return response + } + + // Join all text parts + mergedText := strings.Join(textParts, "") + + // Deep copy response + result := make(map[string]any) + for k, v := range response { + result[k] = v + } + + // Get or create candidates + candidates, ok := result["candidates"].([]any) + if !ok || len(candidates) == 0 { + candidates = []any{map[string]any{}} + } + + // Get first candidate + candidate, ok := candidates[0].(map[string]any) + if !ok { + candidate = make(map[string]any) + candidates[0] = candidate + } + + // Get or create content + content, ok := candidate["content"].(map[string]any) + if !ok { + content = map[string]any{"role": "model"} + candidate["content"] = content + } + + // Get existing parts + existingParts, ok := content["parts"].([]any) + if !ok { + existingParts = []any{} + } + + // Find and update first text part, or create new one + newParts := make([]any, 0, len(existingParts)+1) + textUpdated := false + + for _, p := range existingParts { + pm, ok := p.(map[string]any) + if !ok { + newParts = append(newParts, p) + continue + } + if _, hasText := pm["text"]; hasText && !textUpdated { + // Replace with merged text + newPart := make(map[string]any) + for k, v := range pm { + newPart[k] = v + } + newPart["text"] = mergedText + newParts = append(newParts, newPart) + textUpdated = true + } else { + newParts = append(newParts, pm) + } + } + + if !textUpdated { + newParts = append([]any{map[string]any{"text": mergedText}}, newParts...) + } + + content["parts"] = newParts + result["candidates"] = candidates + + return result +} + type geminiNativeStreamResult struct { usage *ClaudeUsage firstTokenMs *int