perf(backend): 使用 gjson/sjson 优化热路径 JSON 处理

将 API 网关热路径中的 json.Unmarshal+json.Marshal 替换为 gjson 零拷贝查询和 sjson 精准写入:
- unwrapV1InternalResponse 性能提升 22x(4009ns→182ns),内存分配减少 28.5x
- unwrapGeminiResponse、extractGeminiUsage、estimateGeminiCountTokens、ParseGeminiRateLimitResetTime 改为接收 []byte 使用 gjson 提取
- ParseGatewayRequest 的 model/stream/metadata/thinking/max_tokens 改用 gjson 类型安全提取
- Handler 层(sora/openai)改用 gjson 提取字段、sjson 注入/修改字段,移除 map[string]any 中间变量
- Sora Client 响应解析改用 gjson ForEach 遍历,减少内存分配
- 新增约 100 个单元测试用例,所有改动函数覆盖率 >85%

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
yangjianbo
2026-02-10 08:59:30 +08:00
parent 29ca1290b3
commit 58912d4ac5
14 changed files with 1686 additions and 324 deletions

View File

@@ -26,6 +26,7 @@ import (
"github.com/Wei-Shaw/sub2api/internal/util/urlvalidator"
"github.com/gin-gonic/gin"
"github.com/tidwall/gjson"
)
const geminiStickySessionTTL = time.Hour
@@ -929,7 +930,8 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
if err != nil {
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Failed to read upstream stream")
}
claudeResp, usageObj2 := convertGeminiToClaudeMessage(collected, originalModel)
collectedBytes, _ := json.Marshal(collected)
claudeResp, usageObj2 := convertGeminiToClaudeMessage(collected, originalModel, collectedBytes)
c.JSON(http.StatusOK, claudeResp)
usage = usageObj2
if usageObj != nil && (usageObj.InputTokens > 0 || usageObj.OutputTokens > 0) {
@@ -1726,12 +1728,17 @@ func (s *GeminiMessagesCompatService) handleNonStreamingResponse(c *gin.Context,
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Failed to read upstream response")
}
geminiResp, err := unwrapGeminiResponse(body)
unwrappedBody, err := unwrapGeminiResponse(body)
if err != nil {
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Failed to parse upstream response")
}
claudeResp, usage := convertGeminiToClaudeMessage(geminiResp, originalModel)
var geminiResp map[string]any
if err := json.Unmarshal(unwrappedBody, &geminiResp); err != nil {
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Failed to parse upstream response")
}
claudeResp, usage := convertGeminiToClaudeMessage(geminiResp, originalModel, unwrappedBody)
c.JSON(http.StatusOK, claudeResp)
return usage, nil
@@ -1804,11 +1811,16 @@ func (s *GeminiMessagesCompatService) handleStreamingResponse(c *gin.Context, re
continue
}
geminiResp, err := unwrapGeminiResponse([]byte(payload))
unwrappedBytes, err := unwrapGeminiResponse([]byte(payload))
if err != nil {
continue
}
var geminiResp map[string]any
if err := json.Unmarshal(unwrappedBytes, &geminiResp); err != nil {
continue
}
if fr := extractGeminiFinishReason(geminiResp); fr != "" {
finishReason = fr
}
@@ -1935,7 +1947,7 @@ func (s *GeminiMessagesCompatService) handleStreamingResponse(c *gin.Context, re
}
}
if u := extractGeminiUsage(geminiResp); u != nil {
if u := extractGeminiUsage(unwrappedBytes); u != nil {
usage = *u
}
@@ -2026,11 +2038,7 @@ func unwrapIfNeeded(isOAuth bool, raw []byte) []byte {
if err != nil {
return raw
}
b, err := json.Marshal(inner)
if err != nil {
return raw
}
return b
return inner
}
func collectGeminiSSE(body io.Reader, isOAuth bool) (map[string]any, *ClaudeUsage, error) {
@@ -2054,17 +2062,20 @@ func collectGeminiSSE(body io.Reader, isOAuth bool) (map[string]any, *ClaudeUsag
}
default:
var parsed map[string]any
var rawBytes []byte
if isOAuth {
inner, err := unwrapGeminiResponse([]byte(payload))
if err == nil && inner != nil {
parsed = inner
innerBytes, err := unwrapGeminiResponse([]byte(payload))
if err == nil {
rawBytes = innerBytes
_ = json.Unmarshal(innerBytes, &parsed)
}
} else {
_ = json.Unmarshal([]byte(payload), &parsed)
rawBytes = []byte(payload)
_ = json.Unmarshal(rawBytes, &parsed)
}
if parsed != nil {
last = parsed
if u := extractGeminiUsage(parsed); u != nil {
if u := extractGeminiUsage(rawBytes); u != nil {
usage = u
}
if parts := extractGeminiParts(parsed); len(parts) > 0 {
@@ -2193,53 +2204,27 @@ func isGeminiInsufficientScope(headers http.Header, body []byte) bool {
}
func estimateGeminiCountTokens(reqBody []byte) int {
var obj map[string]any
if err := json.Unmarshal(reqBody, &obj); err != nil {
return 0
}
var texts []string
total := 0
// systemInstruction.parts[].text
if si, ok := obj["systemInstruction"].(map[string]any); ok {
if parts, ok := si["parts"].([]any); ok {
for _, p := range parts {
if pm, ok := p.(map[string]any); ok {
if t, ok := pm["text"].(string); ok && strings.TrimSpace(t) != "" {
texts = append(texts, t)
}
}
}
gjson.GetBytes(reqBody, "systemInstruction.parts").ForEach(func(_, part gjson.Result) bool {
if t := strings.TrimSpace(part.Get("text").String()); t != "" {
total += estimateTokensForText(t)
}
}
return true
})
// contents[].parts[].text
if contents, ok := obj["contents"].([]any); ok {
for _, c := range contents {
cm, ok := c.(map[string]any)
if !ok {
continue
gjson.GetBytes(reqBody, "contents").ForEach(func(_, content gjson.Result) bool {
content.Get("parts").ForEach(func(_, part gjson.Result) bool {
if t := strings.TrimSpace(part.Get("text").String()); t != "" {
total += estimateTokensForText(t)
}
parts, ok := cm["parts"].([]any)
if !ok {
continue
}
for _, p := range parts {
pm, ok := p.(map[string]any)
if !ok {
continue
}
if t, ok := pm["text"].(string); ok && strings.TrimSpace(t) != "" {
texts = append(texts, t)
}
}
}
}
return true
})
return true
})
total := 0
for _, t := range texts {
total += estimateTokensForText(t)
}
if total < 0 {
return 0
}
@@ -2293,10 +2278,11 @@ func (s *GeminiMessagesCompatService) handleNativeNonStreamingResponse(c *gin.Co
var parsed map[string]any
if isOAuth {
parsed, err = unwrapGeminiResponse(respBody)
if err == nil && parsed != nil {
respBody, _ = json.Marshal(parsed)
unwrappedBody, uwErr := unwrapGeminiResponse(respBody)
if uwErr == nil {
respBody = unwrappedBody
}
_ = json.Unmarshal(respBody, &parsed)
} else {
_ = json.Unmarshal(respBody, &parsed)
}
@@ -2309,10 +2295,8 @@ func (s *GeminiMessagesCompatService) handleNativeNonStreamingResponse(c *gin.Co
}
c.Data(resp.StatusCode, contentType, respBody)
if parsed != nil {
if u := extractGeminiUsage(parsed); u != nil {
return u, nil
}
if u := extractGeminiUsage(respBody); u != nil {
return u, nil
}
return &ClaudeUsage{}, nil
}
@@ -2365,23 +2349,19 @@ func (s *GeminiMessagesCompatService) handleNativeStreamingResponse(c *gin.Conte
var rawToWrite string
rawToWrite = payload
var parsed map[string]any
var rawBytes []byte
if isOAuth {
inner, err := unwrapGeminiResponse([]byte(payload))
if err == nil && inner != nil {
parsed = inner
if b, err := json.Marshal(inner); err == nil {
rawToWrite = string(b)
}
innerBytes, err := unwrapGeminiResponse([]byte(payload))
if err == nil {
rawToWrite = string(innerBytes)
rawBytes = innerBytes
}
} else {
_ = json.Unmarshal([]byte(payload), &parsed)
rawBytes = []byte(payload)
}
if parsed != nil {
if u := extractGeminiUsage(parsed); u != nil {
usage = u
}
if u := extractGeminiUsage(rawBytes); u != nil {
usage = u
}
if firstTokenMs == nil {
@@ -2484,19 +2464,18 @@ func (s *GeminiMessagesCompatService) ForwardAIStudioGET(ctx context.Context, ac
}, nil
}
func unwrapGeminiResponse(raw []byte) (map[string]any, error) {
var outer map[string]any
if err := json.Unmarshal(raw, &outer); err != nil {
return nil, err
// unwrapGeminiResponse 解包 Gemini OAuth 响应中的 response 字段
// 使用 gjson 零拷贝提取,避免完整 Unmarshal+Marshal
func unwrapGeminiResponse(raw []byte) ([]byte, error) {
result := gjson.GetBytes(raw, "response")
if result.Exists() && result.Type == gjson.JSON {
return []byte(result.Raw), nil
}
if resp, ok := outer["response"].(map[string]any); ok && resp != nil {
return resp, nil
}
return outer, nil
return raw, nil
}
func convertGeminiToClaudeMessage(geminiResp map[string]any, originalModel string) (map[string]any, *ClaudeUsage) {
usage := extractGeminiUsage(geminiResp)
func convertGeminiToClaudeMessage(geminiResp map[string]any, originalModel string, rawData []byte) (map[string]any, *ClaudeUsage) {
usage := extractGeminiUsage(rawData)
if usage == nil {
usage = &ClaudeUsage{}
}
@@ -2560,14 +2539,14 @@ func convertGeminiToClaudeMessage(geminiResp map[string]any, originalModel strin
return resp, usage
}
func extractGeminiUsage(geminiResp map[string]any) *ClaudeUsage {
usageMeta, ok := geminiResp["usageMetadata"].(map[string]any)
if !ok || usageMeta == nil {
func extractGeminiUsage(data []byte) *ClaudeUsage {
usage := gjson.GetBytes(data, "usageMetadata")
if !usage.Exists() {
return nil
}
prompt, _ := asInt(usageMeta["promptTokenCount"])
cand, _ := asInt(usageMeta["candidatesTokenCount"])
cached, _ := asInt(usageMeta["cachedContentTokenCount"])
prompt := int(usage.Get("promptTokenCount").Int())
cand := int(usage.Get("candidatesTokenCount").Int())
cached := int(usage.Get("cachedContentTokenCount").Int())
// 注意Gemini 的 promptTokenCount 包含 cachedContentTokenCount
// 但 Claude 的 input_tokens 不包含 cache_read_input_tokens需要减去
return &ClaudeUsage{
@@ -2646,39 +2625,35 @@ func (s *GeminiMessagesCompatService) handleGeminiUpstreamError(ctx context.Cont
// ParseGeminiRateLimitResetTime 解析 Gemini 格式的 429 响应,返回重置时间的 Unix 时间戳
func ParseGeminiRateLimitResetTime(body []byte) *int64 {
// Try to parse metadata.quotaResetDelay like "12.345s"
var parsed map[string]any
if err := json.Unmarshal(body, &parsed); err == nil {
if errObj, ok := parsed["error"].(map[string]any); ok {
if msg, ok := errObj["message"].(string); ok {
if looksLikeGeminiDailyQuota(msg) {
if ts := nextGeminiDailyResetUnix(); ts != nil {
return ts
}
}
}
if details, ok := errObj["details"].([]any); ok {
for _, d := range details {
dm, ok := d.(map[string]any)
if !ok {
continue
}
if meta, ok := dm["metadata"].(map[string]any); ok {
if v, ok := meta["quotaResetDelay"].(string); ok {
if dur, err := time.ParseDuration(v); err == nil {
// Use ceil to avoid undercounting fractional seconds (e.g. 10.1s should not become 10s),
// which can affect scheduling decisions around thresholds (like 10s).
ts := time.Now().Unix() + int64(math.Ceil(dur.Seconds()))
return &ts
}
}
}
}
}
// 第一阶段gjson 结构化提取
errMsg := gjson.GetBytes(body, "error.message").String()
if looksLikeGeminiDailyQuota(errMsg) {
if ts := nextGeminiDailyResetUnix(); ts != nil {
return ts
}
}
// Match "Please retry in Xs"
// 遍历 error.details 查找 quotaResetDelay
var found *int64
gjson.GetBytes(body, "error.details").ForEach(func(_, detail gjson.Result) bool {
v := detail.Get("metadata.quotaResetDelay").String()
if v == "" {
return true
}
if dur, err := time.ParseDuration(v); err == nil {
// Use ceil to avoid undercounting fractional seconds (e.g. 10.1s should not become 10s),
// which can affect scheduling decisions around thresholds (like 10s).
ts := time.Now().Unix() + int64(math.Ceil(dur.Seconds()))
found = &ts
return false
}
return true
})
if found != nil {
return found
}
// 第二阶段regex 回退匹配 "Please retry in Xs"
matches := retryInRegex.FindStringSubmatch(string(body))
if len(matches) == 2 {
if dur, err := time.ParseDuration(matches[1] + "s"); err == nil {