From b0aa23540ba4ca6363e61a721ffa45e5ae4f3203 Mon Sep 17 00:00:00 2001 From: shaw Date: Sat, 24 Jan 2026 20:20:48 +0800 Subject: [PATCH] =?UTF-8?q?feat(subscription):=20=E8=AE=A2=E9=98=85?= =?UTF-8?q?=E8=BF=87=E6=9C=9F=E7=8A=B6=E6=80=81=E8=87=AA=E5=8A=A8=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E4=B8=8E=E6=9C=8D=E5=8A=A1=E7=AB=AF=E6=8E=92=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 SubscriptionExpiryService 定时任务,每分钟更新过期订阅状态 - 订阅列表支持服务端排序(按过期时间、状态、创建时间) - 实时显示正确的过期状态,无需等待定时任务 - 允许对已过期订阅进行续期操作 - DataTable 组件支持 serverSideSort 模式 --- backend/cmd/server/wire.go | 5 ++ backend/cmd/server/wire_gen.go | 8 ++- .../handler/admin/subscription_handler.go | 6 +- .../repository/user_subscription_repo.go | 53 ++++++++++++-- backend/internal/server/api_contract_test.go | 2 +- .../server/middleware/api_key_auth_test.go | 2 +- .../service/subscription_expiry_service.go | 71 +++++++++++++++++++ .../internal/service/subscription_service.go | 36 ++++++---- .../service/user_subscription_port.go | 2 +- backend/internal/service/wire.go | 8 +++ frontend/src/api/admin/subscriptions.ts | 4 +- frontend/src/components/common/DataTable.vue | 30 ++++++-- .../src/views/admin/SubscriptionsView.vue | 42 ++++++++--- 13 files changed, 228 insertions(+), 41 deletions(-) create mode 100644 backend/internal/service/subscription_expiry_service.go diff --git a/backend/cmd/server/wire.go b/backend/cmd/server/wire.go index 5ef04a66..d9ff788e 100644 --- a/backend/cmd/server/wire.go +++ b/backend/cmd/server/wire.go @@ -70,6 +70,7 @@ func provideCleanup( schedulerSnapshot *service.SchedulerSnapshotService, tokenRefresh *service.TokenRefreshService, accountExpiry *service.AccountExpiryService, + subscriptionExpiry *service.SubscriptionExpiryService, usageCleanup *service.UsageCleanupService, pricing *service.PricingService, emailQueue *service.EmailQueueService, @@ -138,6 +139,10 @@ func provideCleanup( accountExpiry.Stop() return nil }}, + {"SubscriptionExpiryService", func() error { + subscriptionExpiry.Stop() + return nil + }}, {"PricingService", func() error { pricing.Stop() return nil diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 7b22a31e..6a1c79f0 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -178,7 +178,8 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { opsScheduledReportService := service.ProvideOpsScheduledReportService(opsService, userService, emailService, redisClient, configConfig) tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, compositeTokenCacheInvalidator, configConfig) accountExpiryService := service.ProvideAccountExpiryService(accountRepository) - v := provideCleanup(client, redisClient, opsMetricsCollector, opsAggregationService, opsAlertEvaluatorService, opsCleanupService, opsScheduledReportService, schedulerSnapshotService, tokenRefreshService, accountExpiryService, usageCleanupService, pricingService, emailQueueService, billingCacheService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService) + subscriptionExpiryService := service.ProvideSubscriptionExpiryService(userSubscriptionRepository) + v := provideCleanup(client, redisClient, opsMetricsCollector, opsAggregationService, opsAlertEvaluatorService, opsCleanupService, opsScheduledReportService, schedulerSnapshotService, tokenRefreshService, accountExpiryService, subscriptionExpiryService, usageCleanupService, pricingService, emailQueueService, billingCacheService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService) application := &Application{ Server: httpServer, Cleanup: v, @@ -211,6 +212,7 @@ func provideCleanup( schedulerSnapshot *service.SchedulerSnapshotService, tokenRefresh *service.TokenRefreshService, accountExpiry *service.AccountExpiryService, + subscriptionExpiry *service.SubscriptionExpiryService, usageCleanup *service.UsageCleanupService, pricing *service.PricingService, emailQueue *service.EmailQueueService, @@ -278,6 +280,10 @@ func provideCleanup( accountExpiry.Stop() return nil }}, + {"SubscriptionExpiryService", func() error { + subscriptionExpiry.Stop() + return nil + }}, {"PricingService", func() error { pricing.Stop() return nil diff --git a/backend/internal/handler/admin/subscription_handler.go b/backend/internal/handler/admin/subscription_handler.go index a0d1456f..51995ab1 100644 --- a/backend/internal/handler/admin/subscription_handler.go +++ b/backend/internal/handler/admin/subscription_handler.go @@ -77,7 +77,11 @@ func (h *SubscriptionHandler) List(c *gin.Context) { } status := c.Query("status") - subscriptions, pagination, err := h.subscriptionService.List(c.Request.Context(), page, pageSize, userID, groupID, status) + // Parse sorting parameters + sortBy := c.DefaultQuery("sort_by", "created_at") + sortOrder := c.DefaultQuery("sort_order", "desc") + + subscriptions, pagination, err := h.subscriptionService.List(c.Request.Context(), page, pageSize, userID, groupID, status, sortBy, sortOrder) if err != nil { response.ErrorFrom(c, err) return diff --git a/backend/internal/repository/user_subscription_repo.go b/backend/internal/repository/user_subscription_repo.go index cd3b9db6..5a649846 100644 --- a/backend/internal/repository/user_subscription_repo.go +++ b/backend/internal/repository/user_subscription_repo.go @@ -190,7 +190,7 @@ func (r *userSubscriptionRepository) ListByGroupID(ctx context.Context, groupID return userSubscriptionEntitiesToService(subs), paginationResultFromTotal(int64(total), params), nil } -func (r *userSubscriptionRepository) List(ctx context.Context, params pagination.PaginationParams, userID, groupID *int64, status string) ([]service.UserSubscription, *pagination.PaginationResult, error) { +func (r *userSubscriptionRepository) List(ctx context.Context, params pagination.PaginationParams, userID, groupID *int64, status, sortBy, sortOrder string) ([]service.UserSubscription, *pagination.PaginationResult, error) { client := clientFromContext(ctx, r.client) q := client.UserSubscription.Query() if userID != nil { @@ -199,7 +199,31 @@ func (r *userSubscriptionRepository) List(ctx context.Context, params pagination if groupID != nil { q = q.Where(usersubscription.GroupIDEQ(*groupID)) } - if status != "" { + + // Status filtering with real-time expiration check + now := time.Now() + switch status { + case service.SubscriptionStatusActive: + // Active: status is active AND not yet expired + q = q.Where( + usersubscription.StatusEQ(service.SubscriptionStatusActive), + usersubscription.ExpiresAtGT(now), + ) + case service.SubscriptionStatusExpired: + // Expired: status is expired OR (status is active but already expired) + q = q.Where( + usersubscription.Or( + usersubscription.StatusEQ(service.SubscriptionStatusExpired), + usersubscription.And( + usersubscription.StatusEQ(service.SubscriptionStatusActive), + usersubscription.ExpiresAtLTE(now), + ), + ), + ) + case "": + // No filter + default: + // Other status (e.g., revoked) q = q.Where(usersubscription.StatusEQ(status)) } @@ -208,11 +232,28 @@ func (r *userSubscriptionRepository) List(ctx context.Context, params pagination return nil, nil, err } + // Apply sorting + q = q.WithUser().WithGroup().WithAssignedByUser() + + // Determine sort field + var field string + switch sortBy { + case "expires_at": + field = usersubscription.FieldExpiresAt + case "status": + field = usersubscription.FieldStatus + default: + field = usersubscription.FieldCreatedAt + } + + // Determine sort order (default: desc) + if sortOrder == "asc" && sortBy != "" { + q = q.Order(dbent.Asc(field)) + } else { + q = q.Order(dbent.Desc(field)) + } + subs, err := q. - WithUser(). - WithGroup(). - WithAssignedByUser(). - Order(dbent.Desc(usersubscription.FieldCreatedAt)). Offset(params.Offset()). Limit(params.Limit()). All(ctx) diff --git a/backend/internal/server/api_contract_test.go b/backend/internal/server/api_contract_test.go index 4d1b4be2..698c39b6 100644 --- a/backend/internal/server/api_contract_test.go +++ b/backend/internal/server/api_contract_test.go @@ -1176,7 +1176,7 @@ func (r *stubUserSubscriptionRepo) ListActiveByUserID(ctx context.Context, userI func (stubUserSubscriptionRepo) ListByGroupID(ctx context.Context, groupID int64, params pagination.PaginationParams) ([]service.UserSubscription, *pagination.PaginationResult, error) { return nil, nil, errors.New("not implemented") } -func (stubUserSubscriptionRepo) List(ctx context.Context, params pagination.PaginationParams, userID, groupID *int64, status string) ([]service.UserSubscription, *pagination.PaginationResult, error) { +func (stubUserSubscriptionRepo) List(ctx context.Context, params pagination.PaginationParams, userID, groupID *int64, status, sortBy, sortOrder string) ([]service.UserSubscription, *pagination.PaginationResult, error) { return nil, nil, errors.New("not implemented") } func (stubUserSubscriptionRepo) ExistsByUserIDAndGroupID(ctx context.Context, userID, groupID int64) (bool, error) { diff --git a/backend/internal/server/middleware/api_key_auth_test.go b/backend/internal/server/middleware/api_key_auth_test.go index 84398093..920ff93f 100644 --- a/backend/internal/server/middleware/api_key_auth_test.go +++ b/backend/internal/server/middleware/api_key_auth_test.go @@ -367,7 +367,7 @@ func (r *stubUserSubscriptionRepo) ListByGroupID(ctx context.Context, groupID in return nil, nil, errors.New("not implemented") } -func (r *stubUserSubscriptionRepo) List(ctx context.Context, params pagination.PaginationParams, userID, groupID *int64, status string) ([]service.UserSubscription, *pagination.PaginationResult, error) { +func (r *stubUserSubscriptionRepo) List(ctx context.Context, params pagination.PaginationParams, userID, groupID *int64, status, sortBy, sortOrder string) ([]service.UserSubscription, *pagination.PaginationResult, error) { return nil, nil, errors.New("not implemented") } diff --git a/backend/internal/service/subscription_expiry_service.go b/backend/internal/service/subscription_expiry_service.go new file mode 100644 index 00000000..ce6b32b8 --- /dev/null +++ b/backend/internal/service/subscription_expiry_service.go @@ -0,0 +1,71 @@ +package service + +import ( + "context" + "log" + "sync" + "time" +) + +// SubscriptionExpiryService periodically updates expired subscription status. +type SubscriptionExpiryService struct { + userSubRepo UserSubscriptionRepository + interval time.Duration + stopCh chan struct{} + stopOnce sync.Once + wg sync.WaitGroup +} + +func NewSubscriptionExpiryService(userSubRepo UserSubscriptionRepository, interval time.Duration) *SubscriptionExpiryService { + return &SubscriptionExpiryService{ + userSubRepo: userSubRepo, + interval: interval, + stopCh: make(chan struct{}), + } +} + +func (s *SubscriptionExpiryService) Start() { + if s == nil || s.userSubRepo == nil || s.interval <= 0 { + return + } + s.wg.Add(1) + go func() { + defer s.wg.Done() + ticker := time.NewTicker(s.interval) + defer ticker.Stop() + + s.runOnce() + for { + select { + case <-ticker.C: + s.runOnce() + case <-s.stopCh: + return + } + } + }() +} + +func (s *SubscriptionExpiryService) Stop() { + if s == nil { + return + } + s.stopOnce.Do(func() { + close(s.stopCh) + }) + s.wg.Wait() +} + +func (s *SubscriptionExpiryService) runOnce() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + updated, err := s.userSubRepo.BatchUpdateExpiredStatus(ctx) + if err != nil { + log.Printf("[SubscriptionExpiry] Update expired subscriptions failed: %v", err) + return + } + if updated > 0 { + log.Printf("[SubscriptionExpiry] Updated %d expired subscriptions", updated) + } +} diff --git a/backend/internal/service/subscription_service.go b/backend/internal/service/subscription_service.go index c25c58a2..6b3ac8ab 100644 --- a/backend/internal/service/subscription_service.go +++ b/backend/internal/service/subscription_service.go @@ -330,12 +330,10 @@ func (s *SubscriptionService) ExtendSubscription(ctx context.Context, subscripti newExpiresAt = MaxExpiresAt } - // 如果是缩短(负数),检查新的过期时间必须大于当前时间 - if days < 0 { - now := time.Now() - if !newExpiresAt.After(now) { - return nil, ErrAdjustWouldExpire - } + // 检查新的过期时间必须大于当前时间 + now := time.Now() + if !newExpiresAt.After(now) { + return nil, ErrAdjustWouldExpire } if err := s.userSubRepo.ExtendExpiry(ctx, subscriptionID, newExpiresAt); err != nil { @@ -383,6 +381,7 @@ func (s *SubscriptionService) ListUserSubscriptions(ctx context.Context, userID return nil, err } normalizeExpiredWindows(subs) + normalizeSubscriptionStatus(subs) return subs, nil } @@ -404,17 +403,19 @@ func (s *SubscriptionService) ListGroupSubscriptions(ctx context.Context, groupI return nil, nil, err } normalizeExpiredWindows(subs) + normalizeSubscriptionStatus(subs) return subs, pag, nil } -// List 获取所有订阅(分页,支持筛选) -func (s *SubscriptionService) List(ctx context.Context, page, pageSize int, userID, groupID *int64, status string) ([]UserSubscription, *pagination.PaginationResult, error) { +// List 获取所有订阅(分页,支持筛选和排序) +func (s *SubscriptionService) List(ctx context.Context, page, pageSize int, userID, groupID *int64, status, sortBy, sortOrder string) ([]UserSubscription, *pagination.PaginationResult, error) { params := pagination.PaginationParams{Page: page, PageSize: pageSize} - subs, pag, err := s.userSubRepo.List(ctx, params, userID, groupID, status) + subs, pag, err := s.userSubRepo.List(ctx, params, userID, groupID, status, sortBy, sortOrder) if err != nil { return nil, nil, err } normalizeExpiredWindows(subs) + normalizeSubscriptionStatus(subs) return subs, pag, nil } @@ -441,6 +442,18 @@ func normalizeExpiredWindows(subs []UserSubscription) { } } +// normalizeSubscriptionStatus 根据实际过期时间修正状态(仅影响返回数据,不影响数据库) +// 这确保前端显示正确的状态,即使定时任务尚未更新数据库 +func normalizeSubscriptionStatus(subs []UserSubscription) { + now := time.Now() + for i := range subs { + sub := &subs[i] + if sub.Status == SubscriptionStatusActive && !sub.ExpiresAt.After(now) { + sub.Status = SubscriptionStatusExpired + } + } +} + // startOfDay 返回给定时间所在日期的零点(保持原时区) func startOfDay(t time.Time) time.Time { return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) @@ -659,11 +672,6 @@ func (s *SubscriptionService) GetUserSubscriptionsWithProgress(ctx context.Conte return progresses, nil } -// UpdateExpiredSubscriptions 更新过期订阅状态(定时任务调用) -func (s *SubscriptionService) UpdateExpiredSubscriptions(ctx context.Context) (int64, error) { - return s.userSubRepo.BatchUpdateExpiredStatus(ctx) -} - // ValidateSubscription 验证订阅是否有效 func (s *SubscriptionService) ValidateSubscription(ctx context.Context, sub *UserSubscription) error { if sub.Status == SubscriptionStatusExpired { diff --git a/backend/internal/service/user_subscription_port.go b/backend/internal/service/user_subscription_port.go index abf4dffd..2dfc8d02 100644 --- a/backend/internal/service/user_subscription_port.go +++ b/backend/internal/service/user_subscription_port.go @@ -18,7 +18,7 @@ type UserSubscriptionRepository interface { ListByUserID(ctx context.Context, userID int64) ([]UserSubscription, error) ListActiveByUserID(ctx context.Context, userID int64) ([]UserSubscription, error) ListByGroupID(ctx context.Context, groupID int64, params pagination.PaginationParams) ([]UserSubscription, *pagination.PaginationResult, error) - List(ctx context.Context, params pagination.PaginationParams, userID, groupID *int64, status string) ([]UserSubscription, *pagination.PaginationResult, error) + List(ctx context.Context, params pagination.PaginationParams, userID, groupID *int64, status, sortBy, sortOrder string) ([]UserSubscription, *pagination.PaginationResult, error) ExistsByUserIDAndGroupID(ctx context.Context, userID, groupID int64) (bool, error) ExtendExpiry(ctx context.Context, subscriptionID int64, newExpiresAt time.Time) error diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index b210286d..68a6e5c8 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -72,6 +72,13 @@ func ProvideAccountExpiryService(accountRepo AccountRepository) *AccountExpirySe return svc } +// ProvideSubscriptionExpiryService creates and starts SubscriptionExpiryService. +func ProvideSubscriptionExpiryService(userSubRepo UserSubscriptionRepository) *SubscriptionExpiryService { + svc := NewSubscriptionExpiryService(userSubRepo, time.Minute) + svc.Start() + return svc +} + // ProvideTimingWheelService creates and starts TimingWheelService func ProvideTimingWheelService() (*TimingWheelService, error) { svc, err := NewTimingWheelService() @@ -256,6 +263,7 @@ var ProviderSet = wire.NewSet( ProvideUpdateService, ProvideTokenRefreshService, ProvideAccountExpiryService, + ProvideSubscriptionExpiryService, ProvideTimingWheelService, ProvideDashboardAggregationService, ProvideUsageCleanupService, diff --git a/frontend/src/api/admin/subscriptions.ts b/frontend/src/api/admin/subscriptions.ts index 54b448e2..9f21056f 100644 --- a/frontend/src/api/admin/subscriptions.ts +++ b/frontend/src/api/admin/subscriptions.ts @@ -17,7 +17,7 @@ import type { * List all subscriptions with pagination * @param page - Page number (default: 1) * @param pageSize - Items per page (default: 20) - * @param filters - Optional filters (status, user_id, group_id) + * @param filters - Optional filters (status, user_id, group_id, sort_by, sort_order) * @returns Paginated list of subscriptions */ export async function list( @@ -27,6 +27,8 @@ export async function list( status?: 'active' | 'expired' | 'revoked' user_id?: number group_id?: number + sort_by?: string + sort_order?: 'asc' | 'desc' }, options?: { signal?: AbortSignal diff --git a/frontend/src/components/common/DataTable.vue b/frontend/src/components/common/DataTable.vue index b74f52ee..c1e4333d 100644 --- a/frontend/src/components/common/DataTable.vue +++ b/frontend/src/components/common/DataTable.vue @@ -181,6 +181,10 @@ import Icon from '@/components/icons/Icon.vue' const { t } = useI18n() +const emit = defineEmits<{ + sort: [key: string, order: 'asc' | 'desc'] +}>() + // 表格容器引用 const tableWrapperRef = ref(null) const isScrollable = ref(false) @@ -289,6 +293,11 @@ interface Props { * If provided, DataTable will load the stored sort state on mount. */ sortStorageKey?: string + /** + * Enable server-side sorting mode. When true, clicking sort headers + * will emit 'sort' events instead of performing client-side sorting. + */ + serverSideSort?: boolean } const props = withDefaults(defineProps(), { @@ -296,7 +305,8 @@ const props = withDefaults(defineProps(), { stickyFirstColumn: true, stickyActionsColumn: true, expandableActions: true, - defaultSortOrder: 'asc' + defaultSortOrder: 'asc', + serverSideSort: false }) const sortKey = ref('') @@ -448,16 +458,26 @@ watch(actionsExpanded, async () => { }) const handleSort = (key: string) => { + let newOrder: 'asc' | 'desc' = 'asc' if (sortKey.value === key) { - sortOrder.value = sortOrder.value === 'asc' ? 'desc' : 'asc' - } else { + newOrder = sortOrder.value === 'asc' ? 'desc' : 'asc' + } + + if (props.serverSideSort) { + // Server-side sort mode: emit event and update internal state for UI feedback sortKey.value = key - sortOrder.value = 'asc' + sortOrder.value = newOrder + emit('sort', key, newOrder) + } else { + // Client-side sort mode: just update internal state + sortKey.value = key + sortOrder.value = newOrder } } const sortedData = computed(() => { - if (!sortKey.value || !props.data) return props.data + // Server-side sort mode: return data as-is (server handles sorting) + if (props.serverSideSort || !sortKey.value || !props.data) return props.data const key = sortKey.value const order = sortOrder.value diff --git a/frontend/src/views/admin/SubscriptionsView.vue b/frontend/src/views/admin/SubscriptionsView.vue index 9b0e5ecb..eb2b40d5 100644 --- a/frontend/src/views/admin/SubscriptionsView.vue +++ b/frontend/src/views/admin/SubscriptionsView.vue @@ -154,7 +154,13 @@