Merge pull request #731 from xvhuan/fix/061-bounded-backfill-startup
fix(migrations): 061 迁移改为限时分批回填,避免启动阻塞导致 502
This commit is contained in:
@@ -66,6 +66,12 @@ var migrationChecksumCompatibilityRules = map[string]migrationChecksumCompatibil
|
|||||||
"182c193f3359946cf094090cd9e57d5c3fd9abaffbc1e8fc378646b8a6fa12b4": {},
|
"182c193f3359946cf094090cd9e57d5c3fd9abaffbc1e8fc378646b8a6fa12b4": {},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"061_add_usage_log_request_type.sql": {
|
||||||
|
fileChecksum: "97bdd9a32d921986f74a0231ab90735567a9234fb7062f4d9d1baf108ba59769",
|
||||||
|
acceptedDBChecksum: map[string]struct{}{
|
||||||
|
"08a248652cbab7cfde147fc6ef8cda464f2477674e20b718312faa252e0481c0": {},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// ApplyMigrations 将嵌入的 SQL 迁移文件应用到指定的数据库。
|
// ApplyMigrations 将嵌入的 SQL 迁移文件应用到指定的数据库。
|
||||||
|
|||||||
@@ -25,6 +25,15 @@ func TestIsMigrationChecksumCompatible(t *testing.T) {
|
|||||||
require.False(t, ok)
|
require.False(t, ok)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("061历史checksum可兼容", func(t *testing.T) {
|
||||||
|
ok := isMigrationChecksumCompatible(
|
||||||
|
"061_add_usage_log_request_type.sql",
|
||||||
|
"08a248652cbab7cfde147fc6ef8cda464f2477674e20b718312faa252e0481c0",
|
||||||
|
"97bdd9a32d921986f74a0231ab90735567a9234fb7062f4d9d1baf108ba59769",
|
||||||
|
)
|
||||||
|
require.True(t, ok)
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("非白名单迁移不兼容", func(t *testing.T) {
|
t.Run("非白名单迁移不兼容", func(t *testing.T) {
|
||||||
ok := isMigrationChecksumCompatible(
|
ok := isMigrationChecksumCompatible(
|
||||||
"001_init.sql",
|
"001_init.sql",
|
||||||
|
|||||||
@@ -19,11 +19,47 @@ $$;
|
|||||||
CREATE INDEX IF NOT EXISTS idx_usage_logs_request_type_created_at
|
CREATE INDEX IF NOT EXISTS idx_usage_logs_request_type_created_at
|
||||||
ON usage_logs (request_type, created_at);
|
ON usage_logs (request_type, created_at);
|
||||||
|
|
||||||
-- Backfill from legacy fields. openai_ws_mode has higher priority than stream.
|
-- Backfill from legacy fields in bounded batches.
|
||||||
UPDATE usage_logs
|
-- Why bounded:
|
||||||
|
-- 1) Full-table UPDATE on large usage_logs can block startup for a long time.
|
||||||
|
-- 2) request_type=0 rows remain query-compatible via legacy fallback logic
|
||||||
|
-- (stream/openai_ws_mode) in repository filters.
|
||||||
|
-- 3) Subsequent writes will use explicit request_type and gradually dilute
|
||||||
|
-- historical unknown rows.
|
||||||
|
--
|
||||||
|
-- openai_ws_mode has higher priority than stream.
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_rows INTEGER := 0;
|
||||||
|
v_total_rows INTEGER := 0;
|
||||||
|
v_batch_size INTEGER := 5000;
|
||||||
|
v_started_at TIMESTAMPTZ := clock_timestamp();
|
||||||
|
v_max_duration INTERVAL := INTERVAL '8 seconds';
|
||||||
|
BEGIN
|
||||||
|
LOOP
|
||||||
|
WITH batch AS (
|
||||||
|
SELECT id
|
||||||
|
FROM usage_logs
|
||||||
|
WHERE request_type = 0
|
||||||
|
ORDER BY id
|
||||||
|
LIMIT v_batch_size
|
||||||
|
)
|
||||||
|
UPDATE usage_logs ul
|
||||||
SET request_type = CASE
|
SET request_type = CASE
|
||||||
WHEN openai_ws_mode = TRUE THEN 3
|
WHEN ul.openai_ws_mode = TRUE THEN 3
|
||||||
WHEN stream = TRUE THEN 2
|
WHEN ul.stream = TRUE THEN 2
|
||||||
ELSE 1
|
ELSE 1
|
||||||
END
|
END
|
||||||
WHERE request_type = 0;
|
FROM batch
|
||||||
|
WHERE ul.id = batch.id;
|
||||||
|
|
||||||
|
GET DIAGNOSTICS v_rows = ROW_COUNT;
|
||||||
|
EXIT WHEN v_rows = 0;
|
||||||
|
|
||||||
|
v_total_rows := v_total_rows + v_rows;
|
||||||
|
EXIT WHEN clock_timestamp() - v_started_at >= v_max_duration;
|
||||||
|
END LOOP;
|
||||||
|
|
||||||
|
RAISE NOTICE 'usage_logs.request_type startup backfill rows=%', v_total_rows;
|
||||||
|
END
|
||||||
|
$$;
|
||||||
|
|||||||
Reference in New Issue
Block a user