diff --git a/backend/internal/repository/gateway_cache.go b/backend/internal/repository/gateway_cache.go index 9365252a..46ae0c16 100644 --- a/backend/internal/repository/gateway_cache.go +++ b/backend/internal/repository/gateway_cache.go @@ -238,3 +238,41 @@ func (c *gatewayCache) SaveGeminiSession(ctx context.Context, groupID int64, pre return c.rdb.Eval(ctx, geminiTrieSaveScript, []string{trieKey}, digestChain, value, ttlSeconds).Err() } + +// ============ Anthropic 会话 Fallback 方法 (复用 Trie 实现) ============ + +// FindAnthropicSession 查找 Anthropic 会话(复用 Gemini Trie Lua 脚本) +func (c *gatewayCache) FindAnthropicSession(ctx context.Context, groupID int64, prefixHash, digestChain string) (uuid string, accountID int64, found bool) { + if digestChain == "" { + return "", 0, false + } + + trieKey := service.BuildAnthropicTrieKey(groupID, prefixHash) + ttlSeconds := int(service.AnthropicSessionTTL().Seconds()) + + result, err := c.rdb.Eval(ctx, geminiTrieFindScript, []string{trieKey}, digestChain, ttlSeconds).Result() + if err != nil || result == nil { + return "", 0, false + } + + value, ok := result.(string) + if !ok || value == "" { + return "", 0, false + } + + uuid, accountID, ok = service.ParseGeminiSessionValue(value) + return uuid, accountID, ok +} + +// SaveAnthropicSession 保存 Anthropic 会话(复用 Gemini Trie Lua 脚本) +func (c *gatewayCache) SaveAnthropicSession(ctx context.Context, groupID int64, prefixHash, digestChain, uuid string, accountID int64) error { + if digestChain == "" { + return nil + } + + trieKey := service.BuildAnthropicTrieKey(groupID, prefixHash) + value := service.FormatGeminiSessionValue(uuid, accountID) + ttlSeconds := int(service.AnthropicSessionTTL().Seconds()) + + return c.rdb.Eval(ctx, geminiTrieSaveScript, []string{trieKey}, digestChain, value, ttlSeconds).Err() +} diff --git a/backend/internal/service/anthropic_session.go b/backend/internal/service/anthropic_session.go new file mode 100644 index 00000000..2d86ed35 --- /dev/null +++ b/backend/internal/service/anthropic_session.go @@ -0,0 +1,89 @@ +package service + +import ( + "encoding/json" + "strconv" + "strings" + "time" +) + +// Anthropic 会话 Fallback 相关常量 +const ( + // anthropicSessionTTLSeconds Anthropic 会话缓存 TTL(5 分钟) + anthropicSessionTTLSeconds = 300 + + // anthropicTrieKeyPrefix Anthropic Trie 会话 key 前缀 + anthropicTrieKeyPrefix = "anthropic:trie:" + + // anthropicDigestSessionKeyPrefix Anthropic 摘要 fallback 会话 key 前缀 + anthropicDigestSessionKeyPrefix = "anthropic:digest:" +) + +// AnthropicSessionTTL 返回 Anthropic 会话缓存 TTL +func AnthropicSessionTTL() time.Duration { + return anthropicSessionTTLSeconds * time.Second +} + +// BuildAnthropicDigestChain 根据 Anthropic 请求生成摘要链 +// 格式: s:-u:-a:-u:-... +// s = system, u = user, a = assistant +func BuildAnthropicDigestChain(parsed *ParsedRequest) string { + if parsed == nil { + return "" + } + + var parts []string + + // 1. system prompt + if parsed.System != nil { + systemData, _ := json.Marshal(parsed.System) + if len(systemData) > 0 && string(systemData) != "null" { + parts = append(parts, "s:"+shortHash(systemData)) + } + } + + // 2. messages + for _, msg := range parsed.Messages { + msgMap, ok := msg.(map[string]any) + if !ok { + continue + } + role, _ := msgMap["role"].(string) + prefix := rolePrefix(role) + content := msgMap["content"] + contentData, _ := json.Marshal(content) + parts = append(parts, prefix+":"+shortHash(contentData)) + } + + return strings.Join(parts, "-") +} + +// rolePrefix 将 Anthropic 的 role 映射为单字符前缀 +func rolePrefix(role string) string { + switch role { + case "assistant": + return "a" + default: + return "u" + } +} + +// BuildAnthropicTrieKey 构建 Anthropic Trie Redis key +// 格式: anthropic:trie:{groupID}:{prefixHash} +func BuildAnthropicTrieKey(groupID int64, prefixHash string) string { + return anthropicTrieKeyPrefix + strconv.FormatInt(groupID, 10) + ":" + prefixHash +} + +// GenerateAnthropicDigestSessionKey 生成 Anthropic 摘要 fallback 的 sessionKey +// 组合 prefixHash 前 8 位 + uuid 前 8 位,确保不同会话产生不同的 sessionKey +func GenerateAnthropicDigestSessionKey(prefixHash, uuid string) string { + prefix := prefixHash + if len(prefixHash) >= 8 { + prefix = prefixHash[:8] + } + uuidPart := uuid + if len(uuid) >= 8 { + uuidPart = uuid[:8] + } + return anthropicDigestSessionKeyPrefix + prefix + ":" + uuidPart +} diff --git a/backend/internal/service/anthropic_session_test.go b/backend/internal/service/anthropic_session_test.go new file mode 100644 index 00000000..e2f873e7 --- /dev/null +++ b/backend/internal/service/anthropic_session_test.go @@ -0,0 +1,357 @@ +package service + +import ( + "strings" + "testing" +) + +func TestBuildAnthropicDigestChain_NilRequest(t *testing.T) { + result := BuildAnthropicDigestChain(nil) + if result != "" { + t.Errorf("expected empty string for nil request, got: %s", result) + } +} + +func TestBuildAnthropicDigestChain_EmptyMessages(t *testing.T) { + parsed := &ParsedRequest{ + Messages: []any{}, + } + result := BuildAnthropicDigestChain(parsed) + if result != "" { + t.Errorf("expected empty string for empty messages, got: %s", result) + } +} + +func TestBuildAnthropicDigestChain_SingleUserMessage(t *testing.T) { + parsed := &ParsedRequest{ + Messages: []any{ + map[string]any{"role": "user", "content": "hello"}, + }, + } + result := BuildAnthropicDigestChain(parsed) + parts := splitChain(result) + if len(parts) != 1 { + t.Fatalf("expected 1 part, got %d: %s", len(parts), result) + } + if !strings.HasPrefix(parts[0], "u:") { + t.Errorf("expected prefix 'u:', got: %s", parts[0]) + } +} + +func TestBuildAnthropicDigestChain_UserAndAssistant(t *testing.T) { + parsed := &ParsedRequest{ + Messages: []any{ + map[string]any{"role": "user", "content": "hello"}, + map[string]any{"role": "assistant", "content": "hi there"}, + }, + } + result := BuildAnthropicDigestChain(parsed) + parts := splitChain(result) + if len(parts) != 2 { + t.Fatalf("expected 2 parts, got %d: %s", len(parts), result) + } + if !strings.HasPrefix(parts[0], "u:") { + t.Errorf("part[0] expected prefix 'u:', got: %s", parts[0]) + } + if !strings.HasPrefix(parts[1], "a:") { + t.Errorf("part[1] expected prefix 'a:', got: %s", parts[1]) + } +} + +func TestBuildAnthropicDigestChain_WithSystemString(t *testing.T) { + parsed := &ParsedRequest{ + System: "You are a helpful assistant", + Messages: []any{ + map[string]any{"role": "user", "content": "hello"}, + }, + } + result := BuildAnthropicDigestChain(parsed) + parts := splitChain(result) + if len(parts) != 2 { + t.Fatalf("expected 2 parts (s + u), got %d: %s", len(parts), result) + } + if !strings.HasPrefix(parts[0], "s:") { + t.Errorf("part[0] expected prefix 's:', got: %s", parts[0]) + } + if !strings.HasPrefix(parts[1], "u:") { + t.Errorf("part[1] expected prefix 'u:', got: %s", parts[1]) + } +} + +func TestBuildAnthropicDigestChain_WithSystemContentBlocks(t *testing.T) { + parsed := &ParsedRequest{ + System: []any{ + map[string]any{"type": "text", "text": "You are a helpful assistant"}, + }, + Messages: []any{ + map[string]any{"role": "user", "content": "hello"}, + }, + } + result := BuildAnthropicDigestChain(parsed) + parts := splitChain(result) + if len(parts) != 2 { + t.Fatalf("expected 2 parts (s + u), got %d: %s", len(parts), result) + } + if !strings.HasPrefix(parts[0], "s:") { + t.Errorf("part[0] expected prefix 's:', got: %s", parts[0]) + } +} + +func TestBuildAnthropicDigestChain_ConversationPrefixRelationship(t *testing.T) { + // 核心测试:验证对话增长时链的前缀关系 + // 上一轮的完整链一定是下一轮链的前缀 + system := "You are a helpful assistant" + + // 第 1 轮: system + user + round1 := &ParsedRequest{ + System: system, + Messages: []any{ + map[string]any{"role": "user", "content": "hello"}, + }, + } + chain1 := BuildAnthropicDigestChain(round1) + + // 第 2 轮: system + user + assistant + user + round2 := &ParsedRequest{ + System: system, + Messages: []any{ + map[string]any{"role": "user", "content": "hello"}, + map[string]any{"role": "assistant", "content": "hi there"}, + map[string]any{"role": "user", "content": "how are you?"}, + }, + } + chain2 := BuildAnthropicDigestChain(round2) + + // 第 3 轮: system + user + assistant + user + assistant + user + round3 := &ParsedRequest{ + System: system, + Messages: []any{ + map[string]any{"role": "user", "content": "hello"}, + map[string]any{"role": "assistant", "content": "hi there"}, + map[string]any{"role": "user", "content": "how are you?"}, + map[string]any{"role": "assistant", "content": "I'm doing well"}, + map[string]any{"role": "user", "content": "great"}, + }, + } + chain3 := BuildAnthropicDigestChain(round3) + + t.Logf("Chain1: %s", chain1) + t.Logf("Chain2: %s", chain2) + t.Logf("Chain3: %s", chain3) + + // chain1 是 chain2 的前缀 + if !strings.HasPrefix(chain2, chain1) { + t.Errorf("chain1 should be prefix of chain2:\n chain1: %s\n chain2: %s", chain1, chain2) + } + + // chain2 是 chain3 的前缀 + if !strings.HasPrefix(chain3, chain2) { + t.Errorf("chain2 should be prefix of chain3:\n chain2: %s\n chain3: %s", chain2, chain3) + } + + // chain1 也是 chain3 的前缀(传递性) + if !strings.HasPrefix(chain3, chain1) { + t.Errorf("chain1 should be prefix of chain3:\n chain1: %s\n chain3: %s", chain1, chain3) + } +} + +func TestBuildAnthropicDigestChain_DifferentSystemProducesDifferentChain(t *testing.T) { + parsed1 := &ParsedRequest{ + System: "System A", + Messages: []any{ + map[string]any{"role": "user", "content": "hello"}, + }, + } + parsed2 := &ParsedRequest{ + System: "System B", + Messages: []any{ + map[string]any{"role": "user", "content": "hello"}, + }, + } + + chain1 := BuildAnthropicDigestChain(parsed1) + chain2 := BuildAnthropicDigestChain(parsed2) + + if chain1 == chain2 { + t.Error("Different system prompts should produce different chains") + } + + // 但 user 部分的 hash 应该相同 + parts1 := splitChain(chain1) + parts2 := splitChain(chain2) + if parts1[1] != parts2[1] { + t.Error("Same user message should produce same hash regardless of system") + } +} + +func TestBuildAnthropicDigestChain_DifferentContentProducesDifferentChain(t *testing.T) { + parsed1 := &ParsedRequest{ + Messages: []any{ + map[string]any{"role": "user", "content": "hello"}, + map[string]any{"role": "assistant", "content": "ORIGINAL reply"}, + map[string]any{"role": "user", "content": "next"}, + }, + } + parsed2 := &ParsedRequest{ + Messages: []any{ + map[string]any{"role": "user", "content": "hello"}, + map[string]any{"role": "assistant", "content": "TAMPERED reply"}, + map[string]any{"role": "user", "content": "next"}, + }, + } + + chain1 := BuildAnthropicDigestChain(parsed1) + chain2 := BuildAnthropicDigestChain(parsed2) + + if chain1 == chain2 { + t.Error("Different content should produce different chains") + } + + parts1 := splitChain(chain1) + parts2 := splitChain(chain2) + // 第一个 user message hash 应该相同 + if parts1[0] != parts2[0] { + t.Error("First user message hash should be the same") + } + // assistant reply hash 应该不同 + if parts1[1] == parts2[1] { + t.Error("Assistant reply hash should differ") + } +} + +func TestBuildAnthropicDigestChain_Deterministic(t *testing.T) { + parsed := &ParsedRequest{ + System: "test system", + Messages: []any{ + map[string]any{"role": "user", "content": "hello"}, + map[string]any{"role": "assistant", "content": "hi"}, + }, + } + + chain1 := BuildAnthropicDigestChain(parsed) + chain2 := BuildAnthropicDigestChain(parsed) + + if chain1 != chain2 { + t.Errorf("BuildAnthropicDigestChain not deterministic: %s vs %s", chain1, chain2) + } +} + +func TestBuildAnthropicTrieKey(t *testing.T) { + tests := []struct { + name string + groupID int64 + prefixHash string + want string + }{ + { + name: "normal", + groupID: 123, + prefixHash: "abcdef12", + want: "anthropic:trie:123:abcdef12", + }, + { + name: "zero group", + groupID: 0, + prefixHash: "xyz", + want: "anthropic:trie:0:xyz", + }, + { + name: "empty prefix", + groupID: 1, + prefixHash: "", + want: "anthropic:trie:1:", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := BuildAnthropicTrieKey(tt.groupID, tt.prefixHash) + if got != tt.want { + t.Errorf("BuildAnthropicTrieKey(%d, %q) = %q, want %q", tt.groupID, tt.prefixHash, got, tt.want) + } + }) + } +} + +func TestGenerateAnthropicDigestSessionKey(t *testing.T) { + tests := []struct { + name string + prefixHash string + uuid string + want string + }{ + { + name: "normal 16 char hash with uuid", + prefixHash: "abcdefgh12345678", + uuid: "550e8400-e29b-41d4-a716-446655440000", + want: "anthropic:digest:abcdefgh:550e8400", + }, + { + name: "exactly 8 chars", + prefixHash: "12345678", + uuid: "abcdefgh", + want: "anthropic:digest:12345678:abcdefgh", + }, + { + name: "short values", + prefixHash: "abc", + uuid: "xyz", + want: "anthropic:digest:abc:xyz", + }, + { + name: "empty values", + prefixHash: "", + uuid: "", + want: "anthropic:digest::", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := GenerateAnthropicDigestSessionKey(tt.prefixHash, tt.uuid) + if got != tt.want { + t.Errorf("GenerateAnthropicDigestSessionKey(%q, %q) = %q, want %q", tt.prefixHash, tt.uuid, got, tt.want) + } + }) + } + + // 验证不同 uuid 产生不同 sessionKey + t.Run("different uuid different key", func(t *testing.T) { + hash := "sameprefix123456" + result1 := GenerateAnthropicDigestSessionKey(hash, "uuid0001-session-a") + result2 := GenerateAnthropicDigestSessionKey(hash, "uuid0002-session-b") + if result1 == result2 { + t.Errorf("Different UUIDs should produce different session keys: %s vs %s", result1, result2) + } + }) +} + +func TestAnthropicSessionTTL(t *testing.T) { + ttl := AnthropicSessionTTL() + if ttl.Seconds() != 300 { + t.Errorf("expected 300 seconds, got: %v", ttl.Seconds()) + } +} + +func TestBuildAnthropicDigestChain_ContentBlocks(t *testing.T) { + // 测试 content 为 content blocks 数组的情况 + parsed := &ParsedRequest{ + Messages: []any{ + map[string]any{ + "role": "user", + "content": []any{ + map[string]any{"type": "text", "text": "describe this image"}, + map[string]any{"type": "image", "source": map[string]any{"type": "base64"}}, + }, + }, + }, + } + result := BuildAnthropicDigestChain(parsed) + parts := splitChain(result) + if len(parts) != 1 { + t.Fatalf("expected 1 part, got %d: %s", len(parts), result) + } + if !strings.HasPrefix(parts[0], "u:") { + t.Errorf("expected prefix 'u:', got: %s", parts[0]) + } +} diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index 3d3c9cca..435cf0de 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -35,7 +35,7 @@ const ( // - 预检查:剩余限流时间 < 此阈值时等待,>= 此阈值时切换账号 antigravityRateLimitThreshold = 7 * time.Second antigravitySmartRetryMinWait = 1 * time.Second // 智能重试最小等待时间 - antigravitySmartRetryMaxAttempts = 3 // 智能重试最大次数 + antigravitySmartRetryMaxAttempts = 1 // 智能重试最大次数(仅重试 1 次,防止重复限流/长期等待) antigravityDefaultRateLimitDuration = 30 * time.Second // 默认限流时间(无 retryDelay 时使用) // Google RPC 状态和类型常量 @@ -247,6 +247,11 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam } } + // 清除粘性会话绑定,避免下次请求仍命中限流账号 + if s.cache != nil && p.sessionHash != "" { + _ = s.cache.DeleteSessionAccountID(p.ctx, p.groupID, p.sessionHash) + } + // 返回账号切换信号,让上层切换账号重试 return &smartRetryResult{ action: smartRetryActionBreakWithResp, @@ -264,27 +269,15 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam // antigravityRetryLoop 执行带 URL fallback 的重试循环 func (s *AntigravityGatewayService) antigravityRetryLoop(p antigravityRetryLoopParams) (*antigravityRetryLoopResult, error) { - // 预检查:如果账号已限流,根据剩余时间决定等待或切换 + // 预检查:如果账号已限流,直接返回切换信号 if p.requestedModel != "" { if remaining := p.account.GetRateLimitRemainingTimeWithContext(p.ctx, p.requestedModel); remaining > 0 { - if remaining < antigravityRateLimitThreshold { - // 限流剩余时间较短,等待后继续 - log.Printf("%s pre_check: rate_limit_wait remaining=%v model=%s account=%d", - p.prefix, remaining.Truncate(time.Millisecond), p.requestedModel, p.account.ID) - select { - case <-p.ctx.Done(): - return nil, p.ctx.Err() - case <-time.After(remaining): - } - } else { - // 限流剩余时间较长,返回账号切换信号 - log.Printf("%s pre_check: rate_limit_switch remaining=%v model=%s account=%d", - p.prefix, remaining.Truncate(time.Second), p.requestedModel, p.account.ID) - return nil, &AntigravityAccountSwitchError{ - OriginalAccountID: p.account.ID, - RateLimitedModel: p.requestedModel, - IsStickySession: p.isStickySession, - } + log.Printf("%s pre_check: rate_limit_switch remaining=%v model=%s account=%d", + p.prefix, remaining.Truncate(time.Millisecond), p.requestedModel, p.account.ID) + return nil, &AntigravityAccountSwitchError{ + OriginalAccountID: p.account.ID, + RateLimitedModel: p.requestedModel, + IsStickySession: p.isStickySession, } } } @@ -964,6 +957,16 @@ func isModelNotFoundError(statusCode int, body []byte) bool { } // Forward 转发 Claude 协议请求(Claude → Gemini 转换) +// +// 限流处理流程: +// +// 请求 → antigravityRetryLoop → 预检查(remaining>0? → 切换账号) → 发送上游 +// ├─ 成功 → 正常返回 +// └─ 429/503 → handleSmartRetry +// ├─ retryDelay >= 7s → 设置模型限流 + 清除粘性绑定 → 切换账号 +// └─ retryDelay < 7s → 等待后重试 1 次 +// ├─ 成功 → 正常返回 +// └─ 失败 → 设置模型限流 + 清除粘性绑定 → 切换账号 func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, account *Account, body []byte, isStickySession bool) (*ForwardResult, error) { startTime := time.Now() sessionID := getSessionID(c) @@ -1583,6 +1586,16 @@ func stripSignatureSensitiveBlocksFromClaudeRequest(req *antigravity.ClaudeReque } // ForwardGemini 转发 Gemini 协议请求 +// +// 限流处理流程: +// +// 请求 → antigravityRetryLoop → 预检查(remaining>0? → 切换账号) → 发送上游 +// ├─ 成功 → 正常返回 +// └─ 429/503 → handleSmartRetry +// ├─ retryDelay >= 7s → 设置模型限流 + 清除粘性绑定 → 切换账号 +// └─ retryDelay < 7s → 等待后重试 1 次 +// ├─ 成功 → 正常返回 +// └─ 失败 → 设置模型限流 + 清除粘性绑定 → 切换账号 func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Context, account *Account, originalModel string, action string, stream bool, body []byte, isStickySession bool) (*ForwardResult, error) { startTime := time.Now() sessionID := getSessionID(c) diff --git a/backend/internal/service/antigravity_rate_limit_test.go b/backend/internal/service/antigravity_rate_limit_test.go index 20936356..cd2a7a4a 100644 --- a/backend/internal/service/antigravity_rate_limit_test.go +++ b/backend/internal/service/antigravity_rate_limit_test.go @@ -803,7 +803,7 @@ func TestSetModelRateLimitByModelName_NotConvertToScope(t *testing.T) { require.NotEqual(t, "claude_sonnet", call.modelKey, "should NOT be scope") } -func TestAntigravityRetryLoop_PreCheck_WaitsWhenRemainingBelowThreshold(t *testing.T) { +func TestAntigravityRetryLoop_PreCheck_SwitchesWhenRateLimited(t *testing.T) { upstream := &recordingOKUpstream{} account := &Account{ ID: 1, @@ -815,19 +815,15 @@ func TestAntigravityRetryLoop_PreCheck_WaitsWhenRemainingBelowThreshold(t *testi Extra: map[string]any{ modelRateLimitsKey: map[string]any{ "claude-sonnet-4-5": map[string]any{ - // RFC3339 here is second-precision; keep it safely in the future. "rate_limit_reset_at": time.Now().Add(2 * time.Second).Format(time.RFC3339), }, }, }, } - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond) - defer cancel() - svc := &AntigravityGatewayService{} result, err := svc.antigravityRetryLoop(antigravityRetryLoopParams{ - ctx: ctx, + ctx: context.Background(), prefix: "[test]", account: account, accessToken: "token", @@ -841,12 +837,16 @@ func TestAntigravityRetryLoop_PreCheck_WaitsWhenRemainingBelowThreshold(t *testi }, }) - require.ErrorIs(t, err, context.DeadlineExceeded) require.Nil(t, result) - require.Equal(t, 0, upstream.calls, "should not call upstream while waiting on pre-check") + var switchErr *AntigravityAccountSwitchError + require.ErrorAs(t, err, &switchErr) + require.Equal(t, account.ID, switchErr.OriginalAccountID) + require.Equal(t, "claude-sonnet-4-5", switchErr.RateLimitedModel) + require.True(t, switchErr.IsStickySession) + require.Equal(t, 0, upstream.calls, "should not call upstream when switching on pre-check") } -func TestAntigravityRetryLoop_PreCheck_SwitchesWhenRemainingAtOrAboveThreshold(t *testing.T) { +func TestAntigravityRetryLoop_PreCheck_SwitchesWhenRemainingLong(t *testing.T) { upstream := &recordingOKUpstream{} account := &Account{ ID: 2, diff --git a/backend/internal/service/antigravity_smart_retry_test.go b/backend/internal/service/antigravity_smart_retry_test.go index 623dfec5..999b408f 100644 --- a/backend/internal/service/antigravity_smart_retry_test.go +++ b/backend/internal/service/antigravity_smart_retry_test.go @@ -13,6 +13,23 @@ import ( "github.com/stretchr/testify/require" ) +// stubSmartRetryCache 用于 handleSmartRetry 测试的 GatewayCache mock +// 仅关注 DeleteSessionAccountID 的调用记录 +type stubSmartRetryCache struct { + GatewayCache // 嵌入接口,未实现的方法 panic(确保只调用预期方法) + deleteCalls []deleteSessionCall +} + +type deleteSessionCall struct { + groupID int64 + sessionHash string +} + +func (c *stubSmartRetryCache) DeleteSessionAccountID(_ context.Context, groupID int64, sessionHash string) error { + c.deleteCalls = append(c.deleteCalls, deleteSessionCall{groupID: groupID, sessionHash: sessionHash}) + return nil +} + // mockSmartRetryUpstream 用于 handleSmartRetry 测试的 mock upstream type mockSmartRetryUpstream struct { responses []*http.Response @@ -198,7 +215,7 @@ func TestHandleSmartRetry_ShortDelay_SmartRetrySuccess(t *testing.T) { // TestHandleSmartRetry_ShortDelay_SmartRetryFailed_ReturnsSwitchError 测试智能重试失败后返回 switchError func TestHandleSmartRetry_ShortDelay_SmartRetryFailed_ReturnsSwitchError(t *testing.T) { - // 智能重试后仍然返回 429(需要提供 3 个响应,因为智能重试最多 3 次) + // 智能重试后仍然返回 429(需要提供 1 个响应,因为智能重试最多 1 次) failRespBody := `{ "error": { "status": "RESOURCE_EXHAUSTED", @@ -213,19 +230,9 @@ func TestHandleSmartRetry_ShortDelay_SmartRetryFailed_ReturnsSwitchError(t *test Header: http.Header{}, Body: io.NopCloser(strings.NewReader(failRespBody)), } - failResp2 := &http.Response{ - StatusCode: http.StatusTooManyRequests, - Header: http.Header{}, - Body: io.NopCloser(strings.NewReader(failRespBody)), - } - failResp3 := &http.Response{ - StatusCode: http.StatusTooManyRequests, - Header: http.Header{}, - Body: io.NopCloser(strings.NewReader(failRespBody)), - } upstream := &mockSmartRetryUpstream{ - responses: []*http.Response{failResp1, failResp2, failResp3}, - errors: []error{nil, nil, nil}, + responses: []*http.Response{failResp1}, + errors: []error{nil}, } repo := &stubAntigravityAccountRepo{} @@ -236,7 +243,7 @@ func TestHandleSmartRetry_ShortDelay_SmartRetryFailed_ReturnsSwitchError(t *test Platform: PlatformAntigravity, } - // 3s < 7s 阈值,应该触发智能重试(最多 3 次) + // 3s < 7s 阈值,应该触发智能重试(最多 1 次) respBody := []byte(`{ "error": { "status": "RESOURCE_EXHAUSTED", @@ -284,7 +291,7 @@ func TestHandleSmartRetry_ShortDelay_SmartRetryFailed_ReturnsSwitchError(t *test // 验证模型限流已设置 require.Len(t, repo.modelRateLimitCalls, 1) require.Equal(t, "gemini-3-flash", repo.modelRateLimitCalls[0].modelKey) - require.Len(t, upstream.calls, 3, "should have made three retry calls (max attempts)") + require.Len(t, upstream.calls, 1, "should have made one retry call (max attempts)") } // TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError 测试 503 MODEL_CAPACITY_EXHAUSTED 返回 switchError @@ -556,19 +563,15 @@ func TestAntigravityRetryLoop_HandleSmartRetry_SwitchError_Propagates(t *testing require.True(t, switchErr.IsStickySession) } -// TestHandleSmartRetry_NetworkError_ContinuesRetry 测试网络错误时继续重试 -func TestHandleSmartRetry_NetworkError_ContinuesRetry(t *testing.T) { - // 第一次网络错误,第二次成功 - successResp := &http.Response{ - StatusCode: http.StatusOK, - Header: http.Header{}, - Body: io.NopCloser(strings.NewReader(`{"result":"ok"}`)), - } +// TestHandleSmartRetry_NetworkError_ExhaustsRetry 测试网络错误时(maxAttempts=1)直接耗尽重试并切换账号 +func TestHandleSmartRetry_NetworkError_ExhaustsRetry(t *testing.T) { + // 唯一一次重试遇到网络错误(nil response) upstream := &mockSmartRetryUpstream{ - responses: []*http.Response{nil, successResp}, // 第一次返回 nil(模拟网络错误) - errors: []error{nil, nil}, // mock 不返回 error,靠 nil response 触发 + responses: []*http.Response{nil}, // 返回 nil(模拟网络错误) + errors: []error{nil}, // mock 不返回 error,靠 nil response 触发 } + repo := &stubAntigravityAccountRepo{} account := &Account{ ID: 8, Name: "acc-8", @@ -600,6 +603,7 @@ func TestHandleSmartRetry_NetworkError_ContinuesRetry(t *testing.T) { action: "generateContent", body: []byte(`{"input":"test"}`), httpUpstream: upstream, + accountRepo: repo, handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, @@ -612,10 +616,15 @@ func TestHandleSmartRetry_NetworkError_ContinuesRetry(t *testing.T) { require.NotNil(t, result) require.Equal(t, smartRetryActionBreakWithResp, result.action) - require.NotNil(t, result.resp, "should return successful response after network error recovery") - require.Equal(t, http.StatusOK, result.resp.StatusCode) - require.Nil(t, result.switchError, "should not return switchError on success") - require.Len(t, upstream.calls, 2, "should have made two retry calls") + require.Nil(t, result.resp, "should not return resp when switchError is set") + require.NotNil(t, result.switchError, "should return switchError after network error exhausted retry") + require.Equal(t, account.ID, result.switchError.OriginalAccountID) + require.Equal(t, "claude-sonnet-4-5", result.switchError.RateLimitedModel) + require.Len(t, upstream.calls, 1, "should have made one retry call") + + // 验证模型限流已设置 + require.Len(t, repo.modelRateLimitCalls, 1) + require.Equal(t, "claude-sonnet-4-5", repo.modelRateLimitCalls[0].modelKey) } // TestHandleSmartRetry_NoRetryDelay_UsesDefaultRateLimit 测试无 retryDelay 时使用默认 1 分钟限流 @@ -674,3 +683,617 @@ func TestHandleSmartRetry_NoRetryDelay_UsesDefaultRateLimit(t *testing.T) { require.Len(t, repo.modelRateLimitCalls, 1) require.Equal(t, "claude-sonnet-4-5", repo.modelRateLimitCalls[0].modelKey) } + +// --------------------------------------------------------------------------- +// 以下测试覆盖本次改动: +// 1. antigravitySmartRetryMaxAttempts = 1(仅重试 1 次) +// 2. 智能重试失败后清除粘性会话绑定(DeleteSessionAccountID) +// --------------------------------------------------------------------------- + +// TestSmartRetryMaxAttempts_VerifyConstant 验证常量值为 1 +func TestSmartRetryMaxAttempts_VerifyConstant(t *testing.T) { + require.Equal(t, 1, antigravitySmartRetryMaxAttempts, + "antigravitySmartRetryMaxAttempts should be 1 to prevent repeated rate limiting") +} + +// TestHandleSmartRetry_ShortDelay_StickySession_FailedRetry_ClearsSession +// 核心场景:粘性会话 + 短延迟重试失败 → 必须清除粘性绑定 +func TestHandleSmartRetry_ShortDelay_StickySession_FailedRetry_ClearsSession(t *testing.T) { + failRespBody := `{ + "error": { + "status": "RESOURCE_EXHAUSTED", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "claude-sonnet-4-5"}, "reason": "RATE_LIMIT_EXCEEDED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} + ] + } + }` + failResp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(failRespBody)), + } + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{failResp}, + errors: []error{nil}, + } + + repo := &stubAntigravityAccountRepo{} + cache := &stubSmartRetryCache{} + account := &Account{ + ID: 10, + Name: "acc-10", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + } + + respBody := []byte(`{ + "error": { + "status": "RESOURCE_EXHAUSTED", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "claude-sonnet-4-5"}, "reason": "RATE_LIMIT_EXCEEDED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} + ] + } + }`) + resp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(respBody)), + } + + params := antigravityRetryLoopParams{ + ctx: context.Background(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + accountRepo: repo, + isStickySession: true, + groupID: 42, + sessionHash: "sticky-hash-abc", + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + } + + availableURLs := []string{"https://ag-1.test"} + + svc := &AntigravityGatewayService{cache: cache} + result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs) + + // 验证返回 switchError + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + require.NotNil(t, result.switchError) + require.True(t, result.switchError.IsStickySession, "switchError should carry IsStickySession=true") + require.Equal(t, account.ID, result.switchError.OriginalAccountID) + + // 核心断言:DeleteSessionAccountID 被调用,且参数正确 + require.Len(t, cache.deleteCalls, 1, "should call DeleteSessionAccountID exactly once") + require.Equal(t, int64(42), cache.deleteCalls[0].groupID) + require.Equal(t, "sticky-hash-abc", cache.deleteCalls[0].sessionHash) + + // 验证仅重试 1 次 + require.Len(t, upstream.calls, 1, "should make exactly 1 retry call (maxAttempts=1)") + + // 验证模型限流已设置 + require.Len(t, repo.modelRateLimitCalls, 1) + require.Equal(t, "claude-sonnet-4-5", repo.modelRateLimitCalls[0].modelKey) +} + +// TestHandleSmartRetry_ShortDelay_NonStickySession_FailedRetry_NoDeleteSession +// 非粘性会话 + 短延迟重试失败 → 不应调用 DeleteSessionAccountID(sessionHash 为空) +func TestHandleSmartRetry_ShortDelay_NonStickySession_FailedRetry_NoDeleteSession(t *testing.T) { + failRespBody := `{ + "error": { + "status": "RESOURCE_EXHAUSTED", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "RATE_LIMIT_EXCEEDED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} + ] + } + }` + failResp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(failRespBody)), + } + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{failResp}, + errors: []error{nil}, + } + + repo := &stubAntigravityAccountRepo{} + cache := &stubSmartRetryCache{} + account := &Account{ + ID: 11, + Name: "acc-11", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + } + + respBody := []byte(`{ + "error": { + "status": "RESOURCE_EXHAUSTED", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "RATE_LIMIT_EXCEEDED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} + ] + } + }`) + resp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(respBody)), + } + + params := antigravityRetryLoopParams{ + ctx: context.Background(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + accountRepo: repo, + isStickySession: false, + groupID: 42, + sessionHash: "", // 非粘性会话,sessionHash 为空 + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + } + + availableURLs := []string{"https://ag-1.test"} + + svc := &AntigravityGatewayService{cache: cache} + result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs) + + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + require.NotNil(t, result.switchError) + require.False(t, result.switchError.IsStickySession) + + // 核心断言:sessionHash 为空时不应调用 DeleteSessionAccountID + require.Len(t, cache.deleteCalls, 0, "should NOT call DeleteSessionAccountID when sessionHash is empty") +} + +// TestHandleSmartRetry_ShortDelay_StickySession_FailedRetry_NilCache_NoPanic +// 边界:cache 为 nil 时不应 panic +func TestHandleSmartRetry_ShortDelay_StickySession_FailedRetry_NilCache_NoPanic(t *testing.T) { + failRespBody := `{ + "error": { + "status": "RESOURCE_EXHAUSTED", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "claude-sonnet-4-5"}, "reason": "RATE_LIMIT_EXCEEDED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} + ] + } + }` + failResp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(failRespBody)), + } + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{failResp}, + errors: []error{nil}, + } + + repo := &stubAntigravityAccountRepo{} + account := &Account{ + ID: 12, + Name: "acc-12", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + } + + respBody := []byte(`{ + "error": { + "status": "RESOURCE_EXHAUSTED", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "claude-sonnet-4-5"}, "reason": "RATE_LIMIT_EXCEEDED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} + ] + } + }`) + resp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(respBody)), + } + + params := antigravityRetryLoopParams{ + ctx: context.Background(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + accountRepo: repo, + isStickySession: true, + groupID: 42, + sessionHash: "sticky-hash-nil-cache", + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + } + + availableURLs := []string{"https://ag-1.test"} + + // cache 为 nil,不应 panic + svc := &AntigravityGatewayService{cache: nil} + require.NotPanics(t, func() { + result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs) + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + require.NotNil(t, result.switchError) + require.True(t, result.switchError.IsStickySession) + }) +} + +// TestHandleSmartRetry_ShortDelay_StickySession_SuccessRetry_NoDeleteSession +// 重试成功时不应清除粘性会话(只有失败才清除) +func TestHandleSmartRetry_ShortDelay_StickySession_SuccessRetry_NoDeleteSession(t *testing.T) { + successResp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(`{"result":"ok"}`)), + } + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{successResp}, + errors: []error{nil}, + } + + cache := &stubSmartRetryCache{} + account := &Account{ + ID: 13, + Name: "acc-13", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + } + + respBody := []byte(`{ + "error": { + "status": "RESOURCE_EXHAUSTED", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "claude-opus-4"}, "reason": "RATE_LIMIT_EXCEEDED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.5s"} + ] + } + }`) + resp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(respBody)), + } + + params := antigravityRetryLoopParams{ + ctx: context.Background(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + isStickySession: true, + groupID: 42, + sessionHash: "sticky-hash-success", + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + } + + availableURLs := []string{"https://ag-1.test"} + + svc := &AntigravityGatewayService{cache: cache} + result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs) + + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + require.NotNil(t, result.resp, "should return successful response") + require.Equal(t, http.StatusOK, result.resp.StatusCode) + require.Nil(t, result.switchError, "should not return switchError on success") + + // 核心断言:重试成功时不应清除粘性会话 + require.Len(t, cache.deleteCalls, 0, "should NOT call DeleteSessionAccountID on successful retry") +} + +// TestHandleSmartRetry_LongDelay_StickySession_NoDeleteInHandleSmartRetry +// 长延迟路径(情况1)在 handleSmartRetry 中不直接调用 DeleteSessionAccountID +// (清除由 handler 层的 shouldClearStickySession 在下次请求时处理) +func TestHandleSmartRetry_LongDelay_StickySession_NoDeleteInHandleSmartRetry(t *testing.T) { + repo := &stubAntigravityAccountRepo{} + cache := &stubSmartRetryCache{} + account := &Account{ + ID: 14, + Name: "acc-14", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + } + + // 15s >= 7s 阈值 → 走长延迟路径 + respBody := []byte(`{ + "error": { + "status": "RESOURCE_EXHAUSTED", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "claude-sonnet-4-5"}, "reason": "RATE_LIMIT_EXCEEDED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "15s"} + ] + } + }`) + resp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(respBody)), + } + + params := antigravityRetryLoopParams{ + ctx: context.Background(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + accountRepo: repo, + isStickySession: true, + groupID: 42, + sessionHash: "sticky-hash-long-delay", + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + } + + availableURLs := []string{"https://ag-1.test"} + + svc := &AntigravityGatewayService{cache: cache} + result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs) + + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + require.NotNil(t, result.switchError) + require.True(t, result.switchError.IsStickySession) + + // 长延迟路径不在 handleSmartRetry 中调用 DeleteSessionAccountID + // (由上游 handler 的 shouldClearStickySession 处理) + require.Len(t, cache.deleteCalls, 0, + "long delay path should NOT call DeleteSessionAccountID in handleSmartRetry (handled by handler layer)") +} + +// TestHandleSmartRetry_ShortDelay_NetworkError_StickySession_ClearsSession +// 网络错误耗尽重试 + 粘性会话 → 也应清除粘性绑定 +func TestHandleSmartRetry_ShortDelay_NetworkError_StickySession_ClearsSession(t *testing.T) { + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{nil}, // 网络错误 + errors: []error{nil}, + } + + repo := &stubAntigravityAccountRepo{} + cache := &stubSmartRetryCache{} + account := &Account{ + ID: 15, + Name: "acc-15", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + } + + respBody := []byte(`{ + "error": { + "status": "RESOURCE_EXHAUSTED", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "RATE_LIMIT_EXCEEDED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} + ] + } + }`) + resp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(respBody)), + } + + params := antigravityRetryLoopParams{ + ctx: context.Background(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + accountRepo: repo, + isStickySession: true, + groupID: 99, + sessionHash: "sticky-net-error", + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + } + + availableURLs := []string{"https://ag-1.test"} + + svc := &AntigravityGatewayService{cache: cache} + result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs) + + require.NotNil(t, result) + require.NotNil(t, result.switchError) + require.True(t, result.switchError.IsStickySession) + + // 核心断言:网络错误耗尽重试后也应清除粘性绑定 + require.Len(t, cache.deleteCalls, 1, "should call DeleteSessionAccountID after network error exhausts retry") + require.Equal(t, int64(99), cache.deleteCalls[0].groupID) + require.Equal(t, "sticky-net-error", cache.deleteCalls[0].sessionHash) +} + +// TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession +// 503 + 短延迟 + 粘性会话 + 重试失败 → 清除粘性绑定 +func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession(t *testing.T) { + failRespBody := `{ + "error": { + "code": 503, + "status": "UNAVAILABLE", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.5s"} + ] + } + }` + failResp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(failRespBody)), + } + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{failResp}, + errors: []error{nil}, + } + + repo := &stubAntigravityAccountRepo{} + cache := &stubSmartRetryCache{} + account := &Account{ + ID: 16, + Name: "acc-16", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + } + + respBody := []byte(`{ + "error": { + "code": 503, + "status": "UNAVAILABLE", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.5s"} + ] + } + }`) + resp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(respBody)), + } + + params := antigravityRetryLoopParams{ + ctx: context.Background(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + accountRepo: repo, + isStickySession: true, + groupID: 77, + sessionHash: "sticky-503-short", + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + } + + availableURLs := []string{"https://ag-1.test"} + + svc := &AntigravityGatewayService{cache: cache} + result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs) + + require.NotNil(t, result) + require.NotNil(t, result.switchError) + require.True(t, result.switchError.IsStickySession) + + // 验证粘性绑定被清除 + require.Len(t, cache.deleteCalls, 1) + require.Equal(t, int64(77), cache.deleteCalls[0].groupID) + require.Equal(t, "sticky-503-short", cache.deleteCalls[0].sessionHash) + + // 验证模型限流已设置 + require.Len(t, repo.modelRateLimitCalls, 1) + require.Equal(t, "gemini-3-pro", repo.modelRateLimitCalls[0].modelKey) +} + +// TestAntigravityRetryLoop_SmartRetryFailed_StickySession_SwitchErrorPropagates +// 集成测试:antigravityRetryLoop → handleSmartRetry → switchError 传播 +// 验证 IsStickySession 正确传递到上层,且粘性绑定被清除 +func TestAntigravityRetryLoop_SmartRetryFailed_StickySession_SwitchErrorPropagates(t *testing.T) { + // 初始 429 响应 + initialRespBody := []byte(`{ + "error": { + "status": "RESOURCE_EXHAUSTED", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "claude-opus-4-6"}, "reason": "RATE_LIMIT_EXCEEDED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} + ] + } + }`) + initialResp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(initialRespBody)), + } + + // 智能重试也返回 429 + retryRespBody := `{ + "error": { + "status": "RESOURCE_EXHAUSTED", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "claude-opus-4-6"}, "reason": "RATE_LIMIT_EXCEEDED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} + ] + } + }` + retryResp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(retryRespBody)), + } + + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{initialResp, retryResp}, + errors: []error{nil, nil}, + } + + repo := &stubAntigravityAccountRepo{} + cache := &stubSmartRetryCache{} + account := &Account{ + ID: 17, + Name: "acc-17", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Schedulable: true, + Status: StatusActive, + Concurrency: 1, + } + + svc := &AntigravityGatewayService{cache: cache} + result, err := svc.antigravityRetryLoop(antigravityRetryLoopParams{ + ctx: context.Background(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + accountRepo: repo, + isStickySession: true, + groupID: 55, + sessionHash: "sticky-loop-test", + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + }) + + require.Nil(t, result, "should not return result when switchError") + require.NotNil(t, err, "should return error") + + var switchErr *AntigravityAccountSwitchError + require.ErrorAs(t, err, &switchErr, "error should be AntigravityAccountSwitchError") + require.Equal(t, account.ID, switchErr.OriginalAccountID) + require.Equal(t, "claude-opus-4-6", switchErr.RateLimitedModel) + require.True(t, switchErr.IsStickySession, "IsStickySession must propagate through retryLoop") + + // 验证粘性绑定被清除 + require.Len(t, cache.deleteCalls, 1, "should clear sticky session in handleSmartRetry") + require.Equal(t, int64(55), cache.deleteCalls[0].groupID) + require.Equal(t, "sticky-loop-test", cache.deleteCalls[0].sessionHash) +} \ No newline at end of file diff --git a/backend/internal/service/gateway_multiplatform_test.go b/backend/internal/service/gateway_multiplatform_test.go index b3e60c21..d9c852e0 100644 --- a/backend/internal/service/gateway_multiplatform_test.go +++ b/backend/internal/service/gateway_multiplatform_test.go @@ -232,6 +232,14 @@ func (m *mockGatewayCacheForPlatform) SaveGeminiSession(ctx context.Context, gro return nil } +func (m *mockGatewayCacheForPlatform) FindAnthropicSession(ctx context.Context, groupID int64, prefixHash, digestChain string) (uuid string, accountID int64, found bool) { + return "", 0, false +} + +func (m *mockGatewayCacheForPlatform) SaveAnthropicSession(ctx context.Context, groupID int64, prefixHash, digestChain, uuid string, accountID int64) error { + return nil +} + type mockGroupRepoForGateway struct { groups map[int64]*Group getByIDCalls int diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 32646b11..480f5b67 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -313,6 +313,14 @@ type GatewayCache interface { // SaveGeminiSession 保存 Gemini 会话 // Save Gemini session binding SaveGeminiSession(ctx context.Context, groupID int64, prefixHash, digestChain, uuid string, accountID int64) error + + // FindAnthropicSession 查找 Anthropic 会话(Trie 匹配) + // Find Anthropic session using Trie matching + FindAnthropicSession(ctx context.Context, groupID int64, prefixHash, digestChain string) (uuid string, accountID int64, found bool) + + // SaveAnthropicSession 保存 Anthropic 会话 + // Save Anthropic session binding + SaveAnthropicSession(ctx context.Context, groupID int64, prefixHash, digestChain, uuid string, accountID int64) error } // derefGroupID safely dereferences *int64 to int64, returning 0 if nil @@ -323,21 +331,15 @@ func derefGroupID(groupID *int64) int64 { return *groupID } -// stickySessionRateLimitThreshold 定义清除粘性会话的限流时间阈值。 -// 当账号限流剩余时间超过此阈值时,清除粘性会话以便切换到其他账号。 -// 低于此阈值时保持粘性会话,等待短暂限流结束。 -const stickySessionRateLimitThreshold = 10 * time.Second - // shouldClearStickySession 检查账号是否处于不可调度状态,需要清理粘性会话绑定。 // 当账号状态为错误、禁用、不可调度、处于临时不可调度期间, -// 或模型限流剩余时间超过 stickySessionRateLimitThreshold 时,返回 true。 +// 或请求的模型处于限流状态时,返回 true。 // 这确保后续请求不会继续使用不可用的账号。 // // shouldClearStickySession checks if an account is in an unschedulable state // and the sticky session binding should be cleared. // Returns true when account status is error/disabled, schedulable is false, -// within temporary unschedulable period, or model rate limit remaining time -// exceeds stickySessionRateLimitThreshold. +// within temporary unschedulable period, or the requested model is rate-limited. // This ensures subsequent requests won't continue using unavailable accounts. func shouldClearStickySession(account *Account, requestedModel string) bool { if account == nil { @@ -349,8 +351,8 @@ func shouldClearStickySession(account *Account, requestedModel string) bool { if account.TempUnschedulableUntil != nil && time.Now().Before(*account.TempUnschedulableUntil) { return true } - // 检查模型限流和 scope 限流,只在超过阈值时清除粘性会话 - if remaining := account.GetRateLimitRemainingTimeWithContext(context.Background(), requestedModel); remaining > stickySessionRateLimitThreshold { + // 检查模型限流和 scope 限流,有限流即清除粘性会话 + if remaining := account.GetRateLimitRemainingTimeWithContext(context.Background(), requestedModel); remaining > 0 { return true } return false @@ -488,23 +490,25 @@ func (s *GatewayService) GenerateSessionHash(parsed *ParsedRequest) string { return s.hashContent(cacheableContent) } - // 3. Fallback: 使用 system 内容 + // 3. 最后 fallback: 使用 system + 所有消息的完整摘要串 + var combined strings.Builder if parsed.System != nil { systemText := s.extractTextFromSystem(parsed.System) if systemText != "" { - return s.hashContent(systemText) + _, _ = combined.WriteString(systemText) } } - - // 4. 最后 fallback: 使用第一条消息 - if len(parsed.Messages) > 0 { - if firstMsg, ok := parsed.Messages[0].(map[string]any); ok { - msgText := s.extractTextFromContent(firstMsg["content"]) + for _, msg := range parsed.Messages { + if m, ok := msg.(map[string]any); ok { + msgText := s.extractTextFromContent(m["content"]) if msgText != "" { - return s.hashContent(msgText) + _, _ = combined.WriteString(msgText) } } } + if combined.Len() > 0 { + return s.hashContent(combined.String()) + } return "" } @@ -547,6 +551,22 @@ func (s *GatewayService) SaveGeminiSession(ctx context.Context, groupID int64, p return s.cache.SaveGeminiSession(ctx, groupID, prefixHash, digestChain, uuid, accountID) } +// FindAnthropicSession 查找 Anthropic 会话(基于内容摘要链的 Fallback 匹配) +func (s *GatewayService) FindAnthropicSession(ctx context.Context, groupID int64, prefixHash, digestChain string) (uuid string, accountID int64, found bool) { + if digestChain == "" || s.cache == nil { + return "", 0, false + } + return s.cache.FindAnthropicSession(ctx, groupID, prefixHash, digestChain) +} + +// SaveAnthropicSession 保存 Anthropic 会话 +func (s *GatewayService) SaveAnthropicSession(ctx context.Context, groupID int64, prefixHash, digestChain, uuid string, accountID int64) error { + if digestChain == "" || s.cache == nil { + return nil + } + return s.cache.SaveAnthropicSession(ctx, groupID, prefixHash, digestChain, uuid, accountID) +} + func (s *GatewayService) extractCacheableContent(parsed *ParsedRequest) string { if parsed == nil { return "" @@ -1110,7 +1130,6 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro result.ReleaseFunc() // 释放槽位 // 继续到负载感知选择 } else { - _ = s.cache.RefreshSessionTTL(ctx, derefGroupID(groupID), sessionHash, stickySessionTTL) if s.debugModelRoutingEnabled() { log.Printf("[ModelRoutingDebug] routed sticky hit: group_id=%v model=%s session=%s account=%d", derefGroupID(groupID), requestedModel, shortSessionHash(sessionHash), stickyAccountID) } @@ -1264,7 +1283,6 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro if !s.checkAndRegisterSession(ctx, account, sessionHash) { result.ReleaseFunc() // 释放槽位,继续到 Layer 2 } else { - _ = s.cache.RefreshSessionTTL(ctx, derefGroupID(groupID), sessionHash, stickySessionTTL) return &AccountSelectionResult{ Account: account, Acquired: true, @@ -2169,9 +2187,6 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, _ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash) } if !clearSticky && s.isAccountInGroup(account, groupID) && account.Platform == platform && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && account.IsSchedulableForModelWithContext(ctx, requestedModel) { - if err := s.cache.RefreshSessionTTL(ctx, derefGroupID(groupID), sessionHash, stickySessionTTL); err != nil { - log.Printf("refresh session ttl failed: session=%s err=%v", sessionHash, err) - } if s.debugModelRoutingEnabled() { log.Printf("[ModelRoutingDebug] legacy routed sticky hit: group_id=%v model=%s session=%s account=%d", derefGroupID(groupID), requestedModel, shortSessionHash(sessionHash), accountID) } @@ -2272,9 +2287,6 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, _ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash) } if !clearSticky && s.isAccountInGroup(account, groupID) && account.Platform == platform && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && account.IsSchedulableForModelWithContext(ctx, requestedModel) { - if err := s.cache.RefreshSessionTTL(ctx, derefGroupID(groupID), sessionHash, stickySessionTTL); err != nil { - log.Printf("refresh session ttl failed: session=%s err=%v", sessionHash, err) - } return account, nil } } @@ -2383,9 +2395,6 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g } if !clearSticky && s.isAccountInGroup(account, groupID) && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && account.IsSchedulableForModelWithContext(ctx, requestedModel) { if account.Platform == nativePlatform || (account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled()) { - if err := s.cache.RefreshSessionTTL(ctx, derefGroupID(groupID), sessionHash, stickySessionTTL); err != nil { - log.Printf("refresh session ttl failed: session=%s err=%v", sessionHash, err) - } if s.debugModelRoutingEnabled() { log.Printf("[ModelRoutingDebug] legacy mixed routed sticky hit: group_id=%v model=%s session=%s account=%d", derefGroupID(groupID), requestedModel, shortSessionHash(sessionHash), accountID) } @@ -2488,9 +2497,6 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g } if !clearSticky && s.isAccountInGroup(account, groupID) && (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && account.IsSchedulableForModelWithContext(ctx, requestedModel) { if account.Platform == nativePlatform || (account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled()) { - if err := s.cache.RefreshSessionTTL(ctx, derefGroupID(groupID), sessionHash, stickySessionTTL); err != nil { - log.Printf("refresh session ttl failed: session=%s err=%v", sessionHash, err) - } return account, nil } } diff --git a/backend/internal/service/gemini_multiplatform_test.go b/backend/internal/service/gemini_multiplatform_test.go index 601e7e2c..0c54dc39 100644 --- a/backend/internal/service/gemini_multiplatform_test.go +++ b/backend/internal/service/gemini_multiplatform_test.go @@ -281,6 +281,14 @@ func (m *mockGatewayCacheForGemini) SaveGeminiSession(ctx context.Context, group return nil } +func (m *mockGatewayCacheForGemini) FindAnthropicSession(ctx context.Context, groupID int64, prefixHash, digestChain string) (uuid string, accountID int64, found bool) { + return "", 0, false +} + +func (m *mockGatewayCacheForGemini) SaveAnthropicSession(ctx context.Context, groupID int64, prefixHash, digestChain, uuid string, accountID int64) error { + return nil +} + // TestGeminiMessagesCompatService_SelectAccountForModelWithExclusions_GeminiPlatform 测试 Gemini 单平台选择 func TestGeminiMessagesCompatService_SelectAccountForModelWithExclusions_GeminiPlatform(t *testing.T) { ctx := context.Background() diff --git a/backend/internal/service/openai_gateway_service_test.go b/backend/internal/service/openai_gateway_service_test.go index 1c2c81ca..159b0afb 100644 --- a/backend/internal/service/openai_gateway_service_test.go +++ b/backend/internal/service/openai_gateway_service_test.go @@ -220,6 +220,14 @@ func (c *stubGatewayCache) SaveGeminiSession(ctx context.Context, groupID int64, return nil } +func (c *stubGatewayCache) FindAnthropicSession(ctx context.Context, groupID int64, prefixHash, digestChain string) (uuid string, accountID int64, found bool) { + return "", 0, false +} + +func (c *stubGatewayCache) SaveAnthropicSession(ctx context.Context, groupID int64, prefixHash, digestChain, uuid string, accountID int64) error { + return nil +} + func TestOpenAISelectAccountWithLoadAwareness_FiltersUnschedulable(t *testing.T) { now := time.Now() resetAt := now.Add(10 * time.Minute) diff --git a/backend/internal/service/sticky_session_test.go b/backend/internal/service/sticky_session_test.go index c70f12fe..e7ef8982 100644 --- a/backend/internal/service/sticky_session_test.go +++ b/backend/internal/service/sticky_session_test.go @@ -23,8 +23,7 @@ import ( // - 临时不可调度且未过期:清理 // - 临时不可调度已过期:不清理 // - 正常可调度状态:不清理 -// - 模型限流超过阈值:清理 -// - 模型限流未超过阈值:不清理 +// - 模型限流(任意时长):清理 // // TestShouldClearStickySession tests the sticky session clearing logic. // Verifies correct behavior for various account states including: @@ -35,9 +34,9 @@ func TestShouldClearStickySession(t *testing.T) { future := now.Add(1 * time.Hour) past := now.Add(-1 * time.Hour) - // 短限流时间(低于阈值,不应清除粘性会话) + // 短限流时间(有限流即清除粘性会话) shortRateLimitReset := now.Add(5 * time.Second).Format(time.RFC3339) - // 长限流时间(超过阈值,应清除粘性会话) + // 长限流时间(有限流即清除粘性会话) longRateLimitReset := now.Add(30 * time.Second).Format(time.RFC3339) tests := []struct { @@ -53,7 +52,7 @@ func TestShouldClearStickySession(t *testing.T) { {name: "temp unschedulable", account: &Account{Status: StatusActive, Schedulable: true, TempUnschedulableUntil: &future}, requestedModel: "", want: true}, {name: "temp unschedulable expired", account: &Account{Status: StatusActive, Schedulable: true, TempUnschedulableUntil: &past}, requestedModel: "", want: false}, {name: "active schedulable", account: &Account{Status: StatusActive, Schedulable: true}, requestedModel: "", want: false}, - // 模型限流测试 + // 模型限流测试:有限流即清除 { name: "model rate limited short duration", account: &Account{ @@ -68,7 +67,7 @@ func TestShouldClearStickySession(t *testing.T) { }, }, requestedModel: "claude-sonnet-4", - want: false, // 低于阈值,不清除 + want: true, // 有限流即清除 }, { name: "model rate limited long duration", @@ -84,7 +83,7 @@ func TestShouldClearStickySession(t *testing.T) { }, }, requestedModel: "claude-sonnet-4", - want: true, // 超过阈值,清除 + want: true, // 有限流即清除 }, { name: "model rate limited different model",