package service import ( "context" "database/sql" "errors" "net/http" "strings" "sync" "testing" "time" "github.com/Wei-Shaw/sub2api/internal/config" infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors" "github.com/Wei-Shaw/sub2api/internal/pkg/pagination" "github.com/stretchr/testify/require" ) type cleanupDeleteResponse struct { deleted int64 err error } type cleanupDeleteCall struct { filters UsageCleanupFilters limit int } type cleanupMarkCall struct { taskID int64 deletedRows int64 errMsg string } type cleanupRepoStub struct { mu sync.Mutex created []*UsageCleanupTask createErr error listTasks []UsageCleanupTask listResult *pagination.PaginationResult listErr error claimQueue []*UsageCleanupTask claimErr error deleteQueue []cleanupDeleteResponse deleteCalls []cleanupDeleteCall markSucceeded []cleanupMarkCall markFailed []cleanupMarkCall statusByID map[int64]string statusErr error progressCalls []cleanupMarkCall updateErr error cancelCalls []int64 cancelErr error cancelResult *bool markFailedErr error } type dashboardRepoStub struct { recomputeErr error } func (s *dashboardRepoStub) AggregateRange(ctx context.Context, start, end time.Time) error { return nil } func (s *dashboardRepoStub) RecomputeRange(ctx context.Context, start, end time.Time) error { return s.recomputeErr } func (s *dashboardRepoStub) GetAggregationWatermark(ctx context.Context) (time.Time, error) { return time.Time{}, nil } func (s *dashboardRepoStub) UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error { return nil } func (s *dashboardRepoStub) CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error { return nil } func (s *dashboardRepoStub) CleanupUsageLogs(ctx context.Context, cutoff time.Time) error { return nil } func (s *dashboardRepoStub) EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error { return nil } func (s *cleanupRepoStub) CreateTask(ctx context.Context, task *UsageCleanupTask) error { if task == nil { return nil } s.mu.Lock() defer s.mu.Unlock() if s.createErr != nil { return s.createErr } if task.ID == 0 { task.ID = int64(len(s.created) + 1) } if task.CreatedAt.IsZero() { task.CreatedAt = time.Now().UTC() } if task.UpdatedAt.IsZero() { task.UpdatedAt = task.CreatedAt } clone := *task s.created = append(s.created, &clone) return nil } func (s *cleanupRepoStub) ListTasks(ctx context.Context, params pagination.PaginationParams) ([]UsageCleanupTask, *pagination.PaginationResult, error) { s.mu.Lock() defer s.mu.Unlock() return s.listTasks, s.listResult, s.listErr } func (s *cleanupRepoStub) ClaimNextPendingTask(ctx context.Context, staleRunningAfterSeconds int64) (*UsageCleanupTask, error) { s.mu.Lock() defer s.mu.Unlock() if s.claimErr != nil { return nil, s.claimErr } if len(s.claimQueue) == 0 { return nil, nil } task := s.claimQueue[0] s.claimQueue = s.claimQueue[1:] if s.statusByID == nil { s.statusByID = map[int64]string{} } s.statusByID[task.ID] = UsageCleanupStatusRunning return task, nil } func (s *cleanupRepoStub) GetTaskStatus(ctx context.Context, taskID int64) (string, error) { s.mu.Lock() defer s.mu.Unlock() if s.statusErr != nil { return "", s.statusErr } if s.statusByID == nil { return "", sql.ErrNoRows } status, ok := s.statusByID[taskID] if !ok { return "", sql.ErrNoRows } return status, nil } func (s *cleanupRepoStub) UpdateTaskProgress(ctx context.Context, taskID int64, deletedRows int64) error { s.mu.Lock() defer s.mu.Unlock() s.progressCalls = append(s.progressCalls, cleanupMarkCall{taskID: taskID, deletedRows: deletedRows}) if s.updateErr != nil { return s.updateErr } return nil } func (s *cleanupRepoStub) CancelTask(ctx context.Context, taskID int64, canceledBy int64) (bool, error) { s.mu.Lock() defer s.mu.Unlock() s.cancelCalls = append(s.cancelCalls, taskID) if s.cancelErr != nil { return false, s.cancelErr } if s.cancelResult != nil { ok := *s.cancelResult if ok { if s.statusByID == nil { s.statusByID = map[int64]string{} } s.statusByID[taskID] = UsageCleanupStatusCanceled } return ok, nil } if s.statusByID == nil { s.statusByID = map[int64]string{} } status := s.statusByID[taskID] if status != UsageCleanupStatusPending && status != UsageCleanupStatusRunning { return false, nil } s.statusByID[taskID] = UsageCleanupStatusCanceled return true, nil } func (s *cleanupRepoStub) MarkTaskSucceeded(ctx context.Context, taskID int64, deletedRows int64) error { s.mu.Lock() defer s.mu.Unlock() s.markSucceeded = append(s.markSucceeded, cleanupMarkCall{taskID: taskID, deletedRows: deletedRows}) if s.statusByID == nil { s.statusByID = map[int64]string{} } s.statusByID[taskID] = UsageCleanupStatusSucceeded return nil } func (s *cleanupRepoStub) MarkTaskFailed(ctx context.Context, taskID int64, deletedRows int64, errorMsg string) error { s.mu.Lock() defer s.mu.Unlock() s.markFailed = append(s.markFailed, cleanupMarkCall{taskID: taskID, deletedRows: deletedRows, errMsg: errorMsg}) if s.statusByID == nil { s.statusByID = map[int64]string{} } s.statusByID[taskID] = UsageCleanupStatusFailed if s.markFailedErr != nil { return s.markFailedErr } return nil } func (s *cleanupRepoStub) DeleteUsageLogsBatch(ctx context.Context, filters UsageCleanupFilters, limit int) (int64, error) { s.mu.Lock() defer s.mu.Unlock() s.deleteCalls = append(s.deleteCalls, cleanupDeleteCall{filters: filters, limit: limit}) if len(s.deleteQueue) == 0 { return 0, nil } resp := s.deleteQueue[0] s.deleteQueue = s.deleteQueue[1:] return resp.deleted, resp.err } func TestUsageCleanupServiceCreateTaskSanitizeFilters(t *testing.T) { repo := &cleanupRepoStub{} cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, MaxRangeDays: 31}} svc := NewUsageCleanupService(repo, nil, nil, cfg) start := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) end := start.Add(24 * time.Hour) userID := int64(-1) apiKeyID := int64(10) model := " gpt-4 " billingType := int8(-2) filters := UsageCleanupFilters{ StartTime: start, EndTime: end, UserID: &userID, APIKeyID: &apiKeyID, Model: &model, BillingType: &billingType, } task, err := svc.CreateTask(context.Background(), filters, 9) require.NoError(t, err) require.Equal(t, UsageCleanupStatusPending, task.Status) require.Nil(t, task.Filters.UserID) require.NotNil(t, task.Filters.APIKeyID) require.Equal(t, apiKeyID, *task.Filters.APIKeyID) require.NotNil(t, task.Filters.Model) require.Equal(t, "gpt-4", *task.Filters.Model) require.Nil(t, task.Filters.BillingType) require.Equal(t, int64(9), task.CreatedBy) } func TestUsageCleanupServiceCreateTaskInvalidCreator(t *testing.T) { repo := &cleanupRepoStub{} cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}} svc := NewUsageCleanupService(repo, nil, nil, cfg) filters := UsageCleanupFilters{ StartTime: time.Now(), EndTime: time.Now().Add(24 * time.Hour), } _, err := svc.CreateTask(context.Background(), filters, 0) require.Error(t, err) require.Equal(t, "USAGE_CLEANUP_INVALID_CREATOR", infraerrors.Reason(err)) } func TestUsageCleanupServiceCreateTaskDisabled(t *testing.T) { repo := &cleanupRepoStub{} cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: false}} svc := NewUsageCleanupService(repo, nil, nil, cfg) filters := UsageCleanupFilters{ StartTime: time.Now(), EndTime: time.Now().Add(24 * time.Hour), } _, err := svc.CreateTask(context.Background(), filters, 1) require.Error(t, err) require.Equal(t, http.StatusServiceUnavailable, infraerrors.Code(err)) require.Equal(t, "USAGE_CLEANUP_DISABLED", infraerrors.Reason(err)) } func TestUsageCleanupServiceCreateTaskRangeTooLarge(t *testing.T) { repo := &cleanupRepoStub{} cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, MaxRangeDays: 1}} svc := NewUsageCleanupService(repo, nil, nil, cfg) start := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) end := start.Add(48 * time.Hour) filters := UsageCleanupFilters{StartTime: start, EndTime: end} _, err := svc.CreateTask(context.Background(), filters, 1) require.Error(t, err) require.Equal(t, "USAGE_CLEANUP_RANGE_TOO_LARGE", infraerrors.Reason(err)) } func TestUsageCleanupServiceCreateTaskMissingRange(t *testing.T) { repo := &cleanupRepoStub{} cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}} svc := NewUsageCleanupService(repo, nil, nil, cfg) _, err := svc.CreateTask(context.Background(), UsageCleanupFilters{}, 1) require.Error(t, err) require.Equal(t, "USAGE_CLEANUP_MISSING_RANGE", infraerrors.Reason(err)) } func TestUsageCleanupServiceCreateTaskRepoError(t *testing.T) { repo := &cleanupRepoStub{createErr: errors.New("db down")} cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}} svc := NewUsageCleanupService(repo, nil, nil, cfg) filters := UsageCleanupFilters{ StartTime: time.Now(), EndTime: time.Now().Add(24 * time.Hour), } _, err := svc.CreateTask(context.Background(), filters, 1) require.Error(t, err) require.Contains(t, err.Error(), "create cleanup task") } func TestUsageCleanupServiceRunOnceSuccess(t *testing.T) { start := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) end := start.Add(2 * time.Hour) repo := &cleanupRepoStub{ claimQueue: []*UsageCleanupTask{ {ID: 5, Filters: UsageCleanupFilters{StartTime: start, EndTime: end}}, }, deleteQueue: []cleanupDeleteResponse{ {deleted: 2}, {deleted: 2}, {deleted: 1}, }, } cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2, TaskTimeoutSeconds: 30}} svc := NewUsageCleanupService(repo, nil, nil, cfg) svc.runOnce() repo.mu.Lock() defer repo.mu.Unlock() require.Len(t, repo.deleteCalls, 3) require.Equal(t, 2, repo.deleteCalls[0].limit) require.True(t, repo.deleteCalls[0].filters.StartTime.Equal(start)) require.True(t, repo.deleteCalls[0].filters.EndTime.Equal(end)) require.Len(t, repo.markSucceeded, 1) require.Empty(t, repo.markFailed) require.Equal(t, int64(5), repo.markSucceeded[0].taskID) require.Equal(t, int64(5), repo.markSucceeded[0].deletedRows) require.Equal(t, 2, repo.deleteCalls[0].limit) require.Equal(t, start, repo.deleteCalls[0].filters.StartTime) require.Equal(t, end, repo.deleteCalls[0].filters.EndTime) } func TestUsageCleanupServiceRunOnceClaimError(t *testing.T) { repo := &cleanupRepoStub{claimErr: errors.New("claim failed")} cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}} svc := NewUsageCleanupService(repo, nil, nil, cfg) svc.runOnce() repo.mu.Lock() defer repo.mu.Unlock() require.Empty(t, repo.markSucceeded) require.Empty(t, repo.markFailed) } func TestUsageCleanupServiceRunOnceAlreadyRunning(t *testing.T) { repo := &cleanupRepoStub{} cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}} svc := NewUsageCleanupService(repo, nil, nil, cfg) svc.running = 1 svc.runOnce() } func TestUsageCleanupServiceExecuteTaskFailed(t *testing.T) { longMsg := strings.Repeat("x", 600) repo := &cleanupRepoStub{ deleteQueue: []cleanupDeleteResponse{ {err: errors.New(longMsg)}, }, } cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 3}} svc := NewUsageCleanupService(repo, nil, nil, cfg) task := &UsageCleanupTask{ ID: 11, Filters: UsageCleanupFilters{ StartTime: time.Now(), EndTime: time.Now().Add(24 * time.Hour), }, } svc.executeTask(context.Background(), task) repo.mu.Lock() defer repo.mu.Unlock() require.Len(t, repo.markFailed, 1) require.Equal(t, int64(11), repo.markFailed[0].taskID) require.Equal(t, 500, len(repo.markFailed[0].errMsg)) } func TestUsageCleanupServiceExecuteTaskProgressError(t *testing.T) { repo := &cleanupRepoStub{ deleteQueue: []cleanupDeleteResponse{ {deleted: 2}, {deleted: 0}, }, updateErr: errors.New("update failed"), } cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}} svc := NewUsageCleanupService(repo, nil, nil, cfg) task := &UsageCleanupTask{ ID: 8, Filters: UsageCleanupFilters{ StartTime: time.Now().UTC(), EndTime: time.Now().UTC().Add(time.Hour), }, } svc.executeTask(context.Background(), task) repo.mu.Lock() defer repo.mu.Unlock() require.Len(t, repo.markSucceeded, 1) require.Empty(t, repo.markFailed) require.Len(t, repo.progressCalls, 1) } func TestUsageCleanupServiceExecuteTaskDeleteCanceled(t *testing.T) { repo := &cleanupRepoStub{ deleteQueue: []cleanupDeleteResponse{ {err: context.Canceled}, }, } cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}} svc := NewUsageCleanupService(repo, nil, nil, cfg) task := &UsageCleanupTask{ ID: 12, Filters: UsageCleanupFilters{ StartTime: time.Now().UTC(), EndTime: time.Now().UTC().Add(time.Hour), }, } svc.executeTask(context.Background(), task) repo.mu.Lock() defer repo.mu.Unlock() require.Empty(t, repo.markSucceeded) require.Empty(t, repo.markFailed) } func TestUsageCleanupServiceExecuteTaskContextCanceled(t *testing.T) { repo := &cleanupRepoStub{} cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}} svc := NewUsageCleanupService(repo, nil, nil, cfg) task := &UsageCleanupTask{ ID: 9, Filters: UsageCleanupFilters{ StartTime: time.Now().UTC(), EndTime: time.Now().UTC().Add(time.Hour), }, } ctx, cancel := context.WithCancel(context.Background()) cancel() svc.executeTask(ctx, task) repo.mu.Lock() defer repo.mu.Unlock() require.Empty(t, repo.markSucceeded) require.Empty(t, repo.markFailed) require.Empty(t, repo.deleteCalls) } func TestUsageCleanupServiceExecuteTaskMarkFailedUpdateError(t *testing.T) { repo := &cleanupRepoStub{ deleteQueue: []cleanupDeleteResponse{ {err: errors.New("boom")}, }, markFailedErr: errors.New("update failed"), } cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}} svc := NewUsageCleanupService(repo, nil, nil, cfg) task := &UsageCleanupTask{ ID: 13, Filters: UsageCleanupFilters{ StartTime: time.Now().UTC(), EndTime: time.Now().UTC().Add(time.Hour), }, } svc.executeTask(context.Background(), task) repo.mu.Lock() defer repo.mu.Unlock() require.Len(t, repo.markFailed, 1) require.Equal(t, int64(13), repo.markFailed[0].taskID) } func TestUsageCleanupServiceExecuteTaskDashboardRecomputeError(t *testing.T) { repo := &cleanupRepoStub{ deleteQueue: []cleanupDeleteResponse{ {deleted: 0}, }, } dashboard := NewDashboardAggregationService(&dashboardRepoStub{}, nil, &config.Config{ DashboardAgg: config.DashboardAggregationConfig{Enabled: false}, }) cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}} svc := NewUsageCleanupService(repo, nil, dashboard, cfg) task := &UsageCleanupTask{ ID: 14, Filters: UsageCleanupFilters{ StartTime: time.Now().UTC(), EndTime: time.Now().UTC().Add(time.Hour), }, } svc.executeTask(context.Background(), task) repo.mu.Lock() defer repo.mu.Unlock() require.Len(t, repo.markSucceeded, 1) } func TestUsageCleanupServiceExecuteTaskDashboardRecomputeSuccess(t *testing.T) { repo := &cleanupRepoStub{ deleteQueue: []cleanupDeleteResponse{ {deleted: 0}, }, } dashboard := NewDashboardAggregationService(&dashboardRepoStub{}, nil, &config.Config{ DashboardAgg: config.DashboardAggregationConfig{Enabled: true}, }) cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}} svc := NewUsageCleanupService(repo, nil, dashboard, cfg) task := &UsageCleanupTask{ ID: 15, Filters: UsageCleanupFilters{ StartTime: time.Now().UTC(), EndTime: time.Now().UTC().Add(time.Hour), }, } svc.executeTask(context.Background(), task) repo.mu.Lock() defer repo.mu.Unlock() require.Len(t, repo.markSucceeded, 1) } func TestUsageCleanupServiceExecuteTaskCanceled(t *testing.T) { repo := &cleanupRepoStub{ statusByID: map[int64]string{ 3: UsageCleanupStatusCanceled, }, } cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}} svc := NewUsageCleanupService(repo, nil, nil, cfg) task := &UsageCleanupTask{ ID: 3, Filters: UsageCleanupFilters{ StartTime: time.Now().UTC(), EndTime: time.Now().UTC().Add(time.Hour), }, } svc.executeTask(context.Background(), task) repo.mu.Lock() defer repo.mu.Unlock() require.Empty(t, repo.deleteCalls) require.Empty(t, repo.markSucceeded) require.Empty(t, repo.markFailed) } func TestUsageCleanupServiceCancelTaskSuccess(t *testing.T) { repo := &cleanupRepoStub{ statusByID: map[int64]string{ 5: UsageCleanupStatusPending, }, } cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}} svc := NewUsageCleanupService(repo, nil, nil, cfg) err := svc.CancelTask(context.Background(), 5, 9) require.NoError(t, err) repo.mu.Lock() defer repo.mu.Unlock() require.Equal(t, UsageCleanupStatusCanceled, repo.statusByID[5]) require.Len(t, repo.cancelCalls, 1) } func TestUsageCleanupServiceCancelTaskDisabled(t *testing.T) { repo := &cleanupRepoStub{} cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: false}} svc := NewUsageCleanupService(repo, nil, nil, cfg) err := svc.CancelTask(context.Background(), 1, 2) require.Error(t, err) require.Equal(t, http.StatusServiceUnavailable, infraerrors.Code(err)) require.Equal(t, "USAGE_CLEANUP_DISABLED", infraerrors.Reason(err)) } func TestUsageCleanupServiceCancelTaskNotFound(t *testing.T) { repo := &cleanupRepoStub{} cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}} svc := NewUsageCleanupService(repo, nil, nil, cfg) err := svc.CancelTask(context.Background(), 999, 1) require.Error(t, err) require.Equal(t, http.StatusNotFound, infraerrors.Code(err)) require.Equal(t, "USAGE_CLEANUP_TASK_NOT_FOUND", infraerrors.Reason(err)) } func TestUsageCleanupServiceCancelTaskStatusError(t *testing.T) { repo := &cleanupRepoStub{statusErr: errors.New("status broken")} cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}} svc := NewUsageCleanupService(repo, nil, nil, cfg) err := svc.CancelTask(context.Background(), 7, 1) require.Error(t, err) require.Contains(t, err.Error(), "status broken") } func TestUsageCleanupServiceCancelTaskConflict(t *testing.T) { repo := &cleanupRepoStub{ statusByID: map[int64]string{ 7: UsageCleanupStatusSucceeded, }, } cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}} svc := NewUsageCleanupService(repo, nil, nil, cfg) err := svc.CancelTask(context.Background(), 7, 1) require.Error(t, err) require.Equal(t, http.StatusConflict, infraerrors.Code(err)) require.Equal(t, "USAGE_CLEANUP_CANCEL_CONFLICT", infraerrors.Reason(err)) } func TestUsageCleanupServiceCancelTaskRepoConflict(t *testing.T) { shouldCancel := false repo := &cleanupRepoStub{ statusByID: map[int64]string{ 7: UsageCleanupStatusPending, }, cancelResult: &shouldCancel, } cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}} svc := NewUsageCleanupService(repo, nil, nil, cfg) err := svc.CancelTask(context.Background(), 7, 1) require.Error(t, err) require.Equal(t, http.StatusConflict, infraerrors.Code(err)) require.Equal(t, "USAGE_CLEANUP_CANCEL_CONFLICT", infraerrors.Reason(err)) } func TestUsageCleanupServiceCancelTaskRepoError(t *testing.T) { repo := &cleanupRepoStub{ statusByID: map[int64]string{ 7: UsageCleanupStatusPending, }, cancelErr: errors.New("cancel failed"), } cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}} svc := NewUsageCleanupService(repo, nil, nil, cfg) err := svc.CancelTask(context.Background(), 7, 1) require.Error(t, err) require.Contains(t, err.Error(), "cancel failed") } func TestUsageCleanupServiceCancelTaskInvalidCanceller(t *testing.T) { repo := &cleanupRepoStub{ statusByID: map[int64]string{ 7: UsageCleanupStatusRunning, }, } cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}} svc := NewUsageCleanupService(repo, nil, nil, cfg) err := svc.CancelTask(context.Background(), 7, 0) require.Error(t, err) require.Equal(t, "USAGE_CLEANUP_INVALID_CANCELLER", infraerrors.Reason(err)) } func TestUsageCleanupServiceListTasks(t *testing.T) { repo := &cleanupRepoStub{ listTasks: []UsageCleanupTask{{ID: 1}, {ID: 2}}, listResult: &pagination.PaginationResult{ Total: 2, Page: 1, PageSize: 20, Pages: 1, }, } svc := NewUsageCleanupService(repo, nil, nil, &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}) tasks, result, err := svc.ListTasks(context.Background(), pagination.PaginationParams{Page: 1, PageSize: 20}) require.NoError(t, err) require.Len(t, tasks, 2) require.Equal(t, int64(2), result.Total) } func TestUsageCleanupServiceListTasksNotReady(t *testing.T) { var nilSvc *UsageCleanupService _, _, err := nilSvc.ListTasks(context.Background(), pagination.PaginationParams{Page: 1, PageSize: 20}) require.Error(t, err) svc := NewUsageCleanupService(nil, nil, nil, &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}) _, _, err = svc.ListTasks(context.Background(), pagination.PaginationParams{Page: 1, PageSize: 20}) require.Error(t, err) } func TestUsageCleanupServiceDefaultsAndLifecycle(t *testing.T) { var nilSvc *UsageCleanupService require.Equal(t, 31, nilSvc.maxRangeDays()) require.Equal(t, 5000, nilSvc.batchSize()) require.Equal(t, 10*time.Second, nilSvc.workerInterval()) require.Equal(t, 30*time.Minute, nilSvc.taskTimeout()) nilSvc.Start() nilSvc.Stop() repo := &cleanupRepoStub{} cfgDisabled := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: false}} svcDisabled := NewUsageCleanupService(repo, nil, nil, cfgDisabled) svcDisabled.Start() svcDisabled.Stop() timingWheel, err := NewTimingWheelService() require.NoError(t, err) cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, WorkerIntervalSeconds: 5}} svc := NewUsageCleanupService(repo, timingWheel, nil, cfg) require.Equal(t, 5*time.Second, svc.workerInterval()) svc.Start() svc.Stop() cfgFallback := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}} svcFallback := NewUsageCleanupService(repo, timingWheel, nil, cfgFallback) require.Equal(t, 31, svcFallback.maxRangeDays()) require.Equal(t, 5000, svcFallback.batchSize()) require.Equal(t, 10*time.Second, svcFallback.workerInterval()) svcMissingDeps := NewUsageCleanupService(nil, nil, nil, cfgFallback) svcMissingDeps.Start() } func TestSanitizeUsageCleanupFiltersModelEmpty(t *testing.T) { model := " " apiKeyID := int64(-5) accountID := int64(-1) groupID := int64(-2) filters := UsageCleanupFilters{ UserID: &apiKeyID, APIKeyID: &apiKeyID, AccountID: &accountID, GroupID: &groupID, Model: &model, } sanitizeUsageCleanupFilters(&filters) require.Nil(t, filters.UserID) require.Nil(t, filters.APIKeyID) require.Nil(t, filters.AccountID) require.Nil(t, filters.GroupID) require.Nil(t, filters.Model) } func TestDescribeUsageCleanupFiltersAllFields(t *testing.T) { start := time.Date(2024, 2, 1, 10, 0, 0, 0, time.UTC) end := start.Add(2 * time.Hour) userID := int64(1) apiKeyID := int64(2) accountID := int64(3) groupID := int64(4) model := " gpt-4 " stream := true billingType := int8(2) filters := UsageCleanupFilters{ StartTime: start, EndTime: end, UserID: &userID, APIKeyID: &apiKeyID, AccountID: &accountID, GroupID: &groupID, Model: &model, Stream: &stream, BillingType: &billingType, } desc := describeUsageCleanupFilters(filters) require.Equal(t, "start=2024-02-01T10:00:00Z end=2024-02-01T12:00:00Z user_id=1 api_key_id=2 account_id=3 group_id=4 model=gpt-4 stream=true billing_type=2", desc) } func TestUsageCleanupServiceIsTaskCanceledNotFound(t *testing.T) { repo := &cleanupRepoStub{} svc := NewUsageCleanupService(repo, nil, nil, &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}) canceled, err := svc.isTaskCanceled(context.Background(), 9) require.NoError(t, err) require.False(t, canceled) } func TestUsageCleanupServiceIsTaskCanceledError(t *testing.T) { repo := &cleanupRepoStub{statusErr: errors.New("status err")} svc := NewUsageCleanupService(repo, nil, nil, &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}) _, err := svc.isTaskCanceled(context.Background(), 9) require.Error(t, err) require.Contains(t, err.Error(), "status err") }