refactor(task): add CAS-guarded updates to prevent concurrent billing conflicts

Replace all bare task.Update() (DB.Save) calls with UpdateWithStatus(),
which adds a WHERE status = ? guard to prevent concurrent processes from
overwriting each other's state transitions.

Key changes:

model/task.go:
- Add taskSnapshot struct with Equal() method for change detection
- Add Snapshot() method to capture pre-update state
- Add UpdateWithStatus(fromStatus) using DB.Where().Save() for CAS
  semantics with full-struct save (no explicit field listing needed)

model/midjourney.go:
- Add UpdateWithStatus(fromStatus string) with same CAS pattern

service/task_polling.go (updateVideoSingleTask):
- Snapshot before processing upstream response; skip DB write if unchanged
- Terminal transitions (SUCCESS/FAILURE) use UpdateWithStatus CAS:
  billing/refund only executes if this process wins the transition
- Non-terminal updates also use UpdateWithStatus to prevent overwriting
  a concurrent terminal transition back to IN_PROGRESS
- Defer settleTaskBillingOnComplete to after CAS check (shouldSettle flag)

relay/relay_task.go (tryRealtimeFetch):
- Add snapshot + change detection; use UpdateWithStatus for CAS safety

controller/midjourney.go (UpdateMidjourneyTaskBulk):
- Capture preStatus before mutations; use UpdateWithStatus CAS
- Gate refund (IncreaseUserQuota) on CAS success (won && shouldReturnQuota)

This prevents the multi-instance race condition where:
1. Instance A reads task (IN_PROGRESS), fetches upstream (still IN_PROGRESS)
2. Instance B reads same task, fetches upstream (now SUCCESS), writes SUCCESS
3. Instance A's bare Save() overwrites SUCCESS back to IN_PROGRESS
This commit is contained in:
CaIon
2026-02-22 00:52:35 +08:00
parent 6f39c02857
commit b386490d5e
5 changed files with 95 additions and 74 deletions

View File

@@ -319,6 +319,8 @@ func updateVideoSingleTask(ctx context.Context, adaptor TaskPollingAdaptor, ch *
logger.LogDebug(ctx, fmt.Sprintf("updateVideoSingleTask response: %s", string(responseBody)))
snap := task.Snapshot()
taskResult := &relaycommon.TaskInfo{}
// try parse as New API response format
var responseItems dto.TaskResponse[model.Task]
@@ -344,10 +346,9 @@ func updateVideoSingleTask(ctx context.Context, adaptor TaskPollingAdaptor, ch *
taskResult = relaycommon.FailTaskInfo("upstream returned empty status")
}
// 记录原本的状态,防止重复退款
shouldRefund := false
shouldSettle := false
quota := task.Quota
preStatus := task.Status
task.Status = model.TaskStatus(taskResult.Status)
switch taskResult.Status {
@@ -374,9 +375,7 @@ func updateVideoSingleTask(ctx context.Context, adaptor TaskPollingAdaptor, ch *
// No URL from adaptor — construct proxy URL using public task ID
task.PrivateData.ResultURL = taskcommon.BuildProxyURL(task.TaskID)
}
// 完成时计费调整:优先由 adaptor 计算,回退到 token 重算
settleTaskBillingOnComplete(ctx, adaptor, task, taskResult)
shouldSettle = true
case model.TaskStatusFailure:
logger.LogJson(ctx, fmt.Sprintf("Task %s failed", taskId), task)
task.Status = model.TaskStatusFailure
@@ -388,23 +387,39 @@ func updateVideoSingleTask(ctx context.Context, adaptor TaskPollingAdaptor, ch *
logger.LogInfo(ctx, fmt.Sprintf("Task %s failed: %s", task.TaskID, task.FailReason))
taskResult.Progress = taskcommon.ProgressComplete
if quota != 0 {
if preStatus != model.TaskStatusFailure {
shouldRefund = true
} else {
logger.LogWarn(ctx, fmt.Sprintf("Task %s already in failure status, skip refund", task.TaskID))
}
shouldRefund = true
}
default:
return fmt.Errorf("unknown task status %s for task %s", taskResult.Status, taskId)
return fmt.Errorf("unknown task status %s for task %s", taskResult.Status, task.TaskID)
}
if taskResult.Progress != "" {
task.Progress = taskResult.Progress
}
if err := task.Update(); err != nil {
common.SysLog("UpdateVideoTask task error: " + err.Error())
shouldRefund = false
isDone := task.Status == model.TaskStatusSuccess || task.Status == model.TaskStatusFailure
if isDone && snap.Status != task.Status {
won, err := task.UpdateWithStatus(snap.Status)
if err != nil {
logger.LogError(ctx, fmt.Sprintf("UpdateWithStatus failed for task %s: %s", task.TaskID, err.Error()))
shouldRefund = false
shouldSettle = false
} else if !won {
logger.LogWarn(ctx, fmt.Sprintf("Task %s already transitioned by another process, skip billing", task.TaskID))
shouldRefund = false
shouldSettle = false
}
} else if !snap.Equal(task.Snapshot()) {
if _, err := task.UpdateWithStatus(snap.Status); err != nil {
logger.LogError(ctx, fmt.Sprintf("Failed to update task %s: %s", task.TaskID, err.Error()))
}
} else {
// No changes, skip update
logger.LogDebug(ctx, fmt.Sprintf("No update needed for task %s", task.TaskID))
}
if shouldSettle {
settleTaskBillingOnComplete(ctx, adaptor, task, taskResult)
}
if shouldRefund {
RefundTaskQuota(ctx, task, task.FailReason)
}