Files
xinghuoapi/backend/internal/service/scheduler_cache.go
yangjianbo 3141aa5144 feat(scheduler): 引入调度快照缓存与 outbox 回放
- 调度热路径优先读 Redis 快照,保留分组排序语义
- outbox 回放 + 全量重建纠偏,失败重试不推进水位
- 自动 Atlas 基线对齐并同步调度配置示例
2026-01-12 14:19:06 +08:00

69 lines
2.1 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"context"
"fmt"
"strconv"
"strings"
"time"
)
const (
SchedulerModeSingle = "single"
SchedulerModeMixed = "mixed"
SchedulerModeForced = "forced"
)
type SchedulerBucket struct {
GroupID int64
Platform string
Mode string
}
func (b SchedulerBucket) String() string {
return fmt.Sprintf("%d:%s:%s", b.GroupID, b.Platform, b.Mode)
}
func ParseSchedulerBucket(raw string) (SchedulerBucket, bool) {
parts := strings.Split(raw, ":")
if len(parts) != 3 {
return SchedulerBucket{}, false
}
groupID, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil {
return SchedulerBucket{}, false
}
if parts[1] == "" || parts[2] == "" {
return SchedulerBucket{}, false
}
return SchedulerBucket{
GroupID: groupID,
Platform: parts[1],
Mode: parts[2],
}, true
}
// SchedulerCache 负责调度快照与账号快照的缓存读写。
type SchedulerCache interface {
// GetSnapshot 读取快照并返回命中与否ready + active + 数据完整)。
GetSnapshot(ctx context.Context, bucket SchedulerBucket) ([]*Account, bool, error)
// SetSnapshot 写入快照并切换激活版本。
SetSnapshot(ctx context.Context, bucket SchedulerBucket, accounts []Account) error
// GetAccount 获取单账号快照。
GetAccount(ctx context.Context, accountID int64) (*Account, error)
// SetAccount 写入单账号快照(包含不可调度状态)。
SetAccount(ctx context.Context, account *Account) error
// DeleteAccount 删除单账号快照。
DeleteAccount(ctx context.Context, accountID int64) error
// UpdateLastUsed 批量更新账号的最后使用时间。
UpdateLastUsed(ctx context.Context, updates map[int64]time.Time) error
// TryLockBucket 尝试获取分桶重建锁。
TryLockBucket(ctx context.Context, bucket SchedulerBucket, ttl time.Duration) (bool, error)
// ListBuckets 返回已注册的分桶集合。
ListBuckets(ctx context.Context) ([]SchedulerBucket, error)
// GetOutboxWatermark 读取 outbox 水位。
GetOutboxWatermark(ctx context.Context) (int64, error)
// SetOutboxWatermark 保存 outbox 水位。
SetOutboxWatermark(ctx context.Context, id int64) error
}