feat(subscription): 订阅过期状态自动更新与服务端排序
- 新增 SubscriptionExpiryService 定时任务,每分钟更新过期订阅状态 - 订阅列表支持服务端排序(按过期时间、状态、创建时间) - 实时显示正确的过期状态,无需等待定时任务 - 允许对已过期订阅进行续期操作 - DataTable 组件支持 serverSideSort 模式
This commit is contained in:
@@ -70,6 +70,7 @@ func provideCleanup(
|
||||
schedulerSnapshot *service.SchedulerSnapshotService,
|
||||
tokenRefresh *service.TokenRefreshService,
|
||||
accountExpiry *service.AccountExpiryService,
|
||||
subscriptionExpiry *service.SubscriptionExpiryService,
|
||||
usageCleanup *service.UsageCleanupService,
|
||||
pricing *service.PricingService,
|
||||
emailQueue *service.EmailQueueService,
|
||||
@@ -138,6 +139,10 @@ func provideCleanup(
|
||||
accountExpiry.Stop()
|
||||
return nil
|
||||
}},
|
||||
{"SubscriptionExpiryService", func() error {
|
||||
subscriptionExpiry.Stop()
|
||||
return nil
|
||||
}},
|
||||
{"PricingService", func() error {
|
||||
pricing.Stop()
|
||||
return nil
|
||||
|
||||
@@ -178,7 +178,8 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
|
||||
opsScheduledReportService := service.ProvideOpsScheduledReportService(opsService, userService, emailService, redisClient, configConfig)
|
||||
tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, compositeTokenCacheInvalidator, configConfig)
|
||||
accountExpiryService := service.ProvideAccountExpiryService(accountRepository)
|
||||
v := provideCleanup(client, redisClient, opsMetricsCollector, opsAggregationService, opsAlertEvaluatorService, opsCleanupService, opsScheduledReportService, schedulerSnapshotService, tokenRefreshService, accountExpiryService, usageCleanupService, pricingService, emailQueueService, billingCacheService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService)
|
||||
subscriptionExpiryService := service.ProvideSubscriptionExpiryService(userSubscriptionRepository)
|
||||
v := provideCleanup(client, redisClient, opsMetricsCollector, opsAggregationService, opsAlertEvaluatorService, opsCleanupService, opsScheduledReportService, schedulerSnapshotService, tokenRefreshService, accountExpiryService, subscriptionExpiryService, usageCleanupService, pricingService, emailQueueService, billingCacheService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService)
|
||||
application := &Application{
|
||||
Server: httpServer,
|
||||
Cleanup: v,
|
||||
@@ -211,6 +212,7 @@ func provideCleanup(
|
||||
schedulerSnapshot *service.SchedulerSnapshotService,
|
||||
tokenRefresh *service.TokenRefreshService,
|
||||
accountExpiry *service.AccountExpiryService,
|
||||
subscriptionExpiry *service.SubscriptionExpiryService,
|
||||
usageCleanup *service.UsageCleanupService,
|
||||
pricing *service.PricingService,
|
||||
emailQueue *service.EmailQueueService,
|
||||
@@ -278,6 +280,10 @@ func provideCleanup(
|
||||
accountExpiry.Stop()
|
||||
return nil
|
||||
}},
|
||||
{"SubscriptionExpiryService", func() error {
|
||||
subscriptionExpiry.Stop()
|
||||
return nil
|
||||
}},
|
||||
{"PricingService", func() error {
|
||||
pricing.Stop()
|
||||
return nil
|
||||
|
||||
@@ -77,7 +77,11 @@ func (h *SubscriptionHandler) List(c *gin.Context) {
|
||||
}
|
||||
status := c.Query("status")
|
||||
|
||||
subscriptions, pagination, err := h.subscriptionService.List(c.Request.Context(), page, pageSize, userID, groupID, status)
|
||||
// Parse sorting parameters
|
||||
sortBy := c.DefaultQuery("sort_by", "created_at")
|
||||
sortOrder := c.DefaultQuery("sort_order", "desc")
|
||||
|
||||
subscriptions, pagination, err := h.subscriptionService.List(c.Request.Context(), page, pageSize, userID, groupID, status, sortBy, sortOrder)
|
||||
if err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
|
||||
@@ -190,7 +190,7 @@ func (r *userSubscriptionRepository) ListByGroupID(ctx context.Context, groupID
|
||||
return userSubscriptionEntitiesToService(subs), paginationResultFromTotal(int64(total), params), nil
|
||||
}
|
||||
|
||||
func (r *userSubscriptionRepository) List(ctx context.Context, params pagination.PaginationParams, userID, groupID *int64, status string) ([]service.UserSubscription, *pagination.PaginationResult, error) {
|
||||
func (r *userSubscriptionRepository) List(ctx context.Context, params pagination.PaginationParams, userID, groupID *int64, status, sortBy, sortOrder string) ([]service.UserSubscription, *pagination.PaginationResult, error) {
|
||||
client := clientFromContext(ctx, r.client)
|
||||
q := client.UserSubscription.Query()
|
||||
if userID != nil {
|
||||
@@ -199,7 +199,31 @@ func (r *userSubscriptionRepository) List(ctx context.Context, params pagination
|
||||
if groupID != nil {
|
||||
q = q.Where(usersubscription.GroupIDEQ(*groupID))
|
||||
}
|
||||
if status != "" {
|
||||
|
||||
// Status filtering with real-time expiration check
|
||||
now := time.Now()
|
||||
switch status {
|
||||
case service.SubscriptionStatusActive:
|
||||
// Active: status is active AND not yet expired
|
||||
q = q.Where(
|
||||
usersubscription.StatusEQ(service.SubscriptionStatusActive),
|
||||
usersubscription.ExpiresAtGT(now),
|
||||
)
|
||||
case service.SubscriptionStatusExpired:
|
||||
// Expired: status is expired OR (status is active but already expired)
|
||||
q = q.Where(
|
||||
usersubscription.Or(
|
||||
usersubscription.StatusEQ(service.SubscriptionStatusExpired),
|
||||
usersubscription.And(
|
||||
usersubscription.StatusEQ(service.SubscriptionStatusActive),
|
||||
usersubscription.ExpiresAtLTE(now),
|
||||
),
|
||||
),
|
||||
)
|
||||
case "":
|
||||
// No filter
|
||||
default:
|
||||
// Other status (e.g., revoked)
|
||||
q = q.Where(usersubscription.StatusEQ(status))
|
||||
}
|
||||
|
||||
@@ -208,11 +232,28 @@ func (r *userSubscriptionRepository) List(ctx context.Context, params pagination
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Apply sorting
|
||||
q = q.WithUser().WithGroup().WithAssignedByUser()
|
||||
|
||||
// Determine sort field
|
||||
var field string
|
||||
switch sortBy {
|
||||
case "expires_at":
|
||||
field = usersubscription.FieldExpiresAt
|
||||
case "status":
|
||||
field = usersubscription.FieldStatus
|
||||
default:
|
||||
field = usersubscription.FieldCreatedAt
|
||||
}
|
||||
|
||||
// Determine sort order (default: desc)
|
||||
if sortOrder == "asc" && sortBy != "" {
|
||||
q = q.Order(dbent.Asc(field))
|
||||
} else {
|
||||
q = q.Order(dbent.Desc(field))
|
||||
}
|
||||
|
||||
subs, err := q.
|
||||
WithUser().
|
||||
WithGroup().
|
||||
WithAssignedByUser().
|
||||
Order(dbent.Desc(usersubscription.FieldCreatedAt)).
|
||||
Offset(params.Offset()).
|
||||
Limit(params.Limit()).
|
||||
All(ctx)
|
||||
|
||||
@@ -1176,7 +1176,7 @@ func (r *stubUserSubscriptionRepo) ListActiveByUserID(ctx context.Context, userI
|
||||
func (stubUserSubscriptionRepo) ListByGroupID(ctx context.Context, groupID int64, params pagination.PaginationParams) ([]service.UserSubscription, *pagination.PaginationResult, error) {
|
||||
return nil, nil, errors.New("not implemented")
|
||||
}
|
||||
func (stubUserSubscriptionRepo) List(ctx context.Context, params pagination.PaginationParams, userID, groupID *int64, status string) ([]service.UserSubscription, *pagination.PaginationResult, error) {
|
||||
func (stubUserSubscriptionRepo) List(ctx context.Context, params pagination.PaginationParams, userID, groupID *int64, status, sortBy, sortOrder string) ([]service.UserSubscription, *pagination.PaginationResult, error) {
|
||||
return nil, nil, errors.New("not implemented")
|
||||
}
|
||||
func (stubUserSubscriptionRepo) ExistsByUserIDAndGroupID(ctx context.Context, userID, groupID int64) (bool, error) {
|
||||
|
||||
@@ -367,7 +367,7 @@ func (r *stubUserSubscriptionRepo) ListByGroupID(ctx context.Context, groupID in
|
||||
return nil, nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (r *stubUserSubscriptionRepo) List(ctx context.Context, params pagination.PaginationParams, userID, groupID *int64, status string) ([]service.UserSubscription, *pagination.PaginationResult, error) {
|
||||
func (r *stubUserSubscriptionRepo) List(ctx context.Context, params pagination.PaginationParams, userID, groupID *int64, status, sortBy, sortOrder string) ([]service.UserSubscription, *pagination.PaginationResult, error) {
|
||||
return nil, nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
|
||||
71
backend/internal/service/subscription_expiry_service.go
Normal file
71
backend/internal/service/subscription_expiry_service.go
Normal file
@@ -0,0 +1,71 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// SubscriptionExpiryService periodically updates expired subscription status.
|
||||
type SubscriptionExpiryService struct {
|
||||
userSubRepo UserSubscriptionRepository
|
||||
interval time.Duration
|
||||
stopCh chan struct{}
|
||||
stopOnce sync.Once
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewSubscriptionExpiryService(userSubRepo UserSubscriptionRepository, interval time.Duration) *SubscriptionExpiryService {
|
||||
return &SubscriptionExpiryService{
|
||||
userSubRepo: userSubRepo,
|
||||
interval: interval,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SubscriptionExpiryService) Start() {
|
||||
if s == nil || s.userSubRepo == nil || s.interval <= 0 {
|
||||
return
|
||||
}
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
ticker := time.NewTicker(s.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
s.runOnce()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
s.runOnce()
|
||||
case <-s.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *SubscriptionExpiryService) Stop() {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
s.stopOnce.Do(func() {
|
||||
close(s.stopCh)
|
||||
})
|
||||
s.wg.Wait()
|
||||
}
|
||||
|
||||
func (s *SubscriptionExpiryService) runOnce() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
updated, err := s.userSubRepo.BatchUpdateExpiredStatus(ctx)
|
||||
if err != nil {
|
||||
log.Printf("[SubscriptionExpiry] Update expired subscriptions failed: %v", err)
|
||||
return
|
||||
}
|
||||
if updated > 0 {
|
||||
log.Printf("[SubscriptionExpiry] Updated %d expired subscriptions", updated)
|
||||
}
|
||||
}
|
||||
@@ -330,12 +330,10 @@ func (s *SubscriptionService) ExtendSubscription(ctx context.Context, subscripti
|
||||
newExpiresAt = MaxExpiresAt
|
||||
}
|
||||
|
||||
// 如果是缩短(负数),检查新的过期时间必须大于当前时间
|
||||
if days < 0 {
|
||||
now := time.Now()
|
||||
if !newExpiresAt.After(now) {
|
||||
return nil, ErrAdjustWouldExpire
|
||||
}
|
||||
// 检查新的过期时间必须大于当前时间
|
||||
now := time.Now()
|
||||
if !newExpiresAt.After(now) {
|
||||
return nil, ErrAdjustWouldExpire
|
||||
}
|
||||
|
||||
if err := s.userSubRepo.ExtendExpiry(ctx, subscriptionID, newExpiresAt); err != nil {
|
||||
@@ -383,6 +381,7 @@ func (s *SubscriptionService) ListUserSubscriptions(ctx context.Context, userID
|
||||
return nil, err
|
||||
}
|
||||
normalizeExpiredWindows(subs)
|
||||
normalizeSubscriptionStatus(subs)
|
||||
return subs, nil
|
||||
}
|
||||
|
||||
@@ -404,17 +403,19 @@ func (s *SubscriptionService) ListGroupSubscriptions(ctx context.Context, groupI
|
||||
return nil, nil, err
|
||||
}
|
||||
normalizeExpiredWindows(subs)
|
||||
normalizeSubscriptionStatus(subs)
|
||||
return subs, pag, nil
|
||||
}
|
||||
|
||||
// List 获取所有订阅(分页,支持筛选)
|
||||
func (s *SubscriptionService) List(ctx context.Context, page, pageSize int, userID, groupID *int64, status string) ([]UserSubscription, *pagination.PaginationResult, error) {
|
||||
// List 获取所有订阅(分页,支持筛选和排序)
|
||||
func (s *SubscriptionService) List(ctx context.Context, page, pageSize int, userID, groupID *int64, status, sortBy, sortOrder string) ([]UserSubscription, *pagination.PaginationResult, error) {
|
||||
params := pagination.PaginationParams{Page: page, PageSize: pageSize}
|
||||
subs, pag, err := s.userSubRepo.List(ctx, params, userID, groupID, status)
|
||||
subs, pag, err := s.userSubRepo.List(ctx, params, userID, groupID, status, sortBy, sortOrder)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
normalizeExpiredWindows(subs)
|
||||
normalizeSubscriptionStatus(subs)
|
||||
return subs, pag, nil
|
||||
}
|
||||
|
||||
@@ -441,6 +442,18 @@ func normalizeExpiredWindows(subs []UserSubscription) {
|
||||
}
|
||||
}
|
||||
|
||||
// normalizeSubscriptionStatus 根据实际过期时间修正状态(仅影响返回数据,不影响数据库)
|
||||
// 这确保前端显示正确的状态,即使定时任务尚未更新数据库
|
||||
func normalizeSubscriptionStatus(subs []UserSubscription) {
|
||||
now := time.Now()
|
||||
for i := range subs {
|
||||
sub := &subs[i]
|
||||
if sub.Status == SubscriptionStatusActive && !sub.ExpiresAt.After(now) {
|
||||
sub.Status = SubscriptionStatusExpired
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// startOfDay 返回给定时间所在日期的零点(保持原时区)
|
||||
func startOfDay(t time.Time) time.Time {
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
|
||||
@@ -659,11 +672,6 @@ func (s *SubscriptionService) GetUserSubscriptionsWithProgress(ctx context.Conte
|
||||
return progresses, nil
|
||||
}
|
||||
|
||||
// UpdateExpiredSubscriptions 更新过期订阅状态(定时任务调用)
|
||||
func (s *SubscriptionService) UpdateExpiredSubscriptions(ctx context.Context) (int64, error) {
|
||||
return s.userSubRepo.BatchUpdateExpiredStatus(ctx)
|
||||
}
|
||||
|
||||
// ValidateSubscription 验证订阅是否有效
|
||||
func (s *SubscriptionService) ValidateSubscription(ctx context.Context, sub *UserSubscription) error {
|
||||
if sub.Status == SubscriptionStatusExpired {
|
||||
|
||||
@@ -18,7 +18,7 @@ type UserSubscriptionRepository interface {
|
||||
ListByUserID(ctx context.Context, userID int64) ([]UserSubscription, error)
|
||||
ListActiveByUserID(ctx context.Context, userID int64) ([]UserSubscription, error)
|
||||
ListByGroupID(ctx context.Context, groupID int64, params pagination.PaginationParams) ([]UserSubscription, *pagination.PaginationResult, error)
|
||||
List(ctx context.Context, params pagination.PaginationParams, userID, groupID *int64, status string) ([]UserSubscription, *pagination.PaginationResult, error)
|
||||
List(ctx context.Context, params pagination.PaginationParams, userID, groupID *int64, status, sortBy, sortOrder string) ([]UserSubscription, *pagination.PaginationResult, error)
|
||||
|
||||
ExistsByUserIDAndGroupID(ctx context.Context, userID, groupID int64) (bool, error)
|
||||
ExtendExpiry(ctx context.Context, subscriptionID int64, newExpiresAt time.Time) error
|
||||
|
||||
@@ -72,6 +72,13 @@ func ProvideAccountExpiryService(accountRepo AccountRepository) *AccountExpirySe
|
||||
return svc
|
||||
}
|
||||
|
||||
// ProvideSubscriptionExpiryService creates and starts SubscriptionExpiryService.
|
||||
func ProvideSubscriptionExpiryService(userSubRepo UserSubscriptionRepository) *SubscriptionExpiryService {
|
||||
svc := NewSubscriptionExpiryService(userSubRepo, time.Minute)
|
||||
svc.Start()
|
||||
return svc
|
||||
}
|
||||
|
||||
// ProvideTimingWheelService creates and starts TimingWheelService
|
||||
func ProvideTimingWheelService() (*TimingWheelService, error) {
|
||||
svc, err := NewTimingWheelService()
|
||||
@@ -256,6 +263,7 @@ var ProviderSet = wire.NewSet(
|
||||
ProvideUpdateService,
|
||||
ProvideTokenRefreshService,
|
||||
ProvideAccountExpiryService,
|
||||
ProvideSubscriptionExpiryService,
|
||||
ProvideTimingWheelService,
|
||||
ProvideDashboardAggregationService,
|
||||
ProvideUsageCleanupService,
|
||||
|
||||
Reference in New Issue
Block a user