feat(usage): 添加清理任务与统计过滤

This commit is contained in:
yangjianbo
2026-01-18 10:52:18 +08:00
parent 74a3c74514
commit ef5a41057f
44 changed files with 4478 additions and 46 deletions

View File

@@ -32,8 +32,8 @@ type UsageLogRepository interface {
// Admin dashboard stats
GetDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error)
GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID, accountID, groupID int64, model string, stream *bool) ([]usagestats.TrendDataPoint, error)
GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID, accountID, groupID int64, stream *bool) ([]usagestats.ModelStat, error)
GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID, accountID, groupID int64, model string, stream *bool, billingType *int8) ([]usagestats.TrendDataPoint, error)
GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID, accountID, groupID int64, stream *bool, billingType *int8) ([]usagestats.ModelStat, error)
GetAPIKeyUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.APIKeyUsageTrendPoint, error)
GetUserUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.UserUsageTrendPoint, error)
GetBatchUserUsageStats(ctx context.Context, userIDs []int64) (map[int64]*usagestats.BatchUserUsageStats, error)
@@ -272,7 +272,7 @@ func (s *AccountUsageService) getGeminiUsage(ctx context.Context, account *Accou
}
dayStart := geminiDailyWindowStart(now)
stats, err := s.usageLogRepo.GetModelStatsWithFilters(ctx, dayStart, now, 0, 0, account.ID, 0, nil)
stats, err := s.usageLogRepo.GetModelStatsWithFilters(ctx, dayStart, now, 0, 0, account.ID, 0, nil, nil)
if err != nil {
return nil, fmt.Errorf("get gemini usage stats failed: %w", err)
}
@@ -294,7 +294,7 @@ func (s *AccountUsageService) getGeminiUsage(ctx context.Context, account *Accou
// Minute window (RPM) - fixed-window approximation: current minute [truncate(now), truncate(now)+1m)
minuteStart := now.Truncate(time.Minute)
minuteResetAt := minuteStart.Add(time.Minute)
minuteStats, err := s.usageLogRepo.GetModelStatsWithFilters(ctx, minuteStart, now, 0, 0, account.ID, 0, nil)
minuteStats, err := s.usageLogRepo.GetModelStatsWithFilters(ctx, minuteStart, now, 0, 0, account.ID, 0, nil, nil)
if err != nil {
return nil, fmt.Errorf("get gemini minute usage stats failed: %w", err)
}

View File

@@ -21,11 +21,15 @@ var (
ErrDashboardBackfillDisabled = errors.New("仪表盘聚合回填已禁用")
// ErrDashboardBackfillTooLarge 当回填跨度超过限制时返回。
ErrDashboardBackfillTooLarge = errors.New("回填时间跨度过大")
errDashboardAggregationRunning = errors.New("聚合作业正在运行")
)
// DashboardAggregationRepository 定义仪表盘预聚合仓储接口。
type DashboardAggregationRepository interface {
AggregateRange(ctx context.Context, start, end time.Time) error
// RecomputeRange 重新计算指定时间范围内的聚合数据(包含活跃用户等派生表)。
// 设计目的:当 usage_logs 被批量删除/回滚后,确保聚合表可恢复一致性。
RecomputeRange(ctx context.Context, start, end time.Time) error
GetAggregationWatermark(ctx context.Context) (time.Time, error)
UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error
CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error
@@ -112,6 +116,41 @@ func (s *DashboardAggregationService) TriggerBackfill(start, end time.Time) erro
return nil
}
// TriggerRecomputeRange 触发指定范围的重新计算(异步)。
// 与 TriggerBackfill 不同:
// - 不依赖 backfill_enabled这是内部一致性修复
// - 不更新 watermark避免影响正常增量聚合游标
func (s *DashboardAggregationService) TriggerRecomputeRange(start, end time.Time) error {
if s == nil || s.repo == nil {
return errors.New("聚合服务未初始化")
}
if !s.cfg.Enabled {
return errors.New("聚合服务已禁用")
}
if !end.After(start) {
return errors.New("重新计算时间范围无效")
}
go func() {
const maxRetries = 3
for i := 0; i < maxRetries; i++ {
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout)
err := s.recomputeRange(ctx, start, end)
cancel()
if err == nil {
return
}
if !errors.Is(err, errDashboardAggregationRunning) {
log.Printf("[DashboardAggregation] 重新计算失败: %v", err)
return
}
time.Sleep(5 * time.Second)
}
log.Printf("[DashboardAggregation] 重新计算放弃: 聚合作业持续占用")
}()
return nil
}
func (s *DashboardAggregationService) recomputeRecentDays() {
days := s.cfg.RecomputeDays
if days <= 0 {
@@ -128,6 +167,24 @@ func (s *DashboardAggregationService) recomputeRecentDays() {
}
}
func (s *DashboardAggregationService) recomputeRange(ctx context.Context, start, end time.Time) error {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
return errDashboardAggregationRunning
}
defer atomic.StoreInt32(&s.running, 0)
jobStart := time.Now().UTC()
if err := s.repo.RecomputeRange(ctx, start, end); err != nil {
return err
}
log.Printf("[DashboardAggregation] 重新计算完成 (start=%s end=%s duration=%s)",
start.UTC().Format(time.RFC3339),
end.UTC().Format(time.RFC3339),
time.Since(jobStart).String(),
)
return nil
}
func (s *DashboardAggregationService) runScheduledAggregation() {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
return
@@ -179,7 +236,7 @@ func (s *DashboardAggregationService) runScheduledAggregation() {
func (s *DashboardAggregationService) backfillRange(ctx context.Context, start, end time.Time) error {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
return errors.New("聚合作业正在运行")
return errDashboardAggregationRunning
}
defer atomic.StoreInt32(&s.running, 0)

View File

@@ -27,6 +27,10 @@ func (s *dashboardAggregationRepoTestStub) AggregateRange(ctx context.Context, s
return s.aggregateErr
}
func (s *dashboardAggregationRepoTestStub) RecomputeRange(ctx context.Context, start, end time.Time) error {
return s.AggregateRange(ctx, start, end)
}
func (s *dashboardAggregationRepoTestStub) GetAggregationWatermark(ctx context.Context) (time.Time, error) {
return s.watermark, nil
}

View File

@@ -124,16 +124,16 @@ func (s *DashboardService) GetDashboardStats(ctx context.Context) (*usagestats.D
return stats, nil
}
func (s *DashboardService) GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID, accountID, groupID int64, model string, stream *bool) ([]usagestats.TrendDataPoint, error) {
trend, err := s.usageRepo.GetUsageTrendWithFilters(ctx, startTime, endTime, granularity, userID, apiKeyID, accountID, groupID, model, stream)
func (s *DashboardService) GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID, accountID, groupID int64, model string, stream *bool, billingType *int8) ([]usagestats.TrendDataPoint, error) {
trend, err := s.usageRepo.GetUsageTrendWithFilters(ctx, startTime, endTime, granularity, userID, apiKeyID, accountID, groupID, model, stream, billingType)
if err != nil {
return nil, fmt.Errorf("get usage trend with filters: %w", err)
}
return trend, nil
}
func (s *DashboardService) GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID, accountID, groupID int64, stream *bool) ([]usagestats.ModelStat, error) {
stats, err := s.usageRepo.GetModelStatsWithFilters(ctx, startTime, endTime, userID, apiKeyID, accountID, groupID, stream)
func (s *DashboardService) GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID, accountID, groupID int64, stream *bool, billingType *int8) ([]usagestats.ModelStat, error) {
stats, err := s.usageRepo.GetModelStatsWithFilters(ctx, startTime, endTime, userID, apiKeyID, accountID, groupID, stream, billingType)
if err != nil {
return nil, fmt.Errorf("get model stats with filters: %w", err)
}

View File

@@ -101,6 +101,10 @@ func (s *dashboardAggregationRepoStub) AggregateRange(ctx context.Context, start
return nil
}
func (s *dashboardAggregationRepoStub) RecomputeRange(ctx context.Context, start, end time.Time) error {
return nil
}
func (s *dashboardAggregationRepoStub) GetAggregationWatermark(ctx context.Context) (time.Time, error) {
if s.err != nil {
return time.Time{}, s.err

View File

@@ -190,7 +190,7 @@ func (s *RateLimitService) PreCheckUsage(ctx context.Context, account *Account,
start := geminiDailyWindowStart(now)
totals, ok := s.getGeminiUsageTotals(account.ID, start, now)
if !ok {
stats, err := s.usageRepo.GetModelStatsWithFilters(ctx, start, now, 0, 0, account.ID, 0, nil)
stats, err := s.usageRepo.GetModelStatsWithFilters(ctx, start, now, 0, 0, account.ID, 0, nil, nil)
if err != nil {
return true, err
}
@@ -237,7 +237,7 @@ func (s *RateLimitService) PreCheckUsage(ctx context.Context, account *Account,
if limit > 0 {
start := now.Truncate(time.Minute)
stats, err := s.usageRepo.GetModelStatsWithFilters(ctx, start, now, 0, 0, account.ID, 0, nil)
stats, err := s.usageRepo.GetModelStatsWithFilters(ctx, start, now, 0, 0, account.ID, 0, nil, nil)
if err != nil {
return true, err
}

View File

@@ -0,0 +1,74 @@
package service
import (
"context"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
)
const (
UsageCleanupStatusPending = "pending"
UsageCleanupStatusRunning = "running"
UsageCleanupStatusSucceeded = "succeeded"
UsageCleanupStatusFailed = "failed"
UsageCleanupStatusCanceled = "canceled"
)
// UsageCleanupFilters 定义清理任务过滤条件
// 时间范围为必填,其他字段可选
// JSON 序列化用于存储任务参数
//
// start_time/end_time 使用 RFC3339 时间格式
// 以 UTC 或用户时区解析后的时间为准
//
// 说明:
// - nil 表示未设置该过滤条件
// - 过滤条件均为精确匹配
type UsageCleanupFilters struct {
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
UserID *int64 `json:"user_id,omitempty"`
APIKeyID *int64 `json:"api_key_id,omitempty"`
AccountID *int64 `json:"account_id,omitempty"`
GroupID *int64 `json:"group_id,omitempty"`
Model *string `json:"model,omitempty"`
Stream *bool `json:"stream,omitempty"`
BillingType *int8 `json:"billing_type,omitempty"`
}
// UsageCleanupTask 表示使用记录清理任务
// 状态包含 pending/running/succeeded/failed/canceled
type UsageCleanupTask struct {
ID int64
Status string
Filters UsageCleanupFilters
CreatedBy int64
DeletedRows int64
ErrorMsg *string
CanceledBy *int64
CanceledAt *time.Time
StartedAt *time.Time
FinishedAt *time.Time
CreatedAt time.Time
UpdatedAt time.Time
}
// UsageCleanupRepository 定义清理任务持久层接口
type UsageCleanupRepository interface {
CreateTask(ctx context.Context, task *UsageCleanupTask) error
ListTasks(ctx context.Context, params pagination.PaginationParams) ([]UsageCleanupTask, *pagination.PaginationResult, error)
// ClaimNextPendingTask 抢占下一条可执行任务:
// - 优先 pending
// - 若 running 超过 staleRunningAfterSeconds可能由于进程退出/崩溃/超时),允许重新抢占继续执行
ClaimNextPendingTask(ctx context.Context, staleRunningAfterSeconds int64) (*UsageCleanupTask, error)
// GetTaskStatus 查询任务状态;若不存在返回 sql.ErrNoRows
GetTaskStatus(ctx context.Context, taskID int64) (string, error)
// UpdateTaskProgress 更新任务进度deleted_rows用于断点续跑/展示
UpdateTaskProgress(ctx context.Context, taskID int64, deletedRows int64) error
// CancelTask 将任务标记为 canceled仅允许 pending/running
CancelTask(ctx context.Context, taskID int64, canceledBy int64) (bool, error)
MarkTaskSucceeded(ctx context.Context, taskID int64, deletedRows int64) error
MarkTaskFailed(ctx context.Context, taskID int64, deletedRows int64, errorMsg string) error
DeleteUsageLogsBatch(ctx context.Context, filters UsageCleanupFilters, limit int) (int64, error)
}

View File

@@ -0,0 +1,400 @@
package service
import (
"context"
"database/sql"
"errors"
"fmt"
"log"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
)
const (
usageCleanupWorkerName = "usage_cleanup_worker"
)
// UsageCleanupService 负责创建与执行使用记录清理任务
type UsageCleanupService struct {
repo UsageCleanupRepository
timingWheel *TimingWheelService
dashboard *DashboardAggregationService
cfg *config.Config
running int32
startOnce sync.Once
stopOnce sync.Once
workerCtx context.Context
workerCancel context.CancelFunc
}
func NewUsageCleanupService(repo UsageCleanupRepository, timingWheel *TimingWheelService, dashboard *DashboardAggregationService, cfg *config.Config) *UsageCleanupService {
workerCtx, workerCancel := context.WithCancel(context.Background())
return &UsageCleanupService{
repo: repo,
timingWheel: timingWheel,
dashboard: dashboard,
cfg: cfg,
workerCtx: workerCtx,
workerCancel: workerCancel,
}
}
func describeUsageCleanupFilters(filters UsageCleanupFilters) string {
var parts []string
parts = append(parts, "start="+filters.StartTime.UTC().Format(time.RFC3339))
parts = append(parts, "end="+filters.EndTime.UTC().Format(time.RFC3339))
if filters.UserID != nil {
parts = append(parts, fmt.Sprintf("user_id=%d", *filters.UserID))
}
if filters.APIKeyID != nil {
parts = append(parts, fmt.Sprintf("api_key_id=%d", *filters.APIKeyID))
}
if filters.AccountID != nil {
parts = append(parts, fmt.Sprintf("account_id=%d", *filters.AccountID))
}
if filters.GroupID != nil {
parts = append(parts, fmt.Sprintf("group_id=%d", *filters.GroupID))
}
if filters.Model != nil {
parts = append(parts, "model="+strings.TrimSpace(*filters.Model))
}
if filters.Stream != nil {
parts = append(parts, fmt.Sprintf("stream=%t", *filters.Stream))
}
if filters.BillingType != nil {
parts = append(parts, fmt.Sprintf("billing_type=%d", *filters.BillingType))
}
return strings.Join(parts, " ")
}
func (s *UsageCleanupService) Start() {
if s == nil {
return
}
if s.cfg != nil && !s.cfg.UsageCleanup.Enabled {
log.Printf("[UsageCleanup] not started (disabled)")
return
}
if s.repo == nil || s.timingWheel == nil {
log.Printf("[UsageCleanup] not started (missing deps)")
return
}
interval := s.workerInterval()
s.startOnce.Do(func() {
s.timingWheel.ScheduleRecurring(usageCleanupWorkerName, interval, s.runOnce)
log.Printf("[UsageCleanup] started (interval=%s max_range_days=%d batch_size=%d task_timeout=%s)", interval, s.maxRangeDays(), s.batchSize(), s.taskTimeout())
})
}
func (s *UsageCleanupService) Stop() {
if s == nil {
return
}
s.stopOnce.Do(func() {
if s.workerCancel != nil {
s.workerCancel()
}
if s.timingWheel != nil {
s.timingWheel.Cancel(usageCleanupWorkerName)
}
log.Printf("[UsageCleanup] stopped")
})
}
func (s *UsageCleanupService) ListTasks(ctx context.Context, params pagination.PaginationParams) ([]UsageCleanupTask, *pagination.PaginationResult, error) {
if s == nil || s.repo == nil {
return nil, nil, fmt.Errorf("cleanup service not ready")
}
return s.repo.ListTasks(ctx, params)
}
func (s *UsageCleanupService) CreateTask(ctx context.Context, filters UsageCleanupFilters, createdBy int64) (*UsageCleanupTask, error) {
if s == nil || s.repo == nil {
return nil, fmt.Errorf("cleanup service not ready")
}
if s.cfg != nil && !s.cfg.UsageCleanup.Enabled {
return nil, infraerrors.New(http.StatusServiceUnavailable, "USAGE_CLEANUP_DISABLED", "usage cleanup is disabled")
}
if createdBy <= 0 {
return nil, infraerrors.BadRequest("USAGE_CLEANUP_INVALID_CREATOR", "invalid creator")
}
log.Printf("[UsageCleanup] create_task requested: operator=%d %s", createdBy, describeUsageCleanupFilters(filters))
sanitizeUsageCleanupFilters(&filters)
if err := s.validateFilters(filters); err != nil {
log.Printf("[UsageCleanup] create_task rejected: operator=%d err=%v %s", createdBy, err, describeUsageCleanupFilters(filters))
return nil, err
}
task := &UsageCleanupTask{
Status: UsageCleanupStatusPending,
Filters: filters,
CreatedBy: createdBy,
}
if err := s.repo.CreateTask(ctx, task); err != nil {
log.Printf("[UsageCleanup] create_task persist failed: operator=%d err=%v %s", createdBy, err, describeUsageCleanupFilters(filters))
return nil, fmt.Errorf("create cleanup task: %w", err)
}
log.Printf("[UsageCleanup] create_task persisted: task=%d operator=%d status=%s deleted_rows=%d %s", task.ID, createdBy, task.Status, task.DeletedRows, describeUsageCleanupFilters(filters))
go s.runOnce()
return task, nil
}
func (s *UsageCleanupService) runOnce() {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
log.Printf("[UsageCleanup] run_once skipped: already_running=true")
return
}
defer atomic.StoreInt32(&s.running, 0)
parent := context.Background()
if s != nil && s.workerCtx != nil {
parent = s.workerCtx
}
ctx, cancel := context.WithTimeout(parent, s.taskTimeout())
defer cancel()
task, err := s.repo.ClaimNextPendingTask(ctx, int64(s.taskTimeout().Seconds()))
if err != nil {
log.Printf("[UsageCleanup] claim pending task failed: %v", err)
return
}
if task == nil {
log.Printf("[UsageCleanup] run_once done: no_task=true")
return
}
log.Printf("[UsageCleanup] task claimed: task=%d status=%s created_by=%d deleted_rows=%d %s", task.ID, task.Status, task.CreatedBy, task.DeletedRows, describeUsageCleanupFilters(task.Filters))
s.executeTask(ctx, task)
}
func (s *UsageCleanupService) executeTask(ctx context.Context, task *UsageCleanupTask) {
if task == nil {
return
}
batchSize := s.batchSize()
deletedTotal := task.DeletedRows
start := time.Now()
log.Printf("[UsageCleanup] task started: task=%d batch_size=%d deleted_rows=%d %s", task.ID, batchSize, deletedTotal, describeUsageCleanupFilters(task.Filters))
var batchNum int
for {
if ctx != nil && ctx.Err() != nil {
log.Printf("[UsageCleanup] task interrupted: task=%d err=%v", task.ID, ctx.Err())
return
}
canceled, err := s.isTaskCanceled(ctx, task.ID)
if err != nil {
s.markTaskFailed(task.ID, deletedTotal, err)
return
}
if canceled {
log.Printf("[UsageCleanup] task canceled: task=%d deleted_rows=%d duration=%s", task.ID, deletedTotal, time.Since(start))
return
}
batchNum++
deleted, err := s.repo.DeleteUsageLogsBatch(ctx, task.Filters, batchSize)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// 任务被中断(例如服务停止/超时),保持 running 状态,后续通过 stale reclaim 续跑。
log.Printf("[UsageCleanup] task interrupted: task=%d err=%v", task.ID, err)
return
}
s.markTaskFailed(task.ID, deletedTotal, err)
return
}
deletedTotal += deleted
if deleted > 0 {
updateCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
if err := s.repo.UpdateTaskProgress(updateCtx, task.ID, deletedTotal); err != nil {
log.Printf("[UsageCleanup] task progress update failed: task=%d deleted_rows=%d err=%v", task.ID, deletedTotal, err)
}
cancel()
}
if batchNum <= 3 || batchNum%20 == 0 || deleted < int64(batchSize) {
log.Printf("[UsageCleanup] task batch done: task=%d batch=%d deleted=%d deleted_total=%d", task.ID, batchNum, deleted, deletedTotal)
}
if deleted == 0 || deleted < int64(batchSize) {
break
}
}
updateCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := s.repo.MarkTaskSucceeded(updateCtx, task.ID, deletedTotal); err != nil {
log.Printf("[UsageCleanup] update task succeeded failed: task=%d err=%v", task.ID, err)
} else {
log.Printf("[UsageCleanup] task succeeded: task=%d deleted_rows=%d duration=%s", task.ID, deletedTotal, time.Since(start))
}
if s.dashboard != nil {
if err := s.dashboard.TriggerRecomputeRange(task.Filters.StartTime, task.Filters.EndTime); err != nil {
log.Printf("[UsageCleanup] trigger dashboard recompute failed: task=%d err=%v", task.ID, err)
} else {
log.Printf("[UsageCleanup] trigger dashboard recompute: task=%d start=%s end=%s", task.ID, task.Filters.StartTime.UTC().Format(time.RFC3339), task.Filters.EndTime.UTC().Format(time.RFC3339))
}
}
}
func (s *UsageCleanupService) markTaskFailed(taskID int64, deletedRows int64, err error) {
msg := strings.TrimSpace(err.Error())
if len(msg) > 500 {
msg = msg[:500]
}
log.Printf("[UsageCleanup] task failed: task=%d deleted_rows=%d err=%s", taskID, deletedRows, msg)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if updateErr := s.repo.MarkTaskFailed(ctx, taskID, deletedRows, msg); updateErr != nil {
log.Printf("[UsageCleanup] update task failed failed: task=%d err=%v", taskID, updateErr)
}
}
func (s *UsageCleanupService) isTaskCanceled(ctx context.Context, taskID int64) (bool, error) {
if s == nil || s.repo == nil {
return false, fmt.Errorf("cleanup service not ready")
}
checkCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
status, err := s.repo.GetTaskStatus(checkCtx, taskID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
return false, err
}
if status == UsageCleanupStatusCanceled {
log.Printf("[UsageCleanup] task cancel detected: task=%d", taskID)
}
return status == UsageCleanupStatusCanceled, nil
}
func (s *UsageCleanupService) validateFilters(filters UsageCleanupFilters) error {
if filters.StartTime.IsZero() || filters.EndTime.IsZero() {
return infraerrors.BadRequest("USAGE_CLEANUP_MISSING_RANGE", "start_date and end_date are required")
}
if filters.EndTime.Before(filters.StartTime) {
return infraerrors.BadRequest("USAGE_CLEANUP_INVALID_RANGE", "end_date must be after start_date")
}
maxDays := s.maxRangeDays()
if maxDays > 0 {
delta := filters.EndTime.Sub(filters.StartTime)
if delta > time.Duration(maxDays)*24*time.Hour {
return infraerrors.BadRequest("USAGE_CLEANUP_RANGE_TOO_LARGE", fmt.Sprintf("date range exceeds %d days", maxDays))
}
}
return nil
}
func (s *UsageCleanupService) CancelTask(ctx context.Context, taskID int64, canceledBy int64) error {
if s == nil || s.repo == nil {
return fmt.Errorf("cleanup service not ready")
}
if s.cfg != nil && !s.cfg.UsageCleanup.Enabled {
return infraerrors.New(http.StatusServiceUnavailable, "USAGE_CLEANUP_DISABLED", "usage cleanup is disabled")
}
if canceledBy <= 0 {
return infraerrors.BadRequest("USAGE_CLEANUP_INVALID_CANCELLER", "invalid canceller")
}
status, err := s.repo.GetTaskStatus(ctx, taskID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return infraerrors.New(http.StatusNotFound, "USAGE_CLEANUP_TASK_NOT_FOUND", "cleanup task not found")
}
return err
}
log.Printf("[UsageCleanup] cancel_task requested: task=%d operator=%d status=%s", taskID, canceledBy, status)
if status != UsageCleanupStatusPending && status != UsageCleanupStatusRunning {
return infraerrors.New(http.StatusConflict, "USAGE_CLEANUP_CANCEL_CONFLICT", "cleanup task cannot be canceled in current status")
}
ok, err := s.repo.CancelTask(ctx, taskID, canceledBy)
if err != nil {
return err
}
if !ok {
// 状态可能并发改变
return infraerrors.New(http.StatusConflict, "USAGE_CLEANUP_CANCEL_CONFLICT", "cleanup task cannot be canceled in current status")
}
log.Printf("[UsageCleanup] cancel_task done: task=%d operator=%d", taskID, canceledBy)
return nil
}
func sanitizeUsageCleanupFilters(filters *UsageCleanupFilters) {
if filters == nil {
return
}
if filters.UserID != nil && *filters.UserID <= 0 {
filters.UserID = nil
}
if filters.APIKeyID != nil && *filters.APIKeyID <= 0 {
filters.APIKeyID = nil
}
if filters.AccountID != nil && *filters.AccountID <= 0 {
filters.AccountID = nil
}
if filters.GroupID != nil && *filters.GroupID <= 0 {
filters.GroupID = nil
}
if filters.Model != nil {
model := strings.TrimSpace(*filters.Model)
if model == "" {
filters.Model = nil
} else {
filters.Model = &model
}
}
if filters.BillingType != nil && *filters.BillingType < 0 {
filters.BillingType = nil
}
}
func (s *UsageCleanupService) maxRangeDays() int {
if s == nil || s.cfg == nil {
return 31
}
if s.cfg.UsageCleanup.MaxRangeDays > 0 {
return s.cfg.UsageCleanup.MaxRangeDays
}
return 31
}
func (s *UsageCleanupService) batchSize() int {
if s == nil || s.cfg == nil {
return 5000
}
if s.cfg.UsageCleanup.BatchSize > 0 {
return s.cfg.UsageCleanup.BatchSize
}
return 5000
}
func (s *UsageCleanupService) workerInterval() time.Duration {
if s == nil || s.cfg == nil {
return 10 * time.Second
}
if s.cfg.UsageCleanup.WorkerIntervalSeconds > 0 {
return time.Duration(s.cfg.UsageCleanup.WorkerIntervalSeconds) * time.Second
}
return 10 * time.Second
}
func (s *UsageCleanupService) taskTimeout() time.Duration {
if s == nil || s.cfg == nil {
return 30 * time.Minute
}
if s.cfg.UsageCleanup.TaskTimeoutSeconds > 0 {
return time.Duration(s.cfg.UsageCleanup.TaskTimeoutSeconds) * time.Second
}
return 30 * time.Minute
}

View File

@@ -0,0 +1,420 @@
package service
import (
"context"
"database/sql"
"errors"
"net/http"
"strings"
"sync"
"testing"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
"github.com/stretchr/testify/require"
)
type cleanupDeleteResponse struct {
deleted int64
err error
}
type cleanupDeleteCall struct {
filters UsageCleanupFilters
limit int
}
type cleanupMarkCall struct {
taskID int64
deletedRows int64
errMsg string
}
type cleanupRepoStub struct {
mu sync.Mutex
created []*UsageCleanupTask
createErr error
listTasks []UsageCleanupTask
listResult *pagination.PaginationResult
listErr error
claimQueue []*UsageCleanupTask
claimErr error
deleteQueue []cleanupDeleteResponse
deleteCalls []cleanupDeleteCall
markSucceeded []cleanupMarkCall
markFailed []cleanupMarkCall
statusByID map[int64]string
progressCalls []cleanupMarkCall
cancelCalls []int64
}
func (s *cleanupRepoStub) CreateTask(ctx context.Context, task *UsageCleanupTask) error {
if task == nil {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
if s.createErr != nil {
return s.createErr
}
if task.ID == 0 {
task.ID = int64(len(s.created) + 1)
}
if task.CreatedAt.IsZero() {
task.CreatedAt = time.Now().UTC()
}
if task.UpdatedAt.IsZero() {
task.UpdatedAt = task.CreatedAt
}
clone := *task
s.created = append(s.created, &clone)
return nil
}
func (s *cleanupRepoStub) ListTasks(ctx context.Context, params pagination.PaginationParams) ([]UsageCleanupTask, *pagination.PaginationResult, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.listTasks, s.listResult, s.listErr
}
func (s *cleanupRepoStub) ClaimNextPendingTask(ctx context.Context, staleRunningAfterSeconds int64) (*UsageCleanupTask, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.claimErr != nil {
return nil, s.claimErr
}
if len(s.claimQueue) == 0 {
return nil, nil
}
task := s.claimQueue[0]
s.claimQueue = s.claimQueue[1:]
if s.statusByID == nil {
s.statusByID = map[int64]string{}
}
s.statusByID[task.ID] = UsageCleanupStatusRunning
return task, nil
}
func (s *cleanupRepoStub) GetTaskStatus(ctx context.Context, taskID int64) (string, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.statusByID == nil {
return "", sql.ErrNoRows
}
status, ok := s.statusByID[taskID]
if !ok {
return "", sql.ErrNoRows
}
return status, nil
}
func (s *cleanupRepoStub) UpdateTaskProgress(ctx context.Context, taskID int64, deletedRows int64) error {
s.mu.Lock()
defer s.mu.Unlock()
s.progressCalls = append(s.progressCalls, cleanupMarkCall{taskID: taskID, deletedRows: deletedRows})
return nil
}
func (s *cleanupRepoStub) CancelTask(ctx context.Context, taskID int64, canceledBy int64) (bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.cancelCalls = append(s.cancelCalls, taskID)
if s.statusByID == nil {
s.statusByID = map[int64]string{}
}
status := s.statusByID[taskID]
if status != UsageCleanupStatusPending && status != UsageCleanupStatusRunning {
return false, nil
}
s.statusByID[taskID] = UsageCleanupStatusCanceled
return true, nil
}
func (s *cleanupRepoStub) MarkTaskSucceeded(ctx context.Context, taskID int64, deletedRows int64) error {
s.mu.Lock()
defer s.mu.Unlock()
s.markSucceeded = append(s.markSucceeded, cleanupMarkCall{taskID: taskID, deletedRows: deletedRows})
if s.statusByID == nil {
s.statusByID = map[int64]string{}
}
s.statusByID[taskID] = UsageCleanupStatusSucceeded
return nil
}
func (s *cleanupRepoStub) MarkTaskFailed(ctx context.Context, taskID int64, deletedRows int64, errorMsg string) error {
s.mu.Lock()
defer s.mu.Unlock()
s.markFailed = append(s.markFailed, cleanupMarkCall{taskID: taskID, deletedRows: deletedRows, errMsg: errorMsg})
if s.statusByID == nil {
s.statusByID = map[int64]string{}
}
s.statusByID[taskID] = UsageCleanupStatusFailed
return nil
}
func (s *cleanupRepoStub) DeleteUsageLogsBatch(ctx context.Context, filters UsageCleanupFilters, limit int) (int64, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.deleteCalls = append(s.deleteCalls, cleanupDeleteCall{filters: filters, limit: limit})
if len(s.deleteQueue) == 0 {
return 0, nil
}
resp := s.deleteQueue[0]
s.deleteQueue = s.deleteQueue[1:]
return resp.deleted, resp.err
}
func TestUsageCleanupServiceCreateTaskSanitizeFilters(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, MaxRangeDays: 31}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
start := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
end := start.Add(24 * time.Hour)
userID := int64(-1)
apiKeyID := int64(10)
model := " gpt-4 "
billingType := int8(-2)
filters := UsageCleanupFilters{
StartTime: start,
EndTime: end,
UserID: &userID,
APIKeyID: &apiKeyID,
Model: &model,
BillingType: &billingType,
}
task, err := svc.CreateTask(context.Background(), filters, 9)
require.NoError(t, err)
require.Equal(t, UsageCleanupStatusPending, task.Status)
require.Nil(t, task.Filters.UserID)
require.NotNil(t, task.Filters.APIKeyID)
require.Equal(t, apiKeyID, *task.Filters.APIKeyID)
require.NotNil(t, task.Filters.Model)
require.Equal(t, "gpt-4", *task.Filters.Model)
require.Nil(t, task.Filters.BillingType)
require.Equal(t, int64(9), task.CreatedBy)
}
func TestUsageCleanupServiceCreateTaskInvalidCreator(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
filters := UsageCleanupFilters{
StartTime: time.Now(),
EndTime: time.Now().Add(24 * time.Hour),
}
_, err := svc.CreateTask(context.Background(), filters, 0)
require.Error(t, err)
require.Equal(t, "USAGE_CLEANUP_INVALID_CREATOR", infraerrors.Reason(err))
}
func TestUsageCleanupServiceCreateTaskDisabled(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: false}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
filters := UsageCleanupFilters{
StartTime: time.Now(),
EndTime: time.Now().Add(24 * time.Hour),
}
_, err := svc.CreateTask(context.Background(), filters, 1)
require.Error(t, err)
require.Equal(t, http.StatusServiceUnavailable, infraerrors.Code(err))
require.Equal(t, "USAGE_CLEANUP_DISABLED", infraerrors.Reason(err))
}
func TestUsageCleanupServiceCreateTaskRangeTooLarge(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, MaxRangeDays: 1}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
start := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
end := start.Add(48 * time.Hour)
filters := UsageCleanupFilters{StartTime: start, EndTime: end}
_, err := svc.CreateTask(context.Background(), filters, 1)
require.Error(t, err)
require.Equal(t, "USAGE_CLEANUP_RANGE_TOO_LARGE", infraerrors.Reason(err))
}
func TestUsageCleanupServiceCreateTaskMissingRange(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
_, err := svc.CreateTask(context.Background(), UsageCleanupFilters{}, 1)
require.Error(t, err)
require.Equal(t, "USAGE_CLEANUP_MISSING_RANGE", infraerrors.Reason(err))
}
func TestUsageCleanupServiceCreateTaskRepoError(t *testing.T) {
repo := &cleanupRepoStub{createErr: errors.New("db down")}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
filters := UsageCleanupFilters{
StartTime: time.Now(),
EndTime: time.Now().Add(24 * time.Hour),
}
_, err := svc.CreateTask(context.Background(), filters, 1)
require.Error(t, err)
require.Contains(t, err.Error(), "create cleanup task")
}
func TestUsageCleanupServiceRunOnceSuccess(t *testing.T) {
repo := &cleanupRepoStub{
claimQueue: []*UsageCleanupTask{
{ID: 5, Filters: UsageCleanupFilters{StartTime: time.Now(), EndTime: time.Now().Add(2 * time.Hour)}},
},
deleteQueue: []cleanupDeleteResponse{
{deleted: 2},
{deleted: 2},
{deleted: 1},
},
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2, TaskTimeoutSeconds: 30}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
svc.runOnce()
repo.mu.Lock()
defer repo.mu.Unlock()
require.Len(t, repo.deleteCalls, 3)
require.Len(t, repo.markSucceeded, 1)
require.Empty(t, repo.markFailed)
require.Equal(t, int64(5), repo.markSucceeded[0].taskID)
require.Equal(t, int64(5), repo.markSucceeded[0].deletedRows)
}
func TestUsageCleanupServiceRunOnceClaimError(t *testing.T) {
repo := &cleanupRepoStub{claimErr: errors.New("claim failed")}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
svc.runOnce()
repo.mu.Lock()
defer repo.mu.Unlock()
require.Empty(t, repo.markSucceeded)
require.Empty(t, repo.markFailed)
}
func TestUsageCleanupServiceRunOnceAlreadyRunning(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
svc.running = 1
svc.runOnce()
}
func TestUsageCleanupServiceExecuteTaskFailed(t *testing.T) {
longMsg := strings.Repeat("x", 600)
repo := &cleanupRepoStub{
deleteQueue: []cleanupDeleteResponse{
{err: errors.New(longMsg)},
},
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 3}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
task := &UsageCleanupTask{
ID: 11,
Filters: UsageCleanupFilters{
StartTime: time.Now(),
EndTime: time.Now().Add(24 * time.Hour),
},
}
svc.executeTask(context.Background(), task)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Len(t, repo.markFailed, 1)
require.Equal(t, int64(11), repo.markFailed[0].taskID)
require.Equal(t, 500, len(repo.markFailed[0].errMsg))
}
func TestUsageCleanupServiceListTasks(t *testing.T) {
repo := &cleanupRepoStub{
listTasks: []UsageCleanupTask{{ID: 1}, {ID: 2}},
listResult: &pagination.PaginationResult{
Total: 2,
Page: 1,
PageSize: 20,
Pages: 1,
},
}
svc := NewUsageCleanupService(repo, nil, nil, &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}})
tasks, result, err := svc.ListTasks(context.Background(), pagination.PaginationParams{Page: 1, PageSize: 20})
require.NoError(t, err)
require.Len(t, tasks, 2)
require.Equal(t, int64(2), result.Total)
}
func TestUsageCleanupServiceListTasksNotReady(t *testing.T) {
var nilSvc *UsageCleanupService
_, _, err := nilSvc.ListTasks(context.Background(), pagination.PaginationParams{Page: 1, PageSize: 20})
require.Error(t, err)
svc := NewUsageCleanupService(nil, nil, nil, &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}})
_, _, err = svc.ListTasks(context.Background(), pagination.PaginationParams{Page: 1, PageSize: 20})
require.Error(t, err)
}
func TestUsageCleanupServiceDefaultsAndLifecycle(t *testing.T) {
var nilSvc *UsageCleanupService
require.Equal(t, 31, nilSvc.maxRangeDays())
require.Equal(t, 5000, nilSvc.batchSize())
require.Equal(t, 10*time.Second, nilSvc.workerInterval())
require.Equal(t, 30*time.Minute, nilSvc.taskTimeout())
nilSvc.Start()
nilSvc.Stop()
repo := &cleanupRepoStub{}
cfgDisabled := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: false}}
svcDisabled := NewUsageCleanupService(repo, nil, nil, cfgDisabled)
svcDisabled.Start()
svcDisabled.Stop()
timingWheel, err := NewTimingWheelService()
require.NoError(t, err)
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, WorkerIntervalSeconds: 5}}
svc := NewUsageCleanupService(repo, timingWheel, nil, cfg)
require.Equal(t, 5*time.Second, svc.workerInterval())
svc.Start()
svc.Stop()
cfgFallback := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svcFallback := NewUsageCleanupService(repo, timingWheel, nil, cfgFallback)
require.Equal(t, 31, svcFallback.maxRangeDays())
require.Equal(t, 5000, svcFallback.batchSize())
require.Equal(t, 10*time.Second, svcFallback.workerInterval())
svcMissingDeps := NewUsageCleanupService(nil, nil, nil, cfgFallback)
svcMissingDeps.Start()
}
func TestSanitizeUsageCleanupFiltersModelEmpty(t *testing.T) {
model := " "
apiKeyID := int64(-5)
accountID := int64(-1)
groupID := int64(-2)
filters := UsageCleanupFilters{
UserID: &apiKeyID,
APIKeyID: &apiKeyID,
AccountID: &accountID,
GroupID: &groupID,
Model: &model,
}
sanitizeUsageCleanupFilters(&filters)
require.Nil(t, filters.UserID)
require.Nil(t, filters.APIKeyID)
require.Nil(t, filters.AccountID)
require.Nil(t, filters.GroupID)
require.Nil(t, filters.Model)
}

View File

@@ -57,6 +57,13 @@ func ProvideDashboardAggregationService(repo DashboardAggregationRepository, tim
return svc
}
// ProvideUsageCleanupService 创建并启动使用记录清理任务服务
func ProvideUsageCleanupService(repo UsageCleanupRepository, timingWheel *TimingWheelService, dashboardAgg *DashboardAggregationService, cfg *config.Config) *UsageCleanupService {
svc := NewUsageCleanupService(repo, timingWheel, dashboardAgg, cfg)
svc.Start()
return svc
}
// ProvideAccountExpiryService creates and starts AccountExpiryService.
func ProvideAccountExpiryService(accountRepo AccountRepository) *AccountExpiryService {
svc := NewAccountExpiryService(accountRepo, time.Minute)
@@ -248,6 +255,7 @@ var ProviderSet = wire.NewSet(
ProvideAccountExpiryService,
ProvideTimingWheelService,
ProvideDashboardAggregationService,
ProvideUsageCleanupService,
ProvideDeferredService,
NewAntigravityQuotaFetcher,
NewUserAttributeService,