306 lines
10 KiB
Go
306 lines
10 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestSystemOperationLockService_AcquireBusyAndRelease(t *testing.T) {
|
|
repo := newInMemoryIdempotencyRepo()
|
|
svc := NewSystemOperationLockService(repo, IdempotencyConfig{
|
|
SystemOperationTTL: 10 * time.Second,
|
|
ProcessingTimeout: 2 * time.Second,
|
|
})
|
|
|
|
lock1, err := svc.Acquire(context.Background(), "op-1")
|
|
require.NoError(t, err)
|
|
require.NotNil(t, lock1)
|
|
|
|
_, err = svc.Acquire(context.Background(), "op-2")
|
|
require.Error(t, err)
|
|
require.Equal(t, infraerrors.Code(ErrSystemOperationBusy), infraerrors.Code(err))
|
|
appErr := infraerrors.FromError(err)
|
|
require.Equal(t, "op-1", appErr.Metadata["operation_id"])
|
|
require.NotEmpty(t, appErr.Metadata["retry_after"])
|
|
|
|
require.NoError(t, svc.Release(context.Background(), lock1, true, ""))
|
|
|
|
lock2, err := svc.Acquire(context.Background(), "op-2")
|
|
require.NoError(t, err)
|
|
require.NotNil(t, lock2)
|
|
require.NoError(t, svc.Release(context.Background(), lock2, true, ""))
|
|
}
|
|
|
|
func TestSystemOperationLockService_RenewLease(t *testing.T) {
|
|
repo := newInMemoryIdempotencyRepo()
|
|
svc := NewSystemOperationLockService(repo, IdempotencyConfig{
|
|
SystemOperationTTL: 5 * time.Second,
|
|
ProcessingTimeout: 1200 * time.Millisecond,
|
|
})
|
|
|
|
lock, err := svc.Acquire(context.Background(), "op-renew")
|
|
require.NoError(t, err)
|
|
require.NotNil(t, lock)
|
|
defer func() {
|
|
_ = svc.Release(context.Background(), lock, true, "")
|
|
}()
|
|
|
|
keyHash := HashIdempotencyKey(systemOperationLockKey)
|
|
initial, _ := repo.GetByScopeAndKeyHash(context.Background(), systemOperationLockScope, keyHash)
|
|
require.NotNil(t, initial)
|
|
require.NotNil(t, initial.LockedUntil)
|
|
initialLockedUntil := *initial.LockedUntil
|
|
|
|
time.Sleep(1500 * time.Millisecond)
|
|
|
|
updated, _ := repo.GetByScopeAndKeyHash(context.Background(), systemOperationLockScope, keyHash)
|
|
require.NotNil(t, updated)
|
|
require.NotNil(t, updated.LockedUntil)
|
|
require.True(t, updated.LockedUntil.After(initialLockedUntil), "locked_until should be renewed while lock is held")
|
|
}
|
|
|
|
type flakySystemLockRenewRepo struct {
|
|
*inMemoryIdempotencyRepo
|
|
extendCalls int32
|
|
}
|
|
|
|
func (r *flakySystemLockRenewRepo) ExtendProcessingLock(ctx context.Context, id int64, requestFingerprint string, newLockedUntil, newExpiresAt time.Time) (bool, error) {
|
|
call := atomic.AddInt32(&r.extendCalls, 1)
|
|
if call == 1 {
|
|
return false, errors.New("transient extend failure")
|
|
}
|
|
return r.inMemoryIdempotencyRepo.ExtendProcessingLock(ctx, id, requestFingerprint, newLockedUntil, newExpiresAt)
|
|
}
|
|
|
|
func TestSystemOperationLockService_RenewLeaseContinuesAfterTransientFailure(t *testing.T) {
|
|
repo := &flakySystemLockRenewRepo{inMemoryIdempotencyRepo: newInMemoryIdempotencyRepo()}
|
|
svc := NewSystemOperationLockService(repo, IdempotencyConfig{
|
|
SystemOperationTTL: 5 * time.Second,
|
|
ProcessingTimeout: 2400 * time.Millisecond,
|
|
})
|
|
|
|
lock, err := svc.Acquire(context.Background(), "op-renew-transient")
|
|
require.NoError(t, err)
|
|
require.NotNil(t, lock)
|
|
defer func() {
|
|
_ = svc.Release(context.Background(), lock, true, "")
|
|
}()
|
|
|
|
keyHash := HashIdempotencyKey(systemOperationLockKey)
|
|
initial, _ := repo.GetByScopeAndKeyHash(context.Background(), systemOperationLockScope, keyHash)
|
|
require.NotNil(t, initial)
|
|
require.NotNil(t, initial.LockedUntil)
|
|
initialLockedUntil := *initial.LockedUntil
|
|
|
|
// 首次续租失败后,下一轮应继续尝试并成功更新锁过期时间。
|
|
require.Eventually(t, func() bool {
|
|
updated, _ := repo.GetByScopeAndKeyHash(context.Background(), systemOperationLockScope, keyHash)
|
|
if updated == nil || updated.LockedUntil == nil {
|
|
return false
|
|
}
|
|
return atomic.LoadInt32(&repo.extendCalls) >= 2 && updated.LockedUntil.After(initialLockedUntil)
|
|
}, 4*time.Second, 100*time.Millisecond, "renew loop should continue after transient error")
|
|
}
|
|
|
|
func TestSystemOperationLockService_SameOperationIDRetryWhileRunning(t *testing.T) {
|
|
repo := newInMemoryIdempotencyRepo()
|
|
svc := NewSystemOperationLockService(repo, IdempotencyConfig{
|
|
SystemOperationTTL: 10 * time.Second,
|
|
ProcessingTimeout: 2 * time.Second,
|
|
})
|
|
|
|
lock1, err := svc.Acquire(context.Background(), "op-same")
|
|
require.NoError(t, err)
|
|
require.NotNil(t, lock1)
|
|
|
|
_, err = svc.Acquire(context.Background(), "op-same")
|
|
require.Error(t, err)
|
|
require.Equal(t, infraerrors.Code(ErrSystemOperationBusy), infraerrors.Code(err))
|
|
appErr := infraerrors.FromError(err)
|
|
require.Equal(t, "op-same", appErr.Metadata["operation_id"])
|
|
|
|
require.NoError(t, svc.Release(context.Background(), lock1, true, ""))
|
|
|
|
lock2, err := svc.Acquire(context.Background(), "op-same")
|
|
require.NoError(t, err)
|
|
require.NotNil(t, lock2)
|
|
require.NoError(t, svc.Release(context.Background(), lock2, true, ""))
|
|
}
|
|
|
|
func TestSystemOperationLockService_RecoverAfterLeaseExpired(t *testing.T) {
|
|
repo := newInMemoryIdempotencyRepo()
|
|
svc := NewSystemOperationLockService(repo, IdempotencyConfig{
|
|
SystemOperationTTL: 5 * time.Second,
|
|
ProcessingTimeout: 300 * time.Millisecond,
|
|
})
|
|
|
|
lock1, err := svc.Acquire(context.Background(), "op-crashed")
|
|
require.NoError(t, err)
|
|
require.NotNil(t, lock1)
|
|
|
|
// 模拟实例异常:停止续租,不调用 Release。
|
|
lock1.stopOnce.Do(func() {
|
|
close(lock1.stopCh)
|
|
})
|
|
|
|
time.Sleep(450 * time.Millisecond)
|
|
|
|
lock2, err := svc.Acquire(context.Background(), "op-recovered")
|
|
require.NoError(t, err, "expired lease should allow a new operation to reclaim lock")
|
|
require.NotNil(t, lock2)
|
|
require.NoError(t, svc.Release(context.Background(), lock2, true, ""))
|
|
}
|
|
|
|
type systemLockRepoStub struct {
|
|
createOwner bool
|
|
createErr error
|
|
existing *IdempotencyRecord
|
|
getErr error
|
|
reclaimOK bool
|
|
reclaimErr error
|
|
markSuccErr error
|
|
markFailErr error
|
|
}
|
|
|
|
func (s *systemLockRepoStub) CreateProcessing(context.Context, *IdempotencyRecord) (bool, error) {
|
|
if s.createErr != nil {
|
|
return false, s.createErr
|
|
}
|
|
return s.createOwner, nil
|
|
}
|
|
|
|
func (s *systemLockRepoStub) GetByScopeAndKeyHash(context.Context, string, string) (*IdempotencyRecord, error) {
|
|
if s.getErr != nil {
|
|
return nil, s.getErr
|
|
}
|
|
return cloneRecord(s.existing), nil
|
|
}
|
|
|
|
func (s *systemLockRepoStub) TryReclaim(context.Context, int64, string, time.Time, time.Time, time.Time) (bool, error) {
|
|
if s.reclaimErr != nil {
|
|
return false, s.reclaimErr
|
|
}
|
|
return s.reclaimOK, nil
|
|
}
|
|
|
|
func (s *systemLockRepoStub) ExtendProcessingLock(context.Context, int64, string, time.Time, time.Time) (bool, error) {
|
|
return true, nil
|
|
}
|
|
|
|
func (s *systemLockRepoStub) MarkSucceeded(context.Context, int64, int, string, time.Time) error {
|
|
return s.markSuccErr
|
|
}
|
|
|
|
func (s *systemLockRepoStub) MarkFailedRetryable(context.Context, int64, string, time.Time, time.Time) error {
|
|
return s.markFailErr
|
|
}
|
|
|
|
func (s *systemLockRepoStub) DeleteExpired(context.Context, time.Time, int) (int64, error) {
|
|
return 0, nil
|
|
}
|
|
|
|
func TestSystemOperationLockService_InputAndStoreErrorBranches(t *testing.T) {
|
|
var nilSvc *SystemOperationLockService
|
|
_, err := nilSvc.Acquire(context.Background(), "x")
|
|
require.Error(t, err)
|
|
require.Equal(t, infraerrors.Code(ErrIdempotencyStoreUnavail), infraerrors.Code(err))
|
|
|
|
svc := &SystemOperationLockService{repo: nil}
|
|
_, err = svc.Acquire(context.Background(), "x")
|
|
require.Error(t, err)
|
|
require.Equal(t, infraerrors.Code(ErrIdempotencyStoreUnavail), infraerrors.Code(err))
|
|
|
|
svc = NewSystemOperationLockService(newInMemoryIdempotencyRepo(), IdempotencyConfig{
|
|
SystemOperationTTL: 10 * time.Second,
|
|
ProcessingTimeout: 2 * time.Second,
|
|
})
|
|
_, err = svc.Acquire(context.Background(), "")
|
|
require.Error(t, err)
|
|
require.Equal(t, "SYSTEM_OPERATION_ID_REQUIRED", infraerrors.Reason(err))
|
|
|
|
badStore := &systemLockRepoStub{createErr: errors.New("db down")}
|
|
svc = NewSystemOperationLockService(badStore, IdempotencyConfig{
|
|
SystemOperationTTL: 10 * time.Second,
|
|
ProcessingTimeout: 2 * time.Second,
|
|
})
|
|
_, err = svc.Acquire(context.Background(), "x")
|
|
require.Error(t, err)
|
|
require.Equal(t, infraerrors.Code(ErrIdempotencyStoreUnavail), infraerrors.Code(err))
|
|
}
|
|
|
|
func TestSystemOperationLockService_ExistingNilAndReclaimBranches(t *testing.T) {
|
|
now := time.Now()
|
|
repo := &systemLockRepoStub{
|
|
createOwner: false,
|
|
}
|
|
svc := NewSystemOperationLockService(repo, IdempotencyConfig{
|
|
SystemOperationTTL: 10 * time.Second,
|
|
ProcessingTimeout: 2 * time.Second,
|
|
})
|
|
|
|
_, err := svc.Acquire(context.Background(), "op")
|
|
require.Error(t, err)
|
|
require.Equal(t, infraerrors.Code(ErrIdempotencyStoreUnavail), infraerrors.Code(err))
|
|
|
|
repo.existing = &IdempotencyRecord{
|
|
ID: 1,
|
|
Scope: systemOperationLockScope,
|
|
IdempotencyKeyHash: HashIdempotencyKey(systemOperationLockKey),
|
|
RequestFingerprint: "other-op",
|
|
Status: IdempotencyStatusFailedRetryable,
|
|
LockedUntil: ptrTime(now.Add(-time.Second)),
|
|
ExpiresAt: now.Add(time.Hour),
|
|
}
|
|
repo.reclaimErr = errors.New("reclaim failed")
|
|
_, err = svc.Acquire(context.Background(), "op")
|
|
require.Error(t, err)
|
|
require.Equal(t, infraerrors.Code(ErrIdempotencyStoreUnavail), infraerrors.Code(err))
|
|
|
|
repo.reclaimErr = nil
|
|
repo.reclaimOK = false
|
|
_, err = svc.Acquire(context.Background(), "op")
|
|
require.Error(t, err)
|
|
require.Equal(t, infraerrors.Code(ErrSystemOperationBusy), infraerrors.Code(err))
|
|
}
|
|
|
|
func TestSystemOperationLockService_ReleaseBranchesAndOperationID(t *testing.T) {
|
|
require.Equal(t, "", (*SystemOperationLock)(nil).OperationID())
|
|
|
|
svc := NewSystemOperationLockService(newInMemoryIdempotencyRepo(), IdempotencyConfig{
|
|
SystemOperationTTL: 10 * time.Second,
|
|
ProcessingTimeout: 2 * time.Second,
|
|
})
|
|
lock, err := svc.Acquire(context.Background(), "op")
|
|
require.NoError(t, err)
|
|
require.NotNil(t, lock)
|
|
|
|
require.NoError(t, svc.Release(context.Background(), lock, false, ""))
|
|
require.NoError(t, svc.Release(context.Background(), lock, true, ""))
|
|
|
|
repo := &systemLockRepoStub{
|
|
createOwner: true,
|
|
markSuccErr: errors.New("mark succeeded failed"),
|
|
markFailErr: errors.New("mark failed failed"),
|
|
}
|
|
svc = NewSystemOperationLockService(repo, IdempotencyConfig{
|
|
SystemOperationTTL: 10 * time.Second,
|
|
ProcessingTimeout: 2 * time.Second,
|
|
})
|
|
lock = &SystemOperationLock{recordID: 1, operationID: "op2", stopCh: make(chan struct{})}
|
|
require.Error(t, svc.Release(context.Background(), lock, true, ""))
|
|
lock = &SystemOperationLock{recordID: 1, operationID: "op3", stopCh: make(chan struct{})}
|
|
require.Error(t, svc.Release(context.Background(), lock, false, "BAD"))
|
|
|
|
var nilLockSvc *SystemOperationLockService
|
|
require.NoError(t, nilLockSvc.Release(context.Background(), nil, true, ""))
|
|
|
|
err = svc.busyError("", nil, time.Now())
|
|
require.Equal(t, infraerrors.Code(ErrSystemOperationBusy), infraerrors.Code(err))
|
|
}
|