fix(限流): 原子化 Redis 限流并支持故障策略

使用 Lua 脚本原子设置计数与过期,修复 TTL 缺失\n支持 fail-open/fail-close 并对优惠码验证启用 fail-close\n新增单元与集成测试覆盖关键分支\n\n测试:go test ./...
This commit is contained in:
yangjianbo
2026-01-11 22:21:05 +08:00
parent 5f80760a8c
commit 18b8bd43ad
8 changed files with 378 additions and 13 deletions

View File

@@ -1,6 +1,7 @@
package middleware
import (
"context"
"net/http"
"time"
@@ -8,6 +9,33 @@ import (
"github.com/redis/go-redis/v9"
)
// RateLimitFailureMode Redis 故障策略
type RateLimitFailureMode int
const (
RateLimitFailOpen RateLimitFailureMode = iota
RateLimitFailClose
)
// RateLimitOptions 限流可选配置
type RateLimitOptions struct {
FailureMode RateLimitFailureMode
}
var rateLimitScript = redis.NewScript(`
local current = redis.call('INCR', KEYS[1])
local ttl = redis.call('PTTL', KEYS[1])
if current == 1 or ttl == -1 then
redis.call('PEXPIRE', KEYS[1], ARGV[1])
end
return current
`)
// rateLimitRun 允许测试覆写脚本执行逻辑
var rateLimitRun = func(ctx context.Context, client *redis.Client, key string, windowMillis int64) (int64, error) {
return rateLimitScript.Run(ctx, client, []string{key}, windowMillis).Int64()
}
// RateLimiter Redis 速率限制器
type RateLimiter struct {
redis *redis.Client
@@ -27,34 +55,57 @@ func NewRateLimiter(redisClient *redis.Client) *RateLimiter {
// limit: 时间窗口内最大请求数
// window: 时间窗口
func (r *RateLimiter) Limit(key string, limit int, window time.Duration) gin.HandlerFunc {
return r.LimitWithOptions(key, limit, window, RateLimitOptions{})
}
// LimitWithOptions 返回速率限制中间件(带可选配置)
func (r *RateLimiter) LimitWithOptions(key string, limit int, window time.Duration, opts RateLimitOptions) gin.HandlerFunc {
failureMode := opts.FailureMode
if failureMode != RateLimitFailClose {
failureMode = RateLimitFailOpen
}
return func(c *gin.Context) {
ip := c.ClientIP()
redisKey := r.prefix + key + ":" + ip
ctx := c.Request.Context()
// 使用 INCR 原子操作增加计数
count, err := r.redis.Incr(ctx, redisKey).Result()
windowMillis := windowTTLMillis(window)
// 使用 Lua 脚本原子操作增加计数并设置过期
count, err := rateLimitRun(ctx, r.redis, redisKey, windowMillis)
if err != nil {
if failureMode == RateLimitFailClose {
abortRateLimit(c)
return
}
// Redis 错误时放行,避免影响正常服务
c.Next()
return
}
// 首次访问时设置过期时间
if count == 1 {
r.redis.Expire(ctx, redisKey, window)
}
// 超过限制
if count > int64(limit) {
c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{
"error": "rate limit exceeded",
"message": "Too many requests, please try again later",
})
abortRateLimit(c)
return
}
c.Next()
}
}
func windowTTLMillis(window time.Duration) int64 {
ttl := window.Milliseconds()
if ttl < 1 {
return 1
}
return ttl
}
func abortRateLimit(c *gin.Context) {
c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{
"error": "rate limit exceeded",
"message": "Too many requests, please try again later",
})
}

View File

@@ -0,0 +1,114 @@
//go:build integration
package middleware
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/gin-gonic/gin"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
tcredis "github.com/testcontainers/testcontainers-go/modules/redis"
)
const redisImageTag = "redis:8.4-alpine"
func TestRateLimiterSetsTTLAndDoesNotRefresh(t *testing.T) {
gin.SetMode(gin.TestMode)
ctx := context.Background()
rdb := startRedis(t, ctx)
limiter := NewRateLimiter(rdb)
router := gin.New()
router.Use(limiter.Limit("ttl-test", 10, 2*time.Second))
router.GET("/test", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"ok": true})
})
recorder := performRequest(router)
require.Equal(t, http.StatusOK, recorder.Code)
redisKey := limiter.prefix + "ttl-test:127.0.0.1"
ttlBefore, err := rdb.PTTL(ctx, redisKey).Result()
require.NoError(t, err)
require.Greater(t, ttlBefore, time.Duration(0))
require.LessOrEqual(t, ttlBefore, 2*time.Second)
time.Sleep(50 * time.Millisecond)
recorder = performRequest(router)
require.Equal(t, http.StatusOK, recorder.Code)
ttlAfter, err := rdb.PTTL(ctx, redisKey).Result()
require.NoError(t, err)
require.Less(t, ttlAfter, ttlBefore)
}
func TestRateLimiterFixesMissingTTL(t *testing.T) {
gin.SetMode(gin.TestMode)
ctx := context.Background()
rdb := startRedis(t, ctx)
limiter := NewRateLimiter(rdb)
router := gin.New()
router.Use(limiter.Limit("ttl-missing", 10, 2*time.Second))
router.GET("/test", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"ok": true})
})
redisKey := limiter.prefix + "ttl-missing:127.0.0.1"
require.NoError(t, rdb.Set(ctx, redisKey, 5, 0).Err())
ttlBefore, err := rdb.PTTL(ctx, redisKey).Result()
require.NoError(t, err)
require.Less(t, ttlBefore, time.Duration(0))
recorder := performRequest(router)
require.Equal(t, http.StatusOK, recorder.Code)
ttlAfter, err := rdb.PTTL(ctx, redisKey).Result()
require.NoError(t, err)
require.Greater(t, ttlAfter, time.Duration(0))
}
func performRequest(router *gin.Engine) *httptest.ResponseRecorder {
req := httptest.NewRequest(http.MethodGet, "/test", nil)
req.RemoteAddr = "127.0.0.1:1234"
recorder := httptest.NewRecorder()
router.ServeHTTP(recorder, req)
return recorder
}
func startRedis(t *testing.T, ctx context.Context) *redis.Client {
t.Helper()
redisContainer, err := tcredis.Run(ctx, redisImageTag)
require.NoError(t, err)
t.Cleanup(func() {
_ = redisContainer.Terminate(ctx)
})
redisHost, err := redisContainer.Host(ctx)
require.NoError(t, err)
redisPort, err := redisContainer.MappedPort(ctx, "6379/tcp")
require.NoError(t, err)
rdb := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", redisHost, redisPort.Int()),
DB: 0,
})
require.NoError(t, rdb.Ping(ctx).Err())
t.Cleanup(func() {
_ = rdb.Close()
})
return rdb
}

View File

@@ -0,0 +1,100 @@
package middleware
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/gin-gonic/gin"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
)
func TestWindowTTLMillis(t *testing.T) {
require.Equal(t, int64(1), windowTTLMillis(500*time.Microsecond))
require.Equal(t, int64(1), windowTTLMillis(1500*time.Microsecond))
require.Equal(t, int64(2), windowTTLMillis(2500*time.Microsecond))
}
func TestRateLimiterFailureModes(t *testing.T) {
gin.SetMode(gin.TestMode)
rdb := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:1",
DialTimeout: 50 * time.Millisecond,
ReadTimeout: 50 * time.Millisecond,
WriteTimeout: 50 * time.Millisecond,
})
t.Cleanup(func() {
_ = rdb.Close()
})
limiter := NewRateLimiter(rdb)
failOpenRouter := gin.New()
failOpenRouter.Use(limiter.Limit("test", 1, time.Second))
failOpenRouter.GET("/test", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"ok": true})
})
req := httptest.NewRequest(http.MethodGet, "/test", nil)
req.RemoteAddr = "127.0.0.1:1234"
recorder := httptest.NewRecorder()
failOpenRouter.ServeHTTP(recorder, req)
require.Equal(t, http.StatusOK, recorder.Code)
failCloseRouter := gin.New()
failCloseRouter.Use(limiter.LimitWithOptions("test", 1, time.Second, RateLimitOptions{
FailureMode: RateLimitFailClose,
}))
failCloseRouter.GET("/test", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"ok": true})
})
req = httptest.NewRequest(http.MethodGet, "/test", nil)
req.RemoteAddr = "127.0.0.1:1234"
recorder = httptest.NewRecorder()
failCloseRouter.ServeHTTP(recorder, req)
require.Equal(t, http.StatusTooManyRequests, recorder.Code)
}
func TestRateLimiterSuccessAndLimit(t *testing.T) {
gin.SetMode(gin.TestMode)
originalRun := rateLimitRun
counts := []int64{1, 2}
callIndex := 0
rateLimitRun = func(ctx context.Context, client *redis.Client, key string, windowMillis int64) (int64, error) {
if callIndex >= len(counts) {
return counts[len(counts)-1], nil
}
value := counts[callIndex]
callIndex++
return value, nil
}
t.Cleanup(func() {
rateLimitRun = originalRun
})
limiter := NewRateLimiter(redis.NewClient(&redis.Options{Addr: "127.0.0.1:1"}))
router := gin.New()
router.Use(limiter.Limit("test", 1, time.Second))
router.GET("/test", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"ok": true})
})
req := httptest.NewRequest(http.MethodGet, "/test", nil)
req.RemoteAddr = "127.0.0.1:1234"
recorder := httptest.NewRecorder()
router.ServeHTTP(recorder, req)
require.Equal(t, http.StatusOK, recorder.Code)
req = httptest.NewRequest(http.MethodGet, "/test", nil)
req.RemoteAddr = "127.0.0.1:1234"
recorder = httptest.NewRecorder()
router.ServeHTTP(recorder, req)
require.Equal(t, http.StatusTooManyRequests, recorder.Code)
}

View File

@@ -27,8 +27,10 @@ func RegisterAuthRoutes(
auth.POST("/register", h.Auth.Register)
auth.POST("/login", h.Auth.Login)
auth.POST("/send-verify-code", h.Auth.SendVerifyCode)
// 优惠码验证接口添加速率限制:每分钟最多 10 次
auth.POST("/validate-promo-code", rateLimiter.Limit("validate-promo", 10, time.Minute), h.Auth.ValidatePromoCode)
// 优惠码验证接口添加速率限制:每分钟最多 10 次Redis 故障时 fail-close
auth.POST("/validate-promo-code", rateLimiter.LimitWithOptions("validate-promo", 10, time.Minute, middleware.RateLimitOptions{
FailureMode: middleware.RateLimitFailClose,
}), h.Auth.ValidatePromoCode)
auth.GET("/oauth/linuxdo/start", h.Auth.LinuxDoOAuthStart)
auth.GET("/oauth/linuxdo/callback", h.Auth.LinuxDoOAuthCallback)
}