From c53a48cde51a144eab74469d7699e51345c8cafe Mon Sep 17 00:00:00 2001 From: CaIon <1808837298@qq.com> Date: Fri, 23 May 2025 01:26:52 +0800 Subject: [PATCH] feat: add panic recovery and retry mechanism for InitChannelCache; improve batch deletion of abilities in FixAbility --- main.go | 19 ++++++++++++++++--- model/ability.go | 47 ++++++++++++++++++++++++++++++++++++----------- model/cache.go | 7 +++++-- model/channel.go | 11 +++++++++++ 4 files changed, 68 insertions(+), 16 deletions(-) diff --git a/main.go b/main.go index 95c6820d..c286650f 100644 --- a/main.go +++ b/main.go @@ -89,9 +89,22 @@ func main() { if common.MemoryCacheEnabled { common.SysLog("memory cache enabled") common.SysError(fmt.Sprintf("sync frequency: %d seconds", common.SyncFrequency)) - model.InitChannelCache() - } - if common.MemoryCacheEnabled { + + // Add panic recovery and retry for InitChannelCache + func() { + defer func() { + if r := recover(); r != nil { + common.SysError(fmt.Sprintf("InitChannelCache panic: %v, retrying once", r)) + // Retry once + _, fixErr := model.FixAbility() + if fixErr != nil { + common.SysError(fmt.Sprintf("InitChannelCache failed: %s", fixErr.Error())) + } + } + }() + model.InitChannelCache() + }() + go model.SyncOptions(common.SyncFrequency) go model.SyncChannelCache(common.SyncFrequency) } diff --git a/model/ability.go b/model/ability.go index 52720307..38b0bd73 100644 --- a/model/ability.go +++ b/model/ability.go @@ -50,7 +50,7 @@ func getPriority(group string, model string, retry int) (int, error) { err := DB.Model(&Ability{}). Select("DISTINCT(priority)"). Where(groupCol+" = ? and model = ? and enabled = "+trueVal, group, model). - Order("priority DESC"). // 按优先级降序排序 + Order("priority DESC"). // 按优先级降序排序 Pluck("priority", &priorities).Error // Pluck用于将查询的结果直接扫描到一个切片中 if err != nil { @@ -261,12 +261,28 @@ func FixAbility() (int, error) { common.SysError(fmt.Sprintf("Get channel ids from channel table failed: %s", err.Error())) return 0, err } - // Delete abilities of channels that are not in channel table - err = DB.Where("channel_id NOT IN (?)", channelIds).Delete(&Ability{}).Error - if err != nil { - common.SysError(fmt.Sprintf("Delete abilities of channels that are not in channel table failed: %s", err.Error())) - return 0, err + + // Delete abilities of channels that are not in channel table - in batches to avoid too many placeholders + if len(channelIds) > 0 { + // Process deletion in chunks to avoid "too many placeholders" error + for _, chunk := range lo.Chunk(channelIds, 100) { + err = DB.Where("channel_id NOT IN (?)", chunk).Delete(&Ability{}).Error + if err != nil { + common.SysError(fmt.Sprintf("Delete abilities of channels (batch) that are not in channel table failed: %s", err.Error())) + return 0, err + } + } + } else { + // If no channels exist, delete all abilities + err = DB.Delete(&Ability{}).Error + if err != nil { + common.SysError(fmt.Sprintf("Delete all abilities failed: %s", err.Error())) + return 0, err + } + common.SysLog("Delete all abilities successfully") + return 0, nil } + common.SysLog(fmt.Sprintf("Delete abilities of channels that are not in channel table successfully, ids: %v", channelIds)) count += len(channelIds) @@ -275,17 +291,26 @@ func FixAbility() (int, error) { err = DB.Table("abilities").Distinct("channel_id").Pluck("channel_id", &abilityChannelIds).Error if err != nil { common.SysError(fmt.Sprintf("Get channel ids from abilities table failed: %s", err.Error())) - return 0, err + return count, err } + var channels []Channel if len(abilityChannelIds) == 0 { err = DB.Find(&channels).Error } else { - err = DB.Where("id NOT IN (?)", abilityChannelIds).Find(&channels).Error - } - if err != nil { - return 0, err + // Process query in chunks to avoid "too many placeholders" error + err = nil + for _, chunk := range lo.Chunk(abilityChannelIds, 100) { + var channelsChunk []Channel + err = DB.Where("id NOT IN (?)", chunk).Find(&channelsChunk).Error + if err != nil { + common.SysError(fmt.Sprintf("Find channels not in abilities table failed: %s", err.Error())) + return count, err + } + channels = append(channels, channelsChunk...) + } } + for _, channel := range channels { err := channel.UpdateAbilities(nil) if err != nil { diff --git a/model/cache.go b/model/cache.go index 2d1c36bf..e2f83e22 100644 --- a/model/cache.go +++ b/model/cache.go @@ -16,6 +16,9 @@ var channelsIDM map[int]*Channel var channelSyncLock sync.RWMutex func InitChannelCache() { + if !common.MemoryCacheEnabled { + return + } newChannelId2channel := make(map[int]*Channel) var channels []*Channel DB.Where("status = ?", common.ChannelStatusEnabled).Find(&channels) @@ -84,11 +87,11 @@ func CacheGetRandomSatisfiedChannel(group string, model string, retry int) (*Cha if !common.MemoryCacheEnabled { return GetRandomSatisfiedChannel(group, model, retry) } - + channelSyncLock.RLock() channels := group2model2channels[group][model] channelSyncLock.RUnlock() - + if len(channels) == 0 { return nil, errors.New("channel not found") } diff --git a/model/channel.go b/model/channel.go index 41e5e371..ed7a0a7e 100644 --- a/model/channel.go +++ b/model/channel.go @@ -46,6 +46,17 @@ func (channel *Channel) GetModels() []string { return strings.Split(strings.Trim(channel.Models, ","), ",") } +func (channel *Channel) GetGroups() []string { + if channel.Group == "" { + return []string{} + } + groups := strings.Split(strings.Trim(channel.Group, ","), ",") + for i, group := range groups { + groups[i] = strings.TrimSpace(group) + } + return groups +} + func (channel *Channel) GetOtherInfo() map[string]interface{} { otherInfo := make(map[string]interface{}) if channel.OtherInfo != "" {