fix: resolve CI lint errors and test compilation failures for rate limit feature
- Fix errcheck: properly handle rows.Close() error via named return + defer closure - Fix gofmt: auto-format billing_cache.go, api_key_service.go, billing_cache_service.go - Add missing rate limit interface methods to 4 test stubs (GetRateLimitData, IncrementRateLimitUsage, ResetRateLimitWindows) - Fix NewBillingCacheService calls missing the new apiKeyRepo parameter
This commit is contained in:
@@ -159,7 +159,7 @@ func newTestGatewayHandler(t *testing.T, group *service.Group, accounts []*servi
|
|||||||
|
|
||||||
// RunModeSimple:跳过计费检查,避免引入 repo/cache 依赖。
|
// RunModeSimple:跳过计费检查,避免引入 repo/cache 依赖。
|
||||||
cfg := &config.Config{RunMode: config.RunModeSimple}
|
cfg := &config.Config{RunMode: config.RunModeSimple}
|
||||||
billingCacheSvc := service.NewBillingCacheService(nil, nil, nil, cfg)
|
billingCacheSvc := service.NewBillingCacheService(nil, nil, nil, nil, cfg)
|
||||||
|
|
||||||
concurrencySvc := service.NewConcurrencyService(&fakeConcurrencyCache{})
|
concurrencySvc := service.NewConcurrencyService(&fakeConcurrencyCache{})
|
||||||
concurrencyHelper := NewConcurrencyHelper(concurrencySvc, SSEPingFormatClaude, 0)
|
concurrencyHelper := NewConcurrencyHelper(concurrencySvc, SSEPingFormatClaude, 0)
|
||||||
|
|||||||
@@ -1032,6 +1032,15 @@ func (r *stubAPIKeyRepoForHandler) IncrementQuotaUsed(_ context.Context, _ int64
|
|||||||
func (r *stubAPIKeyRepoForHandler) UpdateLastUsed(context.Context, int64, time.Time) error {
|
func (r *stubAPIKeyRepoForHandler) UpdateLastUsed(context.Context, int64, time.Time) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
func (r *stubAPIKeyRepoForHandler) IncrementRateLimitUsage(context.Context, int64, float64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (r *stubAPIKeyRepoForHandler) ResetRateLimitWindows(context.Context, int64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (r *stubAPIKeyRepoForHandler) GetRateLimitData(context.Context, int64) (*service.APIKeyRateLimitData, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// newTestAPIKeyService 创建测试用的 APIKeyService
|
// newTestAPIKeyService 创建测试用的 APIKeyService
|
||||||
func newTestAPIKeyService(repo *stubAPIKeyRepoForHandler) *service.APIKeyService {
|
func newTestAPIKeyService(repo *stubAPIKeyRepoForHandler) *service.APIKeyService {
|
||||||
|
|||||||
@@ -411,7 +411,7 @@ func TestSoraGatewayHandler_ChatCompletions(t *testing.T) {
|
|||||||
deferredService := service.NewDeferredService(accountRepo, nil, 0)
|
deferredService := service.NewDeferredService(accountRepo, nil, 0)
|
||||||
billingService := service.NewBillingService(cfg, nil)
|
billingService := service.NewBillingService(cfg, nil)
|
||||||
concurrencyService := service.NewConcurrencyService(testutil.StubConcurrencyCache{})
|
concurrencyService := service.NewConcurrencyService(testutil.StubConcurrencyCache{})
|
||||||
billingCacheService := service.NewBillingCacheService(nil, nil, nil, cfg)
|
billingCacheService := service.NewBillingCacheService(nil, nil, nil, nil, cfg)
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
billingCacheService.Stop()
|
billingCacheService.Stop()
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -477,7 +477,7 @@ func (r *apiKeyRepository) ResetRateLimitWindows(ctx context.Context, id int64)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetRateLimitData returns the current rate limit usage and window start times for an API key.
|
// GetRateLimitData returns the current rate limit usage and window start times for an API key.
|
||||||
func (r *apiKeyRepository) GetRateLimitData(ctx context.Context, id int64) (*service.APIKeyRateLimitData, error) {
|
func (r *apiKeyRepository) GetRateLimitData(ctx context.Context, id int64) (result *service.APIKeyRateLimitData, err error) {
|
||||||
rows, err := r.sql.QueryContext(ctx, `
|
rows, err := r.sql.QueryContext(ctx, `
|
||||||
SELECT usage_5h, usage_1d, usage_7d, window_5h_start, window_1d_start, window_7d_start
|
SELECT usage_5h, usage_1d, usage_7d, window_5h_start, window_1d_start, window_7d_start
|
||||||
FROM api_keys
|
FROM api_keys
|
||||||
@@ -486,7 +486,11 @@ func (r *apiKeyRepository) GetRateLimitData(ctx context.Context, id int64) (*ser
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer func() {
|
||||||
|
if closeErr := rows.Close(); closeErr != nil && err == nil {
|
||||||
|
err = closeErr
|
||||||
|
}
|
||||||
|
}()
|
||||||
if !rows.Next() {
|
if !rows.Next() {
|
||||||
return nil, service.ErrAPIKeyNotFound
|
return nil, service.ErrAPIKeyNotFound
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,12 +14,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
billingBalanceKeyPrefix = "billing:balance:"
|
billingBalanceKeyPrefix = "billing:balance:"
|
||||||
billingSubKeyPrefix = "billing:sub:"
|
billingSubKeyPrefix = "billing:sub:"
|
||||||
billingRateLimitKeyPrefix = "apikey:rate:"
|
billingRateLimitKeyPrefix = "apikey:rate:"
|
||||||
billingCacheTTL = 5 * time.Minute
|
billingCacheTTL = 5 * time.Minute
|
||||||
billingCacheJitter = 30 * time.Second
|
billingCacheJitter = 30 * time.Second
|
||||||
rateLimitCacheTTL = 7 * 24 * time.Hour // 7 days matches the longest window
|
rateLimitCacheTTL = 7 * 24 * time.Hour // 7 days matches the longest window
|
||||||
)
|
)
|
||||||
|
|
||||||
// jitteredTTL 返回带随机抖动的 TTL,防止缓存雪崩
|
// jitteredTTL 返回带随机抖动的 TTL,防止缓存雪崩
|
||||||
|
|||||||
@@ -86,6 +86,15 @@ func TestAPIContracts(t *testing.T) {
|
|||||||
"last_used_at": null,
|
"last_used_at": null,
|
||||||
"quota": 0,
|
"quota": 0,
|
||||||
"quota_used": 0,
|
"quota_used": 0,
|
||||||
|
"rate_limit_5h": 0,
|
||||||
|
"rate_limit_1d": 0,
|
||||||
|
"rate_limit_7d": 0,
|
||||||
|
"usage_5h": 0,
|
||||||
|
"usage_1d": 0,
|
||||||
|
"usage_7d": 0,
|
||||||
|
"window_5h_start": null,
|
||||||
|
"window_1d_start": null,
|
||||||
|
"window_7d_start": null,
|
||||||
"expires_at": null,
|
"expires_at": null,
|
||||||
"created_at": "2025-01-02T03:04:05Z",
|
"created_at": "2025-01-02T03:04:05Z",
|
||||||
"updated_at": "2025-01-02T03:04:05Z"
|
"updated_at": "2025-01-02T03:04:05Z"
|
||||||
@@ -126,6 +135,15 @@ func TestAPIContracts(t *testing.T) {
|
|||||||
"last_used_at": null,
|
"last_used_at": null,
|
||||||
"quota": 0,
|
"quota": 0,
|
||||||
"quota_used": 0,
|
"quota_used": 0,
|
||||||
|
"rate_limit_5h": 0,
|
||||||
|
"rate_limit_1d": 0,
|
||||||
|
"rate_limit_7d": 0,
|
||||||
|
"usage_5h": 0,
|
||||||
|
"usage_1d": 0,
|
||||||
|
"usage_7d": 0,
|
||||||
|
"window_5h_start": null,
|
||||||
|
"window_1d_start": null,
|
||||||
|
"window_7d_start": null,
|
||||||
"expires_at": null,
|
"expires_at": null,
|
||||||
"created_at": "2025-01-02T03:04:05Z",
|
"created_at": "2025-01-02T03:04:05Z",
|
||||||
"updated_at": "2025-01-02T03:04:05Z"
|
"updated_at": "2025-01-02T03:04:05Z"
|
||||||
@@ -1506,6 +1524,16 @@ func (r *stubApiKeyRepo) UpdateLastUsed(ctx context.Context, id int64, usedAt ti
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *stubApiKeyRepo) IncrementRateLimitUsage(ctx context.Context, id int64, cost float64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (r *stubApiKeyRepo) ResetRateLimitWindows(ctx context.Context, id int64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (r *stubApiKeyRepo) GetRateLimitData(ctx context.Context, id int64) (*service.APIKeyRateLimitData, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
type stubUsageLogRepo struct {
|
type stubUsageLogRepo struct {
|
||||||
userLogs map[int64][]service.UsageLog
|
userLogs map[int64][]service.UsageLog
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -588,6 +588,16 @@ func (r *stubApiKeyRepo) UpdateLastUsed(ctx context.Context, id int64, usedAt ti
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *stubApiKeyRepo) IncrementRateLimitUsage(ctx context.Context, id int64, cost float64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (r *stubApiKeyRepo) ResetRateLimitWindows(ctx context.Context, id int64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (r *stubApiKeyRepo) GetRateLimitData(ctx context.Context, id int64) (*service.APIKeyRateLimitData, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
type stubUserSubscriptionRepo struct {
|
type stubUserSubscriptionRepo struct {
|
||||||
getActive func(ctx context.Context, userID, groupID int64) (*service.UserSubscription, error)
|
getActive func(ctx context.Context, userID, groupID int64) (*service.UserSubscription, error)
|
||||||
updateStatus func(ctx context.Context, subscriptionID int64, status string) error
|
updateStatus func(ctx context.Context, subscriptionID int64, status string) error
|
||||||
|
|||||||
@@ -127,6 +127,15 @@ func (s *apiKeyRepoStubForGroupUpdate) IncrementQuotaUsed(context.Context, int64
|
|||||||
func (s *apiKeyRepoStubForGroupUpdate) UpdateLastUsed(context.Context, int64, time.Time) error {
|
func (s *apiKeyRepoStubForGroupUpdate) UpdateLastUsed(context.Context, int64, time.Time) error {
|
||||||
panic("unexpected")
|
panic("unexpected")
|
||||||
}
|
}
|
||||||
|
func (s *apiKeyRepoStubForGroupUpdate) IncrementRateLimitUsage(context.Context, int64, float64) error {
|
||||||
|
panic("unexpected")
|
||||||
|
}
|
||||||
|
func (s *apiKeyRepoStubForGroupUpdate) ResetRateLimitWindows(context.Context, int64) error {
|
||||||
|
panic("unexpected")
|
||||||
|
}
|
||||||
|
func (s *apiKeyRepoStubForGroupUpdate) GetRateLimitData(context.Context, int64) (*APIKeyRateLimitData, error) {
|
||||||
|
panic("unexpected")
|
||||||
|
}
|
||||||
|
|
||||||
// groupRepoStubForGroupUpdate implements GroupRepository for AdminUpdateAPIKeyGroupID tests.
|
// groupRepoStubForGroupUpdate implements GroupRepository for AdminUpdateAPIKeyGroupID tests.
|
||||||
type groupRepoStubForGroupUpdate struct {
|
type groupRepoStubForGroupUpdate struct {
|
||||||
|
|||||||
@@ -348,6 +348,19 @@ func (s *billingCacheStub) InvalidateSubscriptionCache(ctx context.Context, user
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *billingCacheStub) GetAPIKeyRateLimit(ctx context.Context, keyID int64) (*APIKeyRateLimitCacheData, error) {
|
||||||
|
panic("unexpected GetAPIKeyRateLimit call")
|
||||||
|
}
|
||||||
|
func (s *billingCacheStub) SetAPIKeyRateLimit(ctx context.Context, keyID int64, data *APIKeyRateLimitCacheData) error {
|
||||||
|
panic("unexpected SetAPIKeyRateLimit call")
|
||||||
|
}
|
||||||
|
func (s *billingCacheStub) UpdateAPIKeyRateLimitUsage(ctx context.Context, keyID int64, cost float64) error {
|
||||||
|
panic("unexpected UpdateAPIKeyRateLimitUsage call")
|
||||||
|
}
|
||||||
|
func (s *billingCacheStub) InvalidateAPIKeyRateLimit(ctx context.Context, keyID int64) error {
|
||||||
|
panic("unexpected InvalidateAPIKeyRateLimit call")
|
||||||
|
}
|
||||||
|
|
||||||
func waitForInvalidations(t *testing.T, ch <-chan subscriptionInvalidateCall, expected int) []subscriptionInvalidateCall {
|
func waitForInvalidations(t *testing.T, ch <-chan subscriptionInvalidateCall, expected int) []subscriptionInvalidateCall {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
calls := make([]subscriptionInvalidateCall, 0, expected)
|
calls := make([]subscriptionInvalidateCall, 0, expected)
|
||||||
|
|||||||
@@ -144,10 +144,10 @@ type UpdateAPIKeyRequest struct {
|
|||||||
ResetQuota *bool `json:"reset_quota"` // Reset quota_used to 0
|
ResetQuota *bool `json:"reset_quota"` // Reset quota_used to 0
|
||||||
|
|
||||||
// Rate limit fields (nil = no change, 0 = unlimited)
|
// Rate limit fields (nil = no change, 0 = unlimited)
|
||||||
RateLimit5h *float64 `json:"rate_limit_5h"`
|
RateLimit5h *float64 `json:"rate_limit_5h"`
|
||||||
RateLimit1d *float64 `json:"rate_limit_1d"`
|
RateLimit1d *float64 `json:"rate_limit_1d"`
|
||||||
RateLimit7d *float64 `json:"rate_limit_7d"`
|
RateLimit7d *float64 `json:"rate_limit_7d"`
|
||||||
ResetRateLimitUsage *bool `json:"reset_rate_limit_usage"` // Reset all usage counters to 0
|
ResetRateLimitUsage *bool `json:"reset_rate_limit_usage"` // Reset all usage counters to 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// APIKeyService API Key服务
|
// APIKeyService API Key服务
|
||||||
@@ -157,19 +157,19 @@ type RateLimitCacheInvalidator interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type APIKeyService struct {
|
type APIKeyService struct {
|
||||||
apiKeyRepo APIKeyRepository
|
apiKeyRepo APIKeyRepository
|
||||||
userRepo UserRepository
|
userRepo UserRepository
|
||||||
groupRepo GroupRepository
|
groupRepo GroupRepository
|
||||||
userSubRepo UserSubscriptionRepository
|
userSubRepo UserSubscriptionRepository
|
||||||
userGroupRateRepo UserGroupRateRepository
|
userGroupRateRepo UserGroupRateRepository
|
||||||
cache APIKeyCache
|
cache APIKeyCache
|
||||||
rateLimitCacheInvalid RateLimitCacheInvalidator // optional: invalidate Redis rate limit cache
|
rateLimitCacheInvalid RateLimitCacheInvalidator // optional: invalidate Redis rate limit cache
|
||||||
cfg *config.Config
|
cfg *config.Config
|
||||||
authCacheL1 *ristretto.Cache
|
authCacheL1 *ristretto.Cache
|
||||||
authCfg apiKeyAuthCacheConfig
|
authCfg apiKeyAuthCacheConfig
|
||||||
authGroup singleflight.Group
|
authGroup singleflight.Group
|
||||||
lastUsedTouchL1 sync.Map // keyID -> nextAllowedAt(time.Time)
|
lastUsedTouchL1 sync.Map // keyID -> nextAllowedAt(time.Time)
|
||||||
lastUsedTouchSF singleflight.Group
|
lastUsedTouchSF singleflight.Group
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAPIKeyService 创建API Key服务实例
|
// NewAPIKeyService 创建API Key服务实例
|
||||||
|
|||||||
@@ -106,6 +106,15 @@ func (s *authRepoStub) IncrementQuotaUsed(ctx context.Context, id int64, amount
|
|||||||
func (s *authRepoStub) UpdateLastUsed(ctx context.Context, id int64, usedAt time.Time) error {
|
func (s *authRepoStub) UpdateLastUsed(ctx context.Context, id int64, usedAt time.Time) error {
|
||||||
panic("unexpected UpdateLastUsed call")
|
panic("unexpected UpdateLastUsed call")
|
||||||
}
|
}
|
||||||
|
func (s *authRepoStub) IncrementRateLimitUsage(ctx context.Context, id int64, cost float64) error {
|
||||||
|
panic("unexpected IncrementRateLimitUsage call")
|
||||||
|
}
|
||||||
|
func (s *authRepoStub) ResetRateLimitWindows(ctx context.Context, id int64) error {
|
||||||
|
panic("unexpected ResetRateLimitWindows call")
|
||||||
|
}
|
||||||
|
func (s *authRepoStub) GetRateLimitData(ctx context.Context, id int64) (*APIKeyRateLimitData, error) {
|
||||||
|
panic("unexpected GetRateLimitData call")
|
||||||
|
}
|
||||||
|
|
||||||
type authCacheStub struct {
|
type authCacheStub struct {
|
||||||
getAuthCache func(ctx context.Context, key string) (*APIKeyAuthCacheEntry, error)
|
getAuthCache func(ctx context.Context, key string) (*APIKeyAuthCacheEntry, error)
|
||||||
|
|||||||
@@ -134,6 +134,18 @@ func (s *apiKeyRepoStub) UpdateLastUsed(ctx context.Context, id int64, usedAt ti
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *apiKeyRepoStub) IncrementRateLimitUsage(ctx context.Context, id int64, cost float64) error {
|
||||||
|
panic("unexpected IncrementRateLimitUsage call")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *apiKeyRepoStub) ResetRateLimitWindows(ctx context.Context, id int64) error {
|
||||||
|
panic("unexpected ResetRateLimitWindows call")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *apiKeyRepoStub) GetRateLimitData(ctx context.Context, id int64) (*APIKeyRateLimitData, error) {
|
||||||
|
panic("unexpected GetRateLimitData call")
|
||||||
|
}
|
||||||
|
|
||||||
// apiKeyCacheStub 是 APIKeyCache 接口的测试桩实现。
|
// apiKeyCacheStub 是 APIKeyCache 接口的测试桩实现。
|
||||||
// 用于验证删除操作时缓存清理逻辑是否被正确调用。
|
// 用于验证删除操作时缓存清理逻辑是否被正确调用。
|
||||||
//
|
//
|
||||||
|
|||||||
@@ -83,12 +83,12 @@ type apiKeyRateLimitLoader interface {
|
|||||||
// BillingCacheService 计费缓存服务
|
// BillingCacheService 计费缓存服务
|
||||||
// 负责余额和订阅数据的缓存管理,提供高性能的计费资格检查
|
// 负责余额和订阅数据的缓存管理,提供高性能的计费资格检查
|
||||||
type BillingCacheService struct {
|
type BillingCacheService struct {
|
||||||
cache BillingCache
|
cache BillingCache
|
||||||
userRepo UserRepository
|
userRepo UserRepository
|
||||||
subRepo UserSubscriptionRepository
|
subRepo UserSubscriptionRepository
|
||||||
apiKeyRateLimitLoader apiKeyRateLimitLoader
|
apiKeyRateLimitLoader apiKeyRateLimitLoader
|
||||||
cfg *config.Config
|
cfg *config.Config
|
||||||
circuitBreaker *billingCircuitBreaker
|
circuitBreaker *billingCircuitBreaker
|
||||||
|
|
||||||
cacheWriteChan chan cacheWriteTask
|
cacheWriteChan chan cacheWriteTask
|
||||||
cacheWriteWg sync.WaitGroup
|
cacheWriteWg sync.WaitGroup
|
||||||
@@ -106,11 +106,11 @@ type BillingCacheService struct {
|
|||||||
// NewBillingCacheService 创建计费缓存服务
|
// NewBillingCacheService 创建计费缓存服务
|
||||||
func NewBillingCacheService(cache BillingCache, userRepo UserRepository, subRepo UserSubscriptionRepository, apiKeyRepo APIKeyRepository, cfg *config.Config) *BillingCacheService {
|
func NewBillingCacheService(cache BillingCache, userRepo UserRepository, subRepo UserSubscriptionRepository, apiKeyRepo APIKeyRepository, cfg *config.Config) *BillingCacheService {
|
||||||
svc := &BillingCacheService{
|
svc := &BillingCacheService{
|
||||||
cache: cache,
|
cache: cache,
|
||||||
userRepo: userRepo,
|
userRepo: userRepo,
|
||||||
subRepo: subRepo,
|
subRepo: subRepo,
|
||||||
apiKeyRateLimitLoader: apiKeyRepo,
|
apiKeyRateLimitLoader: apiKeyRepo,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
}
|
}
|
||||||
svc.circuitBreaker = newBillingCircuitBreaker(cfg.Billing.CircuitBreaker)
|
svc.circuitBreaker = newBillingCircuitBreaker(cfg.Billing.CircuitBreaker)
|
||||||
svc.startCacheWriteWorkers()
|
svc.startCacheWriteWorkers()
|
||||||
|
|||||||
Reference in New Issue
Block a user