fix(sse): 修复非标准 SSE 格式解析问题
部分上游 API 返回的 SSE 格式不符合标准规范:
- 标准格式: `data: {...}`(冒号后有空格)
- 非标准格式: `data:{...}`(冒号后无空格)
使用预编译正则 `^data:\s*` 统一处理两种格式。
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -21,6 +22,10 @@ import (
|
|||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// sseDataPrefix matches SSE data lines with optional whitespace after colon.
|
||||||
|
// Some upstream APIs return non-standard "data:" without space (should be "data: ").
|
||||||
|
var sseDataPrefix = regexp.MustCompile(`^data:\s*`)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
testClaudeAPIURL = "https://api.anthropic.com/v1/messages"
|
testClaudeAPIURL = "https://api.anthropic.com/v1/messages"
|
||||||
testOpenAIAPIURL = "https://api.openai.com/v1/responses"
|
testOpenAIAPIURL = "https://api.openai.com/v1/responses"
|
||||||
@@ -412,11 +417,11 @@ func (s *AccountTestService) processClaudeStream(c *gin.Context, body io.Reader)
|
|||||||
}
|
}
|
||||||
|
|
||||||
line = strings.TrimSpace(line)
|
line = strings.TrimSpace(line)
|
||||||
if line == "" || !strings.HasPrefix(line, "data: ") {
|
if line == "" || !sseDataPrefix.MatchString(line) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
jsonStr := strings.TrimPrefix(line, "data: ")
|
jsonStr := sseDataPrefix.ReplaceAllString(line, "")
|
||||||
if jsonStr == "[DONE]" {
|
if jsonStr == "[DONE]" {
|
||||||
s.sendEvent(c, TestEvent{Type: "test_complete", Success: true})
|
s.sendEvent(c, TestEvent{Type: "test_complete", Success: true})
|
||||||
return nil
|
return nil
|
||||||
@@ -466,11 +471,11 @@ func (s *AccountTestService) processOpenAIStream(c *gin.Context, body io.Reader)
|
|||||||
}
|
}
|
||||||
|
|
||||||
line = strings.TrimSpace(line)
|
line = strings.TrimSpace(line)
|
||||||
if line == "" || !strings.HasPrefix(line, "data: ") {
|
if line == "" || !sseDataPrefix.MatchString(line) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
jsonStr := strings.TrimPrefix(line, "data: ")
|
jsonStr := sseDataPrefix.ReplaceAllString(line, "")
|
||||||
if jsonStr == "[DONE]" {
|
if jsonStr == "[DONE]" {
|
||||||
s.sendEvent(c, TestEvent{Type: "test_complete", Success: true})
|
s.sendEvent(c, TestEvent{Type: "test_complete", Success: true})
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -31,6 +31,10 @@ const (
|
|||||||
stickySessionTTL = time.Hour // 粘性会话TTL
|
stickySessionTTL = time.Hour // 粘性会话TTL
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// sseDataRe matches SSE data lines with optional whitespace after colon.
|
||||||
|
// Some upstream APIs return non-standard "data:" without space (should be "data: ").
|
||||||
|
var sseDataRe = regexp.MustCompile(`^data:\s*`)
|
||||||
|
|
||||||
// allowedHeaders 白名单headers(参考CRS项目)
|
// allowedHeaders 白名单headers(参考CRS项目)
|
||||||
var allowedHeaders = map[string]bool{
|
var allowedHeaders = map[string]bool{
|
||||||
"accept": true,
|
"accept": true,
|
||||||
@@ -749,26 +753,33 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http
|
|||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
line := scanner.Text()
|
line := scanner.Text()
|
||||||
|
|
||||||
// 如果有模型映射,替换响应中的model字段
|
// Extract data from SSE line (supports both "data: " and "data:" formats)
|
||||||
if needModelReplace && strings.HasPrefix(line, "data: ") {
|
if sseDataRe.MatchString(line) {
|
||||||
line = s.replaceModelInSSELine(line, mappedModel, originalModel)
|
data := sseDataRe.ReplaceAllString(line, "")
|
||||||
}
|
|
||||||
|
|
||||||
// 转发行
|
// 如果有模型映射,替换响应中的model字段
|
||||||
if _, err := fmt.Fprintf(w, "%s\n", line); err != nil {
|
if needModelReplace {
|
||||||
return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, err
|
line = s.replaceModelInSSELine(line, mappedModel, originalModel)
|
||||||
}
|
}
|
||||||
flusher.Flush()
|
|
||||||
|
// 转发行
|
||||||
|
if _, err := fmt.Fprintf(w, "%s\n", line); err != nil {
|
||||||
|
return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, err
|
||||||
|
}
|
||||||
|
flusher.Flush()
|
||||||
|
|
||||||
// 解析usage数据
|
|
||||||
if strings.HasPrefix(line, "data: ") {
|
|
||||||
data := line[6:]
|
|
||||||
// 记录首字时间:第一个有效的 content_block_delta 或 message_start
|
// 记录首字时间:第一个有效的 content_block_delta 或 message_start
|
||||||
if firstTokenMs == nil && data != "" && data != "[DONE]" {
|
if firstTokenMs == nil && data != "" && data != "[DONE]" {
|
||||||
ms := int(time.Since(startTime).Milliseconds())
|
ms := int(time.Since(startTime).Milliseconds())
|
||||||
firstTokenMs = &ms
|
firstTokenMs = &ms
|
||||||
}
|
}
|
||||||
s.parseSSEUsage(data, usage)
|
s.parseSSEUsage(data, usage)
|
||||||
|
} else {
|
||||||
|
// 非 data 行直接转发
|
||||||
|
if _, err := fmt.Fprintf(w, "%s\n", line); err != nil {
|
||||||
|
return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, err
|
||||||
|
}
|
||||||
|
flusher.Flush()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -781,7 +792,10 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http
|
|||||||
|
|
||||||
// replaceModelInSSELine 替换SSE数据行中的model字段
|
// replaceModelInSSELine 替换SSE数据行中的model字段
|
||||||
func (s *GatewayService) replaceModelInSSELine(line, fromModel, toModel string) string {
|
func (s *GatewayService) replaceModelInSSELine(line, fromModel, toModel string) string {
|
||||||
data := line[6:] // 去掉 "data: " 前缀
|
if !sseDataRe.MatchString(line) {
|
||||||
|
return line
|
||||||
|
}
|
||||||
|
data := sseDataRe.ReplaceAllString(line, "")
|
||||||
if data == "" || data == "[DONE]" {
|
if data == "" || data == "[DONE]" {
|
||||||
return line
|
return line
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -28,6 +29,10 @@ const (
|
|||||||
openaiStickySessionTTL = time.Hour // 粘性会话TTL
|
openaiStickySessionTTL = time.Hour // 粘性会话TTL
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// openaiSSEDataRe matches SSE data lines with optional whitespace after colon.
|
||||||
|
// Some upstream APIs return non-standard "data:" without space (should be "data: ").
|
||||||
|
var openaiSSEDataRe = regexp.MustCompile(`^data:\s*`)
|
||||||
|
|
||||||
// OpenAI allowed headers whitelist (for non-OAuth accounts)
|
// OpenAI allowed headers whitelist (for non-OAuth accounts)
|
||||||
var openaiAllowedHeaders = map[string]bool{
|
var openaiAllowedHeaders = map[string]bool{
|
||||||
"accept-language": true,
|
"accept-language": true,
|
||||||
@@ -464,26 +469,33 @@ func (s *OpenAIGatewayService) handleStreamingResponse(ctx context.Context, resp
|
|||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
line := scanner.Text()
|
line := scanner.Text()
|
||||||
|
|
||||||
// Replace model in response if needed
|
// Extract data from SSE line (supports both "data: " and "data:" formats)
|
||||||
if needModelReplace && strings.HasPrefix(line, "data: ") {
|
if openaiSSEDataRe.MatchString(line) {
|
||||||
line = s.replaceModelInSSELine(line, mappedModel, originalModel)
|
data := openaiSSEDataRe.ReplaceAllString(line, "")
|
||||||
}
|
|
||||||
|
|
||||||
// Forward line
|
// Replace model in response if needed
|
||||||
if _, err := fmt.Fprintf(w, "%s\n", line); err != nil {
|
if needModelReplace {
|
||||||
return &openaiStreamingResult{usage: usage, firstTokenMs: firstTokenMs}, err
|
line = s.replaceModelInSSELine(line, mappedModel, originalModel)
|
||||||
}
|
}
|
||||||
flusher.Flush()
|
|
||||||
|
// Forward line
|
||||||
|
if _, err := fmt.Fprintf(w, "%s\n", line); err != nil {
|
||||||
|
return &openaiStreamingResult{usage: usage, firstTokenMs: firstTokenMs}, err
|
||||||
|
}
|
||||||
|
flusher.Flush()
|
||||||
|
|
||||||
// Parse usage data
|
|
||||||
if strings.HasPrefix(line, "data: ") {
|
|
||||||
data := line[6:]
|
|
||||||
// Record first token time
|
// Record first token time
|
||||||
if firstTokenMs == nil && data != "" && data != "[DONE]" {
|
if firstTokenMs == nil && data != "" && data != "[DONE]" {
|
||||||
ms := int(time.Since(startTime).Milliseconds())
|
ms := int(time.Since(startTime).Milliseconds())
|
||||||
firstTokenMs = &ms
|
firstTokenMs = &ms
|
||||||
}
|
}
|
||||||
s.parseSSEUsage(data, usage)
|
s.parseSSEUsage(data, usage)
|
||||||
|
} else {
|
||||||
|
// Forward non-data lines as-is
|
||||||
|
if _, err := fmt.Fprintf(w, "%s\n", line); err != nil {
|
||||||
|
return &openaiStreamingResult{usage: usage, firstTokenMs: firstTokenMs}, err
|
||||||
|
}
|
||||||
|
flusher.Flush()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -495,7 +507,10 @@ func (s *OpenAIGatewayService) handleStreamingResponse(ctx context.Context, resp
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *OpenAIGatewayService) replaceModelInSSELine(line, fromModel, toModel string) string {
|
func (s *OpenAIGatewayService) replaceModelInSSELine(line, fromModel, toModel string) string {
|
||||||
data := line[6:]
|
if !openaiSSEDataRe.MatchString(line) {
|
||||||
|
return line
|
||||||
|
}
|
||||||
|
data := openaiSSEDataRe.ReplaceAllString(line, "")
|
||||||
if data == "" || data == "[DONE]" {
|
if data == "" || data == "[DONE]" {
|
||||||
return line
|
return line
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user