Merge PR #42: fix(sse): 修复非标准 SSE 格式解析问题

This commit is contained in:
shaw
2025-12-26 21:31:34 +08:00
3 changed files with 64 additions and 30 deletions

View File

@@ -10,6 +10,7 @@ import (
"io" "io"
"log" "log"
"net/http" "net/http"
"regexp"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -20,6 +21,10 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
) )
// sseDataPrefix matches SSE data lines with optional whitespace after colon.
// Some upstream APIs return non-standard "data:" without space (should be "data: ").
var sseDataPrefix = regexp.MustCompile(`^data:\s*`)
const ( const (
testClaudeAPIURL = "https://api.anthropic.com/v1/messages" testClaudeAPIURL = "https://api.anthropic.com/v1/messages"
testOpenAIAPIURL = "https://api.openai.com/v1/responses" testOpenAIAPIURL = "https://api.openai.com/v1/responses"
@@ -411,11 +416,11 @@ func (s *AccountTestService) processClaudeStream(c *gin.Context, body io.Reader)
} }
line = strings.TrimSpace(line) line = strings.TrimSpace(line)
if line == "" || !strings.HasPrefix(line, "data: ") { if line == "" || !sseDataPrefix.MatchString(line) {
continue continue
} }
jsonStr := strings.TrimPrefix(line, "data: ") jsonStr := sseDataPrefix.ReplaceAllString(line, "")
if jsonStr == "[DONE]" { if jsonStr == "[DONE]" {
s.sendEvent(c, TestEvent{Type: "test_complete", Success: true}) s.sendEvent(c, TestEvent{Type: "test_complete", Success: true})
return nil return nil
@@ -465,11 +470,11 @@ func (s *AccountTestService) processOpenAIStream(c *gin.Context, body io.Reader)
} }
line = strings.TrimSpace(line) line = strings.TrimSpace(line)
if line == "" || !strings.HasPrefix(line, "data: ") { if line == "" || !sseDataPrefix.MatchString(line) {
continue continue
} }
jsonStr := strings.TrimPrefix(line, "data: ") jsonStr := sseDataPrefix.ReplaceAllString(line, "")
if jsonStr == "[DONE]" { if jsonStr == "[DONE]" {
s.sendEvent(c, TestEvent{Type: "test_complete", Success: true}) s.sendEvent(c, TestEvent{Type: "test_complete", Success: true})
return nil return nil

View File

@@ -30,6 +30,10 @@ const (
stickySessionTTL = time.Hour // 粘性会话TTL stickySessionTTL = time.Hour // 粘性会话TTL
) )
// sseDataRe matches SSE data lines with optional whitespace after colon.
// Some upstream APIs return non-standard "data:" without space (should be "data: ").
var sseDataRe = regexp.MustCompile(`^data:\s*`)
// allowedHeaders 白名单headers参考CRS项目 // allowedHeaders 白名单headers参考CRS项目
var allowedHeaders = map[string]bool{ var allowedHeaders = map[string]bool{
"accept": true, "accept": true,
@@ -745,26 +749,33 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http
for scanner.Scan() { for scanner.Scan() {
line := scanner.Text() line := scanner.Text()
// 如果有模型映射替换响应中的model字段 // Extract data from SSE line (supports both "data: " and "data:" formats)
if needModelReplace && strings.HasPrefix(line, "data: ") { if sseDataRe.MatchString(line) {
line = s.replaceModelInSSELine(line, mappedModel, originalModel) data := sseDataRe.ReplaceAllString(line, "")
}
// 转发行 // 如果有模型映射替换响应中的model字段
if _, err := fmt.Fprintf(w, "%s\n", line); err != nil { if needModelReplace {
return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, err line = s.replaceModelInSSELine(line, mappedModel, originalModel)
} }
flusher.Flush()
// 转发行
if _, err := fmt.Fprintf(w, "%s\n", line); err != nil {
return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, err
}
flusher.Flush()
// 解析usage数据
if strings.HasPrefix(line, "data: ") {
data := line[6:]
// 记录首字时间:第一个有效的 content_block_delta 或 message_start // 记录首字时间:第一个有效的 content_block_delta 或 message_start
if firstTokenMs == nil && data != "" && data != "[DONE]" { if firstTokenMs == nil && data != "" && data != "[DONE]" {
ms := int(time.Since(startTime).Milliseconds()) ms := int(time.Since(startTime).Milliseconds())
firstTokenMs = &ms firstTokenMs = &ms
} }
s.parseSSEUsage(data, usage) s.parseSSEUsage(data, usage)
} else {
// 非 data 行直接转发
if _, err := fmt.Fprintf(w, "%s\n", line); err != nil {
return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, err
}
flusher.Flush()
} }
} }
@@ -777,7 +788,10 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http
// replaceModelInSSELine 替换SSE数据行中的model字段 // replaceModelInSSELine 替换SSE数据行中的model字段
func (s *GatewayService) replaceModelInSSELine(line, fromModel, toModel string) string { func (s *GatewayService) replaceModelInSSELine(line, fromModel, toModel string) string {
data := line[6:] // 去掉 "data: " 前缀 if !sseDataRe.MatchString(line) {
return line
}
data := sseDataRe.ReplaceAllString(line, "")
if data == "" || data == "[DONE]" { if data == "" || data == "[DONE]" {
return line return line
} }

View File

@@ -11,6 +11,7 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"regexp"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -27,6 +28,10 @@ const (
openaiStickySessionTTL = time.Hour // 粘性会话TTL openaiStickySessionTTL = time.Hour // 粘性会话TTL
) )
// openaiSSEDataRe matches SSE data lines with optional whitespace after colon.
// Some upstream APIs return non-standard "data:" without space (should be "data: ").
var openaiSSEDataRe = regexp.MustCompile(`^data:\s*`)
// OpenAI allowed headers whitelist (for non-OAuth accounts) // OpenAI allowed headers whitelist (for non-OAuth accounts)
var openaiAllowedHeaders = map[string]bool{ var openaiAllowedHeaders = map[string]bool{
"accept-language": true, "accept-language": true,
@@ -463,26 +468,33 @@ func (s *OpenAIGatewayService) handleStreamingResponse(ctx context.Context, resp
for scanner.Scan() { for scanner.Scan() {
line := scanner.Text() line := scanner.Text()
// Replace model in response if needed // Extract data from SSE line (supports both "data: " and "data:" formats)
if needModelReplace && strings.HasPrefix(line, "data: ") { if openaiSSEDataRe.MatchString(line) {
line = s.replaceModelInSSELine(line, mappedModel, originalModel) data := openaiSSEDataRe.ReplaceAllString(line, "")
}
// Forward line // Replace model in response if needed
if _, err := fmt.Fprintf(w, "%s\n", line); err != nil { if needModelReplace {
return &openaiStreamingResult{usage: usage, firstTokenMs: firstTokenMs}, err line = s.replaceModelInSSELine(line, mappedModel, originalModel)
} }
flusher.Flush()
// Forward line
if _, err := fmt.Fprintf(w, "%s\n", line); err != nil {
return &openaiStreamingResult{usage: usage, firstTokenMs: firstTokenMs}, err
}
flusher.Flush()
// Parse usage data
if strings.HasPrefix(line, "data: ") {
data := line[6:]
// Record first token time // Record first token time
if firstTokenMs == nil && data != "" && data != "[DONE]" { if firstTokenMs == nil && data != "" && data != "[DONE]" {
ms := int(time.Since(startTime).Milliseconds()) ms := int(time.Since(startTime).Milliseconds())
firstTokenMs = &ms firstTokenMs = &ms
} }
s.parseSSEUsage(data, usage) s.parseSSEUsage(data, usage)
} else {
// Forward non-data lines as-is
if _, err := fmt.Fprintf(w, "%s\n", line); err != nil {
return &openaiStreamingResult{usage: usage, firstTokenMs: firstTokenMs}, err
}
flusher.Flush()
} }
} }
@@ -494,7 +506,10 @@ func (s *OpenAIGatewayService) handleStreamingResponse(ctx context.Context, resp
} }
func (s *OpenAIGatewayService) replaceModelInSSELine(line, fromModel, toModel string) string { func (s *OpenAIGatewayService) replaceModelInSSELine(line, fromModel, toModel string) string {
data := line[6:] if !openaiSSEDataRe.MatchString(line) {
return line
}
data := openaiSSEDataRe.ReplaceAllString(line, "")
if data == "" || data == "[DONE]" { if data == "" || data == "[DONE]" {
return line return line
} }