fix(migrations): defer online ddl follow-ups safely

This commit is contained in:
IanShaw027
2026-04-22 11:17:45 +08:00
parent ca1f30a911
commit 18481a100b
8 changed files with 173 additions and 37 deletions

View File

@@ -0,0 +1,73 @@
package migrate
import (
"testing"
"entgo.io/ent/dialect/entsql"
entschema "entgo.io/ent/dialect/sql/schema"
"github.com/stretchr/testify/require"
)
func TestAuthIdentityFoundationForeignKeyOnDeleteActions(t *testing.T) {
require.Equal(
t,
entschema.Cascade,
findForeignKeyBySymbol(t, AuthIdentitiesTable, "auth_identities_users_auth_identities").OnDelete,
)
require.Equal(
t,
entschema.Cascade,
findForeignKeyBySymbol(t, AuthIdentityChannelsTable, "auth_identity_channels_auth_identities_channels").OnDelete,
)
require.Equal(
t,
entschema.Cascade,
findForeignKeyBySymbol(t, IdentityAdoptionDecisionsTable, "identity_adoption_decisions_pending_auth_sessions_adoption_decision").OnDelete,
)
require.Equal(
t,
entschema.SetNull,
findForeignKeyBySymbol(t, PendingAuthSessionsTable, "pending_auth_sessions_users_pending_auth_sessions").OnDelete,
)
require.Equal(
t,
entschema.SetNull,
findForeignKeyBySymbol(t, IdentityAdoptionDecisionsTable, "identity_adoption_decisions_auth_identities_adoption_decisions").OnDelete,
)
}
func TestPaymentOrdersOutTradeNoPartialUniqueIndex(t *testing.T) {
idx := findIndexByName(t, PaymentOrdersTable, "paymentorder_out_trade_no")
require.True(t, idx.Unique)
require.Len(t, idx.Columns, 1)
require.Equal(t, "out_trade_no", idx.Columns[0].Name)
require.NotNil(t, idx.Annotation)
require.Equal(t, (&entsql.IndexAnnotation{Where: "out_trade_no <> ''"}).Where, idx.Annotation.Where)
}
func findForeignKeyBySymbol(t *testing.T, table *entschema.Table, symbol string) *entschema.ForeignKey {
t.Helper()
for _, fk := range table.ForeignKeys {
if fk.Symbol == symbol {
return fk
}
}
require.Failf(t, "missing foreign key", "table %s should include foreign key %s", table.Name, symbol)
return nil
}
func findIndexByName(t *testing.T, table *entschema.Table, name string) *entschema.Index {
t.Helper()
for _, idx := range table.Indexes {
if idx.Name == name {
return idx
}
}
require.Failf(t, "missing index", "table %s should include index %s", table.Name, name)
return nil
}

View File

@@ -55,30 +55,17 @@ const nonTransactionalMigrationSuffix = "_notx.sql"
type migrationChecksumCompatibilityRule struct { type migrationChecksumCompatibilityRule struct {
fileChecksum string fileChecksum string
acceptedDBChecksum map[string]struct{} acceptedDBChecksum map[string]struct{}
acceptedChecksums map[string]struct{}
} }
// migrationChecksumCompatibilityRules 仅用于兼容历史上误修改过的迁移文件 checksum。 // migrationChecksumCompatibilityRules 仅用于兼容历史上误修改过的迁移文件 checksum。
// 规则必须同时匹配「迁移名 + 当前文件 checksum + 历史库 checksum」才会放行避免放宽全局校验。 // 规则必须同时匹配「迁移名 + 数据库 checksum + 当前文件 checksum」且两者都落在该迁移的已知版本集合内才会放行,
// 避免放宽全局校验,也允许将误改的历史 migration 回滚为已发布版本而不要求人工修 checksum。
var migrationChecksumCompatibilityRules = map[string]migrationChecksumCompatibilityRule{ var migrationChecksumCompatibilityRules = map[string]migrationChecksumCompatibilityRule{
"054_drop_legacy_cache_columns.sql": { "054_drop_legacy_cache_columns.sql": newMigrationChecksumCompatibilityRule("82de761156e03876653e7a6a4eee883cd927847036f779b0b9f34c42a8af7a7d", "182c193f3359946cf094090cd9e57d5c3fd9abaffbc1e8fc378646b8a6fa12b4"),
fileChecksum: "82de761156e03876653e7a6a4eee883cd927847036f779b0b9f34c42a8af7a7d", "061_add_usage_log_request_type.sql": newMigrationChecksumCompatibilityRule("66207e7aa5dd0429c2e2c0fabdaf79783ff157fa0af2e81adff2ee03790ec65c", "08a248652cbab7cfde147fc6ef8cda464f2477674e20b718312faa252e0481c0", "222b4a09c797c22e5922b6b172327c824f5463aaa8760e4f621bc5c22e2be0f3"),
acceptedDBChecksum: map[string]struct{}{ "109_auth_identity_compat_backfill.sql": newMigrationChecksumCompatibilityRule("2b380305e73ff0c13aa8c811e45897f2b36ca4a438f7b3e8f98e19ecb6bae0b3", "551e498aa5616d2d91096e9d72cf9fb36e418ee22eacc557f8811cadbc9e20ee"),
"182c193f3359946cf094090cd9e57d5c3fd9abaffbc1e8fc378646b8a6fa12b4": {}, "119_enforce_payment_orders_out_trade_no_unique.sql": newMigrationChecksumCompatibilityRule("0bbe809ae48a9d811dabda1ba1c74955bd71c4a9cc610f9128816818dfa6c11e", "ebd2c67cce0116393fb4f1b5d5116a67c6aceb73820dfb5133d1ff6f36d72d34"),
},
},
"061_add_usage_log_request_type.sql": {
fileChecksum: "66207e7aa5dd0429c2e2c0fabdaf79783ff157fa0af2e81adff2ee03790ec65c",
acceptedDBChecksum: map[string]struct{}{
"08a248652cbab7cfde147fc6ef8cda464f2477674e20b718312faa252e0481c0": {},
"222b4a09c797c22e5922b6b172327c824f5463aaa8760e4f621bc5c22e2be0f3": {},
},
},
"109_auth_identity_compat_backfill.sql": {
fileChecksum: "551e498aa5616d2d91096e9d72cf9fb36e418ee22eacc557f8811cadbc9e20ee",
acceptedDBChecksum: map[string]struct{}{
"2b380305e73ff0c13aa8c811e45897f2b36ca4a438f7b3e8f98e19ecb6bae0b3": {},
},
},
} }
// ApplyMigrations 将嵌入的 SQL 迁移文件应用到指定的数据库。 // ApplyMigrations 将嵌入的 SQL 迁移文件应用到指定的数据库。
@@ -328,16 +315,33 @@ func latestMigrationBaseline(fsys fs.FS) (string, string, string, error) {
return version, version, hash, nil return version, version, hash, nil
} }
func checksumSet(values ...string) map[string]struct{} {
out := make(map[string]struct{}, len(values))
for _, value := range values {
out[value] = struct{}{}
}
return out
}
func newMigrationChecksumCompatibilityRule(fileChecksum string, acceptedDBChecksums ...string) migrationChecksumCompatibilityRule {
return migrationChecksumCompatibilityRule{
fileChecksum: fileChecksum,
acceptedDBChecksum: checksumSet(acceptedDBChecksums...),
acceptedChecksums: checksumSet(append([]string{fileChecksum}, acceptedDBChecksums...)...),
}
}
func isMigrationChecksumCompatible(name, dbChecksum, fileChecksum string) bool { func isMigrationChecksumCompatible(name, dbChecksum, fileChecksum string) bool {
rule, ok := migrationChecksumCompatibilityRules[name] rule, ok := migrationChecksumCompatibilityRules[name]
if !ok { if !ok {
return false return false
} }
if rule.fileChecksum != fileChecksum { _, dbOK := rule.acceptedChecksums[dbChecksum]
if !dbOK {
return false return false
} }
_, ok = rule.acceptedDBChecksum[dbChecksum] _, fileOK := rule.acceptedChecksums[fileChecksum]
return ok return fileOK
} }
func validateMigrationExecutionMode(name, content string) (bool, error) { func validateMigrationExecutionMode(name, content string) (bool, error) {

View File

@@ -60,4 +60,31 @@ func TestIsMigrationChecksumCompatible(t *testing.T) {
) )
require.True(t, ok) require.True(t, ok)
}) })
t.Run("109回滚到历史文件后仍兼容已应用的新checksum", func(t *testing.T) {
ok := isMigrationChecksumCompatible(
"109_auth_identity_compat_backfill.sql",
"551e498aa5616d2d91096e9d72cf9fb36e418ee22eacc557f8811cadbc9e20ee",
"2b380305e73ff0c13aa8c811e45897f2b36ca4a438f7b3e8f98e19ecb6bae0b3",
)
require.True(t, ok)
})
t.Run("119历史checksum可兼容占位文件", func(t *testing.T) {
ok := isMigrationChecksumCompatible(
"119_enforce_payment_orders_out_trade_no_unique.sql",
"ebd2c67cce0116393fb4f1b5d5116a67c6aceb73820dfb5133d1ff6f36d72d34",
"0bbe809ae48a9d811dabda1ba1c74955bd71c4a9cc610f9128816818dfa6c11e",
)
require.True(t, ok)
})
t.Run("119未知checksum不兼容", func(t *testing.T) {
ok := isMigrationChecksumCompatible(
"119_enforce_payment_orders_out_trade_no_unique.sql",
"ebd2c67cce0116393fb4f1b5d5116a67c6aceb73820dfb5133d1ff6f36d72d34",
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
)
require.False(t, ok)
})
} }

View File

@@ -1,6 +1,3 @@
ALTER TABLE auth_identity_migration_reports
ALTER COLUMN report_type TYPE VARCHAR(80);
INSERT INTO auth_identities ( INSERT INTO auth_identities (
user_id, user_id,
provider_type, provider_type,

View File

@@ -1,7 +1,6 @@
-- Replace the legacy non-unique index with a partial unique index. -- Intentionally left as a no-op.
-- Keep empty-string legacy rows compatible while enforcing uniqueness for real order IDs. -- The online index rollout lives in 120_enforce_payment_orders_out_trade_no_unique_notx.sql
DROP INDEX IF EXISTS paymentorder_out_trade_no; DO $$
BEGIN
CREATE UNIQUE INDEX IF NOT EXISTS paymentorder_out_trade_no NULL;
ON payment_orders (out_trade_no) END $$;
WHERE out_trade_no <> '';

View File

@@ -0,0 +1,10 @@
-- Build the payment order uniqueness guarantee online.
-- Create the new partial unique index concurrently first so writes keep flowing,
-- then remove the legacy index name once the replacement is ready.
DROP INDEX CONCURRENTLY IF EXISTS paymentorder_out_trade_no_unique;
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS paymentorder_out_trade_no_unique
ON payment_orders (out_trade_no)
WHERE out_trade_no <> '';
DROP INDEX CONCURRENTLY IF EXISTS paymentorder_out_trade_no;

View File

@@ -0,0 +1,2 @@
ALTER TABLE auth_identity_migration_reports
ALTER COLUMN report_type TYPE VARCHAR(80);

View File

@@ -26,12 +26,36 @@ func TestMigration118DoesNotForceOverwriteAuthSourceGrantDefaults(t *testing.T)
require.True(t, strings.Contains(sql, "ON CONFLICT (key) DO NOTHING")) require.True(t, strings.Contains(sql, "ON CONFLICT (key) DO NOTHING"))
} }
func TestMigration119EnforcesOutTradeNoPartialUniqueIndex(t *testing.T) { func TestMigration109KeepsPublishedBackfillBodyAndDefersReportTypeWidening(t *testing.T) {
content, err := FS.ReadFile("109_auth_identity_compat_backfill.sql")
require.NoError(t, err)
sql := string(content)
require.NotContains(t, sql, "ALTER TABLE auth_identity_migration_reports")
followupContent, err := FS.ReadFile("121_auth_identity_migration_report_type_widen.sql")
require.NoError(t, err)
followupSQL := string(followupContent)
require.Contains(t, followupSQL, "ALTER TABLE auth_identity_migration_reports")
require.Contains(t, followupSQL, "ALTER COLUMN report_type TYPE VARCHAR(80)")
}
func TestMigration119DefersPaymentIndexRolloutToOnlineFollowup(t *testing.T) {
content, err := FS.ReadFile("119_enforce_payment_orders_out_trade_no_unique.sql") content, err := FS.ReadFile("119_enforce_payment_orders_out_trade_no_unique.sql")
require.NoError(t, err) require.NoError(t, err)
sql := string(content) sql := string(content)
require.Contains(t, sql, "DROP INDEX IF EXISTS paymentorder_out_trade_no") require.Contains(t, sql, "120_enforce_payment_orders_out_trade_no_unique_notx.sql")
require.Contains(t, sql, "CREATE UNIQUE INDEX IF NOT EXISTS paymentorder_out_trade_no") require.Contains(t, sql, "NULL;")
require.Contains(t, sql, "WHERE out_trade_no <> ''") require.NotContains(t, sql, "CREATE UNIQUE INDEX")
require.NotContains(t, sql, "DROP INDEX")
followupContent, err := FS.ReadFile("120_enforce_payment_orders_out_trade_no_unique_notx.sql")
require.NoError(t, err)
followupSQL := string(followupContent)
require.Contains(t, followupSQL, "CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS paymentorder_out_trade_no_unique")
require.Contains(t, followupSQL, "DROP INDEX CONCURRENTLY IF EXISTS paymentorder_out_trade_no")
require.Contains(t, followupSQL, "WHERE out_trade_no <> ''")
} }