Merge branch 'dev'
This commit is contained in:
@@ -78,7 +78,10 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
|
|||||||
dashboardAggregationRepository := repository.NewDashboardAggregationRepository(db)
|
dashboardAggregationRepository := repository.NewDashboardAggregationRepository(db)
|
||||||
dashboardStatsCache := repository.NewDashboardCache(redisClient, configConfig)
|
dashboardStatsCache := repository.NewDashboardCache(redisClient, configConfig)
|
||||||
dashboardService := service.NewDashboardService(usageLogRepository, dashboardAggregationRepository, dashboardStatsCache, configConfig)
|
dashboardService := service.NewDashboardService(usageLogRepository, dashboardAggregationRepository, dashboardStatsCache, configConfig)
|
||||||
timingWheelService := service.ProvideTimingWheelService()
|
timingWheelService, err := service.ProvideTimingWheelService()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
dashboardAggregationService := service.ProvideDashboardAggregationService(dashboardAggregationRepository, timingWheelService, configConfig)
|
dashboardAggregationService := service.ProvideDashboardAggregationService(dashboardAggregationRepository, timingWheelService, configConfig)
|
||||||
dashboardHandler := admin.NewDashboardHandler(dashboardService, dashboardAggregationService)
|
dashboardHandler := admin.NewDashboardHandler(dashboardService, dashboardAggregationService)
|
||||||
accountRepository := repository.NewAccountRepository(client, db)
|
accountRepository := repository.NewAccountRepository(client, db)
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package service
|
package service
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -8,6 +9,8 @@ import (
|
|||||||
"github.com/zeromicro/go-zero/core/collection"
|
"github.com/zeromicro/go-zero/core/collection"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var newTimingWheel = collection.NewTimingWheel
|
||||||
|
|
||||||
// TimingWheelService wraps go-zero's TimingWheel for task scheduling
|
// TimingWheelService wraps go-zero's TimingWheel for task scheduling
|
||||||
type TimingWheelService struct {
|
type TimingWheelService struct {
|
||||||
tw *collection.TimingWheel
|
tw *collection.TimingWheel
|
||||||
@@ -15,18 +18,18 @@ type TimingWheelService struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewTimingWheelService creates a new TimingWheelService instance
|
// NewTimingWheelService creates a new TimingWheelService instance
|
||||||
func NewTimingWheelService() *TimingWheelService {
|
func NewTimingWheelService() (*TimingWheelService, error) {
|
||||||
// 1 second tick, 3600 slots = supports up to 1 hour delay
|
// 1 second tick, 3600 slots = supports up to 1 hour delay
|
||||||
// execute function: runs func() type tasks
|
// execute function: runs func() type tasks
|
||||||
tw, err := collection.NewTimingWheel(1*time.Second, 3600, func(key, value any) {
|
tw, err := newTimingWheel(1*time.Second, 3600, func(key, value any) {
|
||||||
if fn, ok := value.(func()); ok {
|
if fn, ok := value.(func()); ok {
|
||||||
fn()
|
fn()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
return nil, fmt.Errorf("创建 timing wheel 失败: %w", err)
|
||||||
}
|
}
|
||||||
return &TimingWheelService{tw: tw}
|
return &TimingWheelService{tw: tw}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start starts the timing wheel
|
// Start starts the timing wheel
|
||||||
|
|||||||
146
backend/internal/service/timing_wheel_service_test.go
Normal file
146
backend/internal/service/timing_wheel_service_test.go
Normal file
@@ -0,0 +1,146 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/zeromicro/go-zero/core/collection"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewTimingWheelService_InitFail_NoPanicAndReturnError(t *testing.T) {
|
||||||
|
original := newTimingWheel
|
||||||
|
t.Cleanup(func() { newTimingWheel = original })
|
||||||
|
|
||||||
|
newTimingWheel = func(_ time.Duration, _ int, _ collection.Execute) (*collection.TimingWheel, error) {
|
||||||
|
return nil, errors.New("boom")
|
||||||
|
}
|
||||||
|
|
||||||
|
svc, err := NewTimingWheelService()
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("期望返回 error,但得到 nil")
|
||||||
|
}
|
||||||
|
if svc != nil {
|
||||||
|
t.Fatalf("期望返回 nil svc,但得到非空")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewTimingWheelService_Success(t *testing.T) {
|
||||||
|
svc, err := NewTimingWheelService()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("期望 err 为 nil,但得到: %v", err)
|
||||||
|
}
|
||||||
|
if svc == nil {
|
||||||
|
t.Fatalf("期望 svc 非空,但得到 nil")
|
||||||
|
}
|
||||||
|
svc.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewTimingWheelService_ExecuteCallbackRunsFunc(t *testing.T) {
|
||||||
|
original := newTimingWheel
|
||||||
|
t.Cleanup(func() { newTimingWheel = original })
|
||||||
|
|
||||||
|
var captured collection.Execute
|
||||||
|
newTimingWheel = func(interval time.Duration, numSlots int, execute collection.Execute) (*collection.TimingWheel, error) {
|
||||||
|
captured = execute
|
||||||
|
return original(interval, numSlots, execute)
|
||||||
|
}
|
||||||
|
|
||||||
|
svc, err := NewTimingWheelService()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("期望 err 为 nil,但得到: %v", err)
|
||||||
|
}
|
||||||
|
if captured == nil {
|
||||||
|
t.Fatalf("期望 captured 非空,但得到 nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
called := false
|
||||||
|
captured("k", func() { called = true })
|
||||||
|
if !called {
|
||||||
|
t.Fatalf("期望 execute 回调触发传入函数执行")
|
||||||
|
}
|
||||||
|
|
||||||
|
svc.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTimingWheelService_Schedule_ExecutesOnce(t *testing.T) {
|
||||||
|
original := newTimingWheel
|
||||||
|
t.Cleanup(func() { newTimingWheel = original })
|
||||||
|
|
||||||
|
newTimingWheel = func(_ time.Duration, _ int, execute collection.Execute) (*collection.TimingWheel, error) {
|
||||||
|
return original(10*time.Millisecond, 128, execute)
|
||||||
|
}
|
||||||
|
|
||||||
|
svc, err := NewTimingWheelService()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("期望 err 为 nil,但得到: %v", err)
|
||||||
|
}
|
||||||
|
defer svc.Stop()
|
||||||
|
|
||||||
|
ch := make(chan struct{}, 1)
|
||||||
|
svc.Schedule("once", 30*time.Millisecond, func() { ch <- struct{}{} })
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
case <-time.After(500 * time.Millisecond):
|
||||||
|
t.Fatalf("等待任务执行超时")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
t.Fatalf("任务不应重复执行")
|
||||||
|
case <-time.After(80 * time.Millisecond):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTimingWheelService_Cancel_PreventsExecution(t *testing.T) {
|
||||||
|
original := newTimingWheel
|
||||||
|
t.Cleanup(func() { newTimingWheel = original })
|
||||||
|
|
||||||
|
newTimingWheel = func(_ time.Duration, _ int, execute collection.Execute) (*collection.TimingWheel, error) {
|
||||||
|
return original(10*time.Millisecond, 128, execute)
|
||||||
|
}
|
||||||
|
|
||||||
|
svc, err := NewTimingWheelService()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("期望 err 为 nil,但得到: %v", err)
|
||||||
|
}
|
||||||
|
defer svc.Stop()
|
||||||
|
|
||||||
|
ch := make(chan struct{}, 1)
|
||||||
|
svc.Schedule("cancel", 80*time.Millisecond, func() { ch <- struct{}{} })
|
||||||
|
svc.Cancel("cancel")
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
t.Fatalf("任务已取消,不应执行")
|
||||||
|
case <-time.After(200 * time.Millisecond):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTimingWheelService_ScheduleRecurring_ExecutesMultipleTimes(t *testing.T) {
|
||||||
|
original := newTimingWheel
|
||||||
|
t.Cleanup(func() { newTimingWheel = original })
|
||||||
|
|
||||||
|
newTimingWheel = func(_ time.Duration, _ int, execute collection.Execute) (*collection.TimingWheel, error) {
|
||||||
|
return original(10*time.Millisecond, 128, execute)
|
||||||
|
}
|
||||||
|
|
||||||
|
svc, err := NewTimingWheelService()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("期望 err 为 nil,但得到: %v", err)
|
||||||
|
}
|
||||||
|
defer svc.Stop()
|
||||||
|
|
||||||
|
var count int32
|
||||||
|
svc.ScheduleRecurring("rec", 30*time.Millisecond, func() { atomic.AddInt32(&count, 1) })
|
||||||
|
|
||||||
|
deadline := time.Now().Add(500 * time.Millisecond)
|
||||||
|
for atomic.LoadInt32(&count) < 2 && time.Now().Before(deadline) {
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
if atomic.LoadInt32(&count) < 2 {
|
||||||
|
t.Fatalf("期望周期任务至少执行 2 次,但只执行了 %d 次", atomic.LoadInt32(&count))
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -65,10 +65,13 @@ func ProvideAccountExpiryService(accountRepo AccountRepository) *AccountExpirySe
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ProvideTimingWheelService creates and starts TimingWheelService
|
// ProvideTimingWheelService creates and starts TimingWheelService
|
||||||
func ProvideTimingWheelService() *TimingWheelService {
|
func ProvideTimingWheelService() (*TimingWheelService, error) {
|
||||||
svc := NewTimingWheelService()
|
svc, err := NewTimingWheelService()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
svc.Start()
|
svc.Start()
|
||||||
return svc
|
return svc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProvideDeferredService creates and starts DeferredService
|
// ProvideDeferredService creates and starts DeferredService
|
||||||
|
|||||||
37
backend/internal/service/wire_test.go
Normal file
37
backend/internal/service/wire_test.go
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/zeromicro/go-zero/core/collection"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestProvideTimingWheelService_ReturnsError(t *testing.T) {
|
||||||
|
original := newTimingWheel
|
||||||
|
t.Cleanup(func() { newTimingWheel = original })
|
||||||
|
|
||||||
|
newTimingWheel = func(_ time.Duration, _ int, _ collection.Execute) (*collection.TimingWheel, error) {
|
||||||
|
return nil, errors.New("boom")
|
||||||
|
}
|
||||||
|
|
||||||
|
svc, err := ProvideTimingWheelService()
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("期望返回 error,但得到 nil")
|
||||||
|
}
|
||||||
|
if svc != nil {
|
||||||
|
t.Fatalf("期望返回 nil svc,但得到非空")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProvideTimingWheelService_Success(t *testing.T) {
|
||||||
|
svc, err := ProvideTimingWheelService()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("期望 err 为 nil,但得到: %v", err)
|
||||||
|
}
|
||||||
|
if svc == nil {
|
||||||
|
t.Fatalf("期望 svc 非空,但得到 nil")
|
||||||
|
}
|
||||||
|
svc.Stop()
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user