Merge branch 'feat/deferred-batch-update'

This commit is contained in:
shaw
2025-12-28 11:28:06 +08:00
10 changed files with 243 additions and 38 deletions

View File

@@ -171,6 +171,27 @@ func (r *accountRepository) UpdateLastUsed(ctx context.Context, id int64) error
return r.db.WithContext(ctx).Model(&accountModel{}).Where("id = ?", id).Update("last_used_at", now).Error
}
func (r *accountRepository) BatchUpdateLastUsed(ctx context.Context, updates map[int64]time.Time) error {
if len(updates) == 0 {
return nil
}
var caseSql = "UPDATE accounts SET last_used_at = CASE id"
var args []any
var ids []int64
for id, ts := range updates {
caseSql += " WHEN ? THEN CAST(? AS TIMESTAMP)"
args = append(args, id, ts)
ids = append(ids, id)
}
caseSql += " END WHERE id IN ?"
args = append(args, ids)
return r.db.WithContext(ctx).Exec(caseSql, args...).Error
}
func (r *accountRepository) SetError(ctx context.Context, id int64, errorMsg string) error {
return r.db.WithContext(ctx).Model(&accountModel{}).Where("id = ?", id).
Updates(map[string]any{

View File

@@ -29,6 +29,7 @@ type AccountRepository interface {
ListByPlatform(ctx context.Context, platform string) ([]Account, error)
UpdateLastUsed(ctx context.Context, id int64) error
BatchUpdateLastUsed(ctx context.Context, updates map[int64]time.Time) error
SetError(ctx context.Context, id int64, errorMsg string) error
SetSchedulable(ctx context.Context, id int64, schedulable bool) error
BindGroups(ctx context.Context, accountID int64, groupIDs []int64) error

View File

@@ -0,0 +1,76 @@
package service
import (
"context"
"log"
"sync"
"time"
)
// DeferredService provides deferred batch update functionality
type DeferredService struct {
accountRepo AccountRepository
timingWheel *TimingWheelService
interval time.Duration
lastUsedUpdates sync.Map
}
// NewDeferredService creates a new DeferredService instance
func NewDeferredService(accountRepo AccountRepository, timingWheel *TimingWheelService, interval time.Duration) *DeferredService {
return &DeferredService{
accountRepo: accountRepo,
timingWheel: timingWheel,
interval: interval,
}
}
// Start starts the deferred service
func (s *DeferredService) Start() {
s.timingWheel.ScheduleRecurring("deferred:last_used", s.interval, s.flushLastUsed)
log.Printf("[DeferredService] Started (interval: %v)", s.interval)
}
// Stop stops the deferred service
func (s *DeferredService) Stop() {
s.timingWheel.Cancel("deferred:last_used")
s.flushLastUsed()
log.Printf("[DeferredService] Service stopped")
}
func (s *DeferredService) ScheduleLastUsedUpdate(accountID int64) {
s.lastUsedUpdates.Store(accountID, time.Now())
}
func (s *DeferredService) flushLastUsed() {
updates := make(map[int64]time.Time)
s.lastUsedUpdates.Range(func(key, value any) bool {
id, ok := key.(int64)
if !ok {
return true
}
ts, ok := value.(time.Time)
if !ok {
return true
}
updates[id] = ts
s.lastUsedUpdates.Delete(key)
return true
})
if len(updates) == 0 {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := s.accountRepo.BatchUpdateLastUsed(ctx, updates); err != nil {
log.Printf("[DeferredService] BatchUpdateLastUsed failed (%d accounts): %v", len(updates), err)
for id, ts := range updates {
s.lastUsedUpdates.Store(id, ts)
}
} else {
log.Printf("[DeferredService] BatchUpdateLastUsed flushed %d accounts", len(updates))
}
}

View File

@@ -103,6 +103,7 @@ type GatewayService struct {
billingCacheService *BillingCacheService
identityService *IdentityService
httpUpstream HTTPUpstream
deferredService *DeferredService
}
// NewGatewayService creates a new GatewayService
@@ -118,6 +119,7 @@ func NewGatewayService(
billingCacheService *BillingCacheService,
identityService *IdentityService,
httpUpstream HTTPUpstream,
deferredService *DeferredService,
) *GatewayService {
return &GatewayService{
accountRepo: accountRepo,
@@ -131,6 +133,7 @@ func NewGatewayService(
billingCacheService: billingCacheService,
identityService: identityService,
httpUpstream: httpUpstream,
deferredService: deferredService,
}
}
@@ -1095,10 +1098,8 @@ func (s *GatewayService) RecordUsage(ctx context.Context, input *RecordUsageInpu
}
}
// 更新账号最后使用时间
if err := s.accountRepo.UpdateLastUsed(ctx, account.ID); err != nil {
log.Printf("Update last used failed: %v", err)
}
// Schedule batch update for account last_used_at
s.deferredService.ScheduleLastUsedUpdate(account.ID)
return nil
}

View File

@@ -83,6 +83,7 @@ type OpenAIGatewayService struct {
rateLimitService *RateLimitService
billingCacheService *BillingCacheService
httpUpstream HTTPUpstream
deferredService *DeferredService
}
// NewOpenAIGatewayService creates a new OpenAIGatewayService
@@ -97,6 +98,7 @@ func NewOpenAIGatewayService(
rateLimitService *RateLimitService,
billingCacheService *BillingCacheService,
httpUpstream HTTPUpstream,
deferredService *DeferredService,
) *OpenAIGatewayService {
return &OpenAIGatewayService{
accountRepo: accountRepo,
@@ -109,6 +111,7 @@ func NewOpenAIGatewayService(
rateLimitService: rateLimitService,
billingCacheService: billingCacheService,
httpUpstream: httpUpstream,
deferredService: deferredService,
}
}
@@ -772,8 +775,8 @@ func (s *OpenAIGatewayService) RecordUsage(ctx context.Context, input *OpenAIRec
}
}
// Update account last used
_ = s.accountRepo.UpdateLastUsed(ctx, account.ID)
// Schedule batch update for account last_used_at
s.deferredService.ScheduleLastUsedUpdate(account.ID)
return nil
}

View File

@@ -0,0 +1,63 @@
package service
import (
"log"
"sync"
"time"
"github.com/zeromicro/go-zero/core/collection"
)
// TimingWheelService wraps go-zero's TimingWheel for task scheduling
type TimingWheelService struct {
tw *collection.TimingWheel
stopOnce sync.Once
}
// NewTimingWheelService creates a new TimingWheelService instance
func NewTimingWheelService() *TimingWheelService {
// 1 second tick, 3600 slots = supports up to 1 hour delay
// execute function: runs func() type tasks
tw, err := collection.NewTimingWheel(1*time.Second, 3600, func(key, value any) {
if fn, ok := value.(func()); ok {
fn()
}
})
if err != nil {
panic(err)
}
return &TimingWheelService{tw: tw}
}
// Start starts the timing wheel
func (s *TimingWheelService) Start() {
log.Println("[TimingWheel] Started (auto-start by go-zero)")
}
// Stop stops the timing wheel
func (s *TimingWheelService) Stop() {
s.stopOnce.Do(func() {
s.tw.Stop()
log.Println("[TimingWheel] Stopped")
})
}
// Schedule schedules a one-time task
func (s *TimingWheelService) Schedule(name string, delay time.Duration, fn func()) {
_ = s.tw.SetTimer(name, fn, delay)
}
// ScheduleRecurring schedules a recurring task
func (s *TimingWheelService) ScheduleRecurring(name string, interval time.Duration, fn func()) {
var schedule func()
schedule = func() {
fn()
_ = s.tw.SetTimer(name, schedule, interval)
}
_ = s.tw.SetTimer(name, schedule, interval)
}
// Cancel cancels a scheduled task
func (s *TimingWheelService) Cancel(name string) {
_ = s.tw.RemoveTimer(name)
}

View File

@@ -1,6 +1,8 @@
package service
import (
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/google/wire"
)
@@ -44,6 +46,20 @@ func ProvideTokenRefreshService(
return svc
}
// ProvideTimingWheelService creates and starts TimingWheelService
func ProvideTimingWheelService() *TimingWheelService {
svc := NewTimingWheelService()
svc.Start()
return svc
}
// ProvideDeferredService creates and starts DeferredService
func ProvideDeferredService(accountRepo AccountRepository, timingWheel *TimingWheelService) *DeferredService {
svc := NewDeferredService(accountRepo, timingWheel, 10*time.Second)
svc.Start()
return svc
}
// ProviderSet is the Wire provider set for all services
var ProviderSet = wire.NewSet(
// Core services
@@ -80,4 +96,6 @@ var ProviderSet = wire.NewSet(
NewCRSSyncService,
ProvideUpdateService,
ProvideTokenRefreshService,
ProvideTimingWheelService,
ProvideDeferredService,
)