feat(subscription): 有界队列执行维护并改进鉴权解析
This commit is contained in:
75
backend/internal/service/subscription_maintenance_queue.go
Normal file
75
backend/internal/service/subscription_maintenance_queue.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// SubscriptionMaintenanceQueue 提供“有界队列 + 固定 worker”的后台执行器。
|
||||
// 用于从请求热路径触发维护动作时,避免无限 goroutine 膨胀。
|
||||
type SubscriptionMaintenanceQueue struct {
|
||||
queue chan func()
|
||||
wg sync.WaitGroup
|
||||
stop sync.Once
|
||||
}
|
||||
|
||||
func NewSubscriptionMaintenanceQueue(workerCount, queueSize int) *SubscriptionMaintenanceQueue {
|
||||
if workerCount <= 0 {
|
||||
workerCount = 1
|
||||
}
|
||||
if queueSize <= 0 {
|
||||
queueSize = 1
|
||||
}
|
||||
|
||||
q := &SubscriptionMaintenanceQueue{
|
||||
queue: make(chan func(), queueSize),
|
||||
}
|
||||
|
||||
q.wg.Add(workerCount)
|
||||
for i := 0; i < workerCount; i++ {
|
||||
go func(workerID int) {
|
||||
defer q.wg.Done()
|
||||
for fn := range q.queue {
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("SubscriptionMaintenance worker panic: %v", r)
|
||||
}
|
||||
}()
|
||||
fn()
|
||||
}()
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
return q
|
||||
}
|
||||
|
||||
// TryEnqueue 尝试将任务入队。
|
||||
// 当队列已满时返回 error(调用方应该选择跳过并记录告警/限频日志)。
|
||||
func (q *SubscriptionMaintenanceQueue) TryEnqueue(task func()) error {
|
||||
if q == nil {
|
||||
return fmt.Errorf("maintenance queue is nil")
|
||||
}
|
||||
if task == nil {
|
||||
return fmt.Errorf("maintenance task is nil")
|
||||
}
|
||||
|
||||
select {
|
||||
case q.queue <- task:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("maintenance queue full")
|
||||
}
|
||||
}
|
||||
|
||||
func (q *SubscriptionMaintenanceQueue) Stop() {
|
||||
if q == nil {
|
||||
return
|
||||
}
|
||||
q.stop.Do(func() {
|
||||
close(q.queue)
|
||||
q.wg.Wait()
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
//go:build unit
|
||||
|
||||
package service
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSubscriptionMaintenanceQueue_TryEnqueue_QueueFull(t *testing.T) {
|
||||
q := NewSubscriptionMaintenanceQueue(1, 1)
|
||||
t.Cleanup(q.Stop)
|
||||
|
||||
block := make(chan struct{})
|
||||
var started atomic.Int32
|
||||
|
||||
require.NoError(t, q.TryEnqueue(func() {
|
||||
started.Store(1)
|
||||
<-block
|
||||
}))
|
||||
|
||||
// Wait until worker started consuming the first task.
|
||||
require.Eventually(t, func() bool { return started.Load() == 1 }, time.Second, 10*time.Millisecond)
|
||||
|
||||
// Queue size is 1; with the worker blocked, enqueueing one more should fill it.
|
||||
require.NoError(t, q.TryEnqueue(func() {}))
|
||||
|
||||
// Now the queue is full; next enqueue must fail.
|
||||
err := q.TryEnqueue(func() {})
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "full")
|
||||
|
||||
close(block)
|
||||
}
|
||||
|
||||
func TestSubscriptionMaintenanceQueue_TryEnqueue_PanicDoesNotKillWorker(t *testing.T) {
|
||||
q := NewSubscriptionMaintenanceQueue(1, 8)
|
||||
t.Cleanup(q.Stop)
|
||||
|
||||
require.NoError(t, q.TryEnqueue(func() { panic("boom") }))
|
||||
|
||||
done := make(chan struct{})
|
||||
require.NoError(t, q.TryEnqueue(func() { close(done) }))
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
// ok
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("worker did not continue after panic")
|
||||
}
|
||||
}
|
||||
@@ -48,6 +48,8 @@ type SubscriptionService struct {
|
||||
subCacheGroup singleflight.Group
|
||||
subCacheTTL time.Duration
|
||||
subCacheJitter int // 抖动百分比
|
||||
|
||||
maintenanceQueue *SubscriptionMaintenanceQueue
|
||||
}
|
||||
|
||||
// NewSubscriptionService 创建订阅服务
|
||||
@@ -59,9 +61,31 @@ func NewSubscriptionService(groupRepo GroupRepository, userSubRepo UserSubscript
|
||||
entClient: entClient,
|
||||
}
|
||||
svc.initSubCache(cfg)
|
||||
svc.initMaintenanceQueue(cfg)
|
||||
return svc
|
||||
}
|
||||
|
||||
func (s *SubscriptionService) initMaintenanceQueue(cfg *config.Config) {
|
||||
if cfg == nil {
|
||||
return
|
||||
}
|
||||
mc := cfg.SubscriptionMaintenance
|
||||
if mc.WorkerCount <= 0 || mc.QueueSize <= 0 {
|
||||
return
|
||||
}
|
||||
s.maintenanceQueue = NewSubscriptionMaintenanceQueue(mc.WorkerCount, mc.QueueSize)
|
||||
}
|
||||
|
||||
// Stop stops the maintenance worker pool.
|
||||
func (s *SubscriptionService) Stop() {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
if s.maintenanceQueue != nil {
|
||||
s.maintenanceQueue.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// initSubCache 初始化订阅 L1 缓存
|
||||
func (s *SubscriptionService) initSubCache(cfg *config.Config) {
|
||||
if cfg == nil {
|
||||
@@ -720,6 +744,23 @@ func (s *SubscriptionService) ValidateAndCheckLimits(sub *UserSubscription, grou
|
||||
// 而 IsExpired()=true 的订阅在 ValidateAndCheckLimits 中已被拦截返回错误,
|
||||
// 因此进入此方法的订阅一定未过期,无需处理过期状态同步。
|
||||
func (s *SubscriptionService) DoWindowMaintenance(sub *UserSubscription) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
if s.maintenanceQueue != nil {
|
||||
err := s.maintenanceQueue.TryEnqueue(func() {
|
||||
s.doWindowMaintenance(sub)
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("Subscription maintenance enqueue failed: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
s.doWindowMaintenance(sub)
|
||||
}
|
||||
|
||||
func (s *SubscriptionService) doWindowMaintenance(sub *UserSubscription) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user