From 681a357e0721242a09f7d6933cefdf8b102880e5 Mon Sep 17 00:00:00 2001 From: ianshaw Date: Thu, 1 Jan 2026 19:47:26 -0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20SSE/JSON=20?= =?UTF-8?q?=E8=BD=AC=E4=B9=89=E5=92=8C=20nil=20=E5=AE=89=E5=85=A8=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 基于 Codex 审查建议修复关键安全问题。 SSE/JSON 转义修复: - handleStreamingAwareError: 使用 json.Marshal 替代字符串拼接 - sendMockWarmupStream: 使用 json.Marshal 生成 message_start 事件 - 防止错误消息中的特殊字符导致无效 JSON Nil 安全检查: - SelectAccountWithLoadAwareness: 粘性会话层添加 s.cache != nil 检查 - BindStickySession: 添加 s.cache == nil 检查 - 防止 cache 未初始化时的运行时 panic 影响: - 提升 SSE 错误处理的健壮性 - 避免客户端 JSON 解析失败 - 增强代码防御性编程 --- backend/internal/handler/gateway_handler.go | 37 +++++++++++++++++++-- backend/internal/service/gateway_service.go | 4 +-- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 7eb7007e..bbc9c181 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -586,8 +586,20 @@ func (h *GatewayHandler) handleStreamingAwareError(c *gin.Context, status int, e // Stream already started, send error as SSE event then close flusher, ok := c.Writer.(http.Flusher) if ok { - // Send error event in SSE format - errorEvent := fmt.Sprintf(`data: {"type": "error", "error": {"type": "%s", "message": "%s"}}`+"\n\n", errType, message) + // Send error event in SSE format with proper JSON marshaling + errorData := map[string]any{ + "type": "error", + "error": map[string]string{ + "type": errType, + "message": message, + }, + } + jsonBytes, err := json.Marshal(errorData) + if err != nil { + _ = c.Error(err) + return + } + errorEvent := fmt.Sprintf("data: %s\n\n", string(jsonBytes)) if _, err := fmt.Fprint(c.Writer, errorEvent); err != nil { _ = c.Error(err) } @@ -737,8 +749,27 @@ func sendMockWarmupStream(c *gin.Context, model string) { c.Header("Connection", "keep-alive") c.Header("X-Accel-Buffering", "no") + // Build message_start event with proper JSON marshaling + messageStart := map[string]any{ + "type": "message_start", + "message": map[string]any{ + "id": "msg_mock_warmup", + "type": "message", + "role": "assistant", + "model": model, + "content": []any{}, + "stop_reason": nil, + "stop_sequence": nil, + "usage": map[string]int{ + "input_tokens": 10, + "output_tokens": 0, + }, + }, + } + messageStartJSON, _ := json.Marshal(messageStart) + events := []string{ - `event: message_start` + "\n" + `data: {"message":{"content":[],"id":"msg_mock_warmup","model":"` + model + `","role":"assistant","stop_reason":null,"stop_sequence":null,"type":"message","usage":{"input_tokens":10,"output_tokens":0}},"type":"message_start"}`, + `event: message_start` + "\n" + `data: ` + string(messageStartJSON), `event: content_block_start` + "\n" + `data: {"content_block":{"text":"","type":"text"},"index":0,"type":"content_block_start"}`, `event: content_block_delta` + "\n" + `data: {"delta":{"text":"New","type":"text_delta"},"index":0,"type":"content_block_delta"}`, `event: content_block_delta` + "\n" + `data: {"delta":{"text":" Conversation","type":"text_delta"},"index":0,"type":"content_block_delta"}`, diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 3932c35c..bd6f59f7 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -204,7 +204,7 @@ func (s *GatewayService) GenerateSessionHash(parsed *ParsedRequest) string { // BindStickySession sets session -> account binding with standard TTL. func (s *GatewayService) BindStickySession(ctx context.Context, sessionHash string, accountID int64) error { - if sessionHash == "" || accountID <= 0 { + if sessionHash == "" || accountID <= 0 || s.cache == nil { return nil } return s.cache.SetSessionAccountID(ctx, sessionHash, accountID, stickySessionTTL) @@ -429,7 +429,7 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro } // ============ Layer 1: 粘性会话优先 ============ - if sessionHash != "" { + if sessionHash != "" && s.cache != nil { accountID, err := s.cache.GetSessionAccountID(ctx, sessionHash) if err == nil && accountID > 0 && !isExcluded(accountID) { account, err := s.accountRepo.GetByID(ctx, accountID)