Files
sub2api/backend/internal/service/system_operation_lock_service_test.go

306 lines
10 KiB
Go

package service
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
"github.com/stretchr/testify/require"
)
func TestSystemOperationLockService_AcquireBusyAndRelease(t *testing.T) {
repo := newInMemoryIdempotencyRepo()
svc := NewSystemOperationLockService(repo, IdempotencyConfig{
SystemOperationTTL: 10 * time.Second,
ProcessingTimeout: 2 * time.Second,
})
lock1, err := svc.Acquire(context.Background(), "op-1")
require.NoError(t, err)
require.NotNil(t, lock1)
_, err = svc.Acquire(context.Background(), "op-2")
require.Error(t, err)
require.Equal(t, infraerrors.Code(ErrSystemOperationBusy), infraerrors.Code(err))
appErr := infraerrors.FromError(err)
require.Equal(t, "op-1", appErr.Metadata["operation_id"])
require.NotEmpty(t, appErr.Metadata["retry_after"])
require.NoError(t, svc.Release(context.Background(), lock1, true, ""))
lock2, err := svc.Acquire(context.Background(), "op-2")
require.NoError(t, err)
require.NotNil(t, lock2)
require.NoError(t, svc.Release(context.Background(), lock2, true, ""))
}
func TestSystemOperationLockService_RenewLease(t *testing.T) {
repo := newInMemoryIdempotencyRepo()
svc := NewSystemOperationLockService(repo, IdempotencyConfig{
SystemOperationTTL: 5 * time.Second,
ProcessingTimeout: 1200 * time.Millisecond,
})
lock, err := svc.Acquire(context.Background(), "op-renew")
require.NoError(t, err)
require.NotNil(t, lock)
defer func() {
_ = svc.Release(context.Background(), lock, true, "")
}()
keyHash := HashIdempotencyKey(systemOperationLockKey)
initial, _ := repo.GetByScopeAndKeyHash(context.Background(), systemOperationLockScope, keyHash)
require.NotNil(t, initial)
require.NotNil(t, initial.LockedUntil)
initialLockedUntil := *initial.LockedUntil
time.Sleep(1500 * time.Millisecond)
updated, _ := repo.GetByScopeAndKeyHash(context.Background(), systemOperationLockScope, keyHash)
require.NotNil(t, updated)
require.NotNil(t, updated.LockedUntil)
require.True(t, updated.LockedUntil.After(initialLockedUntil), "locked_until should be renewed while lock is held")
}
type flakySystemLockRenewRepo struct {
*inMemoryIdempotencyRepo
extendCalls int32
}
func (r *flakySystemLockRenewRepo) ExtendProcessingLock(ctx context.Context, id int64, requestFingerprint string, newLockedUntil, newExpiresAt time.Time) (bool, error) {
call := atomic.AddInt32(&r.extendCalls, 1)
if call == 1 {
return false, errors.New("transient extend failure")
}
return r.inMemoryIdempotencyRepo.ExtendProcessingLock(ctx, id, requestFingerprint, newLockedUntil, newExpiresAt)
}
func TestSystemOperationLockService_RenewLeaseContinuesAfterTransientFailure(t *testing.T) {
repo := &flakySystemLockRenewRepo{inMemoryIdempotencyRepo: newInMemoryIdempotencyRepo()}
svc := NewSystemOperationLockService(repo, IdempotencyConfig{
SystemOperationTTL: 5 * time.Second,
ProcessingTimeout: 2400 * time.Millisecond,
})
lock, err := svc.Acquire(context.Background(), "op-renew-transient")
require.NoError(t, err)
require.NotNil(t, lock)
defer func() {
_ = svc.Release(context.Background(), lock, true, "")
}()
keyHash := HashIdempotencyKey(systemOperationLockKey)
initial, _ := repo.GetByScopeAndKeyHash(context.Background(), systemOperationLockScope, keyHash)
require.NotNil(t, initial)
require.NotNil(t, initial.LockedUntil)
initialLockedUntil := *initial.LockedUntil
// 首次续租失败后,下一轮应继续尝试并成功更新锁过期时间。
require.Eventually(t, func() bool {
updated, _ := repo.GetByScopeAndKeyHash(context.Background(), systemOperationLockScope, keyHash)
if updated == nil || updated.LockedUntil == nil {
return false
}
return atomic.LoadInt32(&repo.extendCalls) >= 2 && updated.LockedUntil.After(initialLockedUntil)
}, 4*time.Second, 100*time.Millisecond, "renew loop should continue after transient error")
}
func TestSystemOperationLockService_SameOperationIDRetryWhileRunning(t *testing.T) {
repo := newInMemoryIdempotencyRepo()
svc := NewSystemOperationLockService(repo, IdempotencyConfig{
SystemOperationTTL: 10 * time.Second,
ProcessingTimeout: 2 * time.Second,
})
lock1, err := svc.Acquire(context.Background(), "op-same")
require.NoError(t, err)
require.NotNil(t, lock1)
_, err = svc.Acquire(context.Background(), "op-same")
require.Error(t, err)
require.Equal(t, infraerrors.Code(ErrSystemOperationBusy), infraerrors.Code(err))
appErr := infraerrors.FromError(err)
require.Equal(t, "op-same", appErr.Metadata["operation_id"])
require.NoError(t, svc.Release(context.Background(), lock1, true, ""))
lock2, err := svc.Acquire(context.Background(), "op-same")
require.NoError(t, err)
require.NotNil(t, lock2)
require.NoError(t, svc.Release(context.Background(), lock2, true, ""))
}
func TestSystemOperationLockService_RecoverAfterLeaseExpired(t *testing.T) {
repo := newInMemoryIdempotencyRepo()
svc := NewSystemOperationLockService(repo, IdempotencyConfig{
SystemOperationTTL: 5 * time.Second,
ProcessingTimeout: 300 * time.Millisecond,
})
lock1, err := svc.Acquire(context.Background(), "op-crashed")
require.NoError(t, err)
require.NotNil(t, lock1)
// 模拟实例异常:停止续租,不调用 Release。
lock1.stopOnce.Do(func() {
close(lock1.stopCh)
})
time.Sleep(450 * time.Millisecond)
lock2, err := svc.Acquire(context.Background(), "op-recovered")
require.NoError(t, err, "expired lease should allow a new operation to reclaim lock")
require.NotNil(t, lock2)
require.NoError(t, svc.Release(context.Background(), lock2, true, ""))
}
type systemLockRepoStub struct {
createOwner bool
createErr error
existing *IdempotencyRecord
getErr error
reclaimOK bool
reclaimErr error
markSuccErr error
markFailErr error
}
func (s *systemLockRepoStub) CreateProcessing(context.Context, *IdempotencyRecord) (bool, error) {
if s.createErr != nil {
return false, s.createErr
}
return s.createOwner, nil
}
func (s *systemLockRepoStub) GetByScopeAndKeyHash(context.Context, string, string) (*IdempotencyRecord, error) {
if s.getErr != nil {
return nil, s.getErr
}
return cloneRecord(s.existing), nil
}
func (s *systemLockRepoStub) TryReclaim(context.Context, int64, string, time.Time, time.Time, time.Time) (bool, error) {
if s.reclaimErr != nil {
return false, s.reclaimErr
}
return s.reclaimOK, nil
}
func (s *systemLockRepoStub) ExtendProcessingLock(context.Context, int64, string, time.Time, time.Time) (bool, error) {
return true, nil
}
func (s *systemLockRepoStub) MarkSucceeded(context.Context, int64, int, string, time.Time) error {
return s.markSuccErr
}
func (s *systemLockRepoStub) MarkFailedRetryable(context.Context, int64, string, time.Time, time.Time) error {
return s.markFailErr
}
func (s *systemLockRepoStub) DeleteExpired(context.Context, time.Time, int) (int64, error) {
return 0, nil
}
func TestSystemOperationLockService_InputAndStoreErrorBranches(t *testing.T) {
var nilSvc *SystemOperationLockService
_, err := nilSvc.Acquire(context.Background(), "x")
require.Error(t, err)
require.Equal(t, infraerrors.Code(ErrIdempotencyStoreUnavail), infraerrors.Code(err))
svc := &SystemOperationLockService{repo: nil}
_, err = svc.Acquire(context.Background(), "x")
require.Error(t, err)
require.Equal(t, infraerrors.Code(ErrIdempotencyStoreUnavail), infraerrors.Code(err))
svc = NewSystemOperationLockService(newInMemoryIdempotencyRepo(), IdempotencyConfig{
SystemOperationTTL: 10 * time.Second,
ProcessingTimeout: 2 * time.Second,
})
_, err = svc.Acquire(context.Background(), "")
require.Error(t, err)
require.Equal(t, "SYSTEM_OPERATION_ID_REQUIRED", infraerrors.Reason(err))
badStore := &systemLockRepoStub{createErr: errors.New("db down")}
svc = NewSystemOperationLockService(badStore, IdempotencyConfig{
SystemOperationTTL: 10 * time.Second,
ProcessingTimeout: 2 * time.Second,
})
_, err = svc.Acquire(context.Background(), "x")
require.Error(t, err)
require.Equal(t, infraerrors.Code(ErrIdempotencyStoreUnavail), infraerrors.Code(err))
}
func TestSystemOperationLockService_ExistingNilAndReclaimBranches(t *testing.T) {
now := time.Now()
repo := &systemLockRepoStub{
createOwner: false,
}
svc := NewSystemOperationLockService(repo, IdempotencyConfig{
SystemOperationTTL: 10 * time.Second,
ProcessingTimeout: 2 * time.Second,
})
_, err := svc.Acquire(context.Background(), "op")
require.Error(t, err)
require.Equal(t, infraerrors.Code(ErrIdempotencyStoreUnavail), infraerrors.Code(err))
repo.existing = &IdempotencyRecord{
ID: 1,
Scope: systemOperationLockScope,
IdempotencyKeyHash: HashIdempotencyKey(systemOperationLockKey),
RequestFingerprint: "other-op",
Status: IdempotencyStatusFailedRetryable,
LockedUntil: ptrTime(now.Add(-time.Second)),
ExpiresAt: now.Add(time.Hour),
}
repo.reclaimErr = errors.New("reclaim failed")
_, err = svc.Acquire(context.Background(), "op")
require.Error(t, err)
require.Equal(t, infraerrors.Code(ErrIdempotencyStoreUnavail), infraerrors.Code(err))
repo.reclaimErr = nil
repo.reclaimOK = false
_, err = svc.Acquire(context.Background(), "op")
require.Error(t, err)
require.Equal(t, infraerrors.Code(ErrSystemOperationBusy), infraerrors.Code(err))
}
func TestSystemOperationLockService_ReleaseBranchesAndOperationID(t *testing.T) {
require.Equal(t, "", (*SystemOperationLock)(nil).OperationID())
svc := NewSystemOperationLockService(newInMemoryIdempotencyRepo(), IdempotencyConfig{
SystemOperationTTL: 10 * time.Second,
ProcessingTimeout: 2 * time.Second,
})
lock, err := svc.Acquire(context.Background(), "op")
require.NoError(t, err)
require.NotNil(t, lock)
require.NoError(t, svc.Release(context.Background(), lock, false, ""))
require.NoError(t, svc.Release(context.Background(), lock, true, ""))
repo := &systemLockRepoStub{
createOwner: true,
markSuccErr: errors.New("mark succeeded failed"),
markFailErr: errors.New("mark failed failed"),
}
svc = NewSystemOperationLockService(repo, IdempotencyConfig{
SystemOperationTTL: 10 * time.Second,
ProcessingTimeout: 2 * time.Second,
})
lock = &SystemOperationLock{recordID: 1, operationID: "op2", stopCh: make(chan struct{})}
require.Error(t, svc.Release(context.Background(), lock, true, ""))
lock = &SystemOperationLock{recordID: 1, operationID: "op3", stopCh: make(chan struct{})}
require.Error(t, svc.Release(context.Background(), lock, false, "BAD"))
var nilLockSvc *SystemOperationLockService
require.NoError(t, nilLockSvc.Release(context.Background(), nil, true, ""))
err = svc.busyError("", nil, time.Now())
require.Equal(t, infraerrors.Code(ErrSystemOperationBusy), infraerrors.Code(err))
}