fix issue #791
This commit is contained in:
@@ -119,23 +119,33 @@ func (p *StreamingProcessor) ProcessLine(line string) []byte {
|
|||||||
return result.Bytes()
|
return result.Bytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finish 结束处理,返回最终事件和用量
|
// Finish 结束处理,返回最终事件和用量。
|
||||||
|
// 若整个流未收到任何可解析的上游数据(messageStartSent == false),
|
||||||
|
// 则不补发任何结束事件,防止客户端收到没有 message_start 的残缺流。
|
||||||
func (p *StreamingProcessor) Finish() ([]byte, *ClaudeUsage) {
|
func (p *StreamingProcessor) Finish() ([]byte, *ClaudeUsage) {
|
||||||
var result bytes.Buffer
|
|
||||||
|
|
||||||
if !p.messageStopSent {
|
|
||||||
_, _ = result.Write(p.emitFinish(""))
|
|
||||||
}
|
|
||||||
|
|
||||||
usage := &ClaudeUsage{
|
usage := &ClaudeUsage{
|
||||||
InputTokens: p.inputTokens,
|
InputTokens: p.inputTokens,
|
||||||
OutputTokens: p.outputTokens,
|
OutputTokens: p.outputTokens,
|
||||||
CacheReadInputTokens: p.cacheReadTokens,
|
CacheReadInputTokens: p.cacheReadTokens,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !p.messageStartSent {
|
||||||
|
return nil, usage
|
||||||
|
}
|
||||||
|
|
||||||
|
var result bytes.Buffer
|
||||||
|
if !p.messageStopSent {
|
||||||
|
_, _ = result.Write(p.emitFinish(""))
|
||||||
|
}
|
||||||
|
|
||||||
return result.Bytes(), usage
|
return result.Bytes(), usage
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MessageStartSent 报告流中是否已发出过 message_start 事件(即是否收到过有效的上游数据)
|
||||||
|
func (p *StreamingProcessor) MessageStartSent() bool {
|
||||||
|
return p.messageStartSent
|
||||||
|
}
|
||||||
|
|
||||||
// emitMessageStart 发送 message_start 事件
|
// emitMessageStart 发送 message_start 事件
|
||||||
func (p *StreamingProcessor) emitMessageStart(v1Resp *V1InternalResponse) []byte {
|
func (p *StreamingProcessor) emitMessageStart(v1Resp *V1InternalResponse) []byte {
|
||||||
if p.messageStartSent {
|
if p.messageStartSent {
|
||||||
|
|||||||
@@ -3696,6 +3696,15 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context
|
|||||||
finalEvents, agUsage := processor.Finish()
|
finalEvents, agUsage := processor.Finish()
|
||||||
if len(finalEvents) > 0 {
|
if len(finalEvents) > 0 {
|
||||||
cw.Write(finalEvents)
|
cw.Write(finalEvents)
|
||||||
|
} else if !processor.MessageStartSent() && !cw.Disconnected() {
|
||||||
|
// 整个流未收到任何可解析的上游数据(全部 SSE 行均无法被 JSON 解析),
|
||||||
|
// 触发 failover 在同账号重试,避免向客户端发出缺少 message_start 的残缺流
|
||||||
|
logger.LegacyPrintf("service.antigravity_gateway", "[antigravity-Claude-Stream] empty stream response (no valid events parsed), triggering failover")
|
||||||
|
return nil, &UpstreamFailoverError{
|
||||||
|
StatusCode: http.StatusBadGateway,
|
||||||
|
ResponseBody: []byte(`{"error":"empty stream response from upstream"}`),
|
||||||
|
RetryableOnSameAccount: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return &antigravityStreamResult{usage: convertUsage(agUsage), firstTokenMs: firstTokenMs, clientDisconnect: cw.Disconnected()}, nil
|
return &antigravityStreamResult{usage: convertUsage(agUsage), firstTokenMs: firstTokenMs, clientDisconnect: cw.Disconnected()}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -998,6 +998,46 @@ func TestHandleClaudeStreamingResponse_ClientDisconnect(t *testing.T) {
|
|||||||
require.True(t, result.clientDisconnect)
|
require.True(t, result.clientDisconnect)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestHandleClaudeStreamingResponse_EmptyStream
|
||||||
|
// 验证:上游只返回无法解析的 SSE 行时,触发 UpstreamFailoverError 而不是向客户端发出残缺流
|
||||||
|
func TestHandleClaudeStreamingResponse_EmptyStream(t *testing.T) {
|
||||||
|
gin.SetMode(gin.TestMode)
|
||||||
|
svc := newAntigravityTestService(&config.Config{
|
||||||
|
Gateway: config.GatewayConfig{MaxLineSize: defaultMaxLineSize},
|
||||||
|
})
|
||||||
|
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
c, _ := gin.CreateTestContext(rec)
|
||||||
|
c.Request = httptest.NewRequest(http.MethodPost, "/", nil)
|
||||||
|
|
||||||
|
pr, pw := io.Pipe()
|
||||||
|
resp := &http.Response{StatusCode: http.StatusOK, Body: pr, Header: http.Header{}}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer func() { _ = pw.Close() }()
|
||||||
|
// 所有行均为无法 JSON 解析的内容,ProcessLine 全部返回 nil
|
||||||
|
fmt.Fprintln(pw, "data: not-valid-json")
|
||||||
|
fmt.Fprintln(pw, "")
|
||||||
|
fmt.Fprintln(pw, "data: also-invalid")
|
||||||
|
fmt.Fprintln(pw, "")
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, err := svc.handleClaudeStreamingResponse(c, resp, time.Now(), "claude-sonnet-4-5")
|
||||||
|
_ = pr.Close()
|
||||||
|
|
||||||
|
// 应当返回 UpstreamFailoverError 而非 nil,以便上层触发 failover
|
||||||
|
require.Error(t, err)
|
||||||
|
var failoverErr *UpstreamFailoverError
|
||||||
|
require.ErrorAs(t, err, &failoverErr)
|
||||||
|
require.True(t, failoverErr.RetryableOnSameAccount)
|
||||||
|
|
||||||
|
// 客户端不应收到任何 SSE 事件(既无 message_start 也无 message_stop)
|
||||||
|
body := rec.Body.String()
|
||||||
|
require.NotContains(t, body, "event: message_start")
|
||||||
|
require.NotContains(t, body, "event: message_stop")
|
||||||
|
require.NotContains(t, body, "event: message_delta")
|
||||||
|
}
|
||||||
|
|
||||||
// TestHandleClaudeStreamingResponse_ContextCanceled
|
// TestHandleClaudeStreamingResponse_ContextCanceled
|
||||||
// 验证:context 取消时不注入错误事件
|
// 验证:context 取消时不注入错误事件
|
||||||
func TestHandleClaudeStreamingResponse_ContextCanceled(t *testing.T) {
|
func TestHandleClaudeStreamingResponse_ContextCanceled(t *testing.T) {
|
||||||
|
|||||||
Reference in New Issue
Block a user