Files
sub2api/backend/internal/repository/user_rpm_cache.go
james-6-23 dc5d42addc feat(rpm): RPM 限流模块优化
P0:
- rpm_override 嵌入 Auth Cache Snapshot,消除每请求 DB 查询 (snapshot v6→v7)
- 429 RPM 响应返回 Retry-After 头(当前分钟剩余秒数)

P1:
- ClearAll 按钮直连 DELETE API,带 loading 防重复
- 新增 GET /admin/users/:id/rpm-status 管理员 RPM 用量查询端点

优化:
- checkRPM 从级联互斥改为并行取最严,user.rpm_limit 作为全局硬上限始终生效
- Override/Group 变更后自动失效 auth cache
- fail-open 语义不变,Redis 故障不阻塞业务
2026-04-23 16:34:37 +08:00

109 lines
3.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package repository
import (
"context"
"fmt"
"time"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/redis/go-redis/v9"
)
// 用户/分组级 RPM 计数器 Redis 实现。
//
// 设计说明:
// - key 形式rpm:ug:{uid}:{gid}:{minute}、rpm:u:{uid}:{minute}
// - 时间来源rdb.Time()Redis 服务端时间),避免多实例时钟漂移。
// - 原子操作TxPipeline (MULTI/EXEC) 执行 INCR+EXPIRE兼容 Redis Cluster。
// - TTL120s覆盖当前分钟窗口 + 少量冗余。
// - 返回值语义超限判断由调用方billing_cache_service.checkRPM与 RPMLimit 比较完成。
const (
userGroupRPMKeyPrefix = "rpm:ug:"
userRPMKeyPrefix = "rpm:u:"
userRPMKeyTTL = 120 * time.Second
)
type userRPMCacheImpl struct {
rdb *redis.Client
}
// NewUserRPMCache 创建用户/分组级 RPM 计数器。
func NewUserRPMCache(rdb *redis.Client) service.UserRPMCache {
return &userRPMCacheImpl{rdb: rdb}
}
// minuteTS 获取当前 Redis 服务端分钟时间戳。
func (c *userRPMCacheImpl) minuteTS(ctx context.Context) (int64, error) {
t, err := c.rdb.Time(ctx).Result()
if err != nil {
return 0, fmt.Errorf("redis TIME: %w", err)
}
return t.Unix() / 60, nil
}
// atomicIncr 原子 INCR+EXPIRE。
func (c *userRPMCacheImpl) atomicIncr(ctx context.Context, key string) (int, error) {
pipe := c.rdb.TxPipeline()
incr := pipe.Incr(ctx, key)
pipe.Expire(ctx, key, userRPMKeyTTL)
if _, err := pipe.Exec(ctx); err != nil {
return 0, fmt.Errorf("user rpm increment: %w", err)
}
return int(incr.Val()), nil
}
// IncrementUserGroupRPM 递增 (user, group) 分钟计数。
func (c *userRPMCacheImpl) IncrementUserGroupRPM(ctx context.Context, userID, groupID int64) (int, error) {
minute, err := c.minuteTS(ctx)
if err != nil {
return 0, err
}
key := fmt.Sprintf("%s%d:%d:%d", userGroupRPMKeyPrefix, userID, groupID, minute)
return c.atomicIncr(ctx, key)
}
// IncrementUserRPM 递增用户分钟计数。
func (c *userRPMCacheImpl) IncrementUserRPM(ctx context.Context, userID int64) (int, error) {
minute, err := c.minuteTS(ctx)
if err != nil {
return 0, err
}
key := fmt.Sprintf("%s%d:%d", userRPMKeyPrefix, userID, minute)
return c.atomicIncr(ctx, key)
}
// GetUserGroupRPM 获取 (user, group) 当前分钟已用 RPM只读
func (c *userRPMCacheImpl) GetUserGroupRPM(ctx context.Context, userID, groupID int64) (int, error) {
minute, err := c.minuteTS(ctx)
if err != nil {
return 0, err
}
key := fmt.Sprintf("%s%d:%d:%d", userGroupRPMKeyPrefix, userID, groupID, minute)
val, err := c.rdb.Get(ctx, key).Int()
if err == redis.Nil {
return 0, nil
}
if err != nil {
return 0, fmt.Errorf("user group rpm get: %w", err)
}
return val, nil
}
// GetUserRPM 获取用户当前分钟已用 RPM只读
func (c *userRPMCacheImpl) GetUserRPM(ctx context.Context, userID int64) (int, error) {
minute, err := c.minuteTS(ctx)
if err != nil {
return 0, err
}
key := fmt.Sprintf("%s%d:%d", userRPMKeyPrefix, userID, minute)
val, err := c.rdb.Get(ctx, key).Int()
if err == redis.Nil {
return 0, nil
}
if err != nil {
return 0, fmt.Errorf("user rpm get: %w", err)
}
return val, nil
}