Merge pull request #829 from JIA-ss/fix/usage-query-rate-limit

fix(usage): 修复用量查询 429 重试风暴,增加负缓存、请求去重与随机延迟
This commit is contained in:
Wesley Liddick
2026-03-07 08:32:29 +08:00
committed by GitHub

View File

@@ -6,6 +6,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"math/rand/v2"
"net/http" "net/http"
"strings" "strings"
"sync" "sync"
@@ -17,6 +18,7 @@ import (
"github.com/Wei-Shaw/sub2api/internal/pkg/timezone" "github.com/Wei-Shaw/sub2api/internal/pkg/timezone"
"github.com/Wei-Shaw/sub2api/internal/pkg/usagestats" "github.com/Wei-Shaw/sub2api/internal/pkg/usagestats"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"golang.org/x/sync/singleflight"
) )
type UsageLogRepository interface { type UsageLogRepository interface {
@@ -75,8 +77,10 @@ type accountWindowStatsBatchReader interface {
} }
// apiUsageCache 缓存从 Anthropic API 获取的使用率数据utilization, resets_at // apiUsageCache 缓存从 Anthropic API 获取的使用率数据utilization, resets_at
// 同时支持缓存错误响应(负缓存),防止 429 等错误导致的重试风暴
type apiUsageCache struct { type apiUsageCache struct {
response *ClaudeUsageResponse response *ClaudeUsageResponse
err error // 非 nil 表示缓存的错误(负缓存)
timestamp time.Time timestamp time.Time
} }
@@ -94,6 +98,8 @@ type antigravityUsageCache struct {
const ( const (
apiCacheTTL = 3 * time.Minute apiCacheTTL = 3 * time.Minute
apiErrorCacheTTL = 1 * time.Minute // 负缓存 TTL429 等错误缓存 1 分钟
apiQueryMaxJitter = 800 * time.Millisecond // 用量查询最大随机延迟
windowStatsCacheTTL = 1 * time.Minute windowStatsCacheTTL = 1 * time.Minute
openAIProbeCacheTTL = 10 * time.Minute openAIProbeCacheTTL = 10 * time.Minute
openAICodexProbeVersion = "0.104.0" openAICodexProbeVersion = "0.104.0"
@@ -101,10 +107,11 @@ const (
// UsageCache 封装账户使用量相关的缓存 // UsageCache 封装账户使用量相关的缓存
type UsageCache struct { type UsageCache struct {
apiCache sync.Map // accountID -> *apiUsageCache apiCache sync.Map // accountID -> *apiUsageCache
windowStatsCache sync.Map // accountID -> *windowStatsCache windowStatsCache sync.Map // accountID -> *windowStatsCache
antigravityCache sync.Map // accountID -> *antigravityUsageCache antigravityCache sync.Map // accountID -> *antigravityUsageCache
openAIProbeCache sync.Map // accountID -> time.Time apiFlight singleflight.Group // 防止同一账号的并发请求击穿缓存
openAIProbeCache sync.Map // accountID -> time.Time
} }
// NewUsageCache 创建 UsageCache 实例 // NewUsageCache 创建 UsageCache 实例
@@ -261,24 +268,65 @@ func (s *AccountUsageService) GetUsage(ctx context.Context, accountID int64) (*U
if account.CanGetUsage() { if account.CanGetUsage() {
var apiResp *ClaudeUsageResponse var apiResp *ClaudeUsageResponse
// 1. 检查 API 缓存10 分钟) // 1. 检查缓存(成功响应 3 分钟 / 错误响应 1 分钟)
if cached, ok := s.cache.apiCache.Load(accountID); ok { if cached, ok := s.cache.apiCache.Load(accountID); ok {
if cache, ok := cached.(*apiUsageCache); ok && time.Since(cache.timestamp) < apiCacheTTL { if cache, ok := cached.(*apiUsageCache); ok {
apiResp = cache.response age := time.Since(cache.timestamp)
if cache.err != nil && age < apiErrorCacheTTL {
// 负缓存命中:返回缓存的错误,避免重试风暴
return nil, cache.err
}
if cache.response != nil && age < apiCacheTTL {
apiResp = cache.response
}
} }
} }
// 2. 如果没有缓存,从 API 获取 // 2. 如果没有有效缓存,通过 singleflight 从 API 获取(防止并发击穿)
if apiResp == nil { if apiResp == nil {
apiResp, err = s.fetchOAuthUsageRaw(ctx, account) // 随机延迟:打散多账号并发请求,避免同一时刻大量相同 TLS 指纹请求
if err != nil { // 触发上游反滥用检测。延迟范围 0~800ms仅在缓存未命中时生效。
return nil, err jitter := time.Duration(rand.Int64N(int64(apiQueryMaxJitter)))
select {
case <-time.After(jitter):
case <-ctx.Done():
return nil, ctx.Err()
} }
// 缓存 API 响应
s.cache.apiCache.Store(accountID, &apiUsageCache{ flightKey := fmt.Sprintf("usage:%d", accountID)
response: apiResp, result, flightErr, _ := s.cache.apiFlight.Do(flightKey, func() (any, error) {
timestamp: time.Now(), // 再次检查缓存(可能在等待 singleflight 期间被其他请求填充)
if cached, ok := s.cache.apiCache.Load(accountID); ok {
if cache, ok := cached.(*apiUsageCache); ok {
age := time.Since(cache.timestamp)
if cache.err != nil && age < apiErrorCacheTTL {
return nil, cache.err
}
if cache.response != nil && age < apiCacheTTL {
return cache.response, nil
}
}
}
resp, fetchErr := s.fetchOAuthUsageRaw(ctx, account)
if fetchErr != nil {
// 负缓存:缓存错误响应,防止后续请求重复触发 429
s.cache.apiCache.Store(accountID, &apiUsageCache{
err: fetchErr,
timestamp: time.Now(),
})
return nil, fetchErr
}
// 缓存成功响应
s.cache.apiCache.Store(accountID, &apiUsageCache{
response: resp,
timestamp: time.Now(),
})
return resp, nil
}) })
if flightErr != nil {
return nil, flightErr
}
apiResp, _ = result.(*ClaudeUsageResponse)
} }
// 3. 构建 UsageInfo每次都重新计算 RemainingSeconds // 3. 构建 UsageInfo每次都重新计算 RemainingSeconds