diff --git a/backend/internal/pkg/antigravity/stream_transformer.go b/backend/internal/pkg/antigravity/stream_transformer.go index 677435ad..deed5f92 100644 --- a/backend/internal/pkg/antigravity/stream_transformer.go +++ b/backend/internal/pkg/antigravity/stream_transformer.go @@ -119,23 +119,33 @@ func (p *StreamingProcessor) ProcessLine(line string) []byte { return result.Bytes() } -// Finish 结束处理,返回最终事件和用量 +// Finish 结束处理,返回最终事件和用量。 +// 若整个流未收到任何可解析的上游数据(messageStartSent == false), +// 则不补发任何结束事件,防止客户端收到没有 message_start 的残缺流。 func (p *StreamingProcessor) Finish() ([]byte, *ClaudeUsage) { - var result bytes.Buffer - - if !p.messageStopSent { - _, _ = result.Write(p.emitFinish("")) - } - usage := &ClaudeUsage{ InputTokens: p.inputTokens, OutputTokens: p.outputTokens, CacheReadInputTokens: p.cacheReadTokens, } + if !p.messageStartSent { + return nil, usage + } + + var result bytes.Buffer + if !p.messageStopSent { + _, _ = result.Write(p.emitFinish("")) + } + return result.Bytes(), usage } +// MessageStartSent 报告流中是否已发出过 message_start 事件(即是否收到过有效的上游数据) +func (p *StreamingProcessor) MessageStartSent() bool { + return p.messageStartSent +} + // emitMessageStart 发送 message_start 事件 func (p *StreamingProcessor) emitMessageStart(v1Resp *V1InternalResponse) []byte { if p.messageStartSent { diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index 96ff3354..fca6462b 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -3696,6 +3696,15 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context finalEvents, agUsage := processor.Finish() if len(finalEvents) > 0 { cw.Write(finalEvents) + } else if !processor.MessageStartSent() && !cw.Disconnected() { + // 整个流未收到任何可解析的上游数据(全部 SSE 行均无法被 JSON 解析), + // 触发 failover 在同账号重试,避免向客户端发出缺少 message_start 的残缺流 + logger.LegacyPrintf("service.antigravity_gateway", "[antigravity-Claude-Stream] empty stream response (no valid events parsed), triggering failover") + return nil, &UpstreamFailoverError{ + StatusCode: http.StatusBadGateway, + ResponseBody: []byte(`{"error":"empty stream response from upstream"}`), + RetryableOnSameAccount: true, + } } return &antigravityStreamResult{usage: convertUsage(agUsage), firstTokenMs: firstTokenMs, clientDisconnect: cw.Disconnected()}, nil } diff --git a/backend/internal/service/antigravity_gateway_service_test.go b/backend/internal/service/antigravity_gateway_service_test.go index 84b65adc..60963838 100644 --- a/backend/internal/service/antigravity_gateway_service_test.go +++ b/backend/internal/service/antigravity_gateway_service_test.go @@ -998,6 +998,46 @@ func TestHandleClaudeStreamingResponse_ClientDisconnect(t *testing.T) { require.True(t, result.clientDisconnect) } +// TestHandleClaudeStreamingResponse_EmptyStream +// 验证:上游只返回无法解析的 SSE 行时,触发 UpstreamFailoverError 而不是向客户端发出残缺流 +func TestHandleClaudeStreamingResponse_EmptyStream(t *testing.T) { + gin.SetMode(gin.TestMode) + svc := newAntigravityTestService(&config.Config{ + Gateway: config.GatewayConfig{MaxLineSize: defaultMaxLineSize}, + }) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/", nil) + + pr, pw := io.Pipe() + resp := &http.Response{StatusCode: http.StatusOK, Body: pr, Header: http.Header{}} + + go func() { + defer func() { _ = pw.Close() }() + // 所有行均为无法 JSON 解析的内容,ProcessLine 全部返回 nil + fmt.Fprintln(pw, "data: not-valid-json") + fmt.Fprintln(pw, "") + fmt.Fprintln(pw, "data: also-invalid") + fmt.Fprintln(pw, "") + }() + + _, err := svc.handleClaudeStreamingResponse(c, resp, time.Now(), "claude-sonnet-4-5") + _ = pr.Close() + + // 应当返回 UpstreamFailoverError 而非 nil,以便上层触发 failover + require.Error(t, err) + var failoverErr *UpstreamFailoverError + require.ErrorAs(t, err, &failoverErr) + require.True(t, failoverErr.RetryableOnSameAccount) + + // 客户端不应收到任何 SSE 事件(既无 message_start 也无 message_stop) + body := rec.Body.String() + require.NotContains(t, body, "event: message_start") + require.NotContains(t, body, "event: message_stop") + require.NotContains(t, body, "event: message_delta") +} + // TestHandleClaudeStreamingResponse_ContextCanceled // 验证:context 取消时不注入错误事件 func TestHandleClaudeStreamingResponse_ContextCanceled(t *testing.T) {