fix(openai): emit OpenAI-compatible SSE error events
This commit is contained in:
@@ -1067,7 +1067,9 @@ func (s *OpenAIGatewayService) handleStreamingResponse(ctx context.Context, resp
|
|||||||
// 记录上次收到上游数据的时间,用于控制 keepalive 发送频率
|
// 记录上次收到上游数据的时间,用于控制 keepalive 发送频率
|
||||||
lastDataAt := time.Now()
|
lastDataAt := time.Now()
|
||||||
|
|
||||||
// 仅发送一次错误事件,避免多次写入导致协议混乱(写失败时尽力通知客户端)
|
// 仅发送一次错误事件,避免多次写入导致协议混乱。
|
||||||
|
// 注意:OpenAI `/v1/responses` streaming 事件必须符合 OpenAI Responses schema;
|
||||||
|
// 否则下游 SDK(例如 OpenCode)会因为类型校验失败而报错。
|
||||||
errorEventSent := false
|
errorEventSent := false
|
||||||
clientDisconnected := false // 客户端断开后继续 drain 上游以收集 usage
|
clientDisconnected := false // 客户端断开后继续 drain 上游以收集 usage
|
||||||
sendErrorEvent := func(reason string) {
|
sendErrorEvent := func(reason string) {
|
||||||
@@ -1075,8 +1077,19 @@ func (s *OpenAIGatewayService) handleStreamingResponse(ctx context.Context, resp
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
errorEventSent = true
|
errorEventSent = true
|
||||||
_, _ = fmt.Fprintf(w, "event: error\ndata: {\"error\":\"%s\"}\n\n", reason)
|
payload := map[string]any{
|
||||||
flusher.Flush()
|
"type": "error",
|
||||||
|
"sequence_number": 0,
|
||||||
|
"error": map[string]any{
|
||||||
|
"type": "upstream_error",
|
||||||
|
"message": reason,
|
||||||
|
"code": reason,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if b, err := json.Marshal(payload); err == nil {
|
||||||
|
_, _ = fmt.Fprintf(w, "data: %s\n\n", b)
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
needModelReplace := originalModel != mappedModel
|
needModelReplace := originalModel != mappedModel
|
||||||
|
|||||||
@@ -188,8 +188,8 @@ func TestOpenAIStreamingTimeout(t *testing.T) {
|
|||||||
if err == nil || !strings.Contains(err.Error(), "stream data interval timeout") {
|
if err == nil || !strings.Contains(err.Error(), "stream data interval timeout") {
|
||||||
t.Fatalf("expected stream timeout error, got %v", err)
|
t.Fatalf("expected stream timeout error, got %v", err)
|
||||||
}
|
}
|
||||||
if !strings.Contains(rec.Body.String(), "stream_timeout") {
|
if !strings.Contains(rec.Body.String(), "\"type\":\"error\"") || !strings.Contains(rec.Body.String(), "stream_timeout") {
|
||||||
t.Fatalf("expected stream_timeout SSE error, got %q", rec.Body.String())
|
t.Fatalf("expected OpenAI-compatible error SSE event, got %q", rec.Body.String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -305,8 +305,8 @@ func TestOpenAIStreamingTooLong(t *testing.T) {
|
|||||||
if !errors.Is(err, bufio.ErrTooLong) {
|
if !errors.Is(err, bufio.ErrTooLong) {
|
||||||
t.Fatalf("expected ErrTooLong, got %v", err)
|
t.Fatalf("expected ErrTooLong, got %v", err)
|
||||||
}
|
}
|
||||||
if !strings.Contains(rec.Body.String(), "response_too_large") {
|
if !strings.Contains(rec.Body.String(), "\"type\":\"error\"") || !strings.Contains(rec.Body.String(), "response_too_large") {
|
||||||
t.Fatalf("expected response_too_large SSE error, got %q", rec.Body.String())
|
t.Fatalf("expected OpenAI-compatible error SSE event, got %q", rec.Body.String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user