Files
sub2api/backend/internal/service/idempotency.go

472 lines
17 KiB
Go

package service
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/Wei-Shaw/sub2api/internal/util/logredact"
)
const (
IdempotencyStatusProcessing = "processing"
IdempotencyStatusSucceeded = "succeeded"
IdempotencyStatusFailedRetryable = "failed_retryable"
)
var (
ErrIdempotencyKeyRequired = infraerrors.BadRequest("IDEMPOTENCY_KEY_REQUIRED", "idempotency key is required")
ErrIdempotencyKeyInvalid = infraerrors.BadRequest("IDEMPOTENCY_KEY_INVALID", "idempotency key is invalid")
ErrIdempotencyKeyConflict = infraerrors.Conflict("IDEMPOTENCY_KEY_CONFLICT", "idempotency key reused with different payload")
ErrIdempotencyInProgress = infraerrors.Conflict("IDEMPOTENCY_IN_PROGRESS", "idempotent request is still processing")
ErrIdempotencyRetryBackoff = infraerrors.Conflict("IDEMPOTENCY_RETRY_BACKOFF", "idempotent request is in retry backoff window")
ErrIdempotencyStoreUnavail = infraerrors.ServiceUnavailable("IDEMPOTENCY_STORE_UNAVAILABLE", "idempotency store unavailable")
ErrIdempotencyInvalidPayload = infraerrors.BadRequest("IDEMPOTENCY_PAYLOAD_INVALID", "failed to normalize request payload")
)
type IdempotencyRecord struct {
ID int64
Scope string
IdempotencyKeyHash string
RequestFingerprint string
Status string
ResponseStatus *int
ResponseBody *string
ErrorReason *string
LockedUntil *time.Time
ExpiresAt time.Time
CreatedAt time.Time
UpdatedAt time.Time
}
type IdempotencyRepository interface {
CreateProcessing(ctx context.Context, record *IdempotencyRecord) (bool, error)
GetByScopeAndKeyHash(ctx context.Context, scope, keyHash string) (*IdempotencyRecord, error)
TryReclaim(ctx context.Context, id int64, fromStatus string, now, newLockedUntil, newExpiresAt time.Time) (bool, error)
ExtendProcessingLock(ctx context.Context, id int64, requestFingerprint string, newLockedUntil, newExpiresAt time.Time) (bool, error)
MarkSucceeded(ctx context.Context, id int64, responseStatus int, responseBody string, expiresAt time.Time) error
MarkFailedRetryable(ctx context.Context, id int64, errorReason string, lockedUntil, expiresAt time.Time) error
DeleteExpired(ctx context.Context, now time.Time, limit int) (int64, error)
}
type IdempotencyConfig struct {
DefaultTTL time.Duration
SystemOperationTTL time.Duration
ProcessingTimeout time.Duration
FailedRetryBackoff time.Duration
MaxStoredResponseLen int
ObserveOnly bool
}
func DefaultIdempotencyConfig() IdempotencyConfig {
return IdempotencyConfig{
DefaultTTL: 24 * time.Hour,
SystemOperationTTL: 1 * time.Hour,
ProcessingTimeout: 30 * time.Second,
FailedRetryBackoff: 5 * time.Second,
MaxStoredResponseLen: 64 * 1024,
ObserveOnly: true, // 默认先观察再强制,避免老客户端立刻中断
}
}
type IdempotencyExecuteOptions struct {
Scope string
ActorScope string
Method string
Route string
IdempotencyKey string
Payload any
TTL time.Duration
RequireKey bool
}
type IdempotencyExecuteResult struct {
Data any
Replayed bool
}
type IdempotencyCoordinator struct {
repo IdempotencyRepository
cfg IdempotencyConfig
}
var (
defaultIdempotencyMu sync.RWMutex
defaultIdempotencySvc *IdempotencyCoordinator
)
func SetDefaultIdempotencyCoordinator(svc *IdempotencyCoordinator) {
defaultIdempotencyMu.Lock()
defaultIdempotencySvc = svc
defaultIdempotencyMu.Unlock()
}
func DefaultIdempotencyCoordinator() *IdempotencyCoordinator {
defaultIdempotencyMu.RLock()
defer defaultIdempotencyMu.RUnlock()
return defaultIdempotencySvc
}
func DefaultWriteIdempotencyTTL() time.Duration {
defaultTTL := DefaultIdempotencyConfig().DefaultTTL
if coordinator := DefaultIdempotencyCoordinator(); coordinator != nil && coordinator.cfg.DefaultTTL > 0 {
return coordinator.cfg.DefaultTTL
}
return defaultTTL
}
func DefaultSystemOperationIdempotencyTTL() time.Duration {
defaultTTL := DefaultIdempotencyConfig().SystemOperationTTL
if coordinator := DefaultIdempotencyCoordinator(); coordinator != nil && coordinator.cfg.SystemOperationTTL > 0 {
return coordinator.cfg.SystemOperationTTL
}
return defaultTTL
}
func NewIdempotencyCoordinator(repo IdempotencyRepository, cfg IdempotencyConfig) *IdempotencyCoordinator {
return &IdempotencyCoordinator{
repo: repo,
cfg: cfg,
}
}
func NormalizeIdempotencyKey(raw string) (string, error) {
key := strings.TrimSpace(raw)
if key == "" {
return "", nil
}
if len(key) > 128 {
return "", ErrIdempotencyKeyInvalid
}
for _, r := range key {
if r < 33 || r > 126 {
return "", ErrIdempotencyKeyInvalid
}
}
return key, nil
}
func HashIdempotencyKey(key string) string {
sum := sha256.Sum256([]byte(key))
return hex.EncodeToString(sum[:])
}
func BuildIdempotencyFingerprint(method, route, actorScope string, payload any) (string, error) {
if method == "" {
method = "POST"
}
if route == "" {
route = "/"
}
if actorScope == "" {
actorScope = "anonymous"
}
raw, err := json.Marshal(payload)
if err != nil {
return "", ErrIdempotencyInvalidPayload.WithCause(err)
}
sum := sha256.Sum256([]byte(
strings.ToUpper(method) + "\n" + route + "\n" + actorScope + "\n" + string(raw),
))
return hex.EncodeToString(sum[:]), nil
}
func RetryAfterSecondsFromError(err error) int {
appErr := new(infraerrors.ApplicationError)
if !errors.As(err, &appErr) || appErr == nil || appErr.Metadata == nil {
return 0
}
v := strings.TrimSpace(appErr.Metadata["retry_after"])
if v == "" {
return 0
}
seconds, convErr := strconv.Atoi(v)
if convErr != nil || seconds <= 0 {
return 0
}
return seconds
}
func (c *IdempotencyCoordinator) Execute(
ctx context.Context,
opts IdempotencyExecuteOptions,
execute func(context.Context) (any, error),
) (*IdempotencyExecuteResult, error) {
if execute == nil {
return nil, infraerrors.InternalServer("IDEMPOTENCY_EXECUTOR_NIL", "idempotency executor is nil")
}
key, err := NormalizeIdempotencyKey(opts.IdempotencyKey)
if err != nil {
return nil, err
}
if key == "" {
if opts.RequireKey && !c.cfg.ObserveOnly {
return nil, ErrIdempotencyKeyRequired
}
data, execErr := execute(ctx)
if execErr != nil {
return nil, execErr
}
return &IdempotencyExecuteResult{Data: data}, nil
}
if c.repo == nil {
RecordIdempotencyStoreUnavailable(opts.Route, opts.Scope, "repo_nil")
return nil, ErrIdempotencyStoreUnavail
}
if opts.Scope == "" {
return nil, infraerrors.BadRequest("IDEMPOTENCY_SCOPE_REQUIRED", "idempotency scope is required")
}
fingerprint, err := BuildIdempotencyFingerprint(opts.Method, opts.Route, opts.ActorScope, opts.Payload)
if err != nil {
return nil, err
}
ttl := opts.TTL
if ttl <= 0 {
ttl = c.cfg.DefaultTTL
}
now := time.Now()
expiresAt := now.Add(ttl)
lockedUntil := now.Add(c.cfg.ProcessingTimeout)
keyHash := HashIdempotencyKey(key)
record := &IdempotencyRecord{
Scope: opts.Scope,
IdempotencyKeyHash: keyHash,
RequestFingerprint: fingerprint,
Status: IdempotencyStatusProcessing,
LockedUntil: &lockedUntil,
ExpiresAt: expiresAt,
}
owner, err := c.repo.CreateProcessing(ctx, record)
if err != nil {
RecordIdempotencyStoreUnavailable(opts.Route, opts.Scope, "create_processing_error")
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "unknown->store_unavailable", false, map[string]string{
"operation": "create_processing",
})
return nil, ErrIdempotencyStoreUnavail.WithCause(err)
}
if owner {
recordIdempotencyClaim(opts.Route, opts.Scope, map[string]string{"mode": "new_claim"})
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "none->processing", false, map[string]string{
"claim_mode": "new",
})
}
if !owner {
existing, getErr := c.repo.GetByScopeAndKeyHash(ctx, opts.Scope, keyHash)
if getErr != nil {
RecordIdempotencyStoreUnavailable(opts.Route, opts.Scope, "get_existing_error")
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "unknown->store_unavailable", false, map[string]string{
"operation": "get_existing",
})
return nil, ErrIdempotencyStoreUnavail.WithCause(getErr)
}
if existing == nil {
RecordIdempotencyStoreUnavailable(opts.Route, opts.Scope, "missing_existing")
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "unknown->store_unavailable", false, map[string]string{
"operation": "missing_existing",
})
return nil, ErrIdempotencyStoreUnavail
}
if existing.RequestFingerprint != fingerprint {
recordIdempotencyConflict(opts.Route, opts.Scope, map[string]string{"reason": "fingerprint_mismatch"})
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "existing->fingerprint_mismatch", false, nil)
return nil, ErrIdempotencyKeyConflict
}
reclaimedByExpired := false
if !existing.ExpiresAt.After(now) {
taken, reclaimErr := c.repo.TryReclaim(ctx, existing.ID, existing.Status, now, lockedUntil, expiresAt)
if reclaimErr != nil {
RecordIdempotencyStoreUnavailable(opts.Route, opts.Scope, "try_reclaim_expired_error")
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, existing.Status+"->store_unavailable", false, map[string]string{
"operation": "try_reclaim_expired",
})
return nil, ErrIdempotencyStoreUnavail.WithCause(reclaimErr)
}
if taken {
reclaimedByExpired = true
recordIdempotencyClaim(opts.Route, opts.Scope, map[string]string{"mode": "expired_reclaim"})
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, existing.Status+"->processing", false, map[string]string{
"claim_mode": "expired_reclaim",
})
record.ID = existing.ID
} else {
latest, latestErr := c.repo.GetByScopeAndKeyHash(ctx, opts.Scope, keyHash)
if latestErr != nil {
RecordIdempotencyStoreUnavailable(opts.Route, opts.Scope, "get_existing_after_expired_reclaim_error")
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "unknown->store_unavailable", false, map[string]string{
"operation": "get_existing_after_expired_reclaim",
})
return nil, ErrIdempotencyStoreUnavail.WithCause(latestErr)
}
if latest == nil {
RecordIdempotencyStoreUnavailable(opts.Route, opts.Scope, "missing_existing_after_expired_reclaim")
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "unknown->store_unavailable", false, map[string]string{
"operation": "missing_existing_after_expired_reclaim",
})
return nil, ErrIdempotencyStoreUnavail
}
if latest.RequestFingerprint != fingerprint {
recordIdempotencyConflict(opts.Route, opts.Scope, map[string]string{"reason": "fingerprint_mismatch"})
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "existing->fingerprint_mismatch", false, nil)
return nil, ErrIdempotencyKeyConflict
}
existing = latest
}
}
if !reclaimedByExpired {
switch existing.Status {
case IdempotencyStatusSucceeded:
data, parseErr := c.decodeStoredResponse(existing.ResponseBody)
if parseErr != nil {
RecordIdempotencyStoreUnavailable(opts.Route, opts.Scope, "decode_stored_response_error")
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "succeeded->store_unavailable", false, map[string]string{
"operation": "decode_stored_response",
})
return nil, ErrIdempotencyStoreUnavail.WithCause(parseErr)
}
recordIdempotencyReplay(opts.Route, opts.Scope, nil)
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "succeeded->replayed", true, nil)
return &IdempotencyExecuteResult{Data: data, Replayed: true}, nil
case IdempotencyStatusProcessing:
recordIdempotencyConflict(opts.Route, opts.Scope, map[string]string{"reason": "in_progress"})
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "processing->conflict", false, nil)
return nil, c.conflictWithRetryAfter(ErrIdempotencyInProgress, existing.LockedUntil, now)
case IdempotencyStatusFailedRetryable:
if existing.LockedUntil != nil && existing.LockedUntil.After(now) {
recordIdempotencyConflict(opts.Route, opts.Scope, map[string]string{"reason": "retry_backoff"})
recordIdempotencyRetryBackoff(opts.Route, opts.Scope, nil)
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "failed_retryable->retry_backoff_conflict", false, nil)
return nil, c.conflictWithRetryAfter(ErrIdempotencyRetryBackoff, existing.LockedUntil, now)
}
taken, reclaimErr := c.repo.TryReclaim(ctx, existing.ID, IdempotencyStatusFailedRetryable, now, lockedUntil, expiresAt)
if reclaimErr != nil {
RecordIdempotencyStoreUnavailable(opts.Route, opts.Scope, "try_reclaim_error")
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "failed_retryable->store_unavailable", false, map[string]string{
"operation": "try_reclaim",
})
return nil, ErrIdempotencyStoreUnavail.WithCause(reclaimErr)
}
if !taken {
recordIdempotencyConflict(opts.Route, opts.Scope, map[string]string{"reason": "reclaim_race"})
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "failed_retryable->conflict", false, map[string]string{
"conflict": "reclaim_race",
})
return nil, c.conflictWithRetryAfter(ErrIdempotencyInProgress, existing.LockedUntil, now)
}
recordIdempotencyClaim(opts.Route, opts.Scope, map[string]string{"mode": "reclaim"})
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "failed_retryable->processing", false, map[string]string{
"claim_mode": "reclaim",
})
record.ID = existing.ID
default:
recordIdempotencyConflict(opts.Route, opts.Scope, map[string]string{"reason": "unexpected_status"})
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "existing->conflict", false, map[string]string{
"status": existing.Status,
})
return nil, ErrIdempotencyKeyConflict
}
}
}
if record.ID == 0 {
RecordIdempotencyStoreUnavailable(opts.Route, opts.Scope, "record_id_missing")
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "processing->store_unavailable", false, map[string]string{
"operation": "record_id_missing",
})
return nil, ErrIdempotencyStoreUnavail
}
execStart := time.Now()
defer func() {
recordIdempotencyProcessingDuration(opts.Route, opts.Scope, time.Since(execStart), nil)
}()
data, execErr := execute(ctx)
if execErr != nil {
backoffUntil := time.Now().Add(c.cfg.FailedRetryBackoff)
reason := infraerrors.Reason(execErr)
if reason == "" {
reason = "EXECUTION_FAILED"
}
recordIdempotencyRetryBackoff(opts.Route, opts.Scope, nil)
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "processing->failed_retryable", false, map[string]string{
"reason": reason,
})
if markErr := c.repo.MarkFailedRetryable(ctx, record.ID, reason, backoffUntil, expiresAt); markErr != nil {
RecordIdempotencyStoreUnavailable(opts.Route, opts.Scope, "mark_failed_retryable_error")
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "processing->store_unavailable", false, map[string]string{
"operation": "mark_failed_retryable",
})
}
return nil, execErr
}
storedBody, marshalErr := c.marshalStoredResponse(data)
if marshalErr != nil {
RecordIdempotencyStoreUnavailable(opts.Route, opts.Scope, "marshal_response_error")
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "processing->store_unavailable", false, map[string]string{
"operation": "marshal_response",
})
return nil, ErrIdempotencyStoreUnavail.WithCause(marshalErr)
}
if markErr := c.repo.MarkSucceeded(ctx, record.ID, 200, storedBody, expiresAt); markErr != nil {
RecordIdempotencyStoreUnavailable(opts.Route, opts.Scope, "mark_succeeded_error")
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "processing->store_unavailable", false, map[string]string{
"operation": "mark_succeeded",
})
return nil, ErrIdempotencyStoreUnavail.WithCause(markErr)
}
logIdempotencyAudit(opts.Route, opts.Scope, keyHash, "processing->succeeded", false, nil)
return &IdempotencyExecuteResult{Data: data}, nil
}
func (c *IdempotencyCoordinator) conflictWithRetryAfter(base *infraerrors.ApplicationError, lockedUntil *time.Time, now time.Time) error {
if lockedUntil == nil {
return base
}
sec := int(lockedUntil.Sub(now).Seconds())
if sec <= 0 {
sec = 1
}
return base.WithMetadata(map[string]string{"retry_after": strconv.Itoa(sec)})
}
func (c *IdempotencyCoordinator) marshalStoredResponse(data any) (string, error) {
raw, err := json.Marshal(data)
if err != nil {
return "", err
}
redacted := logredact.RedactText(string(raw))
if c.cfg.MaxStoredResponseLen > 0 && len(redacted) > c.cfg.MaxStoredResponseLen {
redacted = redacted[:c.cfg.MaxStoredResponseLen] + "...(truncated)"
}
return redacted, nil
}
func (c *IdempotencyCoordinator) decodeStoredResponse(stored *string) (any, error) {
if stored == nil || strings.TrimSpace(*stored) == "" {
return map[string]any{}, nil
}
var out any
if err := json.Unmarshal([]byte(*stored), &out); err != nil {
return nil, fmt.Errorf("decode stored response: %w", err)
}
return out, nil
}