feat(scheduler): 引入调度快照缓存与 outbox 回放
- 调度热路径优先读 Redis 快照,保留分组排序语义 - outbox 回放 + 全量重建纠偏,失败重试不推进水位 - 自动 Atlas 基线对齐并同步调度配置示例
This commit is contained in:
@@ -28,6 +28,23 @@ CREATE TABLE IF NOT EXISTS schema_migrations (
|
||||
);
|
||||
`
|
||||
|
||||
const atlasSchemaRevisionsTableDDL = `
|
||||
CREATE TABLE IF NOT EXISTS atlas_schema_revisions (
|
||||
version TEXT PRIMARY KEY,
|
||||
description TEXT NOT NULL,
|
||||
type INTEGER NOT NULL,
|
||||
applied INTEGER NOT NULL DEFAULT 0,
|
||||
total INTEGER NOT NULL DEFAULT 0,
|
||||
executed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
execution_time BIGINT NOT NULL DEFAULT 0,
|
||||
error TEXT NULL,
|
||||
error_stmt TEXT NULL,
|
||||
hash TEXT NOT NULL DEFAULT '',
|
||||
partial_hashes TEXT[] NULL,
|
||||
operator_version TEXT NULL
|
||||
);
|
||||
`
|
||||
|
||||
// migrationsAdvisoryLockID 是用于序列化迁移操作的 PostgreSQL Advisory Lock ID。
|
||||
// 在多实例部署场景下,该锁确保同一时间只有一个实例执行迁移。
|
||||
// 任何稳定的 int64 值都可以,只要不与同一数据库中的其他锁冲突即可。
|
||||
@@ -94,6 +111,11 @@ func applyMigrationsFS(ctx context.Context, db *sql.DB, fsys fs.FS) error {
|
||||
return fmt.Errorf("create schema_migrations: %w", err)
|
||||
}
|
||||
|
||||
// 自动对齐 Atlas 基线(如果检测到 legacy schema_migrations 且缺失 atlas_schema_revisions)。
|
||||
if err := ensureAtlasBaselineAligned(ctx, db, fsys); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 获取所有 .sql 迁移文件并按文件名排序。
|
||||
// 命名规范:使用零填充数字前缀(如 001_init.sql, 002_add_users.sql)。
|
||||
files, err := fs.Glob(fsys, "*.sql")
|
||||
@@ -172,6 +194,80 @@ func applyMigrationsFS(ctx context.Context, db *sql.DB, fsys fs.FS) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func ensureAtlasBaselineAligned(ctx context.Context, db *sql.DB, fsys fs.FS) error {
|
||||
hasLegacy, err := tableExists(ctx, db, "schema_migrations")
|
||||
if err != nil {
|
||||
return fmt.Errorf("check schema_migrations: %w", err)
|
||||
}
|
||||
if !hasLegacy {
|
||||
return nil
|
||||
}
|
||||
|
||||
hasAtlas, err := tableExists(ctx, db, "atlas_schema_revisions")
|
||||
if err != nil {
|
||||
return fmt.Errorf("check atlas_schema_revisions: %w", err)
|
||||
}
|
||||
if !hasAtlas {
|
||||
if _, err := db.ExecContext(ctx, atlasSchemaRevisionsTableDDL); err != nil {
|
||||
return fmt.Errorf("create atlas_schema_revisions: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
var count int
|
||||
if err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM atlas_schema_revisions").Scan(&count); err != nil {
|
||||
return fmt.Errorf("count atlas_schema_revisions: %w", err)
|
||||
}
|
||||
if count > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
version, description, hash, err := latestMigrationBaseline(fsys)
|
||||
if err != nil {
|
||||
return fmt.Errorf("atlas baseline version: %w", err)
|
||||
}
|
||||
|
||||
if _, err := db.ExecContext(ctx, `
|
||||
INSERT INTO atlas_schema_revisions (version, description, type, applied, total, executed_at, execution_time, hash)
|
||||
VALUES ($1, $2, $3, 0, 0, NOW(), 0, $4)
|
||||
`, version, description, 1, hash); err != nil {
|
||||
return fmt.Errorf("insert atlas baseline: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func tableExists(ctx context.Context, db *sql.DB, tableName string) (bool, error) {
|
||||
var exists bool
|
||||
err := db.QueryRowContext(ctx, `
|
||||
SELECT EXISTS (
|
||||
SELECT 1
|
||||
FROM information_schema.tables
|
||||
WHERE table_schema = 'public' AND table_name = $1
|
||||
)
|
||||
`, tableName).Scan(&exists)
|
||||
return exists, err
|
||||
}
|
||||
|
||||
func latestMigrationBaseline(fsys fs.FS) (string, string, string, error) {
|
||||
files, err := fs.Glob(fsys, "*.sql")
|
||||
if err != nil {
|
||||
return "", "", "", err
|
||||
}
|
||||
if len(files) == 0 {
|
||||
return "baseline", "baseline", "", nil
|
||||
}
|
||||
sort.Strings(files)
|
||||
name := files[len(files)-1]
|
||||
contentBytes, err := fs.ReadFile(fsys, name)
|
||||
if err != nil {
|
||||
return "", "", "", err
|
||||
}
|
||||
content := strings.TrimSpace(string(contentBytes))
|
||||
sum := sha256.Sum256([]byte(content))
|
||||
hash := hex.EncodeToString(sum[:])
|
||||
version := strings.TrimSuffix(name, ".sql")
|
||||
return version, version, hash, nil
|
||||
}
|
||||
|
||||
// pgAdvisoryLock 获取 PostgreSQL Advisory Lock。
|
||||
// Advisory Lock 是一种轻量级的锁机制,不与任何特定的数据库对象关联。
|
||||
// 它非常适合用于应用层面的分布式锁场景,如迁移序列化。
|
||||
|
||||
Reference in New Issue
Block a user