fix(仪表盘): 修复缓存稳定性并补充测试
This commit is contained in:
@@ -22,6 +22,9 @@ func NewDashboardCache(rdb *redis.Client, cfg *config.Config) service.DashboardS
|
|||||||
if cfg != nil {
|
if cfg != nil {
|
||||||
prefix = strings.TrimSpace(cfg.Dashboard.KeyPrefix)
|
prefix = strings.TrimSpace(cfg.Dashboard.KeyPrefix)
|
||||||
}
|
}
|
||||||
|
if prefix != "" && !strings.HasSuffix(prefix, ":") {
|
||||||
|
prefix += ":"
|
||||||
|
}
|
||||||
return &dashboardCache{
|
return &dashboardCache{
|
||||||
rdb: rdb,
|
rdb: rdb,
|
||||||
keyPrefix: prefix,
|
keyPrefix: prefix,
|
||||||
@@ -49,3 +52,7 @@ func (c *dashboardCache) buildKey() string {
|
|||||||
}
|
}
|
||||||
return c.keyPrefix + dashboardStatsCacheKey
|
return c.keyPrefix + dashboardStatsCacheKey
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *dashboardCache) DeleteDashboardStats(ctx context.Context) error {
|
||||||
|
return c.rdb.Del(ctx, c.buildKey()).Err()
|
||||||
|
}
|
||||||
|
|||||||
28
backend/internal/repository/dashboard_cache_test.go
Normal file
28
backend/internal/repository/dashboard_cache_test.go
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
package repository
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/Wei-Shaw/sub2api/internal/config"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewDashboardCacheKeyPrefix(t *testing.T) {
|
||||||
|
cache := NewDashboardCache(nil, &config.Config{
|
||||||
|
Dashboard: config.DashboardCacheConfig{
|
||||||
|
KeyPrefix: "prod",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
impl, ok := cache.(*dashboardCache)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, "prod:", impl.keyPrefix)
|
||||||
|
|
||||||
|
cache = NewDashboardCache(nil, &config.Config{
|
||||||
|
Dashboard: config.DashboardCacheConfig{
|
||||||
|
KeyPrefix: "staging:",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
impl, ok = cache.(*dashboardCache)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, "staging:", impl.keyPrefix)
|
||||||
|
}
|
||||||
@@ -26,6 +26,7 @@ var ErrDashboardStatsCacheMiss = errors.New("仪表盘缓存未命中")
|
|||||||
type DashboardStatsCache interface {
|
type DashboardStatsCache interface {
|
||||||
GetDashboardStats(ctx context.Context) (string, error)
|
GetDashboardStats(ctx context.Context) (string, error)
|
||||||
SetDashboardStats(ctx context.Context, data string, ttl time.Duration) error
|
SetDashboardStats(ctx context.Context, data string, ttl time.Duration) error
|
||||||
|
DeleteDashboardStats(ctx context.Context) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type dashboardStatsCacheEntry struct {
|
type dashboardStatsCacheEntry struct {
|
||||||
@@ -115,10 +116,12 @@ func (s *DashboardService) getCachedDashboardStats(ctx context.Context) (*usages
|
|||||||
|
|
||||||
var entry dashboardStatsCacheEntry
|
var entry dashboardStatsCacheEntry
|
||||||
if err := json.Unmarshal([]byte(data), &entry); err != nil {
|
if err := json.Unmarshal([]byte(data), &entry); err != nil {
|
||||||
return nil, false, err
|
s.evictDashboardStatsCache(err)
|
||||||
|
return nil, false, ErrDashboardStatsCacheMiss
|
||||||
}
|
}
|
||||||
if entry.Stats == nil {
|
if entry.Stats == nil {
|
||||||
return nil, false, errors.New("仪表盘缓存缺少统计数据")
|
s.evictDashboardStatsCache(errors.New("仪表盘缓存缺少统计数据"))
|
||||||
|
return nil, false, ErrDashboardStatsCacheMiss
|
||||||
}
|
}
|
||||||
|
|
||||||
age := time.Since(time.Unix(entry.UpdatedAt, 0))
|
age := time.Since(time.Unix(entry.UpdatedAt, 0))
|
||||||
@@ -130,7 +133,9 @@ func (s *DashboardService) refreshDashboardStats(ctx context.Context) (*usagesta
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
s.saveDashboardStatsCache(ctx, stats)
|
cacheCtx, cancel := s.cacheOperationContext()
|
||||||
|
defer cancel()
|
||||||
|
s.saveDashboardStatsCache(cacheCtx, stats)
|
||||||
return stats, nil
|
return stats, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -153,7 +158,9 @@ func (s *DashboardService) refreshDashboardStatsAsync() {
|
|||||||
log.Printf("[Dashboard] 仪表盘缓存异步刷新失败: %v", err)
|
log.Printf("[Dashboard] 仪表盘缓存异步刷新失败: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.saveDashboardStatsCache(ctx, stats)
|
cacheCtx, cancel := s.cacheOperationContext()
|
||||||
|
defer cancel()
|
||||||
|
s.saveDashboardStatsCache(cacheCtx, stats)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -177,6 +184,25 @@ func (s *DashboardService) saveDashboardStatsCache(ctx context.Context, stats *u
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *DashboardService) evictDashboardStatsCache(reason error) {
|
||||||
|
if s.cache == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
cacheCtx, cancel := s.cacheOperationContext()
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := s.cache.DeleteDashboardStats(cacheCtx); err != nil {
|
||||||
|
log.Printf("[Dashboard] 仪表盘缓存清理失败: %v", err)
|
||||||
|
}
|
||||||
|
if reason != nil {
|
||||||
|
log.Printf("[Dashboard] 仪表盘缓存异常,已清理: %v", reason)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *DashboardService) cacheOperationContext() (context.Context, context.CancelFunc) {
|
||||||
|
return context.WithTimeout(context.Background(), s.refreshTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *DashboardService) GetAPIKeyUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.APIKeyUsageTrendPoint, error) {
|
func (s *DashboardService) GetAPIKeyUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.APIKeyUsageTrendPoint, error) {
|
||||||
trend, err := s.usageRepo.GetAPIKeyUsageTrend(ctx, startTime, endTime, granularity, limit)
|
trend, err := s.usageRepo.GetAPIKeyUsageTrend(ctx, startTime, endTime, granularity, limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package service
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -38,8 +39,10 @@ func (s *usageRepoStub) GetDashboardStats(ctx context.Context) (*usagestats.Dash
|
|||||||
type dashboardCacheStub struct {
|
type dashboardCacheStub struct {
|
||||||
get func(ctx context.Context) (string, error)
|
get func(ctx context.Context) (string, error)
|
||||||
set func(ctx context.Context, data string, ttl time.Duration) error
|
set func(ctx context.Context, data string, ttl time.Duration) error
|
||||||
|
del func(ctx context.Context) error
|
||||||
getCalls int32
|
getCalls int32
|
||||||
setCalls int32
|
setCalls int32
|
||||||
|
delCalls int32
|
||||||
lastSetMu sync.Mutex
|
lastSetMu sync.Mutex
|
||||||
lastSet string
|
lastSet string
|
||||||
}
|
}
|
||||||
@@ -63,6 +66,14 @@ func (c *dashboardCacheStub) SetDashboardStats(ctx context.Context, data string,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *dashboardCacheStub) DeleteDashboardStats(ctx context.Context) error {
|
||||||
|
atomic.AddInt32(&c.delCalls, 1)
|
||||||
|
if c.del != nil {
|
||||||
|
return c.del(ctx)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *dashboardCacheStub) readLastEntry(t *testing.T) dashboardStatsCacheEntry {
|
func (c *dashboardCacheStub) readLastEntry(t *testing.T) dashboardStatsCacheEntry {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
c.lastSetMu.Lock()
|
c.lastSetMu.Lock()
|
||||||
@@ -187,3 +198,36 @@ func TestDashboardService_CacheHitStale_TriggersAsyncRefresh(t *testing.T) {
|
|||||||
return atomic.LoadInt32(&cache.setCalls) >= 1
|
return atomic.LoadInt32(&cache.setCalls) >= 1
|
||||||
}, 1*time.Second, 10*time.Millisecond)
|
}, 1*time.Second, 10*time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDashboardService_CacheParseError_EvictsAndRefetches(t *testing.T) {
|
||||||
|
cache := &dashboardCacheStub{
|
||||||
|
get: func(ctx context.Context) (string, error) {
|
||||||
|
return "not-json", nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
stats := &usagestats.DashboardStats{TotalUsers: 9}
|
||||||
|
repo := &usageRepoStub{stats: stats}
|
||||||
|
cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}}
|
||||||
|
svc := NewDashboardService(repo, cache, cfg)
|
||||||
|
|
||||||
|
got, err := svc.GetDashboardStats(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, stats, got)
|
||||||
|
require.Equal(t, int32(1), atomic.LoadInt32(&cache.delCalls))
|
||||||
|
require.Equal(t, int32(1), atomic.LoadInt32(&repo.calls))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDashboardService_CacheParseError_RepoFailure(t *testing.T) {
|
||||||
|
cache := &dashboardCacheStub{
|
||||||
|
get: func(ctx context.Context) (string, error) {
|
||||||
|
return "not-json", nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
repo := &usageRepoStub{err: errors.New("db down")}
|
||||||
|
cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}}
|
||||||
|
svc := NewDashboardService(repo, cache, cfg)
|
||||||
|
|
||||||
|
_, err := svc.GetDashboardStats(context.Background())
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Equal(t, int32(1), atomic.LoadInt32(&cache.delCalls))
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user