feat(清理任务): 引入Ent存储并补充日志与测试

新增 usage_cleanup_task Ent schema 与仓储实现,支持清理任务排序分页
补充清理任务全链路日志、仪表盘重算触发及 UI 过滤调整
完善 repository/service 单测并引入 sqlite 测试依赖
This commit is contained in:
yangjianbo
2026-01-18 14:18:28 +08:00
parent bf7b79f2f0
commit bd18f4b8ef
29 changed files with 5920 additions and 47 deletions

View File

@@ -20,7 +20,7 @@ var (
// ErrDashboardBackfillDisabled 当配置禁用回填时返回。
ErrDashboardBackfillDisabled = errors.New("仪表盘聚合回填已禁用")
// ErrDashboardBackfillTooLarge 当回填跨度超过限制时返回。
ErrDashboardBackfillTooLarge = errors.New("回填时间跨度过大")
ErrDashboardBackfillTooLarge = errors.New("回填时间跨度过大")
errDashboardAggregationRunning = errors.New("聚合作业正在运行")
)

View File

@@ -151,20 +151,24 @@ func (s *UsageCleanupService) CreateTask(ctx context.Context, filters UsageClean
}
func (s *UsageCleanupService) runOnce() {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
svc := s
if svc == nil {
return
}
if !atomic.CompareAndSwapInt32(&svc.running, 0, 1) {
log.Printf("[UsageCleanup] run_once skipped: already_running=true")
return
}
defer atomic.StoreInt32(&s.running, 0)
defer atomic.StoreInt32(&svc.running, 0)
parent := context.Background()
if s != nil && s.workerCtx != nil {
parent = s.workerCtx
if svc.workerCtx != nil {
parent = svc.workerCtx
}
ctx, cancel := context.WithTimeout(parent, s.taskTimeout())
ctx, cancel := context.WithTimeout(parent, svc.taskTimeout())
defer cancel()
task, err := s.repo.ClaimNextPendingTask(ctx, int64(s.taskTimeout().Seconds()))
task, err := svc.repo.ClaimNextPendingTask(ctx, int64(svc.taskTimeout().Seconds()))
if err != nil {
log.Printf("[UsageCleanup] claim pending task failed: %v", err)
return
@@ -175,7 +179,7 @@ func (s *UsageCleanupService) runOnce() {
}
log.Printf("[UsageCleanup] task claimed: task=%d status=%s created_by=%d deleted_rows=%d %s", task.ID, task.Status, task.CreatedBy, task.DeletedRows, describeUsageCleanupFilters(task.Filters))
s.executeTask(ctx, task)
svc.executeTask(ctx, task)
}
func (s *UsageCleanupService) executeTask(ctx context.Context, task *UsageCleanupTask) {

View File

@@ -46,8 +46,45 @@ type cleanupRepoStub struct {
markSucceeded []cleanupMarkCall
markFailed []cleanupMarkCall
statusByID map[int64]string
statusErr error
progressCalls []cleanupMarkCall
updateErr error
cancelCalls []int64
cancelErr error
cancelResult *bool
markFailedErr error
}
type dashboardRepoStub struct {
recomputeErr error
}
func (s *dashboardRepoStub) AggregateRange(ctx context.Context, start, end time.Time) error {
return nil
}
func (s *dashboardRepoStub) RecomputeRange(ctx context.Context, start, end time.Time) error {
return s.recomputeErr
}
func (s *dashboardRepoStub) GetAggregationWatermark(ctx context.Context) (time.Time, error) {
return time.Time{}, nil
}
func (s *dashboardRepoStub) UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error {
return nil
}
func (s *dashboardRepoStub) CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error {
return nil
}
func (s *dashboardRepoStub) CleanupUsageLogs(ctx context.Context, cutoff time.Time) error {
return nil
}
func (s *dashboardRepoStub) EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error {
return nil
}
func (s *cleanupRepoStub) CreateTask(ctx context.Context, task *UsageCleanupTask) error {
@@ -100,6 +137,9 @@ func (s *cleanupRepoStub) ClaimNextPendingTask(ctx context.Context, staleRunning
func (s *cleanupRepoStub) GetTaskStatus(ctx context.Context, taskID int64) (string, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.statusErr != nil {
return "", s.statusErr
}
if s.statusByID == nil {
return "", sql.ErrNoRows
}
@@ -114,6 +154,9 @@ func (s *cleanupRepoStub) UpdateTaskProgress(ctx context.Context, taskID int64,
s.mu.Lock()
defer s.mu.Unlock()
s.progressCalls = append(s.progressCalls, cleanupMarkCall{taskID: taskID, deletedRows: deletedRows})
if s.updateErr != nil {
return s.updateErr
}
return nil
}
@@ -121,6 +164,19 @@ func (s *cleanupRepoStub) CancelTask(ctx context.Context, taskID int64, canceled
s.mu.Lock()
defer s.mu.Unlock()
s.cancelCalls = append(s.cancelCalls, taskID)
if s.cancelErr != nil {
return false, s.cancelErr
}
if s.cancelResult != nil {
ok := *s.cancelResult
if ok {
if s.statusByID == nil {
s.statusByID = map[int64]string{}
}
s.statusByID[taskID] = UsageCleanupStatusCanceled
}
return ok, nil
}
if s.statusByID == nil {
s.statusByID = map[int64]string{}
}
@@ -151,6 +207,9 @@ func (s *cleanupRepoStub) MarkTaskFailed(ctx context.Context, taskID int64, dele
s.statusByID = map[int64]string{}
}
s.statusByID[taskID] = UsageCleanupStatusFailed
if s.markFailedErr != nil {
return s.markFailedErr
}
return nil
}
@@ -266,9 +325,11 @@ func TestUsageCleanupServiceCreateTaskRepoError(t *testing.T) {
}
func TestUsageCleanupServiceRunOnceSuccess(t *testing.T) {
start := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
end := start.Add(2 * time.Hour)
repo := &cleanupRepoStub{
claimQueue: []*UsageCleanupTask{
{ID: 5, Filters: UsageCleanupFilters{StartTime: time.Now(), EndTime: time.Now().Add(2 * time.Hour)}},
{ID: 5, Filters: UsageCleanupFilters{StartTime: start, EndTime: end}},
},
deleteQueue: []cleanupDeleteResponse{
{deleted: 2},
@@ -288,6 +349,9 @@ func TestUsageCleanupServiceRunOnceSuccess(t *testing.T) {
require.Empty(t, repo.markFailed)
require.Equal(t, int64(5), repo.markSucceeded[0].taskID)
require.Equal(t, int64(5), repo.markSucceeded[0].deletedRows)
require.Equal(t, 2, repo.deleteCalls[0].limit)
require.Equal(t, start, repo.deleteCalls[0].filters.StartTime)
require.Equal(t, end, repo.deleteCalls[0].filters.EndTime)
}
func TestUsageCleanupServiceRunOnceClaimError(t *testing.T) {
@@ -336,6 +400,293 @@ func TestUsageCleanupServiceExecuteTaskFailed(t *testing.T) {
require.Equal(t, 500, len(repo.markFailed[0].errMsg))
}
func TestUsageCleanupServiceExecuteTaskProgressError(t *testing.T) {
repo := &cleanupRepoStub{
deleteQueue: []cleanupDeleteResponse{
{deleted: 2},
{deleted: 0},
},
updateErr: errors.New("update failed"),
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
task := &UsageCleanupTask{
ID: 8,
Filters: UsageCleanupFilters{
StartTime: time.Now().UTC(),
EndTime: time.Now().UTC().Add(time.Hour),
},
}
svc.executeTask(context.Background(), task)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Len(t, repo.markSucceeded, 1)
require.Empty(t, repo.markFailed)
require.Len(t, repo.progressCalls, 1)
}
func TestUsageCleanupServiceExecuteTaskDeleteCanceled(t *testing.T) {
repo := &cleanupRepoStub{
deleteQueue: []cleanupDeleteResponse{
{err: context.Canceled},
},
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
task := &UsageCleanupTask{
ID: 12,
Filters: UsageCleanupFilters{
StartTime: time.Now().UTC(),
EndTime: time.Now().UTC().Add(time.Hour),
},
}
svc.executeTask(context.Background(), task)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Empty(t, repo.markSucceeded)
require.Empty(t, repo.markFailed)
}
func TestUsageCleanupServiceExecuteTaskContextCanceled(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
task := &UsageCleanupTask{
ID: 9,
Filters: UsageCleanupFilters{
StartTime: time.Now().UTC(),
EndTime: time.Now().UTC().Add(time.Hour),
},
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
svc.executeTask(ctx, task)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Empty(t, repo.markSucceeded)
require.Empty(t, repo.markFailed)
require.Empty(t, repo.deleteCalls)
}
func TestUsageCleanupServiceExecuteTaskMarkFailedUpdateError(t *testing.T) {
repo := &cleanupRepoStub{
deleteQueue: []cleanupDeleteResponse{
{err: errors.New("boom")},
},
markFailedErr: errors.New("update failed"),
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
task := &UsageCleanupTask{
ID: 13,
Filters: UsageCleanupFilters{
StartTime: time.Now().UTC(),
EndTime: time.Now().UTC().Add(time.Hour),
},
}
svc.executeTask(context.Background(), task)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Len(t, repo.markFailed, 1)
require.Equal(t, int64(13), repo.markFailed[0].taskID)
}
func TestUsageCleanupServiceExecuteTaskDashboardRecomputeError(t *testing.T) {
repo := &cleanupRepoStub{
deleteQueue: []cleanupDeleteResponse{
{deleted: 0},
},
}
dashboard := NewDashboardAggregationService(&dashboardRepoStub{}, nil, &config.Config{
DashboardAgg: config.DashboardAggregationConfig{Enabled: false},
})
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}}
svc := NewUsageCleanupService(repo, nil, dashboard, cfg)
task := &UsageCleanupTask{
ID: 14,
Filters: UsageCleanupFilters{
StartTime: time.Now().UTC(),
EndTime: time.Now().UTC().Add(time.Hour),
},
}
svc.executeTask(context.Background(), task)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Len(t, repo.markSucceeded, 1)
}
func TestUsageCleanupServiceExecuteTaskDashboardRecomputeSuccess(t *testing.T) {
repo := &cleanupRepoStub{
deleteQueue: []cleanupDeleteResponse{
{deleted: 0},
},
}
dashboard := NewDashboardAggregationService(&dashboardRepoStub{}, nil, &config.Config{
DashboardAgg: config.DashboardAggregationConfig{Enabled: true},
})
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}}
svc := NewUsageCleanupService(repo, nil, dashboard, cfg)
task := &UsageCleanupTask{
ID: 15,
Filters: UsageCleanupFilters{
StartTime: time.Now().UTC(),
EndTime: time.Now().UTC().Add(time.Hour),
},
}
svc.executeTask(context.Background(), task)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Len(t, repo.markSucceeded, 1)
}
func TestUsageCleanupServiceExecuteTaskCanceled(t *testing.T) {
repo := &cleanupRepoStub{
statusByID: map[int64]string{
3: UsageCleanupStatusCanceled,
},
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true, BatchSize: 2}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
task := &UsageCleanupTask{
ID: 3,
Filters: UsageCleanupFilters{
StartTime: time.Now().UTC(),
EndTime: time.Now().UTC().Add(time.Hour),
},
}
svc.executeTask(context.Background(), task)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Empty(t, repo.deleteCalls)
require.Empty(t, repo.markSucceeded)
require.Empty(t, repo.markFailed)
}
func TestUsageCleanupServiceCancelTaskSuccess(t *testing.T) {
repo := &cleanupRepoStub{
statusByID: map[int64]string{
5: UsageCleanupStatusPending,
},
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
err := svc.CancelTask(context.Background(), 5, 9)
require.NoError(t, err)
repo.mu.Lock()
defer repo.mu.Unlock()
require.Equal(t, UsageCleanupStatusCanceled, repo.statusByID[5])
require.Len(t, repo.cancelCalls, 1)
}
func TestUsageCleanupServiceCancelTaskDisabled(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: false}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
err := svc.CancelTask(context.Background(), 1, 2)
require.Error(t, err)
require.Equal(t, http.StatusServiceUnavailable, infraerrors.Code(err))
require.Equal(t, "USAGE_CLEANUP_DISABLED", infraerrors.Reason(err))
}
func TestUsageCleanupServiceCancelTaskNotFound(t *testing.T) {
repo := &cleanupRepoStub{}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
err := svc.CancelTask(context.Background(), 999, 1)
require.Error(t, err)
require.Equal(t, http.StatusNotFound, infraerrors.Code(err))
require.Equal(t, "USAGE_CLEANUP_TASK_NOT_FOUND", infraerrors.Reason(err))
}
func TestUsageCleanupServiceCancelTaskStatusError(t *testing.T) {
repo := &cleanupRepoStub{statusErr: errors.New("status broken")}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
err := svc.CancelTask(context.Background(), 7, 1)
require.Error(t, err)
require.Contains(t, err.Error(), "status broken")
}
func TestUsageCleanupServiceCancelTaskConflict(t *testing.T) {
repo := &cleanupRepoStub{
statusByID: map[int64]string{
7: UsageCleanupStatusSucceeded,
},
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
err := svc.CancelTask(context.Background(), 7, 1)
require.Error(t, err)
require.Equal(t, http.StatusConflict, infraerrors.Code(err))
require.Equal(t, "USAGE_CLEANUP_CANCEL_CONFLICT", infraerrors.Reason(err))
}
func TestUsageCleanupServiceCancelTaskRepoConflict(t *testing.T) {
shouldCancel := false
repo := &cleanupRepoStub{
statusByID: map[int64]string{
7: UsageCleanupStatusPending,
},
cancelResult: &shouldCancel,
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
err := svc.CancelTask(context.Background(), 7, 1)
require.Error(t, err)
require.Equal(t, http.StatusConflict, infraerrors.Code(err))
require.Equal(t, "USAGE_CLEANUP_CANCEL_CONFLICT", infraerrors.Reason(err))
}
func TestUsageCleanupServiceCancelTaskRepoError(t *testing.T) {
repo := &cleanupRepoStub{
statusByID: map[int64]string{
7: UsageCleanupStatusPending,
},
cancelErr: errors.New("cancel failed"),
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
err := svc.CancelTask(context.Background(), 7, 1)
require.Error(t, err)
require.Contains(t, err.Error(), "cancel failed")
}
func TestUsageCleanupServiceCancelTaskInvalidCanceller(t *testing.T) {
repo := &cleanupRepoStub{
statusByID: map[int64]string{
7: UsageCleanupStatusRunning,
},
}
cfg := &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}}
svc := NewUsageCleanupService(repo, nil, nil, cfg)
err := svc.CancelTask(context.Background(), 7, 0)
require.Error(t, err)
require.Equal(t, "USAGE_CLEANUP_INVALID_CANCELLER", infraerrors.Reason(err))
}
func TestUsageCleanupServiceListTasks(t *testing.T) {
repo := &cleanupRepoStub{
listTasks: []UsageCleanupTask{{ID: 1}, {ID: 2}},
@@ -418,3 +769,47 @@ func TestSanitizeUsageCleanupFiltersModelEmpty(t *testing.T) {
require.Nil(t, filters.GroupID)
require.Nil(t, filters.Model)
}
func TestDescribeUsageCleanupFiltersAllFields(t *testing.T) {
start := time.Date(2024, 2, 1, 10, 0, 0, 0, time.UTC)
end := start.Add(2 * time.Hour)
userID := int64(1)
apiKeyID := int64(2)
accountID := int64(3)
groupID := int64(4)
model := " gpt-4 "
stream := true
billingType := int8(2)
filters := UsageCleanupFilters{
StartTime: start,
EndTime: end,
UserID: &userID,
APIKeyID: &apiKeyID,
AccountID: &accountID,
GroupID: &groupID,
Model: &model,
Stream: &stream,
BillingType: &billingType,
}
desc := describeUsageCleanupFilters(filters)
require.Equal(t, "start=2024-02-01T10:00:00Z end=2024-02-01T12:00:00Z user_id=1 api_key_id=2 account_id=3 group_id=4 model=gpt-4 stream=true billing_type=2", desc)
}
func TestUsageCleanupServiceIsTaskCanceledNotFound(t *testing.T) {
repo := &cleanupRepoStub{}
svc := NewUsageCleanupService(repo, nil, nil, &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}})
canceled, err := svc.isTaskCanceled(context.Background(), 9)
require.NoError(t, err)
require.False(t, canceled)
}
func TestUsageCleanupServiceIsTaskCanceledError(t *testing.T) {
repo := &cleanupRepoStub{statusErr: errors.New("status err")}
svc := NewUsageCleanupService(repo, nil, nil, &config.Config{UsageCleanup: config.UsageCleanupConfig{Enabled: true}})
_, err := svc.isTaskCanceled(context.Background(), 9)
require.Error(t, err)
require.Contains(t, err.Error(), "status err")
}