feat: apikey支持5h/1d/7d速率控制
This commit is contained in:
@@ -36,12 +36,28 @@ type APIKey struct {
|
||||
Quota float64 // Quota limit in USD (0 = unlimited)
|
||||
QuotaUsed float64 // Used quota amount
|
||||
ExpiresAt *time.Time // Expiration time (nil = never expires)
|
||||
|
||||
// Rate limit fields
|
||||
RateLimit5h float64 // Rate limit in USD per 5h (0 = unlimited)
|
||||
RateLimit1d float64 // Rate limit in USD per 1d (0 = unlimited)
|
||||
RateLimit7d float64 // Rate limit in USD per 7d (0 = unlimited)
|
||||
Usage5h float64 // Used amount in current 5h window
|
||||
Usage1d float64 // Used amount in current 1d window
|
||||
Usage7d float64 // Used amount in current 7d window
|
||||
Window5hStart *time.Time // Start of current 5h window
|
||||
Window1dStart *time.Time // Start of current 1d window
|
||||
Window7dStart *time.Time // Start of current 7d window
|
||||
}
|
||||
|
||||
func (k *APIKey) IsActive() bool {
|
||||
return k.Status == StatusActive
|
||||
}
|
||||
|
||||
// HasRateLimits returns true if any rate limit window is configured
|
||||
func (k *APIKey) HasRateLimits() bool {
|
||||
return k.RateLimit5h > 0 || k.RateLimit1d > 0 || k.RateLimit7d > 0
|
||||
}
|
||||
|
||||
// IsExpired checks if the API key has expired
|
||||
func (k *APIKey) IsExpired() bool {
|
||||
if k.ExpiresAt == nil {
|
||||
|
||||
@@ -19,6 +19,11 @@ type APIKeyAuthSnapshot struct {
|
||||
|
||||
// Expiration field for API Key expiration feature
|
||||
ExpiresAt *time.Time `json:"expires_at,omitempty"` // Expiration time (nil = never expires)
|
||||
|
||||
// Rate limit configuration (only limits, not usage - usage read from Redis at check time)
|
||||
RateLimit5h float64 `json:"rate_limit_5h"`
|
||||
RateLimit1d float64 `json:"rate_limit_1d"`
|
||||
RateLimit7d float64 `json:"rate_limit_7d"`
|
||||
}
|
||||
|
||||
// APIKeyAuthUserSnapshot 用户快照
|
||||
|
||||
@@ -209,6 +209,9 @@ func (s *APIKeyService) snapshotFromAPIKey(apiKey *APIKey) *APIKeyAuthSnapshot {
|
||||
Quota: apiKey.Quota,
|
||||
QuotaUsed: apiKey.QuotaUsed,
|
||||
ExpiresAt: apiKey.ExpiresAt,
|
||||
RateLimit5h: apiKey.RateLimit5h,
|
||||
RateLimit1d: apiKey.RateLimit1d,
|
||||
RateLimit7d: apiKey.RateLimit7d,
|
||||
User: APIKeyAuthUserSnapshot{
|
||||
ID: apiKey.User.ID,
|
||||
Status: apiKey.User.Status,
|
||||
@@ -262,6 +265,9 @@ func (s *APIKeyService) snapshotToAPIKey(key string, snapshot *APIKeyAuthSnapsho
|
||||
Quota: snapshot.Quota,
|
||||
QuotaUsed: snapshot.QuotaUsed,
|
||||
ExpiresAt: snapshot.ExpiresAt,
|
||||
RateLimit5h: snapshot.RateLimit5h,
|
||||
RateLimit1d: snapshot.RateLimit1d,
|
||||
RateLimit7d: snapshot.RateLimit7d,
|
||||
User: &User{
|
||||
ID: snapshot.User.ID,
|
||||
Status: snapshot.User.Status,
|
||||
|
||||
@@ -30,6 +30,11 @@ var (
|
||||
ErrAPIKeyExpired = infraerrors.Forbidden("API_KEY_EXPIRED", "api key 已过期")
|
||||
// ErrAPIKeyQuotaExhausted = infraerrors.TooManyRequests("API_KEY_QUOTA_EXHAUSTED", "api key quota exhausted")
|
||||
ErrAPIKeyQuotaExhausted = infraerrors.TooManyRequests("API_KEY_QUOTA_EXHAUSTED", "api key 额度已用完")
|
||||
|
||||
// Rate limit errors
|
||||
ErrAPIKeyRateLimit5hExceeded = infraerrors.TooManyRequests("API_KEY_RATE_5H_EXCEEDED", "api key 5小时限额已用完")
|
||||
ErrAPIKeyRateLimit1dExceeded = infraerrors.TooManyRequests("API_KEY_RATE_1D_EXCEEDED", "api key 日限额已用完")
|
||||
ErrAPIKeyRateLimit7dExceeded = infraerrors.TooManyRequests("API_KEY_RATE_7D_EXCEEDED", "api key 7天限额已用完")
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -64,6 +69,21 @@ type APIKeyRepository interface {
|
||||
// Quota methods
|
||||
IncrementQuotaUsed(ctx context.Context, id int64, amount float64) (float64, error)
|
||||
UpdateLastUsed(ctx context.Context, id int64, usedAt time.Time) error
|
||||
|
||||
// Rate limit methods
|
||||
IncrementRateLimitUsage(ctx context.Context, id int64, cost float64) error
|
||||
ResetRateLimitWindows(ctx context.Context, id int64) error
|
||||
GetRateLimitData(ctx context.Context, id int64) (*APIKeyRateLimitData, error)
|
||||
}
|
||||
|
||||
// APIKeyRateLimitData holds rate limit usage and window state for an API key.
|
||||
type APIKeyRateLimitData struct {
|
||||
Usage5h float64
|
||||
Usage1d float64
|
||||
Usage7d float64
|
||||
Window5hStart *time.Time
|
||||
Window1dStart *time.Time
|
||||
Window7dStart *time.Time
|
||||
}
|
||||
|
||||
// APIKeyCache defines cache operations for API key service
|
||||
@@ -102,6 +122,11 @@ type CreateAPIKeyRequest struct {
|
||||
// Quota fields
|
||||
Quota float64 `json:"quota"` // Quota limit in USD (0 = unlimited)
|
||||
ExpiresInDays *int `json:"expires_in_days"` // Days until expiry (nil = never expires)
|
||||
|
||||
// Rate limit fields (0 = unlimited)
|
||||
RateLimit5h float64 `json:"rate_limit_5h"`
|
||||
RateLimit1d float64 `json:"rate_limit_1d"`
|
||||
RateLimit7d float64 `json:"rate_limit_7d"`
|
||||
}
|
||||
|
||||
// UpdateAPIKeyRequest 更新API Key请求
|
||||
@@ -117,22 +142,34 @@ type UpdateAPIKeyRequest struct {
|
||||
ExpiresAt *time.Time `json:"expires_at"` // Expiration time (nil = no change)
|
||||
ClearExpiration bool `json:"-"` // Clear expiration (internal use)
|
||||
ResetQuota *bool `json:"reset_quota"` // Reset quota_used to 0
|
||||
|
||||
// Rate limit fields (nil = no change, 0 = unlimited)
|
||||
RateLimit5h *float64 `json:"rate_limit_5h"`
|
||||
RateLimit1d *float64 `json:"rate_limit_1d"`
|
||||
RateLimit7d *float64 `json:"rate_limit_7d"`
|
||||
ResetRateLimitUsage *bool `json:"reset_rate_limit_usage"` // Reset all usage counters to 0
|
||||
}
|
||||
|
||||
// APIKeyService API Key服务
|
||||
// RateLimitCacheInvalidator invalidates rate limit cache entries on manual reset.
|
||||
type RateLimitCacheInvalidator interface {
|
||||
InvalidateAPIKeyRateLimit(ctx context.Context, keyID int64) error
|
||||
}
|
||||
|
||||
type APIKeyService struct {
|
||||
apiKeyRepo APIKeyRepository
|
||||
userRepo UserRepository
|
||||
groupRepo GroupRepository
|
||||
userSubRepo UserSubscriptionRepository
|
||||
userGroupRateRepo UserGroupRateRepository
|
||||
cache APIKeyCache
|
||||
cfg *config.Config
|
||||
authCacheL1 *ristretto.Cache
|
||||
authCfg apiKeyAuthCacheConfig
|
||||
authGroup singleflight.Group
|
||||
lastUsedTouchL1 sync.Map // keyID -> nextAllowedAt(time.Time)
|
||||
lastUsedTouchSF singleflight.Group
|
||||
apiKeyRepo APIKeyRepository
|
||||
userRepo UserRepository
|
||||
groupRepo GroupRepository
|
||||
userSubRepo UserSubscriptionRepository
|
||||
userGroupRateRepo UserGroupRateRepository
|
||||
cache APIKeyCache
|
||||
rateLimitCacheInvalid RateLimitCacheInvalidator // optional: invalidate Redis rate limit cache
|
||||
cfg *config.Config
|
||||
authCacheL1 *ristretto.Cache
|
||||
authCfg apiKeyAuthCacheConfig
|
||||
authGroup singleflight.Group
|
||||
lastUsedTouchL1 sync.Map // keyID -> nextAllowedAt(time.Time)
|
||||
lastUsedTouchSF singleflight.Group
|
||||
}
|
||||
|
||||
// NewAPIKeyService 创建API Key服务实例
|
||||
@@ -158,6 +195,12 @@ func NewAPIKeyService(
|
||||
return svc
|
||||
}
|
||||
|
||||
// SetRateLimitCacheInvalidator sets the optional rate limit cache invalidator.
|
||||
// Called after construction (e.g. in wire) to avoid circular dependencies.
|
||||
func (s *APIKeyService) SetRateLimitCacheInvalidator(inv RateLimitCacheInvalidator) {
|
||||
s.rateLimitCacheInvalid = inv
|
||||
}
|
||||
|
||||
func (s *APIKeyService) compileAPIKeyIPRules(apiKey *APIKey) {
|
||||
if apiKey == nil {
|
||||
return
|
||||
@@ -327,6 +370,9 @@ func (s *APIKeyService) Create(ctx context.Context, userID int64, req CreateAPIK
|
||||
IPBlacklist: req.IPBlacklist,
|
||||
Quota: req.Quota,
|
||||
QuotaUsed: 0,
|
||||
RateLimit5h: req.RateLimit5h,
|
||||
RateLimit1d: req.RateLimit1d,
|
||||
RateLimit7d: req.RateLimit7d,
|
||||
}
|
||||
|
||||
// Set expiration time if specified
|
||||
@@ -519,6 +565,26 @@ func (s *APIKeyService) Update(ctx context.Context, id int64, userID int64, req
|
||||
apiKey.IPWhitelist = req.IPWhitelist
|
||||
apiKey.IPBlacklist = req.IPBlacklist
|
||||
|
||||
// Update rate limit configuration
|
||||
if req.RateLimit5h != nil {
|
||||
apiKey.RateLimit5h = *req.RateLimit5h
|
||||
}
|
||||
if req.RateLimit1d != nil {
|
||||
apiKey.RateLimit1d = *req.RateLimit1d
|
||||
}
|
||||
if req.RateLimit7d != nil {
|
||||
apiKey.RateLimit7d = *req.RateLimit7d
|
||||
}
|
||||
resetRateLimit := req.ResetRateLimitUsage != nil && *req.ResetRateLimitUsage
|
||||
if resetRateLimit {
|
||||
apiKey.Usage5h = 0
|
||||
apiKey.Usage1d = 0
|
||||
apiKey.Usage7d = 0
|
||||
apiKey.Window5hStart = nil
|
||||
apiKey.Window1dStart = nil
|
||||
apiKey.Window7dStart = nil
|
||||
}
|
||||
|
||||
if err := s.apiKeyRepo.Update(ctx, apiKey); err != nil {
|
||||
return nil, fmt.Errorf("update api key: %w", err)
|
||||
}
|
||||
@@ -526,6 +592,11 @@ func (s *APIKeyService) Update(ctx context.Context, id int64, userID int64, req
|
||||
s.InvalidateAuthCacheByKey(ctx, apiKey.Key)
|
||||
s.compileAPIKeyIPRules(apiKey)
|
||||
|
||||
// Invalidate Redis rate limit cache so reset takes effect immediately
|
||||
if resetRateLimit && s.rateLimitCacheInvalid != nil {
|
||||
_ = s.rateLimitCacheInvalid.InvalidateAPIKeyRateLimit(ctx, apiKey.ID)
|
||||
}
|
||||
|
||||
return apiKey, nil
|
||||
}
|
||||
|
||||
@@ -746,3 +817,11 @@ func (s *APIKeyService) UpdateQuotaUsed(ctx context.Context, apiKeyID int64, cos
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateRateLimitUsage atomically increments rate limit usage counters in the DB.
|
||||
func (s *APIKeyService) UpdateRateLimitUsage(ctx context.Context, apiKeyID int64, cost float64) error {
|
||||
if cost <= 0 {
|
||||
return nil
|
||||
}
|
||||
return s.apiKeyRepo.IncrementRateLimitUsage(ctx, apiKeyID, cost)
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@ const (
|
||||
cacheWriteSetSubscription
|
||||
cacheWriteUpdateSubscriptionUsage
|
||||
cacheWriteDeductBalance
|
||||
cacheWriteUpdateRateLimitUsage
|
||||
)
|
||||
|
||||
// 异步缓存写入工作池配置
|
||||
@@ -68,19 +69,26 @@ type cacheWriteTask struct {
|
||||
kind cacheWriteKind
|
||||
userID int64
|
||||
groupID int64
|
||||
apiKeyID int64
|
||||
balance float64
|
||||
amount float64
|
||||
subscriptionData *subscriptionCacheData
|
||||
}
|
||||
|
||||
// apiKeyRateLimitLoader defines the interface for loading rate limit data from DB.
|
||||
type apiKeyRateLimitLoader interface {
|
||||
GetRateLimitData(ctx context.Context, keyID int64) (*APIKeyRateLimitData, error)
|
||||
}
|
||||
|
||||
// BillingCacheService 计费缓存服务
|
||||
// 负责余额和订阅数据的缓存管理,提供高性能的计费资格检查
|
||||
type BillingCacheService struct {
|
||||
cache BillingCache
|
||||
userRepo UserRepository
|
||||
subRepo UserSubscriptionRepository
|
||||
cfg *config.Config
|
||||
circuitBreaker *billingCircuitBreaker
|
||||
cache BillingCache
|
||||
userRepo UserRepository
|
||||
subRepo UserSubscriptionRepository
|
||||
apiKeyRateLimitLoader apiKeyRateLimitLoader
|
||||
cfg *config.Config
|
||||
circuitBreaker *billingCircuitBreaker
|
||||
|
||||
cacheWriteChan chan cacheWriteTask
|
||||
cacheWriteWg sync.WaitGroup
|
||||
@@ -96,12 +104,13 @@ type BillingCacheService struct {
|
||||
}
|
||||
|
||||
// NewBillingCacheService 创建计费缓存服务
|
||||
func NewBillingCacheService(cache BillingCache, userRepo UserRepository, subRepo UserSubscriptionRepository, cfg *config.Config) *BillingCacheService {
|
||||
func NewBillingCacheService(cache BillingCache, userRepo UserRepository, subRepo UserSubscriptionRepository, apiKeyRepo APIKeyRepository, cfg *config.Config) *BillingCacheService {
|
||||
svc := &BillingCacheService{
|
||||
cache: cache,
|
||||
userRepo: userRepo,
|
||||
subRepo: subRepo,
|
||||
cfg: cfg,
|
||||
cache: cache,
|
||||
userRepo: userRepo,
|
||||
subRepo: subRepo,
|
||||
apiKeyRateLimitLoader: apiKeyRepo,
|
||||
cfg: cfg,
|
||||
}
|
||||
svc.circuitBreaker = newBillingCircuitBreaker(cfg.Billing.CircuitBreaker)
|
||||
svc.startCacheWriteWorkers()
|
||||
@@ -188,6 +197,12 @@ func (s *BillingCacheService) cacheWriteWorker(ch <-chan cacheWriteTask) {
|
||||
logger.LegacyPrintf("service.billing_cache", "Warning: deduct balance cache failed for user %d: %v", task.userID, err)
|
||||
}
|
||||
}
|
||||
case cacheWriteUpdateRateLimitUsage:
|
||||
if s.cache != nil {
|
||||
if err := s.cache.UpdateAPIKeyRateLimitUsage(ctx, task.apiKeyID, task.amount); err != nil {
|
||||
logger.LegacyPrintf("service.billing_cache", "Warning: update rate limit usage cache failed for api key %d: %v", task.apiKeyID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
@@ -204,6 +219,8 @@ func cacheWriteKindName(kind cacheWriteKind) string {
|
||||
return "update_subscription_usage"
|
||||
case cacheWriteDeductBalance:
|
||||
return "deduct_balance"
|
||||
case cacheWriteUpdateRateLimitUsage:
|
||||
return "update_rate_limit_usage"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
@@ -476,6 +493,137 @@ func (s *BillingCacheService) InvalidateSubscription(ctx context.Context, userID
|
||||
return nil
|
||||
}
|
||||
|
||||
// ============================================
|
||||
// API Key 限速缓存方法
|
||||
// ============================================
|
||||
|
||||
// checkAPIKeyRateLimits checks rate limit windows for an API key.
|
||||
// It loads usage from Redis cache (falling back to DB on cache miss),
|
||||
// resets expired windows in-memory and triggers async DB reset,
|
||||
// and returns an error if any window limit is exceeded.
|
||||
func (s *BillingCacheService) checkAPIKeyRateLimits(ctx context.Context, apiKey *APIKey) error {
|
||||
if s.cache == nil {
|
||||
// No cache: fall back to reading from DB directly
|
||||
if s.apiKeyRateLimitLoader == nil {
|
||||
return nil
|
||||
}
|
||||
data, err := s.apiKeyRateLimitLoader.GetRateLimitData(ctx, apiKey.ID)
|
||||
if err != nil {
|
||||
return nil // Don't block requests on DB errors
|
||||
}
|
||||
return s.evaluateRateLimits(ctx, apiKey, data.Usage5h, data.Usage1d, data.Usage7d,
|
||||
data.Window5hStart, data.Window1dStart, data.Window7dStart)
|
||||
}
|
||||
|
||||
cacheData, err := s.cache.GetAPIKeyRateLimit(ctx, apiKey.ID)
|
||||
if err != nil {
|
||||
// Cache miss: load from DB and populate cache
|
||||
if s.apiKeyRateLimitLoader == nil {
|
||||
return nil
|
||||
}
|
||||
dbData, dbErr := s.apiKeyRateLimitLoader.GetRateLimitData(ctx, apiKey.ID)
|
||||
if dbErr != nil {
|
||||
return nil // Don't block requests on DB errors
|
||||
}
|
||||
// Build cache entry from DB data
|
||||
cacheEntry := &APIKeyRateLimitCacheData{
|
||||
Usage5h: dbData.Usage5h,
|
||||
Usage1d: dbData.Usage1d,
|
||||
Usage7d: dbData.Usage7d,
|
||||
}
|
||||
if dbData.Window5hStart != nil {
|
||||
cacheEntry.Window5h = dbData.Window5hStart.Unix()
|
||||
}
|
||||
if dbData.Window1dStart != nil {
|
||||
cacheEntry.Window1d = dbData.Window1dStart.Unix()
|
||||
}
|
||||
if dbData.Window7dStart != nil {
|
||||
cacheEntry.Window7d = dbData.Window7dStart.Unix()
|
||||
}
|
||||
_ = s.cache.SetAPIKeyRateLimit(ctx, apiKey.ID, cacheEntry)
|
||||
cacheData = cacheEntry
|
||||
}
|
||||
|
||||
var w5h, w1d, w7d *time.Time
|
||||
if cacheData.Window5h > 0 {
|
||||
t := time.Unix(cacheData.Window5h, 0)
|
||||
w5h = &t
|
||||
}
|
||||
if cacheData.Window1d > 0 {
|
||||
t := time.Unix(cacheData.Window1d, 0)
|
||||
w1d = &t
|
||||
}
|
||||
if cacheData.Window7d > 0 {
|
||||
t := time.Unix(cacheData.Window7d, 0)
|
||||
w7d = &t
|
||||
}
|
||||
return s.evaluateRateLimits(ctx, apiKey, cacheData.Usage5h, cacheData.Usage1d, cacheData.Usage7d, w5h, w1d, w7d)
|
||||
}
|
||||
|
||||
// evaluateRateLimits checks usage against limits, triggering async resets for expired windows.
|
||||
func (s *BillingCacheService) evaluateRateLimits(ctx context.Context, apiKey *APIKey, usage5h, usage1d, usage7d float64, w5h, w1d, w7d *time.Time) error {
|
||||
needsReset := false
|
||||
|
||||
// Reset expired windows in-memory for check purposes
|
||||
if w5h != nil && time.Since(*w5h) >= 5*time.Hour {
|
||||
usage5h = 0
|
||||
needsReset = true
|
||||
}
|
||||
if w1d != nil && time.Since(*w1d) >= 24*time.Hour {
|
||||
usage1d = 0
|
||||
needsReset = true
|
||||
}
|
||||
if w7d != nil && time.Since(*w7d) >= 7*24*time.Hour {
|
||||
usage7d = 0
|
||||
needsReset = true
|
||||
}
|
||||
|
||||
// Trigger async DB reset if any window expired
|
||||
if needsReset {
|
||||
keyID := apiKey.ID
|
||||
go func() {
|
||||
resetCtx, cancel := context.WithTimeout(context.Background(), cacheWriteTimeout)
|
||||
defer cancel()
|
||||
if s.apiKeyRateLimitLoader != nil {
|
||||
// Use the repo directly - reset then reload cache
|
||||
if loader, ok := s.apiKeyRateLimitLoader.(interface {
|
||||
ResetRateLimitWindows(ctx context.Context, id int64) error
|
||||
}); ok {
|
||||
_ = loader.ResetRateLimitWindows(resetCtx, keyID)
|
||||
}
|
||||
}
|
||||
// Invalidate cache so next request loads fresh data
|
||||
if s.cache != nil {
|
||||
_ = s.cache.InvalidateAPIKeyRateLimit(resetCtx, keyID)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Check limits
|
||||
if apiKey.RateLimit5h > 0 && usage5h >= apiKey.RateLimit5h {
|
||||
return ErrAPIKeyRateLimit5hExceeded
|
||||
}
|
||||
if apiKey.RateLimit1d > 0 && usage1d >= apiKey.RateLimit1d {
|
||||
return ErrAPIKeyRateLimit1dExceeded
|
||||
}
|
||||
if apiKey.RateLimit7d > 0 && usage7d >= apiKey.RateLimit7d {
|
||||
return ErrAPIKeyRateLimit7dExceeded
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// QueueUpdateAPIKeyRateLimitUsage asynchronously updates rate limit usage in the cache.
|
||||
func (s *BillingCacheService) QueueUpdateAPIKeyRateLimitUsage(apiKeyID int64, cost float64) {
|
||||
if s.cache == nil {
|
||||
return
|
||||
}
|
||||
s.enqueueCacheWrite(cacheWriteTask{
|
||||
kind: cacheWriteUpdateRateLimitUsage,
|
||||
apiKeyID: apiKeyID,
|
||||
amount: cost,
|
||||
})
|
||||
}
|
||||
|
||||
// ============================================
|
||||
// 统一检查方法
|
||||
// ============================================
|
||||
@@ -496,10 +644,23 @@ func (s *BillingCacheService) CheckBillingEligibility(ctx context.Context, user
|
||||
isSubscriptionMode := group != nil && group.IsSubscriptionType() && subscription != nil
|
||||
|
||||
if isSubscriptionMode {
|
||||
return s.checkSubscriptionEligibility(ctx, user.ID, group, subscription)
|
||||
if err := s.checkSubscriptionEligibility(ctx, user.ID, group, subscription); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := s.checkBalanceEligibility(ctx, user.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return s.checkBalanceEligibility(ctx, user.ID)
|
||||
// Check API Key rate limits (applies to both billing modes)
|
||||
if apiKey != nil && apiKey.HasRateLimits() {
|
||||
if err := s.checkAPIKeyRateLimits(ctx, apiKey); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkBalanceEligibility 检查余额模式资格
|
||||
|
||||
@@ -52,9 +52,25 @@ func (b *billingCacheWorkerStub) InvalidateSubscriptionCache(ctx context.Context
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *billingCacheWorkerStub) GetAPIKeyRateLimit(ctx context.Context, keyID int64) (*APIKeyRateLimitCacheData, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (b *billingCacheWorkerStub) SetAPIKeyRateLimit(ctx context.Context, keyID int64, data *APIKeyRateLimitCacheData) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *billingCacheWorkerStub) UpdateAPIKeyRateLimitUsage(ctx context.Context, keyID int64, cost float64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *billingCacheWorkerStub) InvalidateAPIKeyRateLimit(ctx context.Context, keyID int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestBillingCacheServiceQueueHighLoad(t *testing.T) {
|
||||
cache := &billingCacheWorkerStub{}
|
||||
svc := NewBillingCacheService(cache, nil, nil, &config.Config{})
|
||||
svc := NewBillingCacheService(cache, nil, nil, nil, &config.Config{})
|
||||
t.Cleanup(svc.Stop)
|
||||
|
||||
start := time.Now()
|
||||
@@ -76,7 +92,7 @@ func TestBillingCacheServiceQueueHighLoad(t *testing.T) {
|
||||
|
||||
func TestBillingCacheServiceEnqueueAfterStopReturnsFalse(t *testing.T) {
|
||||
cache := &billingCacheWorkerStub{}
|
||||
svc := NewBillingCacheService(cache, nil, nil, &config.Config{})
|
||||
svc := NewBillingCacheService(cache, nil, nil, nil, &config.Config{})
|
||||
svc.Stop()
|
||||
|
||||
enqueued := svc.enqueueCacheWrite(cacheWriteTask{
|
||||
|
||||
@@ -10,6 +10,16 @@ import (
|
||||
"github.com/Wei-Shaw/sub2api/internal/config"
|
||||
)
|
||||
|
||||
// APIKeyRateLimitCacheData holds rate limit usage data cached in Redis.
|
||||
type APIKeyRateLimitCacheData struct {
|
||||
Usage5h float64 `json:"usage_5h"`
|
||||
Usage1d float64 `json:"usage_1d"`
|
||||
Usage7d float64 `json:"usage_7d"`
|
||||
Window5h int64 `json:"window_5h"` // unix timestamp, 0 = not started
|
||||
Window1d int64 `json:"window_1d"`
|
||||
Window7d int64 `json:"window_7d"`
|
||||
}
|
||||
|
||||
// BillingCache defines cache operations for billing service
|
||||
type BillingCache interface {
|
||||
// Balance operations
|
||||
@@ -23,6 +33,12 @@ type BillingCache interface {
|
||||
SetSubscriptionCache(ctx context.Context, userID, groupID int64, data *SubscriptionCacheData) error
|
||||
UpdateSubscriptionUsage(ctx context.Context, userID, groupID int64, cost float64) error
|
||||
InvalidateSubscriptionCache(ctx context.Context, userID, groupID int64) error
|
||||
|
||||
// API Key rate limit operations
|
||||
GetAPIKeyRateLimit(ctx context.Context, keyID int64) (*APIKeyRateLimitCacheData, error)
|
||||
SetAPIKeyRateLimit(ctx context.Context, keyID int64, data *APIKeyRateLimitCacheData) error
|
||||
UpdateAPIKeyRateLimitUsage(ctx context.Context, keyID int64, cost float64) error
|
||||
InvalidateAPIKeyRateLimit(ctx context.Context, keyID int64) error
|
||||
}
|
||||
|
||||
// ModelPricing 模型价格配置(per-token价格,与LiteLLM格式一致)
|
||||
|
||||
@@ -6361,9 +6361,10 @@ type RecordUsageInput struct {
|
||||
APIKeyService APIKeyQuotaUpdater // 可选:用于更新API Key配额
|
||||
}
|
||||
|
||||
// APIKeyQuotaUpdater defines the interface for updating API Key quota
|
||||
// APIKeyQuotaUpdater defines the interface for updating API Key quota and rate limit usage
|
||||
type APIKeyQuotaUpdater interface {
|
||||
UpdateQuotaUsed(ctx context.Context, apiKeyID int64, cost float64) error
|
||||
UpdateRateLimitUsage(ctx context.Context, apiKeyID int64, cost float64) error
|
||||
}
|
||||
|
||||
// RecordUsage 记录使用量并扣费(或更新订阅用量)
|
||||
@@ -6557,6 +6558,14 @@ func (s *GatewayService) RecordUsage(ctx context.Context, input *RecordUsageInpu
|
||||
}
|
||||
}
|
||||
|
||||
// Update API Key rate limit usage
|
||||
if shouldBill && cost.ActualCost > 0 && apiKey.HasRateLimits() && input.APIKeyService != nil {
|
||||
if err := input.APIKeyService.UpdateRateLimitUsage(ctx, apiKey.ID, cost.ActualCost); err != nil {
|
||||
logger.LegacyPrintf("service.gateway", "Update API key rate limit usage failed: %v", err)
|
||||
}
|
||||
s.billingCacheService.QueueUpdateAPIKeyRateLimitUsage(apiKey.ID, cost.ActualCost)
|
||||
}
|
||||
|
||||
// Schedule batch update for account last_used_at
|
||||
s.deferredService.ScheduleLastUsedUpdate(account.ID)
|
||||
|
||||
@@ -6746,6 +6755,14 @@ func (s *GatewayService) RecordUsageWithLongContext(ctx context.Context, input *
|
||||
}
|
||||
}
|
||||
|
||||
// Update API Key rate limit usage
|
||||
if shouldBill && cost.ActualCost > 0 && apiKey.HasRateLimits() && input.APIKeyService != nil {
|
||||
if err := input.APIKeyService.UpdateRateLimitUsage(ctx, apiKey.ID, cost.ActualCost); err != nil {
|
||||
logger.LegacyPrintf("service.gateway", "Update API key rate limit usage failed: %v", err)
|
||||
}
|
||||
s.billingCacheService.QueueUpdateAPIKeyRateLimitUsage(apiKey.ID, cost.ActualCost)
|
||||
}
|
||||
|
||||
// Schedule batch update for account last_used_at
|
||||
s.deferredService.ScheduleLastUsedUpdate(account.ID)
|
||||
|
||||
|
||||
@@ -3492,6 +3492,14 @@ func (s *OpenAIGatewayService) RecordUsage(ctx context.Context, input *OpenAIRec
|
||||
}
|
||||
}
|
||||
|
||||
// Update API Key rate limit usage
|
||||
if shouldBill && cost.ActualCost > 0 && apiKey.HasRateLimits() && input.APIKeyService != nil {
|
||||
if err := input.APIKeyService.UpdateRateLimitUsage(ctx, apiKey.ID, cost.ActualCost); err != nil {
|
||||
logger.LegacyPrintf("service.openai_gateway", "Update API key rate limit usage failed: %v", err)
|
||||
}
|
||||
s.billingCacheService.QueueUpdateAPIKeyRateLimitUsage(apiKey.ID, cost.ActualCost)
|
||||
}
|
||||
|
||||
// Schedule batch update for account last_used_at
|
||||
s.deferredService.ScheduleLastUsedUpdate(account.ID)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user