feat(ops): 增强ops核心服务功能和重试机制
This commit is contained in:
@@ -108,6 +108,10 @@ func (w *limitedResponseWriter) truncated() bool {
|
||||
return w.totalWritten > int64(w.limit)
|
||||
}
|
||||
|
||||
const (
|
||||
OpsRetryModeUpstreamEvent = "upstream_event"
|
||||
)
|
||||
|
||||
func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, errorID int64, mode string, pinnedAccountID *int64) (*OpsRetryResult, error) {
|
||||
if err := s.RequireMonitoringEnabled(ctx); err != nil {
|
||||
return nil, err
|
||||
@@ -123,6 +127,81 @@ func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, er
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_INVALID_MODE", "mode must be client or upstream")
|
||||
}
|
||||
|
||||
errorLog, err := s.GetErrorLogByID(ctx, errorID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if errorLog == nil {
|
||||
return nil, infraerrors.NotFound("OPS_ERROR_NOT_FOUND", "ops error log not found")
|
||||
}
|
||||
if strings.TrimSpace(errorLog.RequestBody) == "" {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_NO_REQUEST_BODY", "No request body found to retry")
|
||||
}
|
||||
|
||||
var pinned *int64
|
||||
if mode == OpsRetryModeUpstream {
|
||||
if pinnedAccountID != nil && *pinnedAccountID > 0 {
|
||||
pinned = pinnedAccountID
|
||||
} else if errorLog.AccountID != nil && *errorLog.AccountID > 0 {
|
||||
pinned = errorLog.AccountID
|
||||
} else {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "pinned_account_id is required for upstream retry")
|
||||
}
|
||||
}
|
||||
|
||||
return s.retryWithErrorLog(ctx, requestedByUserID, errorID, mode, mode, pinned, errorLog)
|
||||
}
|
||||
|
||||
// RetryUpstreamEvent retries a specific upstream attempt captured inside ops_error_logs.upstream_errors.
|
||||
// idx is 0-based. It always pins the original event account_id.
|
||||
func (s *OpsService) RetryUpstreamEvent(ctx context.Context, requestedByUserID int64, errorID int64, idx int) (*OpsRetryResult, error) {
|
||||
if err := s.RequireMonitoringEnabled(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.opsRepo == nil {
|
||||
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
|
||||
}
|
||||
if idx < 0 {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_INVALID_UPSTREAM_IDX", "invalid upstream idx")
|
||||
}
|
||||
|
||||
errorLog, err := s.GetErrorLogByID(ctx, errorID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if errorLog == nil {
|
||||
return nil, infraerrors.NotFound("OPS_ERROR_NOT_FOUND", "ops error log not found")
|
||||
}
|
||||
|
||||
events, err := ParseOpsUpstreamErrors(errorLog.UpstreamErrors)
|
||||
if err != nil {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_EVENTS_INVALID", "invalid upstream_errors")
|
||||
}
|
||||
if idx >= len(events) {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_IDX_OOB", "upstream idx out of range")
|
||||
}
|
||||
ev := events[idx]
|
||||
if ev == nil {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_EVENT_MISSING", "upstream event missing")
|
||||
}
|
||||
if ev.AccountID <= 0 {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "account_id is required for upstream retry")
|
||||
}
|
||||
|
||||
upstreamBody := strings.TrimSpace(ev.UpstreamRequestBody)
|
||||
if upstreamBody == "" {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_UPSTREAM_NO_REQUEST_BODY", "No upstream request body found to retry")
|
||||
}
|
||||
|
||||
override := *errorLog
|
||||
override.RequestBody = upstreamBody
|
||||
pinned := ev.AccountID
|
||||
|
||||
// Persist as upstream_event, execute as upstream pinned retry.
|
||||
return s.retryWithErrorLog(ctx, requestedByUserID, errorID, OpsRetryModeUpstreamEvent, OpsRetryModeUpstream, &pinned, &override)
|
||||
}
|
||||
|
||||
func (s *OpsService) retryWithErrorLog(ctx context.Context, requestedByUserID int64, errorID int64, mode string, execMode string, pinnedAccountID *int64, errorLog *OpsErrorLogDetail) (*OpsRetryResult, error) {
|
||||
latest, err := s.opsRepo.GetLatestRetryAttemptForError(ctx, errorID)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, infraerrors.InternalServer("OPS_RETRY_LOAD_LATEST_FAILED", "Failed to check retry status").WithCause(err)
|
||||
@@ -144,22 +223,18 @@ func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, er
|
||||
}
|
||||
}
|
||||
|
||||
errorLog, err := s.GetErrorLogByID(ctx, errorID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if strings.TrimSpace(errorLog.RequestBody) == "" {
|
||||
if errorLog == nil || strings.TrimSpace(errorLog.RequestBody) == "" {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_NO_REQUEST_BODY", "No request body found to retry")
|
||||
}
|
||||
|
||||
var pinned *int64
|
||||
if mode == OpsRetryModeUpstream {
|
||||
if execMode == OpsRetryModeUpstream {
|
||||
if pinnedAccountID != nil && *pinnedAccountID > 0 {
|
||||
pinned = pinnedAccountID
|
||||
} else if errorLog.AccountID != nil && *errorLog.AccountID > 0 {
|
||||
pinned = errorLog.AccountID
|
||||
} else {
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "pinned_account_id is required for upstream retry")
|
||||
return nil, infraerrors.BadRequest("OPS_RETRY_PINNED_ACCOUNT_REQUIRED", "account_id is required for upstream retry")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -196,7 +271,7 @@ func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, er
|
||||
execCtx, cancel := context.WithTimeout(ctx, opsRetryTimeout)
|
||||
defer cancel()
|
||||
|
||||
execRes := s.executeRetry(execCtx, errorLog, mode, pinned)
|
||||
execRes := s.executeRetry(execCtx, errorLog, execMode, pinned)
|
||||
|
||||
finishedAt := time.Now()
|
||||
result.FinishedAt = finishedAt
|
||||
@@ -249,14 +324,10 @@ func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, er
|
||||
ResultRequestID: resultRequestID,
|
||||
ErrorMessage: updateErrMsg,
|
||||
}); err != nil {
|
||||
// Best-effort: retry itself already executed; do not fail the API response.
|
||||
log.Printf("[Ops] UpdateRetryAttempt failed: %v", err)
|
||||
} else {
|
||||
// Auto-resolve the source error when a manual retry succeeds.
|
||||
if success {
|
||||
if err := s.opsRepo.UpdateErrorResolution(updateCtx, errorID, true, &requestedByUserID, &attemptID, &finishedAt); err != nil {
|
||||
log.Printf("[Ops] UpdateErrorResolution failed: %v", err)
|
||||
}
|
||||
} else if success {
|
||||
if err := s.opsRepo.UpdateErrorResolution(updateCtx, errorID, true, &requestedByUserID, &attemptID, &finishedAt); err != nil {
|
||||
log.Printf("[Ops] UpdateErrorResolution failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user