merge: 合并 upstream/main 并保留本地图片计费功能
This commit is contained in:
@@ -9,8 +9,10 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
mathrand "math/rand"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/antigravity"
|
||||
@@ -255,6 +257,16 @@ func (s *AntigravityGatewayService) buildClaudeTestRequest(projectID, mappedMode
|
||||
return antigravity.TransformClaudeToGemini(claudeReq, projectID, mappedModel)
|
||||
}
|
||||
|
||||
func (s *AntigravityGatewayService) getClaudeTransformOptions(ctx context.Context) antigravity.TransformOptions {
|
||||
opts := antigravity.DefaultTransformOptions()
|
||||
if s.settingService == nil {
|
||||
return opts
|
||||
}
|
||||
opts.EnableIdentityPatch = s.settingService.IsIdentityPatchEnabled(ctx)
|
||||
opts.IdentityPatch = s.settingService.GetIdentityPatchPrompt(ctx)
|
||||
return opts
|
||||
}
|
||||
|
||||
// extractGeminiResponseText 从 Gemini 响应中提取文本
|
||||
func extractGeminiResponseText(respBody []byte) string {
|
||||
var resp map[string]any
|
||||
@@ -380,7 +392,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
}
|
||||
|
||||
// 转换 Claude 请求为 Gemini 格式
|
||||
geminiBody, err := antigravity.TransformClaudeToGemini(&claudeReq, projectID, mappedModel)
|
||||
geminiBody, err := antigravity.TransformClaudeToGeminiWithOptions(&claudeReq, projectID, mappedModel, s.getClaudeTransformOptions(ctx))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("transform request: %w", err)
|
||||
}
|
||||
@@ -394,6 +406,14 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
// 重试循环
|
||||
var resp *http.Response
|
||||
for attempt := 1; attempt <= antigravityMaxRetries; attempt++ {
|
||||
// 检查 context 是否已取消(客户端断开连接)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Printf("%s status=context_canceled error=%v", prefix, ctx.Err())
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
upstreamReq, err := antigravity.NewAPIRequest(ctx, action, accessToken, geminiBody)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -403,7 +423,10 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
if err != nil {
|
||||
if attempt < antigravityMaxRetries {
|
||||
log.Printf("%s status=request_failed retry=%d/%d error=%v", prefix, attempt, antigravityMaxRetries, err)
|
||||
sleepAntigravityBackoff(attempt)
|
||||
if !sleepAntigravityBackoffWithContext(ctx, attempt) {
|
||||
log.Printf("%s status=context_canceled_during_backoff", prefix)
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
continue
|
||||
}
|
||||
log.Printf("%s status=request_failed retries_exhausted error=%v", prefix, err)
|
||||
@@ -416,7 +439,10 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
|
||||
if attempt < antigravityMaxRetries {
|
||||
log.Printf("%s status=%d retry=%d/%d", prefix, resp.StatusCode, attempt, antigravityMaxRetries)
|
||||
sleepAntigravityBackoff(attempt)
|
||||
if !sleepAntigravityBackoffWithContext(ctx, attempt) {
|
||||
log.Printf("%s status=context_canceled_during_backoff", prefix)
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
continue
|
||||
}
|
||||
// 所有重试都失败,标记限流状态
|
||||
@@ -443,35 +469,70 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
// Antigravity /v1internal 链路在部分场景会对 thought/thinking signature 做严格校验,
|
||||
// 当历史消息携带的 signature 不合法时会直接 400;去除 thinking 后可继续完成请求。
|
||||
if resp.StatusCode == http.StatusBadRequest && isSignatureRelatedError(respBody) {
|
||||
retryClaudeReq := claudeReq
|
||||
retryClaudeReq.Messages = append([]antigravity.ClaudeMessage(nil), claudeReq.Messages...)
|
||||
// Conservative two-stage fallback:
|
||||
// 1) Disable top-level thinking + thinking->text
|
||||
// 2) Only if still signature-related 400: also downgrade tool_use/tool_result to text.
|
||||
|
||||
stripped, stripErr := stripThinkingFromClaudeRequest(&retryClaudeReq)
|
||||
if stripErr == nil && stripped {
|
||||
log.Printf("Antigravity account %d: detected signature-related 400, retrying once without thinking blocks", account.ID)
|
||||
retryStages := []struct {
|
||||
name string
|
||||
strip func(*antigravity.ClaudeRequest) (bool, error)
|
||||
}{
|
||||
{name: "thinking-only", strip: stripThinkingFromClaudeRequest},
|
||||
{name: "thinking+tools", strip: stripSignatureSensitiveBlocksFromClaudeRequest},
|
||||
}
|
||||
|
||||
retryGeminiBody, txErr := antigravity.TransformClaudeToGemini(&retryClaudeReq, projectID, mappedModel)
|
||||
if txErr == nil {
|
||||
retryReq, buildErr := antigravity.NewAPIRequest(ctx, action, accessToken, retryGeminiBody)
|
||||
if buildErr == nil {
|
||||
retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)
|
||||
if retryErr == nil {
|
||||
// Retry success: continue normal success flow with the new response.
|
||||
if retryResp.StatusCode < 400 {
|
||||
_ = resp.Body.Close()
|
||||
resp = retryResp
|
||||
respBody = nil
|
||||
} else {
|
||||
// Retry still errored: replace error context with retry response.
|
||||
retryBody, _ := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20))
|
||||
_ = retryResp.Body.Close()
|
||||
respBody = retryBody
|
||||
resp = retryResp
|
||||
}
|
||||
} else {
|
||||
log.Printf("Antigravity account %d: signature retry request failed: %v", account.ID, retryErr)
|
||||
}
|
||||
for _, stage := range retryStages {
|
||||
retryClaudeReq := claudeReq
|
||||
retryClaudeReq.Messages = append([]antigravity.ClaudeMessage(nil), claudeReq.Messages...)
|
||||
|
||||
stripped, stripErr := stage.strip(&retryClaudeReq)
|
||||
if stripErr != nil || !stripped {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("Antigravity account %d: detected signature-related 400, retrying once (%s)", account.ID, stage.name)
|
||||
|
||||
retryGeminiBody, txErr := antigravity.TransformClaudeToGeminiWithOptions(&retryClaudeReq, projectID, mappedModel, s.getClaudeTransformOptions(ctx))
|
||||
if txErr != nil {
|
||||
continue
|
||||
}
|
||||
retryReq, buildErr := antigravity.NewAPIRequest(ctx, action, accessToken, retryGeminiBody)
|
||||
if buildErr != nil {
|
||||
continue
|
||||
}
|
||||
retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)
|
||||
if retryErr != nil {
|
||||
log.Printf("Antigravity account %d: signature retry request failed (%s): %v", account.ID, stage.name, retryErr)
|
||||
continue
|
||||
}
|
||||
|
||||
if retryResp.StatusCode < 400 {
|
||||
_ = resp.Body.Close()
|
||||
resp = retryResp
|
||||
respBody = nil
|
||||
break
|
||||
}
|
||||
|
||||
retryBody, _ := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20))
|
||||
_ = retryResp.Body.Close()
|
||||
|
||||
// If this stage fixed the signature issue, we stop; otherwise we may try the next stage.
|
||||
if retryResp.StatusCode != http.StatusBadRequest || !isSignatureRelatedError(retryBody) {
|
||||
respBody = retryBody
|
||||
resp = &http.Response{
|
||||
StatusCode: retryResp.StatusCode,
|
||||
Header: retryResp.Header.Clone(),
|
||||
Body: io.NopCloser(bytes.NewReader(retryBody)),
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// Still signature-related; capture context and allow next stage.
|
||||
respBody = retryBody
|
||||
resp = &http.Response{
|
||||
StatusCode: retryResp.StatusCode,
|
||||
Header: retryResp.Header.Clone(),
|
||||
Body: io.NopCloser(bytes.NewReader(retryBody)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -528,7 +589,17 @@ func isSignatureRelatedError(respBody []byte) bool {
|
||||
}
|
||||
|
||||
// Keep this intentionally broad: different upstreams may use "signature" or "thought_signature".
|
||||
return strings.Contains(msg, "thought_signature") || strings.Contains(msg, "signature")
|
||||
if strings.Contains(msg, "thought_signature") || strings.Contains(msg, "signature") {
|
||||
return true
|
||||
}
|
||||
|
||||
// Also detect thinking block structural errors:
|
||||
// "Expected `thinking` or `redacted_thinking`, but found `text`"
|
||||
if strings.Contains(msg, "expected") && (strings.Contains(msg, "thinking") || strings.Contains(msg, "redacted_thinking")) {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func extractAntigravityErrorMessage(body []byte) string {
|
||||
@@ -555,7 +626,7 @@ func extractAntigravityErrorMessage(body []byte) string {
|
||||
// stripThinkingFromClaudeRequest converts thinking blocks to text blocks in a Claude Messages request.
|
||||
// This preserves the thinking content while avoiding signature validation errors.
|
||||
// Note: redacted_thinking blocks are removed because they cannot be converted to text.
|
||||
// It also disables top-level `thinking` to prevent dummy-thought injection during retry.
|
||||
// It also disables top-level `thinking` to avoid upstream structural constraints for thinking mode.
|
||||
func stripThinkingFromClaudeRequest(req *antigravity.ClaudeRequest) (bool, error) {
|
||||
if req == nil {
|
||||
return false, nil
|
||||
@@ -585,6 +656,92 @@ func stripThinkingFromClaudeRequest(req *antigravity.ClaudeRequest) (bool, error
|
||||
continue
|
||||
}
|
||||
|
||||
filtered := make([]map[string]any, 0, len(blocks))
|
||||
modifiedAny := false
|
||||
for _, block := range blocks {
|
||||
t, _ := block["type"].(string)
|
||||
switch t {
|
||||
case "thinking":
|
||||
thinkingText, _ := block["thinking"].(string)
|
||||
if thinkingText != "" {
|
||||
filtered = append(filtered, map[string]any{
|
||||
"type": "text",
|
||||
"text": thinkingText,
|
||||
})
|
||||
}
|
||||
modifiedAny = true
|
||||
case "redacted_thinking":
|
||||
modifiedAny = true
|
||||
case "":
|
||||
if thinkingText, hasThinking := block["thinking"].(string); hasThinking {
|
||||
if thinkingText != "" {
|
||||
filtered = append(filtered, map[string]any{
|
||||
"type": "text",
|
||||
"text": thinkingText,
|
||||
})
|
||||
}
|
||||
modifiedAny = true
|
||||
} else {
|
||||
filtered = append(filtered, block)
|
||||
}
|
||||
default:
|
||||
filtered = append(filtered, block)
|
||||
}
|
||||
}
|
||||
|
||||
if !modifiedAny {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(filtered) == 0 {
|
||||
filtered = append(filtered, map[string]any{
|
||||
"type": "text",
|
||||
"text": "(content removed)",
|
||||
})
|
||||
}
|
||||
|
||||
newRaw, err := json.Marshal(filtered)
|
||||
if err != nil {
|
||||
return changed, err
|
||||
}
|
||||
req.Messages[i].Content = newRaw
|
||||
changed = true
|
||||
}
|
||||
|
||||
return changed, nil
|
||||
}
|
||||
|
||||
// stripSignatureSensitiveBlocksFromClaudeRequest is a stronger retry degradation that additionally converts
|
||||
// tool blocks to plain text. Use this only after a thinking-only retry still fails with signature errors.
|
||||
func stripSignatureSensitiveBlocksFromClaudeRequest(req *antigravity.ClaudeRequest) (bool, error) {
|
||||
if req == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
changed := false
|
||||
if req.Thinking != nil {
|
||||
req.Thinking = nil
|
||||
changed = true
|
||||
}
|
||||
|
||||
for i := range req.Messages {
|
||||
raw := req.Messages[i].Content
|
||||
if len(raw) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// If content is a string, nothing to strip.
|
||||
var str string
|
||||
if json.Unmarshal(raw, &str) == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Otherwise treat as an array of blocks and convert signature-sensitive blocks to text.
|
||||
var blocks []map[string]any
|
||||
if err := json.Unmarshal(raw, &blocks); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
filtered := make([]map[string]any, 0, len(blocks))
|
||||
modifiedAny := false
|
||||
for _, block := range blocks {
|
||||
@@ -603,6 +760,49 @@ func stripThinkingFromClaudeRequest(req *antigravity.ClaudeRequest) (bool, error
|
||||
case "redacted_thinking":
|
||||
// Remove redacted_thinking (cannot convert encrypted content)
|
||||
modifiedAny = true
|
||||
case "tool_use":
|
||||
// Convert tool_use to text to avoid upstream signature/thought_signature validation errors.
|
||||
// This is a retry-only degradation path, so we prioritise request validity over tool semantics.
|
||||
name, _ := block["name"].(string)
|
||||
id, _ := block["id"].(string)
|
||||
input := block["input"]
|
||||
inputJSON, _ := json.Marshal(input)
|
||||
text := "(tool_use)"
|
||||
if name != "" {
|
||||
text += " name=" + name
|
||||
}
|
||||
if id != "" {
|
||||
text += " id=" + id
|
||||
}
|
||||
if len(inputJSON) > 0 && string(inputJSON) != "null" {
|
||||
text += " input=" + string(inputJSON)
|
||||
}
|
||||
filtered = append(filtered, map[string]any{
|
||||
"type": "text",
|
||||
"text": text,
|
||||
})
|
||||
modifiedAny = true
|
||||
case "tool_result":
|
||||
// Convert tool_result to text so it stays consistent when tool_use is downgraded.
|
||||
toolUseID, _ := block["tool_use_id"].(string)
|
||||
isError, _ := block["is_error"].(bool)
|
||||
content := block["content"]
|
||||
contentJSON, _ := json.Marshal(content)
|
||||
text := "(tool_result)"
|
||||
if toolUseID != "" {
|
||||
text += " tool_use_id=" + toolUseID
|
||||
}
|
||||
if isError {
|
||||
text += " is_error=true"
|
||||
}
|
||||
if len(contentJSON) > 0 && string(contentJSON) != "null" {
|
||||
text += "\n" + string(contentJSON)
|
||||
}
|
||||
filtered = append(filtered, map[string]any{
|
||||
"type": "text",
|
||||
"text": text,
|
||||
})
|
||||
modifiedAny = true
|
||||
case "":
|
||||
// Handle untyped block with "thinking" field
|
||||
if thinkingText, hasThinking := block["thinking"].(string); hasThinking {
|
||||
@@ -625,6 +825,14 @@ func stripThinkingFromClaudeRequest(req *antigravity.ClaudeRequest) (bool, error
|
||||
continue
|
||||
}
|
||||
|
||||
if len(filtered) == 0 {
|
||||
// Keep request valid: upstream rejects empty content arrays.
|
||||
filtered = append(filtered, map[string]any{
|
||||
"type": "text",
|
||||
"text": "(content removed)",
|
||||
})
|
||||
}
|
||||
|
||||
newRaw, err := json.Marshal(filtered)
|
||||
if err != nil {
|
||||
return changed, err
|
||||
@@ -711,6 +919,14 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
|
||||
// 重试循环
|
||||
var resp *http.Response
|
||||
for attempt := 1; attempt <= antigravityMaxRetries; attempt++ {
|
||||
// 检查 context 是否已取消(客户端断开连接)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Printf("%s status=context_canceled error=%v", prefix, ctx.Err())
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
upstreamReq, err := antigravity.NewAPIRequest(ctx, upstreamAction, accessToken, wrappedBody)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -720,7 +936,10 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
|
||||
if err != nil {
|
||||
if attempt < antigravityMaxRetries {
|
||||
log.Printf("%s status=request_failed retry=%d/%d error=%v", prefix, attempt, antigravityMaxRetries, err)
|
||||
sleepAntigravityBackoff(attempt)
|
||||
if !sleepAntigravityBackoffWithContext(ctx, attempt) {
|
||||
log.Printf("%s status=context_canceled_during_backoff", prefix)
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
continue
|
||||
}
|
||||
log.Printf("%s status=request_failed retries_exhausted error=%v", prefix, err)
|
||||
@@ -733,7 +952,10 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
|
||||
|
||||
if attempt < antigravityMaxRetries {
|
||||
log.Printf("%s status=%d retry=%d/%d", prefix, resp.StatusCode, attempt, antigravityMaxRetries)
|
||||
sleepAntigravityBackoff(attempt)
|
||||
if !sleepAntigravityBackoffWithContext(ctx, attempt) {
|
||||
log.Printf("%s status=context_canceled_during_backoff", prefix)
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
continue
|
||||
}
|
||||
// 所有重试都失败,标记限流状态
|
||||
@@ -750,11 +972,18 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
|
||||
|
||||
break
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
defer func() {
|
||||
if resp != nil && resp.Body != nil {
|
||||
_ = resp.Body.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
// 处理错误响应
|
||||
if resp.StatusCode >= 400 {
|
||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||
// 尽早关闭原始响应体,释放连接;后续逻辑仍可能需要读取 body,因此用内存副本重新包装。
|
||||
_ = resp.Body.Close()
|
||||
resp.Body = io.NopCloser(bytes.NewReader(respBody))
|
||||
|
||||
// 模型兜底:模型不存在且开启 fallback 时,自动用 fallback 模型重试一次
|
||||
if s.settingService != nil && s.settingService.IsModelFallbackEnabled(ctx) &&
|
||||
@@ -763,15 +992,13 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
|
||||
if fallbackModel != "" && fallbackModel != mappedModel {
|
||||
log.Printf("[Antigravity] Model not found (%s), retrying with fallback model %s (account: %s)", mappedModel, fallbackModel, account.Name)
|
||||
|
||||
// 关闭原始响应,释放连接(respBody 已读取到内存)
|
||||
_ = resp.Body.Close()
|
||||
|
||||
fallbackWrapped, err := s.wrapV1InternalRequest(projectID, fallbackModel, body)
|
||||
if err == nil {
|
||||
fallbackReq, err := antigravity.NewAPIRequest(ctx, upstreamAction, accessToken, fallbackWrapped)
|
||||
if err == nil {
|
||||
fallbackResp, err := s.httpUpstream.Do(fallbackReq, proxyURL, account.ID, account.Concurrency)
|
||||
if err == nil && fallbackResp.StatusCode < 400 {
|
||||
_ = resp.Body.Close()
|
||||
resp = fallbackResp
|
||||
} else if fallbackResp != nil {
|
||||
_ = fallbackResp.Body.Close()
|
||||
@@ -872,8 +1099,28 @@ func (s *AntigravityGatewayService) shouldFailoverUpstreamError(statusCode int)
|
||||
}
|
||||
}
|
||||
|
||||
func sleepAntigravityBackoff(attempt int) {
|
||||
sleepGeminiBackoff(attempt) // 复用 Gemini 的退避逻辑
|
||||
// sleepAntigravityBackoffWithContext 带 context 取消检查的退避等待
|
||||
// 返回 true 表示正常完成等待,false 表示 context 已取消
|
||||
func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool {
|
||||
delay := geminiRetryBaseDelay * time.Duration(1<<uint(attempt-1))
|
||||
if delay > geminiRetryMaxDelay {
|
||||
delay = geminiRetryMaxDelay
|
||||
}
|
||||
|
||||
// +/- 20% jitter
|
||||
r := mathrand.New(mathrand.NewSource(time.Now().UnixNano()))
|
||||
jitter := time.Duration(float64(delay) * 0.2 * (r.Float64()*2 - 1))
|
||||
sleepFor := delay + jitter
|
||||
if sleepFor < 0 {
|
||||
sleepFor = 0
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case <-time.After(sleepFor):
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (s *AntigravityGatewayService) handleUpstreamError(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte) {
|
||||
@@ -928,57 +1175,145 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context
|
||||
return nil, errors.New("streaming not supported")
|
||||
}
|
||||
|
||||
reader := bufio.NewReader(resp.Body)
|
||||
// 使用 Scanner 并限制单行大小,避免 ReadString 无上限导致 OOM
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
maxLineSize := defaultMaxLineSize
|
||||
if s.settingService.cfg != nil && s.settingService.cfg.Gateway.MaxLineSize > 0 {
|
||||
maxLineSize = s.settingService.cfg.Gateway.MaxLineSize
|
||||
}
|
||||
scanner.Buffer(make([]byte, 64*1024), maxLineSize)
|
||||
usage := &ClaudeUsage{}
|
||||
var firstTokenMs *int
|
||||
|
||||
type scanEvent struct {
|
||||
line string
|
||||
err error
|
||||
}
|
||||
// 独立 goroutine 读取上游,避免读取阻塞影响超时处理
|
||||
events := make(chan scanEvent, 16)
|
||||
done := make(chan struct{})
|
||||
sendEvent := func(ev scanEvent) bool {
|
||||
select {
|
||||
case events <- ev:
|
||||
return true
|
||||
case <-done:
|
||||
return false
|
||||
}
|
||||
}
|
||||
var lastReadAt int64
|
||||
atomic.StoreInt64(&lastReadAt, time.Now().UnixNano())
|
||||
go func() {
|
||||
defer close(events)
|
||||
for scanner.Scan() {
|
||||
atomic.StoreInt64(&lastReadAt, time.Now().UnixNano())
|
||||
if !sendEvent(scanEvent{line: scanner.Text()}) {
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
_ = sendEvent(scanEvent{err: err})
|
||||
}
|
||||
}()
|
||||
defer close(done)
|
||||
|
||||
// 上游数据间隔超时保护(防止上游挂起长期占用连接)
|
||||
streamInterval := time.Duration(0)
|
||||
if s.settingService.cfg != nil && s.settingService.cfg.Gateway.StreamDataIntervalTimeout > 0 {
|
||||
streamInterval = time.Duration(s.settingService.cfg.Gateway.StreamDataIntervalTimeout) * time.Second
|
||||
}
|
||||
var intervalTicker *time.Ticker
|
||||
if streamInterval > 0 {
|
||||
intervalTicker = time.NewTicker(streamInterval)
|
||||
defer intervalTicker.Stop()
|
||||
}
|
||||
var intervalCh <-chan time.Time
|
||||
if intervalTicker != nil {
|
||||
intervalCh = intervalTicker.C
|
||||
}
|
||||
|
||||
// 仅发送一次错误事件,避免多次写入导致协议混乱
|
||||
errorEventSent := false
|
||||
sendErrorEvent := func(reason string) {
|
||||
if errorEventSent {
|
||||
return
|
||||
}
|
||||
errorEventSent = true
|
||||
_, _ = fmt.Fprintf(c.Writer, "event: error\ndata: {\"error\":\"%s\"}\n\n", reason)
|
||||
flusher.Flush()
|
||||
}
|
||||
|
||||
for {
|
||||
line, err := reader.ReadString('\n')
|
||||
if len(line) > 0 {
|
||||
select {
|
||||
case ev, ok := <-events:
|
||||
if !ok {
|
||||
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, nil
|
||||
}
|
||||
if ev.err != nil {
|
||||
if errors.Is(ev.err, bufio.ErrTooLong) {
|
||||
log.Printf("SSE line too long (antigravity): max_size=%d error=%v", maxLineSize, ev.err)
|
||||
sendErrorEvent("response_too_large")
|
||||
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, ev.err
|
||||
}
|
||||
sendErrorEvent("stream_read_error")
|
||||
return nil, ev.err
|
||||
}
|
||||
|
||||
line := ev.line
|
||||
trimmed := strings.TrimRight(line, "\r\n")
|
||||
if strings.HasPrefix(trimmed, "data:") {
|
||||
payload := strings.TrimSpace(strings.TrimPrefix(trimmed, "data:"))
|
||||
if payload == "" || payload == "[DONE]" {
|
||||
_, _ = io.WriteString(c.Writer, line)
|
||||
flusher.Flush()
|
||||
} else {
|
||||
// 解包 v1internal 响应
|
||||
inner, parseErr := s.unwrapV1InternalResponse([]byte(payload))
|
||||
if parseErr == nil && inner != nil {
|
||||
payload = string(inner)
|
||||
}
|
||||
|
||||
// 解析 usage
|
||||
var parsed map[string]any
|
||||
if json.Unmarshal(inner, &parsed) == nil {
|
||||
if u := extractGeminiUsage(parsed); u != nil {
|
||||
usage = u
|
||||
}
|
||||
}
|
||||
|
||||
if firstTokenMs == nil {
|
||||
ms := int(time.Since(startTime).Milliseconds())
|
||||
firstTokenMs = &ms
|
||||
}
|
||||
|
||||
_, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", payload)
|
||||
if _, err := fmt.Fprintf(c.Writer, "%s\n", line); err != nil {
|
||||
sendErrorEvent("write_failed")
|
||||
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, err
|
||||
}
|
||||
flusher.Flush()
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
_, _ = io.WriteString(c.Writer, line)
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// 解包 v1internal 响应
|
||||
inner, parseErr := s.unwrapV1InternalResponse([]byte(payload))
|
||||
if parseErr == nil && inner != nil {
|
||||
payload = string(inner)
|
||||
}
|
||||
|
||||
// 解析 usage
|
||||
var parsed map[string]any
|
||||
if json.Unmarshal(inner, &parsed) == nil {
|
||||
if u := extractGeminiUsage(parsed); u != nil {
|
||||
usage = u
|
||||
}
|
||||
}
|
||||
|
||||
if firstTokenMs == nil {
|
||||
ms := int(time.Since(startTime).Milliseconds())
|
||||
firstTokenMs = &ms
|
||||
}
|
||||
|
||||
if _, err := fmt.Fprintf(c.Writer, "data: %s\n\n", payload); err != nil {
|
||||
sendErrorEvent("write_failed")
|
||||
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, err
|
||||
}
|
||||
flusher.Flush()
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err := fmt.Fprintf(c.Writer, "%s\n", line); err != nil {
|
||||
sendErrorEvent("write_failed")
|
||||
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, err
|
||||
}
|
||||
flusher.Flush()
|
||||
|
||||
case <-intervalCh:
|
||||
lastRead := time.Unix(0, atomic.LoadInt64(&lastReadAt))
|
||||
if time.Since(lastRead) < streamInterval {
|
||||
continue
|
||||
}
|
||||
log.Printf("Stream data interval timeout (antigravity)")
|
||||
sendErrorEvent("stream_timeout")
|
||||
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout")
|
||||
}
|
||||
}
|
||||
|
||||
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, nil
|
||||
}
|
||||
|
||||
func (s *AntigravityGatewayService) handleGeminiNonStreamingResponse(c *gin.Context, resp *http.Response) (*ClaudeUsage, error) {
|
||||
@@ -1117,7 +1452,13 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context
|
||||
|
||||
processor := antigravity.NewStreamingProcessor(originalModel)
|
||||
var firstTokenMs *int
|
||||
reader := bufio.NewReader(resp.Body)
|
||||
// 使用 Scanner 并限制单行大小,避免 ReadString 无上限导致 OOM
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
maxLineSize := defaultMaxLineSize
|
||||
if s.settingService.cfg != nil && s.settingService.cfg.Gateway.MaxLineSize > 0 {
|
||||
maxLineSize = s.settingService.cfg.Gateway.MaxLineSize
|
||||
}
|
||||
scanner.Buffer(make([]byte, 64*1024), maxLineSize)
|
||||
|
||||
// 辅助函数:转换 antigravity.ClaudeUsage 到 service.ClaudeUsage
|
||||
convertUsage := func(agUsage *antigravity.ClaudeUsage) *ClaudeUsage {
|
||||
@@ -1132,13 +1473,85 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
line, err := reader.ReadString('\n')
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
return nil, fmt.Errorf("stream read error: %w", err)
|
||||
type scanEvent struct {
|
||||
line string
|
||||
err error
|
||||
}
|
||||
// 独立 goroutine 读取上游,避免读取阻塞影响超时处理
|
||||
events := make(chan scanEvent, 16)
|
||||
done := make(chan struct{})
|
||||
sendEvent := func(ev scanEvent) bool {
|
||||
select {
|
||||
case events <- ev:
|
||||
return true
|
||||
case <-done:
|
||||
return false
|
||||
}
|
||||
}
|
||||
var lastReadAt int64
|
||||
atomic.StoreInt64(&lastReadAt, time.Now().UnixNano())
|
||||
go func() {
|
||||
defer close(events)
|
||||
for scanner.Scan() {
|
||||
atomic.StoreInt64(&lastReadAt, time.Now().UnixNano())
|
||||
if !sendEvent(scanEvent{line: scanner.Text()}) {
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
_ = sendEvent(scanEvent{err: err})
|
||||
}
|
||||
}()
|
||||
defer close(done)
|
||||
|
||||
if len(line) > 0 {
|
||||
streamInterval := time.Duration(0)
|
||||
if s.settingService.cfg != nil && s.settingService.cfg.Gateway.StreamDataIntervalTimeout > 0 {
|
||||
streamInterval = time.Duration(s.settingService.cfg.Gateway.StreamDataIntervalTimeout) * time.Second
|
||||
}
|
||||
var intervalTicker *time.Ticker
|
||||
if streamInterval > 0 {
|
||||
intervalTicker = time.NewTicker(streamInterval)
|
||||
defer intervalTicker.Stop()
|
||||
}
|
||||
var intervalCh <-chan time.Time
|
||||
if intervalTicker != nil {
|
||||
intervalCh = intervalTicker.C
|
||||
}
|
||||
|
||||
// 仅发送一次错误事件,避免多次写入导致协议混乱
|
||||
errorEventSent := false
|
||||
sendErrorEvent := func(reason string) {
|
||||
if errorEventSent {
|
||||
return
|
||||
}
|
||||
errorEventSent = true
|
||||
_, _ = fmt.Fprintf(c.Writer, "event: error\ndata: {\"error\":\"%s\"}\n\n", reason)
|
||||
flusher.Flush()
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case ev, ok := <-events:
|
||||
if !ok {
|
||||
// 发送结束事件
|
||||
finalEvents, agUsage := processor.Finish()
|
||||
if len(finalEvents) > 0 {
|
||||
_, _ = c.Writer.Write(finalEvents)
|
||||
flusher.Flush()
|
||||
}
|
||||
return &antigravityStreamResult{usage: convertUsage(agUsage), firstTokenMs: firstTokenMs}, nil
|
||||
}
|
||||
if ev.err != nil {
|
||||
if errors.Is(ev.err, bufio.ErrTooLong) {
|
||||
log.Printf("SSE line too long (antigravity): max_size=%d error=%v", maxLineSize, ev.err)
|
||||
sendErrorEvent("response_too_large")
|
||||
return &antigravityStreamResult{usage: convertUsage(nil), firstTokenMs: firstTokenMs}, ev.err
|
||||
}
|
||||
sendErrorEvent("stream_read_error")
|
||||
return nil, fmt.Errorf("stream read error: %w", ev.err)
|
||||
}
|
||||
|
||||
line := ev.line
|
||||
// 处理 SSE 行,转换为 Claude 格式
|
||||
claudeEvents := processor.ProcessLine(strings.TrimRight(line, "\r\n"))
|
||||
|
||||
@@ -1153,25 +1566,23 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context
|
||||
if len(finalEvents) > 0 {
|
||||
_, _ = c.Writer.Write(finalEvents)
|
||||
}
|
||||
sendErrorEvent("write_failed")
|
||||
return &antigravityStreamResult{usage: convertUsage(agUsage), firstTokenMs: firstTokenMs}, writeErr
|
||||
}
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
case <-intervalCh:
|
||||
lastRead := time.Unix(0, atomic.LoadInt64(&lastReadAt))
|
||||
if time.Since(lastRead) < streamInterval {
|
||||
continue
|
||||
}
|
||||
log.Printf("Stream data interval timeout (antigravity)")
|
||||
sendErrorEvent("stream_timeout")
|
||||
return &antigravityStreamResult{usage: convertUsage(nil), firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout")
|
||||
}
|
||||
}
|
||||
|
||||
// 发送结束事件
|
||||
finalEvents, agUsage := processor.Finish()
|
||||
if len(finalEvents) > 0 {
|
||||
_, _ = c.Writer.Write(finalEvents)
|
||||
flusher.Flush()
|
||||
}
|
||||
|
||||
return &antigravityStreamResult{usage: convertUsage(agUsage), firstTokenMs: firstTokenMs}, nil
|
||||
}
|
||||
|
||||
// extractImageSize 从 Gemini 请求中提取 image_size 参数
|
||||
|
||||
Reference in New Issue
Block a user