diff --git a/backend/internal/pkg/apicompat/chatcompletions_responses_test.go b/backend/internal/pkg/apicompat/chatcompletions_responses_test.go index f54a4a02..903d5b31 100644 --- a/backend/internal/pkg/apicompat/chatcompletions_responses_test.go +++ b/backend/internal/pkg/apicompat/chatcompletions_responses_test.go @@ -876,3 +876,182 @@ func TestChatCompletionsStreamRoundTrip(t *testing.T) { assert.Equal(t, "resp_rt", c.ID) } } + +// --------------------------------------------------------------------------- +// BufferedResponseAccumulator tests +// --------------------------------------------------------------------------- + +func TestBufferedResponseAccumulator_TextOnly(t *testing.T) { + acc := NewBufferedResponseAccumulator() + + acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.output_text.delta", Delta: "Hello"}) + acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.output_text.delta", Delta: ", world!"}) + + assert.True(t, acc.HasContent()) + + output := acc.BuildOutput() + require.Len(t, output, 1) + assert.Equal(t, "message", output[0].Type) + assert.Equal(t, "assistant", output[0].Role) + require.Len(t, output[0].Content, 1) + assert.Equal(t, "output_text", output[0].Content[0].Type) + assert.Equal(t, "Hello, world!", output[0].Content[0].Text) +} + +func TestBufferedResponseAccumulator_ToolCalls(t *testing.T) { + acc := NewBufferedResponseAccumulator() + + // Add function call at output_index=1 + acc.ProcessEvent(&ResponsesStreamEvent{ + Type: "response.output_item.added", + OutputIndex: 1, + Item: &ResponsesOutput{ + Type: "function_call", + CallID: "call_abc", + Name: "get_weather", + }, + }) + acc.ProcessEvent(&ResponsesStreamEvent{ + Type: "response.function_call_arguments.delta", + OutputIndex: 1, + Delta: `{"city":`, + }) + acc.ProcessEvent(&ResponsesStreamEvent{ + Type: "response.function_call_arguments.delta", + OutputIndex: 1, + Delta: `"NYC"}`, + }) + + assert.True(t, acc.HasContent()) + + output := acc.BuildOutput() + require.Len(t, output, 1) + assert.Equal(t, "function_call", output[0].Type) + assert.Equal(t, "call_abc", output[0].CallID) + assert.Equal(t, "get_weather", output[0].Name) + assert.Equal(t, `{"city":"NYC"}`, output[0].Arguments) +} + +func TestBufferedResponseAccumulator_Reasoning(t *testing.T) { + acc := NewBufferedResponseAccumulator() + + acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.reasoning_summary_text.delta", Delta: "Step 1: "}) + acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.reasoning_summary_text.delta", Delta: "think about it"}) + + assert.True(t, acc.HasContent()) + + output := acc.BuildOutput() + require.Len(t, output, 1) + assert.Equal(t, "reasoning", output[0].Type) + require.Len(t, output[0].Summary, 1) + assert.Equal(t, "summary_text", output[0].Summary[0].Type) + assert.Equal(t, "Step 1: think about it", output[0].Summary[0].Text) +} + +func TestBufferedResponseAccumulator_Mixed(t *testing.T) { + acc := NewBufferedResponseAccumulator() + + // Reasoning first + acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.reasoning_summary_text.delta", Delta: "I thought about it."}) + + // Then text + acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.output_text.delta", Delta: "The answer is 42."}) + + // Then a tool call + acc.ProcessEvent(&ResponsesStreamEvent{ + Type: "response.output_item.added", + OutputIndex: 2, + Item: &ResponsesOutput{ + Type: "function_call", + CallID: "call_1", + Name: "verify", + }, + }) + acc.ProcessEvent(&ResponsesStreamEvent{ + Type: "response.function_call_arguments.delta", + OutputIndex: 2, + Delta: `{}`, + }) + + assert.True(t, acc.HasContent()) + + output := acc.BuildOutput() + // Order: reasoning → message → function_calls + require.Len(t, output, 3) + assert.Equal(t, "reasoning", output[0].Type) + assert.Equal(t, "message", output[1].Type) + assert.Equal(t, "function_call", output[2].Type) + assert.Equal(t, "The answer is 42.", output[1].Content[0].Text) + assert.Equal(t, "verify", output[2].Name) +} + +func TestBufferedResponseAccumulator_SupplementEmptyOutput(t *testing.T) { + acc := NewBufferedResponseAccumulator() + acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.output_text.delta", Delta: "Hello"}) + + resp := &ResponsesResponse{ + ID: "resp_1", + Status: "completed", + Output: nil, // empty output + Usage: &ResponsesUsage{InputTokens: 10, OutputTokens: 5}, + } + + acc.SupplementResponseOutput(resp) + + require.Len(t, resp.Output, 1) + assert.Equal(t, "message", resp.Output[0].Type) + assert.Equal(t, "Hello", resp.Output[0].Content[0].Text) + // Usage should be untouched + assert.Equal(t, 10, resp.Usage.InputTokens) +} + +func TestBufferedResponseAccumulator_NoSupplementWhenOutputExists(t *testing.T) { + acc := NewBufferedResponseAccumulator() + acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.output_text.delta", Delta: "from deltas"}) + + resp := &ResponsesResponse{ + ID: "resp_2", + Status: "completed", + Output: []ResponsesOutput{ + { + Type: "message", + Content: []ResponsesContentPart{ + {Type: "output_text", Text: "from terminal event"}, + }, + }, + }, + } + + acc.SupplementResponseOutput(resp) + + // Output should NOT be overwritten + require.Len(t, resp.Output, 1) + assert.Equal(t, "from terminal event", resp.Output[0].Content[0].Text) +} + +func TestBufferedResponseAccumulator_EmptyDeltas(t *testing.T) { + acc := NewBufferedResponseAccumulator() + + // Process events with empty delta — should not accumulate + acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.output_text.delta", Delta: ""}) + acc.ProcessEvent(&ResponsesStreamEvent{Type: "response.created"}) + + assert.False(t, acc.HasContent()) + + resp := &ResponsesResponse{ID: "resp_3", Status: "completed"} + acc.SupplementResponseOutput(resp) + assert.Nil(t, resp.Output) +} + +func TestBufferedResponseAccumulator_IgnoresNonFunctionCallItems(t *testing.T) { + acc := NewBufferedResponseAccumulator() + + // output_item.added with type "message" should be ignored + acc.ProcessEvent(&ResponsesStreamEvent{ + Type: "response.output_item.added", + OutputIndex: 0, + Item: &ResponsesOutput{Type: "message"}, + }) + + assert.False(t, acc.HasContent()) +} diff --git a/backend/internal/pkg/apicompat/responses_to_chatcompletions.go b/backend/internal/pkg/apicompat/responses_to_chatcompletions.go index 688a68eb..61b3bf9c 100644 --- a/backend/internal/pkg/apicompat/responses_to_chatcompletions.go +++ b/backend/internal/pkg/apicompat/responses_to_chatcompletions.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "strings" "time" ) @@ -372,3 +373,119 @@ func generateChatCmplID() string { _, _ = rand.Read(b) return "chatcmpl-" + hex.EncodeToString(b) } + +// --------------------------------------------------------------------------- +// BufferedResponseAccumulator: accumulates SSE delta events for non-streaming +// paths where the terminal event may have empty output. +// --------------------------------------------------------------------------- + +type bufferedFuncCall struct { + CallID string + Name string + Args strings.Builder +} + +// BufferedResponseAccumulator collects content from Responses SSE delta events +// so that non-streaming handlers can reconstruct output when the terminal event +// (response.completed / response.done) carries an empty output array. +type BufferedResponseAccumulator struct { + text strings.Builder + reasoning strings.Builder + funcCalls []bufferedFuncCall + outputIndexToFuncIdx map[int]int +} + +// NewBufferedResponseAccumulator returns an initialised accumulator. +func NewBufferedResponseAccumulator() *BufferedResponseAccumulator { + return &BufferedResponseAccumulator{ + outputIndexToFuncIdx: make(map[int]int), + } +} + +// ProcessEvent inspects a single Responses SSE event and accumulates any +// content it carries. Only delta events that contribute to the final output +// are handled; all other event types are silently ignored. +func (a *BufferedResponseAccumulator) ProcessEvent(event *ResponsesStreamEvent) { + switch event.Type { + case "response.output_text.delta": + if event.Delta != "" { + _, _ = a.text.WriteString(event.Delta) + } + case "response.output_item.added": + if event.Item != nil && event.Item.Type == "function_call" { + idx := len(a.funcCalls) + a.outputIndexToFuncIdx[event.OutputIndex] = idx + a.funcCalls = append(a.funcCalls, bufferedFuncCall{ + CallID: event.Item.CallID, + Name: event.Item.Name, + }) + } + case "response.function_call_arguments.delta": + if event.Delta != "" { + if idx, ok := a.outputIndexToFuncIdx[event.OutputIndex]; ok { + _, _ = a.funcCalls[idx].Args.WriteString(event.Delta) + } + } + case "response.reasoning_summary_text.delta": + if event.Delta != "" { + _, _ = a.reasoning.WriteString(event.Delta) + } + } +} + +// HasContent reports whether any content has been accumulated. +func (a *BufferedResponseAccumulator) HasContent() bool { + return a.text.Len() > 0 || len(a.funcCalls) > 0 || a.reasoning.Len() > 0 +} + +// BuildOutput constructs a []ResponsesOutput from the accumulated delta +// content. The order matches what ResponsesToChatCompletions expects: +// reasoning → message → function_calls. +func (a *BufferedResponseAccumulator) BuildOutput() []ResponsesOutput { + var out []ResponsesOutput + + if a.reasoning.Len() > 0 { + out = append(out, ResponsesOutput{ + Type: "reasoning", + Summary: []ResponsesSummary{{ + Type: "summary_text", + Text: a.reasoning.String(), + }}, + }) + } + + if a.text.Len() > 0 { + out = append(out, ResponsesOutput{ + Type: "message", + Role: "assistant", + Content: []ResponsesContentPart{{ + Type: "output_text", + Text: a.text.String(), + }}, + }) + } + + for i := range a.funcCalls { + out = append(out, ResponsesOutput{ + Type: "function_call", + CallID: a.funcCalls[i].CallID, + Name: a.funcCalls[i].Name, + Arguments: a.funcCalls[i].Args.String(), + }) + } + + return out +} + +// SupplementResponseOutput fills resp.Output from accumulated delta content +// when the terminal event delivered an empty output array. If resp.Output is +// already populated, this is a no-op (preserves backward compatibility). +func (a *BufferedResponseAccumulator) SupplementResponseOutput(resp *ResponsesResponse) { + if resp == nil || len(resp.Output) > 0 { + return + } + if !a.HasContent() { + return + } + resp.Output = a.BuildOutput() +} diff --git a/backend/internal/service/openai_gateway_chat_completions.go b/backend/internal/service/openai_gateway_chat_completions.go index be076cc0..4f24749b 100644 --- a/backend/internal/service/openai_gateway_chat_completions.go +++ b/backend/internal/service/openai_gateway_chat_completions.go @@ -244,6 +244,7 @@ func (s *OpenAIGatewayService) handleChatBufferedStreamingResponse( var finalResponse *apicompat.ResponsesResponse var usage OpenAIUsage + acc := apicompat.NewBufferedResponseAccumulator() for scanner.Scan() { line := scanner.Text() @@ -261,7 +262,11 @@ func (s *OpenAIGatewayService) handleChatBufferedStreamingResponse( continue } - if (event.Type == "response.completed" || event.Type == "response.incomplete" || event.Type == "response.failed") && + // Accumulate delta content for fallback when terminal output is empty. + acc.ProcessEvent(&event) + + if (event.Type == "response.completed" || event.Type == "response.done" || + event.Type == "response.incomplete" || event.Type == "response.failed") && event.Response != nil { finalResponse = event.Response if event.Response.Usage != nil { @@ -290,6 +295,10 @@ func (s *OpenAIGatewayService) handleChatBufferedStreamingResponse( return nil, fmt.Errorf("upstream stream ended without terminal event") } + // When the terminal event has an empty output array, reconstruct from + // accumulated delta events so the client receives the full content. + acc.SupplementResponseOutput(finalResponse) + chatResp := apicompat.ResponsesToChatCompletions(finalResponse, originalModel) if s.responseHeaderFilter != nil { diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index f03f44d5..a0e1d239 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -21,6 +21,7 @@ import ( "time" "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/Wei-Shaw/sub2api/internal/pkg/apicompat" "github.com/Wei-Shaw/sub2api/internal/pkg/logger" "github.com/Wei-Shaw/sub2api/internal/pkg/openai" "github.com/Wei-Shaw/sub2api/internal/util/responseheaders" @@ -3901,6 +3902,16 @@ func (s *OpenAIGatewayService) handleOAuthSSEToJSON(resp *http.Response, c *gin. if parsedUsage, parsed := extractOpenAIUsageFromJSONBytes(finalResponse); parsed { *usage = parsedUsage } + // When the terminal event has an empty output array, reconstruct + // output from accumulated delta events so the client gets full content. + // gjson Array() returns empty slice for null, missing, or empty arrays. + if len(gjson.GetBytes(finalResponse, "output").Array()) == 0 { + if outputJSON, reconstructed := reconstructResponseOutputFromSSE(bodyText); reconstructed { + if patched, err := sjson.SetRawBytes(finalResponse, "output", outputJSON); err == nil { + finalResponse = patched + } + } + } body = finalResponse if originalModel != mappedModel { body = s.replaceModelInResponseBody(body, mappedModel, originalModel) @@ -4002,6 +4013,34 @@ func extractCodexFinalResponse(body string) ([]byte, bool) { return nil, false } +// reconstructResponseOutputFromSSE scans raw SSE body text for delta events and +// returns a JSON-encoded output array reconstructed from accumulated deltas. +// Returns (nil, false) if no content was found in deltas. +func reconstructResponseOutputFromSSE(bodyText string) ([]byte, bool) { + acc := apicompat.NewBufferedResponseAccumulator() + lines := strings.Split(bodyText, "\n") + for _, line := range lines { + data, ok := extractOpenAISSEDataLine(line) + if !ok || data == "" || data == "[DONE]" { + continue + } + var event apicompat.ResponsesStreamEvent + if err := json.Unmarshal([]byte(data), &event); err != nil { + continue + } + acc.ProcessEvent(&event) + } + if !acc.HasContent() { + return nil, false + } + output := acc.BuildOutput() + outputJSON, err := json.Marshal(output) + if err != nil { + return nil, false + } + return outputJSON, true +} + func (s *OpenAIGatewayService) parseSSEUsageFromBody(body string) *OpenAIUsage { usage := &OpenAIUsage{} lines := strings.Split(body, "\n")