diff --git a/backend/internal/service/account_test_service.go b/backend/internal/service/account_test_service.go index 782aa95c..48e600c0 100644 --- a/backend/internal/service/account_test_service.go +++ b/backend/internal/service/account_test_service.go @@ -10,6 +10,7 @@ import ( "io" "log" "net/http" + "regexp" "strconv" "strings" "time" @@ -21,6 +22,10 @@ import ( "github.com/google/uuid" ) +// sseDataPrefix matches SSE data lines with optional whitespace after colon. +// Some upstream APIs return non-standard "data:" without space (should be "data: "). +var sseDataPrefix = regexp.MustCompile(`^data:\s*`) + const ( testClaudeAPIURL = "https://api.anthropic.com/v1/messages" testOpenAIAPIURL = "https://api.openai.com/v1/responses" @@ -412,11 +417,11 @@ func (s *AccountTestService) processClaudeStream(c *gin.Context, body io.Reader) } line = strings.TrimSpace(line) - if line == "" || !strings.HasPrefix(line, "data: ") { + if line == "" || !sseDataPrefix.MatchString(line) { continue } - jsonStr := strings.TrimPrefix(line, "data: ") + jsonStr := sseDataPrefix.ReplaceAllString(line, "") if jsonStr == "[DONE]" { s.sendEvent(c, TestEvent{Type: "test_complete", Success: true}) return nil @@ -466,11 +471,11 @@ func (s *AccountTestService) processOpenAIStream(c *gin.Context, body io.Reader) } line = strings.TrimSpace(line) - if line == "" || !strings.HasPrefix(line, "data: ") { + if line == "" || !sseDataPrefix.MatchString(line) { continue } - jsonStr := strings.TrimPrefix(line, "data: ") + jsonStr := sseDataPrefix.ReplaceAllString(line, "") if jsonStr == "[DONE]" { s.sendEvent(c, TestEvent{Type: "test_complete", Success: true}) return nil diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 5b5ad7c8..c80931a3 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -31,6 +31,10 @@ const ( stickySessionTTL = time.Hour // 粘性会话TTL ) +// sseDataRe matches SSE data lines with optional whitespace after colon. +// Some upstream APIs return non-standard "data:" without space (should be "data: "). +var sseDataRe = regexp.MustCompile(`^data:\s*`) + // allowedHeaders 白名单headers(参考CRS项目) var allowedHeaders = map[string]bool{ "accept": true, @@ -749,26 +753,33 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http for scanner.Scan() { line := scanner.Text() - // 如果有模型映射,替换响应中的model字段 - if needModelReplace && strings.HasPrefix(line, "data: ") { - line = s.replaceModelInSSELine(line, mappedModel, originalModel) - } + // Extract data from SSE line (supports both "data: " and "data:" formats) + if sseDataRe.MatchString(line) { + data := sseDataRe.ReplaceAllString(line, "") - // 转发行 - if _, err := fmt.Fprintf(w, "%s\n", line); err != nil { - return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, err - } - flusher.Flush() + // 如果有模型映射,替换响应中的model字段 + if needModelReplace { + line = s.replaceModelInSSELine(line, mappedModel, originalModel) + } + + // 转发行 + if _, err := fmt.Fprintf(w, "%s\n", line); err != nil { + return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, err + } + flusher.Flush() - // 解析usage数据 - if strings.HasPrefix(line, "data: ") { - data := line[6:] // 记录首字时间:第一个有效的 content_block_delta 或 message_start if firstTokenMs == nil && data != "" && data != "[DONE]" { ms := int(time.Since(startTime).Milliseconds()) firstTokenMs = &ms } s.parseSSEUsage(data, usage) + } else { + // 非 data 行直接转发 + if _, err := fmt.Fprintf(w, "%s\n", line); err != nil { + return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, err + } + flusher.Flush() } } @@ -781,7 +792,10 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http // replaceModelInSSELine 替换SSE数据行中的model字段 func (s *GatewayService) replaceModelInSSELine(line, fromModel, toModel string) string { - data := line[6:] // 去掉 "data: " 前缀 + if !sseDataRe.MatchString(line) { + return line + } + data := sseDataRe.ReplaceAllString(line, "") if data == "" || data == "[DONE]" { return line } diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index ca3c2c36..164b6a59 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "net/http" + "regexp" "strconv" "strings" "time" @@ -28,6 +29,10 @@ const ( openaiStickySessionTTL = time.Hour // 粘性会话TTL ) +// openaiSSEDataRe matches SSE data lines with optional whitespace after colon. +// Some upstream APIs return non-standard "data:" without space (should be "data: "). +var openaiSSEDataRe = regexp.MustCompile(`^data:\s*`) + // OpenAI allowed headers whitelist (for non-OAuth accounts) var openaiAllowedHeaders = map[string]bool{ "accept-language": true, @@ -464,26 +469,33 @@ func (s *OpenAIGatewayService) handleStreamingResponse(ctx context.Context, resp for scanner.Scan() { line := scanner.Text() - // Replace model in response if needed - if needModelReplace && strings.HasPrefix(line, "data: ") { - line = s.replaceModelInSSELine(line, mappedModel, originalModel) - } + // Extract data from SSE line (supports both "data: " and "data:" formats) + if openaiSSEDataRe.MatchString(line) { + data := openaiSSEDataRe.ReplaceAllString(line, "") - // Forward line - if _, err := fmt.Fprintf(w, "%s\n", line); err != nil { - return &openaiStreamingResult{usage: usage, firstTokenMs: firstTokenMs}, err - } - flusher.Flush() + // Replace model in response if needed + if needModelReplace { + line = s.replaceModelInSSELine(line, mappedModel, originalModel) + } + + // Forward line + if _, err := fmt.Fprintf(w, "%s\n", line); err != nil { + return &openaiStreamingResult{usage: usage, firstTokenMs: firstTokenMs}, err + } + flusher.Flush() - // Parse usage data - if strings.HasPrefix(line, "data: ") { - data := line[6:] // Record first token time if firstTokenMs == nil && data != "" && data != "[DONE]" { ms := int(time.Since(startTime).Milliseconds()) firstTokenMs = &ms } s.parseSSEUsage(data, usage) + } else { + // Forward non-data lines as-is + if _, err := fmt.Fprintf(w, "%s\n", line); err != nil { + return &openaiStreamingResult{usage: usage, firstTokenMs: firstTokenMs}, err + } + flusher.Flush() } } @@ -495,7 +507,10 @@ func (s *OpenAIGatewayService) handleStreamingResponse(ctx context.Context, resp } func (s *OpenAIGatewayService) replaceModelInSSELine(line, fromModel, toModel string) string { - data := line[6:] + if !openaiSSEDataRe.MatchString(line) { + return line + } + data := openaiSSEDataRe.ReplaceAllString(line, "") if data == "" || data == "[DONE]" { return line }