feat(admin): 添加临时不可调度功能
当账号触发特定错误码和关键词匹配时,自动临时禁用调度: 后端: - 新增 TempUnschedCache Redis 缓存层 - RateLimitService 支持规则匹配和状态管理 - 添加 GET/DELETE /accounts/:id/temp-unschedulable API - 数据库迁移添加 temp_unschedulable_until/reason 字段 前端: - 账号状态指示器显示临时不可调度状态 - 新增 TempUnschedStatusModal 详情弹窗 - 创建/编辑账号时支持配置规则和预设模板 - 完整的中英文国际化支持
This commit is contained in:
@@ -43,6 +43,11 @@ type accountRepository struct {
|
||||
sql sqlExecutor // 原生 SQL 执行接口
|
||||
}
|
||||
|
||||
type tempUnschedSnapshot struct {
|
||||
until *time.Time
|
||||
reason string
|
||||
}
|
||||
|
||||
// NewAccountRepository 创建账户仓储实例。
|
||||
// 这是对外暴露的构造函数,返回接口类型以便于依赖注入。
|
||||
func NewAccountRepository(client *dbent.Client, sqlDB *sql.DB) service.AccountRepository {
|
||||
@@ -165,6 +170,11 @@ func (r *accountRepository) GetByIDs(ctx context.Context, ids []int64) ([]*servi
|
||||
accountIDs = append(accountIDs, acc.ID)
|
||||
}
|
||||
|
||||
tempUnschedMap, err := r.loadTempUnschedStates(ctx, accountIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
groupsByAccount, groupIDsByAccount, accountGroupsByAccount, err := r.loadAccountGroups(ctx, accountIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -191,6 +201,10 @@ func (r *accountRepository) GetByIDs(ctx context.Context, ids []int64) ([]*servi
|
||||
if ags, ok := accountGroupsByAccount[entAcc.ID]; ok {
|
||||
out.AccountGroups = ags
|
||||
}
|
||||
if snap, ok := tempUnschedMap[entAcc.ID]; ok {
|
||||
out.TempUnschedulableUntil = snap.until
|
||||
out.TempUnschedulableReason = snap.reason
|
||||
}
|
||||
outByID[entAcc.ID] = out
|
||||
}
|
||||
|
||||
@@ -550,6 +564,7 @@ func (r *accountRepository) ListSchedulable(ctx context.Context) ([]service.Acco
|
||||
Where(
|
||||
dbaccount.StatusEQ(service.StatusActive),
|
||||
dbaccount.SchedulableEQ(true),
|
||||
tempUnschedulablePredicate(),
|
||||
dbaccount.Or(dbaccount.OverloadUntilIsNil(), dbaccount.OverloadUntilLTE(now)),
|
||||
dbaccount.Or(dbaccount.RateLimitResetAtIsNil(), dbaccount.RateLimitResetAtLTE(now)),
|
||||
).
|
||||
@@ -575,6 +590,7 @@ func (r *accountRepository) ListSchedulableByPlatform(ctx context.Context, platf
|
||||
dbaccount.PlatformEQ(platform),
|
||||
dbaccount.StatusEQ(service.StatusActive),
|
||||
dbaccount.SchedulableEQ(true),
|
||||
tempUnschedulablePredicate(),
|
||||
dbaccount.Or(dbaccount.OverloadUntilIsNil(), dbaccount.OverloadUntilLTE(now)),
|
||||
dbaccount.Or(dbaccount.RateLimitResetAtIsNil(), dbaccount.RateLimitResetAtLTE(now)),
|
||||
).
|
||||
@@ -607,6 +623,7 @@ func (r *accountRepository) ListSchedulableByPlatforms(ctx context.Context, plat
|
||||
dbaccount.PlatformIn(platforms...),
|
||||
dbaccount.StatusEQ(service.StatusActive),
|
||||
dbaccount.SchedulableEQ(true),
|
||||
tempUnschedulablePredicate(),
|
||||
dbaccount.Or(dbaccount.OverloadUntilIsNil(), dbaccount.OverloadUntilLTE(now)),
|
||||
dbaccount.Or(dbaccount.RateLimitResetAtIsNil(), dbaccount.RateLimitResetAtLTE(now)),
|
||||
).
|
||||
@@ -648,6 +665,31 @@ func (r *accountRepository) SetOverloaded(ctx context.Context, id int64, until t
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *accountRepository) SetTempUnschedulable(ctx context.Context, id int64, until time.Time, reason string) error {
|
||||
_, err := r.sql.ExecContext(ctx, `
|
||||
UPDATE accounts
|
||||
SET temp_unschedulable_until = $1,
|
||||
temp_unschedulable_reason = $2,
|
||||
updated_at = NOW()
|
||||
WHERE id = $3
|
||||
AND deleted_at IS NULL
|
||||
AND (temp_unschedulable_until IS NULL OR temp_unschedulable_until < $1)
|
||||
`, until, reason, id)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *accountRepository) ClearTempUnschedulable(ctx context.Context, id int64) error {
|
||||
_, err := r.sql.ExecContext(ctx, `
|
||||
UPDATE accounts
|
||||
SET temp_unschedulable_until = NULL,
|
||||
temp_unschedulable_reason = NULL,
|
||||
updated_at = NOW()
|
||||
WHERE id = $1
|
||||
AND deleted_at IS NULL
|
||||
`, id)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *accountRepository) ClearRateLimit(ctx context.Context, id int64) error {
|
||||
_, err := r.client.Account.Update().
|
||||
Where(dbaccount.IDEQ(id)).
|
||||
@@ -808,6 +850,7 @@ func (r *accountRepository) queryAccountsByGroup(ctx context.Context, groupID in
|
||||
now := time.Now()
|
||||
preds = append(preds,
|
||||
dbaccount.SchedulableEQ(true),
|
||||
tempUnschedulablePredicate(),
|
||||
dbaccount.Or(dbaccount.OverloadUntilIsNil(), dbaccount.OverloadUntilLTE(now)),
|
||||
dbaccount.Or(dbaccount.RateLimitResetAtIsNil(), dbaccount.RateLimitResetAtLTE(now)),
|
||||
)
|
||||
@@ -869,6 +912,10 @@ func (r *accountRepository) accountsToService(ctx context.Context, accounts []*d
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tempUnschedMap, err := r.loadTempUnschedStates(ctx, accountIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
groupsByAccount, groupIDsByAccount, accountGroupsByAccount, err := r.loadAccountGroups(ctx, accountIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -894,12 +941,68 @@ func (r *accountRepository) accountsToService(ctx context.Context, accounts []*d
|
||||
if ags, ok := accountGroupsByAccount[acc.ID]; ok {
|
||||
out.AccountGroups = ags
|
||||
}
|
||||
if snap, ok := tempUnschedMap[acc.ID]; ok {
|
||||
out.TempUnschedulableUntil = snap.until
|
||||
out.TempUnschedulableReason = snap.reason
|
||||
}
|
||||
outAccounts = append(outAccounts, *out)
|
||||
}
|
||||
|
||||
return outAccounts, nil
|
||||
}
|
||||
|
||||
func tempUnschedulablePredicate() dbpredicate.Account {
|
||||
return dbpredicate.Account(func(s *entsql.Selector) {
|
||||
col := s.C("temp_unschedulable_until")
|
||||
s.Where(entsql.Or(
|
||||
entsql.IsNull(col),
|
||||
entsql.LTE(col, entsql.Expr("NOW()")),
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
func (r *accountRepository) loadTempUnschedStates(ctx context.Context, accountIDs []int64) (map[int64]tempUnschedSnapshot, error) {
|
||||
out := make(map[int64]tempUnschedSnapshot)
|
||||
if len(accountIDs) == 0 {
|
||||
return out, nil
|
||||
}
|
||||
|
||||
rows, err := r.sql.QueryContext(ctx, `
|
||||
SELECT id, temp_unschedulable_until, temp_unschedulable_reason
|
||||
FROM accounts
|
||||
WHERE id = ANY($1)
|
||||
`, pq.Array(accountIDs))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var id int64
|
||||
var until sql.NullTime
|
||||
var reason sql.NullString
|
||||
if err := rows.Scan(&id, &until, &reason); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var untilPtr *time.Time
|
||||
if until.Valid {
|
||||
tmp := until.Time
|
||||
untilPtr = &tmp
|
||||
}
|
||||
if reason.Valid {
|
||||
out[id] = tempUnschedSnapshot{until: untilPtr, reason: reason.String}
|
||||
} else {
|
||||
out[id] = tempUnschedSnapshot{until: untilPtr, reason: ""}
|
||||
}
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (r *accountRepository) loadProxies(ctx context.Context, proxyIDs []int64) (map[int64]*service.Proxy, error) {
|
||||
proxyMap := make(map[int64]*service.Proxy)
|
||||
if len(proxyIDs) == 0 {
|
||||
|
||||
91
backend/internal/repository/temp_unsched_cache.go
Normal file
91
backend/internal/repository/temp_unsched_cache.go
Normal file
@@ -0,0 +1,91 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
const tempUnschedPrefix = "temp_unsched:account:"
|
||||
|
||||
var tempUnschedSetScript = redis.NewScript(`
|
||||
local key = KEYS[1]
|
||||
local new_until = tonumber(ARGV[1])
|
||||
local new_value = ARGV[2]
|
||||
local new_ttl = tonumber(ARGV[3])
|
||||
|
||||
local existing = redis.call('GET', key)
|
||||
if existing then
|
||||
local ok, existing_data = pcall(cjson.decode, existing)
|
||||
if ok and existing_data and existing_data.until_unix then
|
||||
local existing_until = tonumber(existing_data.until_unix)
|
||||
if existing_until and new_until <= existing_until then
|
||||
return 0
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
redis.call('SET', key, new_value, 'EX', new_ttl)
|
||||
return 1
|
||||
`)
|
||||
|
||||
type tempUnschedCache struct {
|
||||
rdb *redis.Client
|
||||
}
|
||||
|
||||
func NewTempUnschedCache(rdb *redis.Client) service.TempUnschedCache {
|
||||
return &tempUnschedCache{rdb: rdb}
|
||||
}
|
||||
|
||||
// SetTempUnsched 设置临时不可调度状态(只延长不缩短)
|
||||
func (c *tempUnschedCache) SetTempUnsched(ctx context.Context, accountID int64, state *service.TempUnschedState) error {
|
||||
key := fmt.Sprintf("%s%d", tempUnschedPrefix, accountID)
|
||||
|
||||
stateJSON, err := json.Marshal(state)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal state: %w", err)
|
||||
}
|
||||
|
||||
ttl := time.Until(time.Unix(state.UntilUnix, 0))
|
||||
if ttl <= 0 {
|
||||
return nil // 已过期,不设置
|
||||
}
|
||||
|
||||
ttlSeconds := int(ttl.Seconds())
|
||||
if ttlSeconds < 1 {
|
||||
ttlSeconds = 1
|
||||
}
|
||||
|
||||
_, err = tempUnschedSetScript.Run(ctx, c.rdb, []string{key}, state.UntilUnix, string(stateJSON), ttlSeconds).Result()
|
||||
return err
|
||||
}
|
||||
|
||||
// GetTempUnsched 获取临时不可调度状态
|
||||
func (c *tempUnschedCache) GetTempUnsched(ctx context.Context, accountID int64) (*service.TempUnschedState, error) {
|
||||
key := fmt.Sprintf("%s%d", tempUnschedPrefix, accountID)
|
||||
|
||||
val, err := c.rdb.Get(ctx, key).Result()
|
||||
if err == redis.Nil {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var state service.TempUnschedState
|
||||
if err := json.Unmarshal([]byte(val), &state); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal state: %w", err)
|
||||
}
|
||||
|
||||
return &state, nil
|
||||
}
|
||||
|
||||
// DeleteTempUnsched 删除临时不可调度状态
|
||||
func (c *tempUnschedCache) DeleteTempUnsched(ctx context.Context, accountID int64) error {
|
||||
key := fmt.Sprintf("%s%d", tempUnschedPrefix, accountID)
|
||||
return c.rdb.Del(ctx, key).Err()
|
||||
}
|
||||
Reference in New Issue
Block a user