fix: Anthropic 429 限流使用精确的窗口重置时间而非聚合最大值

当账号仅触发 5h 窗口限流时,旧逻辑从聚合头
anthropic-ratelimit-unified-reset 读取重置时间,该值为所有窗口的
最大值(即 7d 重置时间),导致账号被标记为不可调度约 6 天。

新增 calculateAnthropic429ResetTime 函数,解析 Anthropic 的
per-window 头(5h-utilization/reset、7d-utilization/reset、
surpassed-threshold),判断实际触发的窗口并使用对应的重置时间:
- 仅 5h 超标 → 使用 5h-reset(约 5 小时)
- 仅 7d 超标 → 使用 7d-reset
- 两者均超标 → 使用 7d-reset(较长冷却)
- per-window 头不存在 → 回退到聚合头(向后兼容)
This commit is contained in:
shaw
2026-02-14 00:21:56 +08:00
parent 471943269c
commit e681431454
2 changed files with 331 additions and 2 deletions

View File

@@ -381,10 +381,31 @@ func (s *RateLimitService) handle429(ctx context.Context, account *Account, head
}
}
// 2. 尝试从响应头解析重置时间Anthropic
// 2. Anthropic 平台:尝试解析 per-window 头5h / 7d选择实际触发的窗口
if result := calculateAnthropic429ResetTime(headers); result != nil {
if err := s.accountRepo.SetRateLimited(ctx, account.ID, result.resetAt); err != nil {
slog.Warn("rate_limit_set_failed", "account_id", account.ID, "error", err)
return
}
// 更新 session window优先使用 5h-reset 头精确计算,否则从 resetAt 反推
windowEnd := result.resetAt
if result.fiveHourReset != nil {
windowEnd = *result.fiveHourReset
}
windowStart := windowEnd.Add(-5 * time.Hour)
if err := s.accountRepo.UpdateSessionWindow(ctx, account.ID, &windowStart, &windowEnd, "rejected"); err != nil {
slog.Warn("rate_limit_update_session_window_failed", "account_id", account.ID, "error", err)
}
slog.Info("anthropic_account_rate_limited", "account_id", account.ID, "reset_at", result.resetAt, "reset_in", time.Until(result.resetAt).Truncate(time.Second))
return
}
// 3. 尝试从响应头解析重置时间Anthropic 聚合头,向后兼容)
resetTimestamp := headers.Get("anthropic-ratelimit-unified-reset")
// 3. 如果响应头没有尝试从响应体解析OpenAI usage_limit_reached, Gemini
// 4. 如果响应头没有尝试从响应体解析OpenAI usage_limit_reached, Gemini
if resetTimestamp == "" {
switch account.Platform {
case PlatformOpenAI:
@@ -497,6 +518,112 @@ func (s *RateLimitService) calculateOpenAI429ResetTime(headers http.Header) *tim
return nil
}
// anthropic429Result holds the parsed Anthropic 429 rate-limit information.
type anthropic429Result struct {
resetAt time.Time // The correct reset time to use for SetRateLimited
fiveHourReset *time.Time // 5h window reset timestamp (for session window calculation), nil if not available
}
// calculateAnthropic429ResetTime parses Anthropic's per-window rate-limit headers
// to determine which window (5h or 7d) actually triggered the 429.
//
// Headers used:
// - anthropic-ratelimit-unified-5h-utilization / anthropic-ratelimit-unified-5h-surpassed-threshold
// - anthropic-ratelimit-unified-5h-reset
// - anthropic-ratelimit-unified-7d-utilization / anthropic-ratelimit-unified-7d-surpassed-threshold
// - anthropic-ratelimit-unified-7d-reset
//
// Returns nil when the per-window headers are absent (caller should fall back to
// the aggregated anthropic-ratelimit-unified-reset header).
func calculateAnthropic429ResetTime(headers http.Header) *anthropic429Result {
reset5hStr := headers.Get("anthropic-ratelimit-unified-5h-reset")
reset7dStr := headers.Get("anthropic-ratelimit-unified-7d-reset")
if reset5hStr == "" && reset7dStr == "" {
return nil
}
var reset5h, reset7d *time.Time
if ts, err := strconv.ParseInt(reset5hStr, 10, 64); err == nil {
t := time.Unix(ts, 0)
reset5h = &t
}
if ts, err := strconv.ParseInt(reset7dStr, 10, 64); err == nil {
t := time.Unix(ts, 0)
reset7d = &t
}
is5hExceeded := isAnthropicWindowExceeded(headers, "5h")
is7dExceeded := isAnthropicWindowExceeded(headers, "7d")
slog.Info("anthropic_429_window_analysis",
"is_5h_exceeded", is5hExceeded,
"is_7d_exceeded", is7dExceeded,
"reset_5h", reset5hStr,
"reset_7d", reset7dStr,
)
// Select the correct reset time based on which window(s) are exceeded.
var chosen *time.Time
switch {
case is5hExceeded && is7dExceeded:
// Both exceeded → prefer 7d (longer cooldown), fall back to 5h
chosen = reset7d
if chosen == nil {
chosen = reset5h
}
case is5hExceeded:
chosen = reset5h
case is7dExceeded:
chosen = reset7d
default:
// Neither flag clearly exceeded — pick the sooner reset as best guess
chosen = pickSooner(reset5h, reset7d)
}
if chosen == nil {
return nil
}
return &anthropic429Result{resetAt: *chosen, fiveHourReset: reset5h}
}
// isAnthropicWindowExceeded checks whether a given Anthropic rate-limit window
// (e.g. "5h" or "7d") has been exceeded, using utilization and surpassed-threshold headers.
func isAnthropicWindowExceeded(headers http.Header, window string) bool {
prefix := "anthropic-ratelimit-unified-" + window + "-"
// Check surpassed-threshold first (most explicit signal)
if st := headers.Get(prefix + "surpassed-threshold"); strings.EqualFold(st, "true") {
return true
}
// Fall back to utilization >= 1.0
if utilStr := headers.Get(prefix + "utilization"); utilStr != "" {
if util, err := strconv.ParseFloat(utilStr, 64); err == nil && util >= 1.0-1e-9 {
// Use a small epsilon to handle floating point: treat 0.9999999... as >= 1.0
return true
}
}
return false
}
// pickSooner returns whichever of the two time pointers is earlier.
// If only one is non-nil, it is returned. If both are nil, returns nil.
func pickSooner(a, b *time.Time) *time.Time {
switch {
case a != nil && b != nil:
if a.Before(*b) {
return a
}
return b
case a != nil:
return a
default:
return b
}
}
// parseOpenAIRateLimitResetTime 解析 OpenAI 格式的 429 响应,返回重置时间的 Unix 时间戳
// OpenAI 的 usage_limit_reached 错误格式:
//

View File

@@ -0,0 +1,202 @@
package service
import (
"net/http"
"testing"
"time"
)
func TestCalculateAnthropic429ResetTime_Only5hExceeded(t *testing.T) {
headers := http.Header{}
headers.Set("anthropic-ratelimit-unified-5h-utilization", "1.02")
headers.Set("anthropic-ratelimit-unified-5h-reset", "1770998400")
headers.Set("anthropic-ratelimit-unified-7d-utilization", "0.32")
headers.Set("anthropic-ratelimit-unified-7d-reset", "1771549200")
result := calculateAnthropic429ResetTime(headers)
assertAnthropicResult(t, result, 1770998400)
if result.fiveHourReset == nil || !result.fiveHourReset.Equal(time.Unix(1770998400, 0)) {
t.Errorf("expected fiveHourReset=1770998400, got %v", result.fiveHourReset)
}
}
func TestCalculateAnthropic429ResetTime_Only7dExceeded(t *testing.T) {
headers := http.Header{}
headers.Set("anthropic-ratelimit-unified-5h-utilization", "0.50")
headers.Set("anthropic-ratelimit-unified-5h-reset", "1770998400")
headers.Set("anthropic-ratelimit-unified-7d-utilization", "1.05")
headers.Set("anthropic-ratelimit-unified-7d-reset", "1771549200")
result := calculateAnthropic429ResetTime(headers)
assertAnthropicResult(t, result, 1771549200)
// fiveHourReset should still be populated for session window calculation
if result.fiveHourReset == nil || !result.fiveHourReset.Equal(time.Unix(1770998400, 0)) {
t.Errorf("expected fiveHourReset=1770998400, got %v", result.fiveHourReset)
}
}
func TestCalculateAnthropic429ResetTime_BothExceeded(t *testing.T) {
headers := http.Header{}
headers.Set("anthropic-ratelimit-unified-5h-utilization", "1.10")
headers.Set("anthropic-ratelimit-unified-5h-reset", "1770998400")
headers.Set("anthropic-ratelimit-unified-7d-utilization", "1.02")
headers.Set("anthropic-ratelimit-unified-7d-reset", "1771549200")
result := calculateAnthropic429ResetTime(headers)
assertAnthropicResult(t, result, 1771549200)
}
func TestCalculateAnthropic429ResetTime_NoPerWindowHeaders(t *testing.T) {
headers := http.Header{}
headers.Set("anthropic-ratelimit-unified-reset", "1771549200")
result := calculateAnthropic429ResetTime(headers)
if result != nil {
t.Errorf("expected nil result when no per-window headers, got resetAt=%v", result.resetAt)
}
}
func TestCalculateAnthropic429ResetTime_NoHeaders(t *testing.T) {
result := calculateAnthropic429ResetTime(http.Header{})
if result != nil {
t.Errorf("expected nil result for empty headers, got resetAt=%v", result.resetAt)
}
}
func TestCalculateAnthropic429ResetTime_SurpassedThreshold(t *testing.T) {
headers := http.Header{}
headers.Set("anthropic-ratelimit-unified-5h-surpassed-threshold", "true")
headers.Set("anthropic-ratelimit-unified-5h-reset", "1770998400")
headers.Set("anthropic-ratelimit-unified-7d-surpassed-threshold", "false")
headers.Set("anthropic-ratelimit-unified-7d-reset", "1771549200")
result := calculateAnthropic429ResetTime(headers)
assertAnthropicResult(t, result, 1770998400)
}
func TestCalculateAnthropic429ResetTime_UtilizationExactlyOne(t *testing.T) {
headers := http.Header{}
headers.Set("anthropic-ratelimit-unified-5h-utilization", "1.0")
headers.Set("anthropic-ratelimit-unified-5h-reset", "1770998400")
headers.Set("anthropic-ratelimit-unified-7d-utilization", "0.5")
headers.Set("anthropic-ratelimit-unified-7d-reset", "1771549200")
result := calculateAnthropic429ResetTime(headers)
assertAnthropicResult(t, result, 1770998400)
}
func TestCalculateAnthropic429ResetTime_NeitherExceeded_UsesShorter(t *testing.T) {
headers := http.Header{}
headers.Set("anthropic-ratelimit-unified-5h-utilization", "0.95")
headers.Set("anthropic-ratelimit-unified-5h-reset", "1770998400") // sooner
headers.Set("anthropic-ratelimit-unified-7d-utilization", "0.80")
headers.Set("anthropic-ratelimit-unified-7d-reset", "1771549200") // later
result := calculateAnthropic429ResetTime(headers)
assertAnthropicResult(t, result, 1770998400)
}
func TestCalculateAnthropic429ResetTime_Only5hResetHeader(t *testing.T) {
headers := http.Header{}
headers.Set("anthropic-ratelimit-unified-5h-utilization", "1.05")
headers.Set("anthropic-ratelimit-unified-5h-reset", "1770998400")
result := calculateAnthropic429ResetTime(headers)
assertAnthropicResult(t, result, 1770998400)
}
func TestCalculateAnthropic429ResetTime_Only7dResetHeader(t *testing.T) {
headers := http.Header{}
headers.Set("anthropic-ratelimit-unified-7d-utilization", "1.03")
headers.Set("anthropic-ratelimit-unified-7d-reset", "1771549200")
result := calculateAnthropic429ResetTime(headers)
assertAnthropicResult(t, result, 1771549200)
if result.fiveHourReset != nil {
t.Errorf("expected fiveHourReset=nil when no 5h headers, got %v", result.fiveHourReset)
}
}
func TestIsAnthropicWindowExceeded(t *testing.T) {
tests := []struct {
name string
headers http.Header
window string
expected bool
}{
{
name: "utilization above 1.0",
headers: makeHeader("anthropic-ratelimit-unified-5h-utilization", "1.02"),
window: "5h",
expected: true,
},
{
name: "utilization exactly 1.0",
headers: makeHeader("anthropic-ratelimit-unified-5h-utilization", "1.0"),
window: "5h",
expected: true,
},
{
name: "utilization below 1.0",
headers: makeHeader("anthropic-ratelimit-unified-5h-utilization", "0.99"),
window: "5h",
expected: false,
},
{
name: "surpassed-threshold true",
headers: makeHeader("anthropic-ratelimit-unified-7d-surpassed-threshold", "true"),
window: "7d",
expected: true,
},
{
name: "surpassed-threshold True (case insensitive)",
headers: makeHeader("anthropic-ratelimit-unified-7d-surpassed-threshold", "True"),
window: "7d",
expected: true,
},
{
name: "surpassed-threshold false",
headers: makeHeader("anthropic-ratelimit-unified-7d-surpassed-threshold", "false"),
window: "7d",
expected: false,
},
{
name: "no headers",
headers: http.Header{},
window: "5h",
expected: false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := isAnthropicWindowExceeded(tc.headers, tc.window)
if got != tc.expected {
t.Errorf("expected %v, got %v", tc.expected, got)
}
})
}
}
// assertAnthropicResult is a test helper that verifies the result is non-nil and
// has the expected resetAt unix timestamp.
func assertAnthropicResult(t *testing.T, result *anthropic429Result, wantUnix int64) {
t.Helper()
if result == nil {
t.Fatal("expected non-nil result")
return // unreachable, but satisfies staticcheck SA5011
}
want := time.Unix(wantUnix, 0)
if !result.resetAt.Equal(want) {
t.Errorf("expected resetAt=%v, got %v", want, result.resetAt)
}
}
func makeHeader(key, value string) http.Header {
h := http.Header{}
h.Set(key, value)
return h
}