feat: antigravity 配额域限流 + SSE 上限 (#222)
* fix: 添加 gemini-3-flash 前缀映射支持 gemini-3-flash-preview * feat(antigravity): 增强请求参数和注入 Antigravity 身份 system prompt * feat: antigravity 配额域限流 * chore: 调整 SSE 单行上限到 25MB * chore: 提升 SSE 单行上限到 40MB
This commit is contained in:
@@ -673,7 +673,7 @@ func setDefaults() {
|
|||||||
viper.SetDefault("gateway.concurrency_slot_ttl_minutes", 30) // 并发槽位过期时间(支持超长请求)
|
viper.SetDefault("gateway.concurrency_slot_ttl_minutes", 30) // 并发槽位过期时间(支持超长请求)
|
||||||
viper.SetDefault("gateway.stream_data_interval_timeout", 180)
|
viper.SetDefault("gateway.stream_data_interval_timeout", 180)
|
||||||
viper.SetDefault("gateway.stream_keepalive_interval", 10)
|
viper.SetDefault("gateway.stream_keepalive_interval", 10)
|
||||||
viper.SetDefault("gateway.max_line_size", 10*1024*1024)
|
viper.SetDefault("gateway.max_line_size", 40*1024*1024)
|
||||||
viper.SetDefault("gateway.scheduling.sticky_session_max_waiting", 3)
|
viper.SetDefault("gateway.scheduling.sticky_session_max_waiting", 3)
|
||||||
viper.SetDefault("gateway.scheduling.sticky_session_wait_timeout", 45*time.Second)
|
viper.SetDefault("gateway.scheduling.sticky_session_wait_timeout", 45*time.Second)
|
||||||
viper.SetDefault("gateway.scheduling.fallback_wait_timeout", 30*time.Second)
|
viper.SetDefault("gateway.scheduling.fallback_wait_timeout", 30*time.Second)
|
||||||
|
|||||||
@@ -675,6 +675,40 @@ func (r *accountRepository) SetRateLimited(ctx context.Context, id int64, resetA
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *accountRepository) SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope service.AntigravityQuotaScope, resetAt time.Time) error {
|
||||||
|
now := time.Now().UTC()
|
||||||
|
payload := map[string]string{
|
||||||
|
"rate_limited_at": now.Format(time.RFC3339),
|
||||||
|
"rate_limit_reset_at": resetAt.UTC().Format(time.RFC3339),
|
||||||
|
}
|
||||||
|
raw, err := json.Marshal(payload)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
path := "{antigravity_quota_scopes," + string(scope) + "}"
|
||||||
|
client := clientFromContext(ctx, r.client)
|
||||||
|
result, err := client.ExecContext(
|
||||||
|
ctx,
|
||||||
|
"UPDATE accounts SET extra = jsonb_set(COALESCE(extra, '{}'::jsonb), $1::text[], $2::jsonb, true), updated_at = NOW() WHERE id = $3 AND deleted_at IS NULL",
|
||||||
|
path,
|
||||||
|
raw,
|
||||||
|
id,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
affected, err := result.RowsAffected()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if affected == 0 {
|
||||||
|
return service.ErrAccountNotFound
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *accountRepository) SetOverloaded(ctx context.Context, id int64, until time.Time) error {
|
func (r *accountRepository) SetOverloaded(ctx context.Context, id int64, until time.Time) error {
|
||||||
_, err := r.client.Account.Update().
|
_, err := r.client.Account.Update().
|
||||||
Where(dbaccount.IDEQ(id)).
|
Where(dbaccount.IDEQ(id)).
|
||||||
@@ -718,6 +752,27 @@ func (r *accountRepository) ClearRateLimit(ctx context.Context, id int64) error
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *accountRepository) ClearAntigravityQuotaScopes(ctx context.Context, id int64) error {
|
||||||
|
client := clientFromContext(ctx, r.client)
|
||||||
|
result, err := client.ExecContext(
|
||||||
|
ctx,
|
||||||
|
"UPDATE accounts SET extra = COALESCE(extra, '{}'::jsonb) - 'antigravity_quota_scopes', updated_at = NOW() WHERE id = $1 AND deleted_at IS NULL",
|
||||||
|
id,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
affected, err := result.RowsAffected()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if affected == 0 {
|
||||||
|
return service.ErrAccountNotFound
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *accountRepository) UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error {
|
func (r *accountRepository) UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error {
|
||||||
builder := r.client.Account.Update().
|
builder := r.client.Account.Update().
|
||||||
Where(dbaccount.IDEQ(id)).
|
Where(dbaccount.IDEQ(id)).
|
||||||
|
|||||||
@@ -49,10 +49,12 @@ type AccountRepository interface {
|
|||||||
ListSchedulableByGroupIDAndPlatforms(ctx context.Context, groupID int64, platforms []string) ([]Account, error)
|
ListSchedulableByGroupIDAndPlatforms(ctx context.Context, groupID int64, platforms []string) ([]Account, error)
|
||||||
|
|
||||||
SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error
|
SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error
|
||||||
|
SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope AntigravityQuotaScope, resetAt time.Time) error
|
||||||
SetOverloaded(ctx context.Context, id int64, until time.Time) error
|
SetOverloaded(ctx context.Context, id int64, until time.Time) error
|
||||||
SetTempUnschedulable(ctx context.Context, id int64, until time.Time, reason string) error
|
SetTempUnschedulable(ctx context.Context, id int64, until time.Time, reason string) error
|
||||||
ClearTempUnschedulable(ctx context.Context, id int64) error
|
ClearTempUnschedulable(ctx context.Context, id int64) error
|
||||||
ClearRateLimit(ctx context.Context, id int64) error
|
ClearRateLimit(ctx context.Context, id int64) error
|
||||||
|
ClearAntigravityQuotaScopes(ctx context.Context, id int64) error
|
||||||
UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error
|
UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error
|
||||||
UpdateExtra(ctx context.Context, id int64, updates map[string]any) error
|
UpdateExtra(ctx context.Context, id int64, updates map[string]any) error
|
||||||
BulkUpdate(ctx context.Context, ids []int64, updates AccountBulkUpdate) (int64, error)
|
BulkUpdate(ctx context.Context, ids []int64, updates AccountBulkUpdate) (int64, error)
|
||||||
|
|||||||
@@ -139,6 +139,10 @@ func (s *accountRepoStub) SetRateLimited(ctx context.Context, id int64, resetAt
|
|||||||
panic("unexpected SetRateLimited call")
|
panic("unexpected SetRateLimited call")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *accountRepoStub) SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope AntigravityQuotaScope, resetAt time.Time) error {
|
||||||
|
panic("unexpected SetAntigravityQuotaScopeLimit call")
|
||||||
|
}
|
||||||
|
|
||||||
func (s *accountRepoStub) SetOverloaded(ctx context.Context, id int64, until time.Time) error {
|
func (s *accountRepoStub) SetOverloaded(ctx context.Context, id int64, until time.Time) error {
|
||||||
panic("unexpected SetOverloaded call")
|
panic("unexpected SetOverloaded call")
|
||||||
}
|
}
|
||||||
@@ -155,6 +159,10 @@ func (s *accountRepoStub) ClearRateLimit(ctx context.Context, id int64) error {
|
|||||||
panic("unexpected ClearRateLimit call")
|
panic("unexpected ClearRateLimit call")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *accountRepoStub) ClearAntigravityQuotaScopes(ctx context.Context, id int64) error {
|
||||||
|
panic("unexpected ClearAntigravityQuotaScopes call")
|
||||||
|
}
|
||||||
|
|
||||||
func (s *accountRepoStub) UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error {
|
func (s *accountRepoStub) UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error {
|
||||||
panic("unexpected UpdateSessionWindow call")
|
panic("unexpected UpdateSessionWindow call")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -93,6 +93,7 @@ var antigravityPrefixMapping = []struct {
|
|||||||
// 长前缀优先
|
// 长前缀优先
|
||||||
{"gemini-2.5-flash-image", "gemini-3-pro-image"}, // gemini-2.5-flash-image → 3-pro-image
|
{"gemini-2.5-flash-image", "gemini-3-pro-image"}, // gemini-2.5-flash-image → 3-pro-image
|
||||||
{"gemini-3-pro-image", "gemini-3-pro-image"}, // gemini-3-pro-image-preview 等
|
{"gemini-3-pro-image", "gemini-3-pro-image"}, // gemini-3-pro-image-preview 等
|
||||||
|
{"gemini-3-flash", "gemini-3-flash"}, // gemini-3-flash-preview 等 → gemini-3-flash
|
||||||
{"claude-3-5-sonnet", "claude-sonnet-4-5"}, // 旧版 claude-3-5-sonnet-xxx
|
{"claude-3-5-sonnet", "claude-sonnet-4-5"}, // 旧版 claude-3-5-sonnet-xxx
|
||||||
{"claude-sonnet-4-5", "claude-sonnet-4-5"}, // claude-sonnet-4-5-xxx
|
{"claude-sonnet-4-5", "claude-sonnet-4-5"}, // claude-sonnet-4-5-xxx
|
||||||
{"claude-haiku-4-5", "claude-sonnet-4-5"}, // claude-haiku-4-5-xxx → sonnet
|
{"claude-haiku-4-5", "claude-sonnet-4-5"}, // claude-haiku-4-5-xxx → sonnet
|
||||||
@@ -502,6 +503,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
|||||||
|
|
||||||
originalModel := claudeReq.Model
|
originalModel := claudeReq.Model
|
||||||
mappedModel := s.getMappedModel(account, claudeReq.Model)
|
mappedModel := s.getMappedModel(account, claudeReq.Model)
|
||||||
|
quotaScope, _ := resolveAntigravityQuotaScope(originalModel)
|
||||||
|
|
||||||
// 获取 access_token
|
// 获取 access_token
|
||||||
if s.tokenProvider == nil {
|
if s.tokenProvider == nil {
|
||||||
@@ -603,7 +605,7 @@ urlFallbackLoop:
|
|||||||
}
|
}
|
||||||
// 所有重试都失败,标记限流状态
|
// 所有重试都失败,标记限流状态
|
||||||
if resp.StatusCode == 429 {
|
if resp.StatusCode == 429 {
|
||||||
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody)
|
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
|
||||||
}
|
}
|
||||||
// 最后一次尝试也失败
|
// 最后一次尝试也失败
|
||||||
resp = &http.Response{
|
resp = &http.Response{
|
||||||
@@ -696,7 +698,7 @@ urlFallbackLoop:
|
|||||||
|
|
||||||
// 处理错误响应(重试后仍失败或不触发重试)
|
// 处理错误响应(重试后仍失败或不触发重试)
|
||||||
if resp.StatusCode >= 400 {
|
if resp.StatusCode >= 400 {
|
||||||
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody)
|
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
|
||||||
|
|
||||||
if s.shouldFailoverUpstreamError(resp.StatusCode) {
|
if s.shouldFailoverUpstreamError(resp.StatusCode) {
|
||||||
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
|
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
|
||||||
@@ -1021,6 +1023,7 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
|
|||||||
if len(body) == 0 {
|
if len(body) == 0 {
|
||||||
return nil, s.writeGoogleError(c, http.StatusBadRequest, "Request body is empty")
|
return nil, s.writeGoogleError(c, http.StatusBadRequest, "Request body is empty")
|
||||||
}
|
}
|
||||||
|
quotaScope, _ := resolveAntigravityQuotaScope(originalModel)
|
||||||
|
|
||||||
// 解析请求以获取 image_size(用于图片计费)
|
// 解析请求以获取 image_size(用于图片计费)
|
||||||
imageSize := s.extractImageSize(body)
|
imageSize := s.extractImageSize(body)
|
||||||
@@ -1146,7 +1149,7 @@ urlFallbackLoop:
|
|||||||
}
|
}
|
||||||
// 所有重试都失败,标记限流状态
|
// 所有重试都失败,标记限流状态
|
||||||
if resp.StatusCode == 429 {
|
if resp.StatusCode == 429 {
|
||||||
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody)
|
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
|
||||||
}
|
}
|
||||||
resp = &http.Response{
|
resp = &http.Response{
|
||||||
StatusCode: resp.StatusCode,
|
StatusCode: resp.StatusCode,
|
||||||
@@ -1200,7 +1203,7 @@ urlFallbackLoop:
|
|||||||
goto handleSuccess
|
goto handleSuccess
|
||||||
}
|
}
|
||||||
|
|
||||||
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody)
|
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
|
||||||
|
|
||||||
if s.shouldFailoverUpstreamError(resp.StatusCode) {
|
if s.shouldFailoverUpstreamError(resp.StatusCode) {
|
||||||
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
|
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
|
||||||
@@ -1314,7 +1317,7 @@ func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *AntigravityGatewayService) handleUpstreamError(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte) {
|
func (s *AntigravityGatewayService) handleUpstreamError(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope) {
|
||||||
// 429 使用 Gemini 格式解析(从 body 解析重置时间)
|
// 429 使用 Gemini 格式解析(从 body 解析重置时间)
|
||||||
if statusCode == 429 {
|
if statusCode == 429 {
|
||||||
resetAt := ParseGeminiRateLimitResetTime(body)
|
resetAt := ParseGeminiRateLimitResetTime(body)
|
||||||
@@ -1325,13 +1328,23 @@ func (s *AntigravityGatewayService) handleUpstreamError(ctx context.Context, pre
|
|||||||
defaultDur = 5 * time.Minute
|
defaultDur = 5 * time.Minute
|
||||||
}
|
}
|
||||||
ra := time.Now().Add(defaultDur)
|
ra := time.Now().Add(defaultDur)
|
||||||
log.Printf("%s status=429 rate_limited reset_in=%v (fallback)", prefix, defaultDur)
|
log.Printf("%s status=429 rate_limited scope=%s reset_in=%v (fallback)", prefix, quotaScope, defaultDur)
|
||||||
_ = s.accountRepo.SetRateLimited(ctx, account.ID, ra)
|
if quotaScope == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := s.accountRepo.SetAntigravityQuotaScopeLimit(ctx, account.ID, quotaScope, ra); err != nil {
|
||||||
|
log.Printf("%s status=429 rate_limit_set_failed scope=%s error=%v", prefix, quotaScope, err)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resetTime := time.Unix(*resetAt, 0)
|
resetTime := time.Unix(*resetAt, 0)
|
||||||
log.Printf("%s status=429 rate_limited reset_at=%v reset_in=%v", prefix, resetTime.Format("15:04:05"), time.Until(resetTime).Truncate(time.Second))
|
log.Printf("%s status=429 rate_limited scope=%s reset_at=%v reset_in=%v", prefix, quotaScope, resetTime.Format("15:04:05"), time.Until(resetTime).Truncate(time.Second))
|
||||||
_ = s.accountRepo.SetRateLimited(ctx, account.ID, resetTime)
|
if quotaScope == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := s.accountRepo.SetAntigravityQuotaScopeLimit(ctx, account.ID, quotaScope, resetTime); err != nil {
|
||||||
|
log.Printf("%s status=429 rate_limit_set_failed scope=%s error=%v", prefix, quotaScope, err)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// 其他错误码继续使用 rateLimitService
|
// 其他错误码继续使用 rateLimitService
|
||||||
|
|||||||
88
backend/internal/service/antigravity_quota_scope.go
Normal file
88
backend/internal/service/antigravity_quota_scope.go
Normal file
@@ -0,0 +1,88 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const antigravityQuotaScopesKey = "antigravity_quota_scopes"
|
||||||
|
|
||||||
|
// AntigravityQuotaScope 表示 Antigravity 的配额域
|
||||||
|
type AntigravityQuotaScope string
|
||||||
|
|
||||||
|
const (
|
||||||
|
AntigravityQuotaScopeClaude AntigravityQuotaScope = "claude"
|
||||||
|
AntigravityQuotaScopeGeminiText AntigravityQuotaScope = "gemini_text"
|
||||||
|
AntigravityQuotaScopeGeminiImage AntigravityQuotaScope = "gemini_image"
|
||||||
|
)
|
||||||
|
|
||||||
|
// resolveAntigravityQuotaScope 根据模型名称解析配额域
|
||||||
|
func resolveAntigravityQuotaScope(requestedModel string) (AntigravityQuotaScope, bool) {
|
||||||
|
model := normalizeAntigravityModelName(requestedModel)
|
||||||
|
if model == "" {
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
switch {
|
||||||
|
case strings.HasPrefix(model, "claude-"):
|
||||||
|
return AntigravityQuotaScopeClaude, true
|
||||||
|
case strings.HasPrefix(model, "gemini-"):
|
||||||
|
if isImageGenerationModel(model) {
|
||||||
|
return AntigravityQuotaScopeGeminiImage, true
|
||||||
|
}
|
||||||
|
return AntigravityQuotaScopeGeminiText, true
|
||||||
|
default:
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeAntigravityModelName(model string) string {
|
||||||
|
normalized := strings.ToLower(strings.TrimSpace(model))
|
||||||
|
normalized = strings.TrimPrefix(normalized, "models/")
|
||||||
|
return normalized
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsSchedulableForModel 结合 Antigravity 配额域限流判断是否可调度
|
||||||
|
func (a *Account) IsSchedulableForModel(requestedModel string) bool {
|
||||||
|
if a == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if !a.IsSchedulable() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if a.Platform != PlatformAntigravity {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
scope, ok := resolveAntigravityQuotaScope(requestedModel)
|
||||||
|
if !ok {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
resetAt := a.antigravityQuotaScopeResetAt(scope)
|
||||||
|
if resetAt == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
now := time.Now()
|
||||||
|
return !now.Before(*resetAt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Account) antigravityQuotaScopeResetAt(scope AntigravityQuotaScope) *time.Time {
|
||||||
|
if a == nil || a.Extra == nil || scope == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
rawScopes, ok := a.Extra[antigravityQuotaScopesKey].(map[string]any)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
rawScope, ok := rawScopes[string(scope)].(map[string]any)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
resetAtRaw, ok := rawScope["rate_limit_reset_at"].(string)
|
||||||
|
if !ok || strings.TrimSpace(resetAtRaw) == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
resetAt, err := time.Parse(time.RFC3339, resetAtRaw)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return &resetAt
|
||||||
|
}
|
||||||
@@ -136,6 +136,9 @@ func (m *mockAccountRepoForPlatform) ListSchedulableByGroupIDAndPlatforms(ctx co
|
|||||||
func (m *mockAccountRepoForPlatform) SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error {
|
func (m *mockAccountRepoForPlatform) SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
func (m *mockAccountRepoForPlatform) SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope AntigravityQuotaScope, resetAt time.Time) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
func (m *mockAccountRepoForPlatform) SetOverloaded(ctx context.Context, id int64, until time.Time) error {
|
func (m *mockAccountRepoForPlatform) SetOverloaded(ctx context.Context, id int64, until time.Time) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -148,6 +151,9 @@ func (m *mockAccountRepoForPlatform) ClearTempUnschedulable(ctx context.Context,
|
|||||||
func (m *mockAccountRepoForPlatform) ClearRateLimit(ctx context.Context, id int64) error {
|
func (m *mockAccountRepoForPlatform) ClearRateLimit(ctx context.Context, id int64) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
func (m *mockAccountRepoForPlatform) ClearAntigravityQuotaScopes(ctx context.Context, id int64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
func (m *mockAccountRepoForPlatform) UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error {
|
func (m *mockAccountRepoForPlatform) UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ const (
|
|||||||
claudeAPIURL = "https://api.anthropic.com/v1/messages?beta=true"
|
claudeAPIURL = "https://api.anthropic.com/v1/messages?beta=true"
|
||||||
claudeAPICountTokensURL = "https://api.anthropic.com/v1/messages/count_tokens?beta=true"
|
claudeAPICountTokensURL = "https://api.anthropic.com/v1/messages/count_tokens?beta=true"
|
||||||
stickySessionTTL = time.Hour // 粘性会话TTL
|
stickySessionTTL = time.Hour // 粘性会话TTL
|
||||||
defaultMaxLineSize = 10 * 1024 * 1024
|
defaultMaxLineSize = 40 * 1024 * 1024
|
||||||
claudeCodeSystemPrompt = "You are Claude Code, Anthropic's official CLI for Claude."
|
claudeCodeSystemPrompt = "You are Claude Code, Anthropic's official CLI for Claude."
|
||||||
maxCacheControlBlocks = 4 // Anthropic API 允许的最大 cache_control 块数量
|
maxCacheControlBlocks = 4 // Anthropic API 允许的最大 cache_control 块数量
|
||||||
)
|
)
|
||||||
@@ -481,7 +481,7 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
|
|||||||
account, err := s.accountRepo.GetByID(ctx, accountID)
|
account, err := s.accountRepo.GetByID(ctx, accountID)
|
||||||
if err == nil && s.isAccountInGroup(account, groupID) &&
|
if err == nil && s.isAccountInGroup(account, groupID) &&
|
||||||
s.isAccountAllowedForPlatform(account, platform, useMixed) &&
|
s.isAccountAllowedForPlatform(account, platform, useMixed) &&
|
||||||
account.IsSchedulable() &&
|
account.IsSchedulableForModel(requestedModel) &&
|
||||||
(requestedModel == "" || s.isModelSupportedByAccount(account, requestedModel)) {
|
(requestedModel == "" || s.isModelSupportedByAccount(account, requestedModel)) {
|
||||||
result, err := s.tryAcquireAccountSlot(ctx, accountID, account.Concurrency)
|
result, err := s.tryAcquireAccountSlot(ctx, accountID, account.Concurrency)
|
||||||
if err == nil && result.Acquired {
|
if err == nil && result.Acquired {
|
||||||
@@ -519,6 +519,9 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
|
|||||||
if !s.isAccountAllowedForPlatform(acc, platform, useMixed) {
|
if !s.isAccountAllowedForPlatform(acc, platform, useMixed) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if !acc.IsSchedulableForModel(requestedModel) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if requestedModel != "" && !s.isModelSupportedByAccount(acc, requestedModel) {
|
if requestedModel != "" && !s.isModelSupportedByAccount(acc, requestedModel) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -812,7 +815,7 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context,
|
|||||||
if _, excluded := excludedIDs[accountID]; !excluded {
|
if _, excluded := excludedIDs[accountID]; !excluded {
|
||||||
account, err := s.accountRepo.GetByID(ctx, accountID)
|
account, err := s.accountRepo.GetByID(ctx, accountID)
|
||||||
// 检查账号分组归属和平台匹配(确保粘性会话不会跨分组或跨平台)
|
// 检查账号分组归属和平台匹配(确保粘性会话不会跨分组或跨平台)
|
||||||
if err == nil && s.isAccountInGroup(account, groupID) && account.Platform == platform && account.IsSchedulable() && (requestedModel == "" || s.isModelSupportedByAccount(account, requestedModel)) {
|
if err == nil && s.isAccountInGroup(account, groupID) && account.Platform == platform && account.IsSchedulableForModel(requestedModel) && (requestedModel == "" || s.isModelSupportedByAccount(account, requestedModel)) {
|
||||||
if err := s.cache.RefreshSessionTTL(ctx, derefGroupID(groupID), sessionHash, stickySessionTTL); err != nil {
|
if err := s.cache.RefreshSessionTTL(ctx, derefGroupID(groupID), sessionHash, stickySessionTTL); err != nil {
|
||||||
log.Printf("refresh session ttl failed: session=%s err=%v", sessionHash, err)
|
log.Printf("refresh session ttl failed: session=%s err=%v", sessionHash, err)
|
||||||
}
|
}
|
||||||
@@ -844,6 +847,9 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context,
|
|||||||
if _, excluded := excludedIDs[acc.ID]; excluded {
|
if _, excluded := excludedIDs[acc.ID]; excluded {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if !acc.IsSchedulableForModel(requestedModel) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if requestedModel != "" && !s.isModelSupportedByAccount(acc, requestedModel) {
|
if requestedModel != "" && !s.isModelSupportedByAccount(acc, requestedModel) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -901,7 +907,7 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g
|
|||||||
if _, excluded := excludedIDs[accountID]; !excluded {
|
if _, excluded := excludedIDs[accountID]; !excluded {
|
||||||
account, err := s.accountRepo.GetByID(ctx, accountID)
|
account, err := s.accountRepo.GetByID(ctx, accountID)
|
||||||
// 检查账号分组归属和有效性:原生平台直接匹配,antigravity 需要启用混合调度
|
// 检查账号分组归属和有效性:原生平台直接匹配,antigravity 需要启用混合调度
|
||||||
if err == nil && s.isAccountInGroup(account, groupID) && account.IsSchedulable() && (requestedModel == "" || s.isModelSupportedByAccount(account, requestedModel)) {
|
if err == nil && s.isAccountInGroup(account, groupID) && account.IsSchedulableForModel(requestedModel) && (requestedModel == "" || s.isModelSupportedByAccount(account, requestedModel)) {
|
||||||
if account.Platform == nativePlatform || (account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled()) {
|
if account.Platform == nativePlatform || (account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled()) {
|
||||||
if err := s.cache.RefreshSessionTTL(ctx, derefGroupID(groupID), sessionHash, stickySessionTTL); err != nil {
|
if err := s.cache.RefreshSessionTTL(ctx, derefGroupID(groupID), sessionHash, stickySessionTTL); err != nil {
|
||||||
log.Printf("refresh session ttl failed: session=%s err=%v", sessionHash, err)
|
log.Printf("refresh session ttl failed: session=%s err=%v", sessionHash, err)
|
||||||
@@ -936,6 +942,9 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g
|
|||||||
if acc.Platform == PlatformAntigravity && !acc.IsMixedSchedulingEnabled() {
|
if acc.Platform == PlatformAntigravity && !acc.IsMixedSchedulingEnabled() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if !acc.IsSchedulableForModel(requestedModel) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if requestedModel != "" && !s.isModelSupportedByAccount(acc, requestedModel) {
|
if requestedModel != "" && !s.isModelSupportedByAccount(acc, requestedModel) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -114,7 +114,7 @@ func (s *GeminiMessagesCompatService) SelectAccountForModelWithExclusions(ctx co
|
|||||||
if _, excluded := excludedIDs[accountID]; !excluded {
|
if _, excluded := excludedIDs[accountID]; !excluded {
|
||||||
account, err := s.accountRepo.GetByID(ctx, accountID)
|
account, err := s.accountRepo.GetByID(ctx, accountID)
|
||||||
// 检查账号是否有效:原生平台直接匹配,antigravity 需要启用混合调度
|
// 检查账号是否有效:原生平台直接匹配,antigravity 需要启用混合调度
|
||||||
if err == nil && account.IsSchedulable() && (requestedModel == "" || s.isModelSupportedByAccount(account, requestedModel)) {
|
if err == nil && account.IsSchedulableForModel(requestedModel) && (requestedModel == "" || s.isModelSupportedByAccount(account, requestedModel)) {
|
||||||
valid := false
|
valid := false
|
||||||
if account.Platform == platform {
|
if account.Platform == platform {
|
||||||
valid = true
|
valid = true
|
||||||
@@ -172,6 +172,9 @@ func (s *GeminiMessagesCompatService) SelectAccountForModelWithExclusions(ctx co
|
|||||||
if useMixedScheduling && acc.Platform == PlatformAntigravity && !acc.IsMixedSchedulingEnabled() {
|
if useMixedScheduling && acc.Platform == PlatformAntigravity && !acc.IsMixedSchedulingEnabled() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if !acc.IsSchedulableForModel(requestedModel) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if requestedModel != "" && !s.isModelSupportedByAccount(acc, requestedModel) {
|
if requestedModel != "" && !s.isModelSupportedByAccount(acc, requestedModel) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -121,6 +121,9 @@ func (m *mockAccountRepoForGemini) ListSchedulableByGroupIDAndPlatforms(ctx cont
|
|||||||
func (m *mockAccountRepoForGemini) SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error {
|
func (m *mockAccountRepoForGemini) SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
func (m *mockAccountRepoForGemini) SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope AntigravityQuotaScope, resetAt time.Time) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
func (m *mockAccountRepoForGemini) SetOverloaded(ctx context.Context, id int64, until time.Time) error {
|
func (m *mockAccountRepoForGemini) SetOverloaded(ctx context.Context, id int64, until time.Time) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -131,6 +134,9 @@ func (m *mockAccountRepoForGemini) ClearTempUnschedulable(ctx context.Context, i
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (m *mockAccountRepoForGemini) ClearRateLimit(ctx context.Context, id int64) error { return nil }
|
func (m *mockAccountRepoForGemini) ClearRateLimit(ctx context.Context, id int64) error { return nil }
|
||||||
|
func (m *mockAccountRepoForGemini) ClearAntigravityQuotaScopes(ctx context.Context, id int64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
func (m *mockAccountRepoForGemini) UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error {
|
func (m *mockAccountRepoForGemini) UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -345,7 +345,7 @@ func (s *RateLimitService) UpdateSessionWindow(ctx context.Context, account *Acc
|
|||||||
|
|
||||||
// 如果状态为allowed且之前有限流,说明窗口已重置,清除限流状态
|
// 如果状态为allowed且之前有限流,说明窗口已重置,清除限流状态
|
||||||
if status == "allowed" && account.IsRateLimited() {
|
if status == "allowed" && account.IsRateLimited() {
|
||||||
if err := s.accountRepo.ClearRateLimit(ctx, account.ID); err != nil {
|
if err := s.ClearRateLimit(ctx, account.ID); err != nil {
|
||||||
log.Printf("ClearRateLimit failed for account %d: %v", account.ID, err)
|
log.Printf("ClearRateLimit failed for account %d: %v", account.ID, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -353,7 +353,10 @@ func (s *RateLimitService) UpdateSessionWindow(ctx context.Context, account *Acc
|
|||||||
|
|
||||||
// ClearRateLimit 清除账号的限流状态
|
// ClearRateLimit 清除账号的限流状态
|
||||||
func (s *RateLimitService) ClearRateLimit(ctx context.Context, accountID int64) error {
|
func (s *RateLimitService) ClearRateLimit(ctx context.Context, accountID int64) error {
|
||||||
return s.accountRepo.ClearRateLimit(ctx, accountID)
|
if err := s.accountRepo.ClearRateLimit(ctx, accountID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return s.accountRepo.ClearAntigravityQuotaScopes(ctx, accountID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *RateLimitService) ClearTempUnschedulable(ctx context.Context, accountID int64) error {
|
func (s *RateLimitService) ClearTempUnschedulable(ctx context.Context, accountID int64) error {
|
||||||
|
|||||||
@@ -154,9 +154,9 @@ gateway:
|
|||||||
# Stream keepalive interval (seconds), 0=disable
|
# Stream keepalive interval (seconds), 0=disable
|
||||||
# 流式 keepalive 间隔(秒),0=禁用
|
# 流式 keepalive 间隔(秒),0=禁用
|
||||||
stream_keepalive_interval: 10
|
stream_keepalive_interval: 10
|
||||||
# SSE max line size in bytes (default: 10MB)
|
# SSE max line size in bytes (default: 40MB)
|
||||||
# SSE 单行最大字节数(默认 10MB)
|
# SSE 单行最大字节数(默认 40MB)
|
||||||
max_line_size: 10485760
|
max_line_size: 41943040
|
||||||
# Log upstream error response body summary (safe/truncated; does not log request content)
|
# Log upstream error response body summary (safe/truncated; does not log request content)
|
||||||
# 记录上游错误响应体摘要(安全/截断;不记录请求内容)
|
# 记录上游错误响应体摘要(安全/截断;不记录请求内容)
|
||||||
log_upstream_error_body: false
|
log_upstream_error_body: false
|
||||||
|
|||||||
@@ -154,9 +154,9 @@ gateway:
|
|||||||
# Stream keepalive interval (seconds), 0=disable
|
# Stream keepalive interval (seconds), 0=disable
|
||||||
# 流式 keepalive 间隔(秒),0=禁用
|
# 流式 keepalive 间隔(秒),0=禁用
|
||||||
stream_keepalive_interval: 10
|
stream_keepalive_interval: 10
|
||||||
# SSE max line size in bytes (default: 10MB)
|
# SSE max line size in bytes (default: 40MB)
|
||||||
# SSE 单行最大字节数(默认 10MB)
|
# SSE 单行最大字节数(默认 40MB)
|
||||||
max_line_size: 10485760
|
max_line_size: 41943040
|
||||||
# Log upstream error response body summary (safe/truncated; does not log request content)
|
# Log upstream error response body summary (safe/truncated; does not log request content)
|
||||||
# 记录上游错误响应体摘要(安全/截断;不记录请求内容)
|
# 记录上游错误响应体摘要(安全/截断;不记录请求内容)
|
||||||
log_upstream_error_body: false
|
log_upstream_error_body: false
|
||||||
|
|||||||
Reference in New Issue
Block a user