fix(数据库): 修复默认分组缺失与迁移锁阻塞
通过迁移补种默认 groups 记录,避免新装空分组 迁移锁改为 try lock + 重试并加入超时 写入 usage_logs 时保留 rate_multiplier=0 语义 测试: go test ./...
This commit is contained in:
@@ -5,6 +5,7 @@ package infrastructure
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/Wei-Shaw/sub2api/ent"
|
"github.com/Wei-Shaw/sub2api/ent"
|
||||||
"github.com/Wei-Shaw/sub2api/internal/config"
|
"github.com/Wei-Shaw/sub2api/internal/config"
|
||||||
@@ -54,7 +55,9 @@ func InitEnt(cfg *config.Config) (*ent.Client, *sql.DB, error) {
|
|||||||
// 确保数据库 schema 已准备就绪。
|
// 确保数据库 schema 已准备就绪。
|
||||||
// SQL 迁移文件是 schema 的权威来源(source of truth)。
|
// SQL 迁移文件是 schema 的权威来源(source of truth)。
|
||||||
// 这种方式比 Ent 的自动迁移更可控,支持复杂的迁移场景。
|
// 这种方式比 Ent 的自动迁移更可控,支持复杂的迁移场景。
|
||||||
if err := applyMigrationsFS(context.Background(), drv.DB(), migrations.FS); err != nil {
|
migrationCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := applyMigrationsFS(migrationCtx, drv.DB(), migrations.FS); err != nil {
|
||||||
_ = drv.Close() // 迁移失败时关闭驱动,避免资源泄露
|
_ = drv.Close() // 迁移失败时关闭驱动,避免资源泄露
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
"io/fs"
|
"io/fs"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/Wei-Shaw/sub2api/migrations"
|
"github.com/Wei-Shaw/sub2api/migrations"
|
||||||
)
|
)
|
||||||
@@ -31,6 +32,7 @@ CREATE TABLE IF NOT EXISTS schema_migrations (
|
|||||||
// 在多实例部署场景下,该锁确保同一时间只有一个实例执行迁移。
|
// 在多实例部署场景下,该锁确保同一时间只有一个实例执行迁移。
|
||||||
// 任何稳定的 int64 值都可以,只要不与同一数据库中的其他锁冲突即可。
|
// 任何稳定的 int64 值都可以,只要不与同一数据库中的其他锁冲突即可。
|
||||||
const migrationsAdvisoryLockID int64 = 694208311321144027
|
const migrationsAdvisoryLockID int64 = 694208311321144027
|
||||||
|
const migrationsLockRetryInterval = 500 * time.Millisecond
|
||||||
|
|
||||||
// ApplyMigrations 将嵌入的 SQL 迁移文件应用到指定的数据库。
|
// ApplyMigrations 将嵌入的 SQL 迁移文件应用到指定的数据库。
|
||||||
//
|
//
|
||||||
@@ -166,11 +168,23 @@ func applyMigrationsFS(ctx context.Context, db *sql.DB, fsys fs.FS) error {
|
|||||||
// Advisory Lock 是一种轻量级的锁机制,不与任何特定的数据库对象关联。
|
// Advisory Lock 是一种轻量级的锁机制,不与任何特定的数据库对象关联。
|
||||||
// 它非常适合用于应用层面的分布式锁场景,如迁移序列化。
|
// 它非常适合用于应用层面的分布式锁场景,如迁移序列化。
|
||||||
func pgAdvisoryLock(ctx context.Context, db *sql.DB) error {
|
func pgAdvisoryLock(ctx context.Context, db *sql.DB) error {
|
||||||
_, err := db.ExecContext(ctx, "SELECT pg_advisory_lock($1)", migrationsAdvisoryLockID)
|
ticker := time.NewTicker(migrationsLockRetryInterval)
|
||||||
if err != nil {
|
defer ticker.Stop()
|
||||||
return fmt.Errorf("acquire migrations lock: %w", err)
|
|
||||||
|
for {
|
||||||
|
var locked bool
|
||||||
|
if err := db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1)", migrationsAdvisoryLockID).Scan(&locked); err != nil {
|
||||||
|
return fmt.Errorf("acquire migrations lock: %w", err)
|
||||||
|
}
|
||||||
|
if locked {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return fmt.Errorf("acquire migrations lock: %w", ctx.Err())
|
||||||
|
case <-ticker.C:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// pgAdvisoryUnlock 释放 PostgreSQL Advisory Lock。
|
// pgAdvisoryUnlock 释放 PostgreSQL Advisory Lock。
|
||||||
|
|||||||
@@ -70,9 +70,6 @@ func (r *usageLogRepository) Create(ctx context.Context, log *service.UsageLog)
|
|||||||
}
|
}
|
||||||
|
|
||||||
rateMultiplier := log.RateMultiplier
|
rateMultiplier := log.RateMultiplier
|
||||||
if rateMultiplier == 0 {
|
|
||||||
rateMultiplier = 1
|
|
||||||
}
|
|
||||||
|
|
||||||
query := `
|
query := `
|
||||||
INSERT INTO usage_logs (
|
INSERT INTO usage_logs (
|
||||||
|
|||||||
@@ -260,7 +260,9 @@ func initializeDatabase(cfg *SetupConfig) error {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return infrastructure.ApplyMigrations(context.Background(), db)
|
migrationCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
return infrastructure.ApplyMigrations(migrationCtx, db)
|
||||||
}
|
}
|
||||||
|
|
||||||
func createAdminUser(cfg *SetupConfig) error {
|
func createAdminUser(cfg *SetupConfig) error {
|
||||||
|
|||||||
4
backend/migrations/008_seed_default_group.sql
Normal file
4
backend/migrations/008_seed_default_group.sql
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
-- Seed a default group for fresh installs.
|
||||||
|
INSERT INTO groups (name, description)
|
||||||
|
SELECT 'default', 'Default group'
|
||||||
|
WHERE NOT EXISTS (SELECT 1 FROM groups);
|
||||||
Reference in New Issue
Block a user