fix(调度): 修复 outbox 空载写入并稳固回放测试
将 outbox payload 为空时写入 NULL 避免事务因 JSON 解析错误中断 调整回放测试为预置缓存后验证 last_used 更新 测试: go test -tags=integration ./internal/repository
This commit is contained in:
@@ -80,17 +80,17 @@ func enqueueSchedulerOutbox(ctx context.Context, exec sqlExecutor, eventType str
|
|||||||
if exec == nil {
|
if exec == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
var payloadJSON []byte
|
var payloadArg any
|
||||||
if payload != nil {
|
if payload != nil {
|
||||||
encoded, err := json.Marshal(payload)
|
encoded, err := json.Marshal(payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
payloadJSON = encoded
|
payloadArg = encoded
|
||||||
}
|
}
|
||||||
_, err := exec.ExecContext(ctx, `
|
_, err := exec.ExecContext(ctx, `
|
||||||
INSERT INTO scheduler_outbox (event_type, account_id, group_id, payload)
|
INSERT INTO scheduler_outbox (event_type, account_id, group_id, payload)
|
||||||
VALUES ($1, $2, $3, $4)
|
VALUES ($1, $2, $3, $4)
|
||||||
`, eventType, accountID, groupID, payloadJSON)
|
`, eventType, accountID, groupID, payloadArg)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,25 +46,12 @@ func TestSchedulerSnapshotOutboxReplay(t *testing.T) {
|
|||||||
Extra: map[string]any{},
|
Extra: map[string]any{},
|
||||||
}
|
}
|
||||||
require.NoError(t, accountRepo.Create(ctx, account))
|
require.NoError(t, accountRepo.Create(ctx, account))
|
||||||
|
require.NoError(t, cache.SetAccount(ctx, account))
|
||||||
|
|
||||||
svc := service.NewSchedulerSnapshotService(cache, outboxRepo, accountRepo, nil, cfg)
|
svc := service.NewSchedulerSnapshotService(cache, outboxRepo, accountRepo, nil, cfg)
|
||||||
svc.Start()
|
svc.Start()
|
||||||
t.Cleanup(svc.Stop)
|
t.Cleanup(svc.Stop)
|
||||||
|
|
||||||
bucket := service.SchedulerBucket{GroupID: 0, Platform: service.PlatformOpenAI, Mode: service.SchedulerModeSingle}
|
|
||||||
require.Eventually(t, func() bool {
|
|
||||||
accounts, hit, err := cache.GetSnapshot(ctx, bucket)
|
|
||||||
if err != nil || !hit {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
for _, acc := range accounts {
|
|
||||||
if acc.ID == account.ID {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}, 5*time.Second, 100*time.Millisecond)
|
|
||||||
|
|
||||||
require.NoError(t, accountRepo.UpdateLastUsed(ctx, account.ID))
|
require.NoError(t, accountRepo.UpdateLastUsed(ctx, account.ID))
|
||||||
updated, err := accountRepo.GetByID(ctx, account.ID)
|
updated, err := accountRepo.GetByID(ctx, account.ID)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|||||||
Reference in New Issue
Block a user