From 26298c4a5fe58c43f908fb0262c05ef38422da6b Mon Sep 17 00:00:00 2001 From: cyhhao Date: Mon, 19 Jan 2026 13:53:39 +0800 Subject: [PATCH] fix(openai): emit OpenAI-compatible SSE error events --- .../service/openai_gateway_service.go | 19 ++++++++++++++++--- .../service/openai_gateway_service_test.go | 8 ++++---- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index 87ad37a6..66ac1601 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -1067,7 +1067,9 @@ func (s *OpenAIGatewayService) handleStreamingResponse(ctx context.Context, resp // 记录上次收到上游数据的时间,用于控制 keepalive 发送频率 lastDataAt := time.Now() - // 仅发送一次错误事件,避免多次写入导致协议混乱(写失败时尽力通知客户端) + // 仅发送一次错误事件,避免多次写入导致协议混乱。 + // 注意:OpenAI `/v1/responses` streaming 事件必须符合 OpenAI Responses schema; + // 否则下游 SDK(例如 OpenCode)会因为类型校验失败而报错。 errorEventSent := false clientDisconnected := false // 客户端断开后继续 drain 上游以收集 usage sendErrorEvent := func(reason string) { @@ -1075,8 +1077,19 @@ func (s *OpenAIGatewayService) handleStreamingResponse(ctx context.Context, resp return } errorEventSent = true - _, _ = fmt.Fprintf(w, "event: error\ndata: {\"error\":\"%s\"}\n\n", reason) - flusher.Flush() + payload := map[string]any{ + "type": "error", + "sequence_number": 0, + "error": map[string]any{ + "type": "upstream_error", + "message": reason, + "code": reason, + }, + } + if b, err := json.Marshal(payload); err == nil { + _, _ = fmt.Fprintf(w, "data: %s\n\n", b) + flusher.Flush() + } } needModelReplace := originalModel != mappedModel diff --git a/backend/internal/service/openai_gateway_service_test.go b/backend/internal/service/openai_gateway_service_test.go index 3ec37544..57b73245 100644 --- a/backend/internal/service/openai_gateway_service_test.go +++ b/backend/internal/service/openai_gateway_service_test.go @@ -188,8 +188,8 @@ func TestOpenAIStreamingTimeout(t *testing.T) { if err == nil || !strings.Contains(err.Error(), "stream data interval timeout") { t.Fatalf("expected stream timeout error, got %v", err) } - if !strings.Contains(rec.Body.String(), "stream_timeout") { - t.Fatalf("expected stream_timeout SSE error, got %q", rec.Body.String()) + if !strings.Contains(rec.Body.String(), "\"type\":\"error\"") || !strings.Contains(rec.Body.String(), "stream_timeout") { + t.Fatalf("expected OpenAI-compatible error SSE event, got %q", rec.Body.String()) } } @@ -305,8 +305,8 @@ func TestOpenAIStreamingTooLong(t *testing.T) { if !errors.Is(err, bufio.ErrTooLong) { t.Fatalf("expected ErrTooLong, got %v", err) } - if !strings.Contains(rec.Body.String(), "response_too_large") { - t.Fatalf("expected response_too_large SSE error, got %q", rec.Body.String()) + if !strings.Contains(rec.Body.String(), "\"type\":\"error\"") || !strings.Contains(rec.Body.String(), "response_too_large") { + t.Fatalf("expected OpenAI-compatible error SSE event, got %q", rec.Body.String()) } }