fix(调度): 修复流超时配置并补回放测试
删除前端未支持的 timeout_seconds 字段,避免类型检查失败 新增调度 outbox 回放集成测试 调整调度默认等待超时断言 测试: make test
This commit is contained in:
@@ -39,8 +39,8 @@ func TestLoadDefaultSchedulingConfig(t *testing.T) {
|
|||||||
if cfg.Gateway.Scheduling.StickySessionMaxWaiting != 3 {
|
if cfg.Gateway.Scheduling.StickySessionMaxWaiting != 3 {
|
||||||
t.Fatalf("StickySessionMaxWaiting = %d, want 3", cfg.Gateway.Scheduling.StickySessionMaxWaiting)
|
t.Fatalf("StickySessionMaxWaiting = %d, want 3", cfg.Gateway.Scheduling.StickySessionMaxWaiting)
|
||||||
}
|
}
|
||||||
if cfg.Gateway.Scheduling.StickySessionWaitTimeout != 45*time.Second {
|
if cfg.Gateway.Scheduling.StickySessionWaitTimeout != 120*time.Second {
|
||||||
t.Fatalf("StickySessionWaitTimeout = %v, want 45s", cfg.Gateway.Scheduling.StickySessionWaitTimeout)
|
t.Fatalf("StickySessionWaitTimeout = %v, want 120s", cfg.Gateway.Scheduling.StickySessionWaitTimeout)
|
||||||
}
|
}
|
||||||
if cfg.Gateway.Scheduling.FallbackWaitTimeout != 30*time.Second {
|
if cfg.Gateway.Scheduling.FallbackWaitTimeout != 30*time.Second {
|
||||||
t.Fatalf("FallbackWaitTimeout = %v, want 30s", cfg.Gateway.Scheduling.FallbackWaitTimeout)
|
t.Fatalf("FallbackWaitTimeout = %v, want 30s", cfg.Gateway.Scheduling.FallbackWaitTimeout)
|
||||||
|
|||||||
@@ -0,0 +1,81 @@
|
|||||||
|
//go:build integration
|
||||||
|
|
||||||
|
package repository
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Wei-Shaw/sub2api/internal/config"
|
||||||
|
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSchedulerSnapshotOutboxReplay(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
rdb := testRedis(t)
|
||||||
|
client := testEntClient(t)
|
||||||
|
|
||||||
|
_, _ = integrationDB.ExecContext(ctx, "TRUNCATE scheduler_outbox")
|
||||||
|
|
||||||
|
accountRepo := newAccountRepositoryWithSQL(client, integrationDB)
|
||||||
|
outboxRepo := NewSchedulerOutboxRepository(integrationDB)
|
||||||
|
cache := NewSchedulerCache(rdb)
|
||||||
|
|
||||||
|
cfg := &config.Config{
|
||||||
|
RunMode: config.RunModeStandard,
|
||||||
|
Gateway: config.GatewayConfig{
|
||||||
|
Scheduling: config.GatewaySchedulingConfig{
|
||||||
|
OutboxPollIntervalSeconds: 1,
|
||||||
|
FullRebuildIntervalSeconds: 0,
|
||||||
|
DbFallbackEnabled: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
account := &service.Account{
|
||||||
|
Name: "outbox-replay-" + time.Now().Format("150405.000000"),
|
||||||
|
Platform: service.PlatformOpenAI,
|
||||||
|
Type: service.AccountTypeAPIKey,
|
||||||
|
Status: service.StatusActive,
|
||||||
|
Schedulable: true,
|
||||||
|
Concurrency: 3,
|
||||||
|
Priority: 1,
|
||||||
|
Credentials: map[string]any{},
|
||||||
|
Extra: map[string]any{},
|
||||||
|
}
|
||||||
|
require.NoError(t, accountRepo.Create(ctx, account))
|
||||||
|
|
||||||
|
svc := service.NewSchedulerSnapshotService(cache, outboxRepo, accountRepo, nil, cfg)
|
||||||
|
svc.Start()
|
||||||
|
t.Cleanup(svc.Stop)
|
||||||
|
|
||||||
|
bucket := service.SchedulerBucket{GroupID: 0, Platform: service.PlatformOpenAI, Mode: service.SchedulerModeSingle}
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
accounts, hit, err := cache.GetSnapshot(ctx, bucket)
|
||||||
|
if err != nil || !hit {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for _, acc := range accounts {
|
||||||
|
if acc.ID == account.ID {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}, 5*time.Second, 100*time.Millisecond)
|
||||||
|
|
||||||
|
require.NoError(t, accountRepo.UpdateLastUsed(ctx, account.ID))
|
||||||
|
updated, err := accountRepo.GetByID(ctx, account.ID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, updated.LastUsedAt)
|
||||||
|
expectedUnix := updated.LastUsedAt.Unix()
|
||||||
|
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
cached, err := cache.GetAccount(ctx, account.ID)
|
||||||
|
if err != nil || cached == nil || cached.LastUsedAt == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return cached.LastUsedAt.Unix() == expectedUnix
|
||||||
|
}, 5*time.Second, 100*time.Millisecond)
|
||||||
|
}
|
||||||
@@ -183,23 +183,6 @@
|
|||||||
v-if="streamTimeoutForm.enabled"
|
v-if="streamTimeoutForm.enabled"
|
||||||
class="space-y-4 border-t border-gray-100 pt-4 dark:border-dark-700"
|
class="space-y-4 border-t border-gray-100 pt-4 dark:border-dark-700"
|
||||||
>
|
>
|
||||||
<!-- Timeout Seconds -->
|
|
||||||
<div>
|
|
||||||
<label class="mb-2 block text-sm font-medium text-gray-700 dark:text-gray-300">
|
|
||||||
{{ t('admin.settings.streamTimeout.timeoutSeconds') }}
|
|
||||||
</label>
|
|
||||||
<input
|
|
||||||
v-model.number="streamTimeoutForm.timeout_seconds"
|
|
||||||
type="number"
|
|
||||||
min="30"
|
|
||||||
max="300"
|
|
||||||
class="input w-32"
|
|
||||||
/>
|
|
||||||
<p class="mt-1.5 text-xs text-gray-500 dark:text-gray-400">
|
|
||||||
{{ t('admin.settings.streamTimeout.timeoutSecondsHint') }}
|
|
||||||
</p>
|
|
||||||
</div>
|
|
||||||
|
|
||||||
<!-- Action -->
|
<!-- Action -->
|
||||||
<div>
|
<div>
|
||||||
<label class="mb-2 block text-sm font-medium text-gray-700 dark:text-gray-300">
|
<label class="mb-2 block text-sm font-medium text-gray-700 dark:text-gray-300">
|
||||||
@@ -1000,7 +983,6 @@ const streamTimeoutLoading = ref(true)
|
|||||||
const streamTimeoutSaving = ref(false)
|
const streamTimeoutSaving = ref(false)
|
||||||
const streamTimeoutForm = reactive({
|
const streamTimeoutForm = reactive({
|
||||||
enabled: true,
|
enabled: true,
|
||||||
timeout_seconds: 60,
|
|
||||||
action: 'temp_unsched' as 'temp_unsched' | 'error' | 'none',
|
action: 'temp_unsched' as 'temp_unsched' | 'error' | 'none',
|
||||||
temp_unsched_minutes: 5,
|
temp_unsched_minutes: 5,
|
||||||
threshold_count: 3,
|
threshold_count: 3,
|
||||||
@@ -1314,7 +1296,6 @@ async function saveStreamTimeoutSettings() {
|
|||||||
try {
|
try {
|
||||||
const updated = await adminAPI.settings.updateStreamTimeoutSettings({
|
const updated = await adminAPI.settings.updateStreamTimeoutSettings({
|
||||||
enabled: streamTimeoutForm.enabled,
|
enabled: streamTimeoutForm.enabled,
|
||||||
timeout_seconds: streamTimeoutForm.timeout_seconds,
|
|
||||||
action: streamTimeoutForm.action,
|
action: streamTimeoutForm.action,
|
||||||
temp_unsched_minutes: streamTimeoutForm.temp_unsched_minutes,
|
temp_unsched_minutes: streamTimeoutForm.temp_unsched_minutes,
|
||||||
threshold_count: streamTimeoutForm.threshold_count,
|
threshold_count: streamTimeoutForm.threshold_count,
|
||||||
|
|||||||
Reference in New Issue
Block a user