diff --git a/backend/cmd/server/wire.go b/backend/cmd/server/wire.go index cbf89ba3..80364bf2 100644 --- a/backend/cmd/server/wire.go +++ b/backend/cmd/server/wire.go @@ -86,6 +86,7 @@ func provideCleanup( geminiOAuth *service.GeminiOAuthService, antigravityOAuth *service.AntigravityOAuthService, openAIGateway *service.OpenAIGatewayService, + scheduledTestRunner *service.ScheduledTestRunnerService, ) func() { return func() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -216,6 +217,12 @@ func provideCleanup( } return nil }}, + {"ScheduledTestRunnerService", func() error { + if scheduledTestRunner != nil { + scheduledTestRunner.Stop() + } + return nil + }}, } infraSteps := []cleanupStep{ diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 8e7aefe1..9db1d0fc 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -195,7 +195,11 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { errorPassthroughService := service.NewErrorPassthroughService(errorPassthroughRepository, errorPassthroughCache) errorPassthroughHandler := admin.NewErrorPassthroughHandler(errorPassthroughService) adminAPIKeyHandler := admin.NewAdminAPIKeyHandler(adminService) - adminHandlers := handler.ProvideAdminHandlers(dashboardHandler, adminUserHandler, groupHandler, accountHandler, adminAnnouncementHandler, dataManagementHandler, oAuthHandler, openAIOAuthHandler, geminiOAuthHandler, antigravityOAuthHandler, proxyHandler, adminRedeemHandler, promoHandler, settingHandler, opsHandler, systemHandler, adminSubscriptionHandler, adminUsageHandler, userAttributeHandler, errorPassthroughHandler, adminAPIKeyHandler) + scheduledTestPlanRepository := repository.NewScheduledTestPlanRepository(db) + scheduledTestResultRepository := repository.NewScheduledTestResultRepository(db) + scheduledTestService := service.ProvideScheduledTestService(scheduledTestPlanRepository, scheduledTestResultRepository) + scheduledTestHandler := admin.NewScheduledTestHandler(scheduledTestService) + adminHandlers := handler.ProvideAdminHandlers(dashboardHandler, adminUserHandler, groupHandler, accountHandler, adminAnnouncementHandler, dataManagementHandler, oAuthHandler, openAIOAuthHandler, geminiOAuthHandler, antigravityOAuthHandler, proxyHandler, adminRedeemHandler, promoHandler, settingHandler, opsHandler, systemHandler, adminSubscriptionHandler, adminUsageHandler, userAttributeHandler, errorPassthroughHandler, adminAPIKeyHandler, scheduledTestHandler) usageRecordWorkerPool := service.NewUsageRecordWorkerPool(configConfig) userMsgQueueCache := repository.NewUserMsgQueueCache(redisClient) userMessageQueueService := service.ProvideUserMessageQueueService(userMsgQueueCache, rpmCache, configConfig) @@ -225,7 +229,8 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, soraAccountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, compositeTokenCacheInvalidator, schedulerCache, configConfig, tempUnschedCache) accountExpiryService := service.ProvideAccountExpiryService(accountRepository) subscriptionExpiryService := service.ProvideSubscriptionExpiryService(userSubscriptionRepository) - v := provideCleanup(client, redisClient, opsMetricsCollector, opsAggregationService, opsAlertEvaluatorService, opsCleanupService, opsScheduledReportService, opsSystemLogSink, soraMediaCleanupService, schedulerSnapshotService, tokenRefreshService, accountExpiryService, subscriptionExpiryService, usageCleanupService, idempotencyCleanupService, pricingService, emailQueueService, billingCacheService, usageRecordWorkerPool, subscriptionService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, openAIGatewayService) + scheduledTestRunnerService := service.ProvideScheduledTestRunnerService(scheduledTestPlanRepository, scheduledTestService, accountTestService, db, redisClient, configConfig) + v := provideCleanup(client, redisClient, opsMetricsCollector, opsAggregationService, opsAlertEvaluatorService, opsCleanupService, opsScheduledReportService, opsSystemLogSink, soraMediaCleanupService, schedulerSnapshotService, tokenRefreshService, accountExpiryService, subscriptionExpiryService, usageCleanupService, idempotencyCleanupService, pricingService, emailQueueService, billingCacheService, usageRecordWorkerPool, subscriptionService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, openAIGatewayService, scheduledTestRunnerService) application := &Application{ Server: httpServer, Cleanup: v, @@ -273,6 +278,7 @@ func provideCleanup( geminiOAuth *service.GeminiOAuthService, antigravityOAuth *service.AntigravityOAuthService, openAIGateway *service.OpenAIGatewayService, + scheduledTestRunner *service.ScheduledTestRunnerService, ) func() { return func() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -402,6 +408,12 @@ func provideCleanup( } return nil }}, + {"ScheduledTestRunnerService", func() error { + if scheduledTestRunner != nil { + scheduledTestRunner.Stop() + } + return nil + }}, } infraSteps := []cleanupStep{ diff --git a/backend/internal/handler/admin/scheduled_test_handler.go b/backend/internal/handler/admin/scheduled_test_handler.go new file mode 100644 index 00000000..f400f1fc --- /dev/null +++ b/backend/internal/handler/admin/scheduled_test_handler.go @@ -0,0 +1,161 @@ +package admin + +import ( + "net/http" + "strconv" + + "github.com/Wei-Shaw/sub2api/internal/pkg/response" + "github.com/Wei-Shaw/sub2api/internal/service" + "github.com/gin-gonic/gin" +) + +// ScheduledTestHandler handles admin scheduled-test-plan management. +type ScheduledTestHandler struct { + scheduledTestSvc *service.ScheduledTestService +} + +// NewScheduledTestHandler creates a new ScheduledTestHandler. +func NewScheduledTestHandler(scheduledTestSvc *service.ScheduledTestService) *ScheduledTestHandler { + return &ScheduledTestHandler{scheduledTestSvc: scheduledTestSvc} +} + +type createScheduledTestPlanRequest struct { + AccountID int64 `json:"account_id" binding:"required"` + ModelID string `json:"model_id"` + CronExpression string `json:"cron_expression" binding:"required"` + Enabled *bool `json:"enabled"` + MaxResults int `json:"max_results"` +} + +type updateScheduledTestPlanRequest struct { + ModelID string `json:"model_id"` + CronExpression string `json:"cron_expression"` + Enabled *bool `json:"enabled"` + MaxResults int `json:"max_results"` +} + +// ListByAccount GET /admin/accounts/:id/scheduled-test-plans +func (h *ScheduledTestHandler) ListByAccount(c *gin.Context) { + accountID, err := strconv.ParseInt(c.Param("id"), 10, 64) + if err != nil { + response.BadRequest(c, "invalid account id") + return + } + + plans, err := h.scheduledTestSvc.ListPlansByAccount(c.Request.Context(), accountID) + if err != nil { + response.InternalError(c, err.Error()) + return + } + if plans == nil { + plans = []*service.ScheduledTestPlan{} + } + c.JSON(http.StatusOK, plans) +} + +// Create POST /admin/scheduled-test-plans +func (h *ScheduledTestHandler) Create(c *gin.Context) { + var req createScheduledTestPlanRequest + if err := c.ShouldBindJSON(&req); err != nil { + response.BadRequest(c, err.Error()) + return + } + + plan := &service.ScheduledTestPlan{ + AccountID: req.AccountID, + ModelID: req.ModelID, + CronExpression: req.CronExpression, + Enabled: true, + MaxResults: req.MaxResults, + } + if req.Enabled != nil { + plan.Enabled = *req.Enabled + } + + created, err := h.scheduledTestSvc.CreatePlan(c.Request.Context(), plan) + if err != nil { + response.BadRequest(c, err.Error()) + return + } + c.JSON(http.StatusOK, created) +} + +// Update PUT /admin/scheduled-test-plans/:id +func (h *ScheduledTestHandler) Update(c *gin.Context) { + planID, err := strconv.ParseInt(c.Param("id"), 10, 64) + if err != nil { + response.BadRequest(c, "invalid plan id") + return + } + + existing, err := h.scheduledTestSvc.GetPlan(c.Request.Context(), planID) + if err != nil { + response.NotFound(c, "plan not found") + return + } + + var req updateScheduledTestPlanRequest + if err := c.ShouldBindJSON(&req); err != nil { + response.BadRequest(c, err.Error()) + return + } + + if req.ModelID != "" { + existing.ModelID = req.ModelID + } + if req.CronExpression != "" { + existing.CronExpression = req.CronExpression + } + if req.Enabled != nil { + existing.Enabled = *req.Enabled + } + if req.MaxResults > 0 { + existing.MaxResults = req.MaxResults + } + + updated, err := h.scheduledTestSvc.UpdatePlan(c.Request.Context(), existing) + if err != nil { + response.BadRequest(c, err.Error()) + return + } + c.JSON(http.StatusOK, updated) +} + +// Delete DELETE /admin/scheduled-test-plans/:id +func (h *ScheduledTestHandler) Delete(c *gin.Context) { + planID, err := strconv.ParseInt(c.Param("id"), 10, 64) + if err != nil { + response.BadRequest(c, "invalid plan id") + return + } + + if err := h.scheduledTestSvc.DeletePlan(c.Request.Context(), planID); err != nil { + response.InternalError(c, err.Error()) + return + } + c.JSON(http.StatusOK, gin.H{"message": "deleted"}) +} + +// ListResults GET /admin/scheduled-test-plans/:id/results +func (h *ScheduledTestHandler) ListResults(c *gin.Context) { + planID, err := strconv.ParseInt(c.Param("id"), 10, 64) + if err != nil { + response.BadRequest(c, "invalid plan id") + return + } + + limit := 50 + if l, err := strconv.Atoi(c.Query("limit")); err == nil && l > 0 { + limit = l + } + + results, err := h.scheduledTestSvc.ListResults(c.Request.Context(), planID, limit) + if err != nil { + response.InternalError(c, err.Error()) + return + } + if results == nil { + results = []*service.ScheduledTestResult{} + } + c.JSON(http.StatusOK, results) +} diff --git a/backend/internal/handler/handler.go b/backend/internal/handler/handler.go index 1e1247fc..3f1d73ca 100644 --- a/backend/internal/handler/handler.go +++ b/backend/internal/handler/handler.go @@ -27,6 +27,7 @@ type AdminHandlers struct { UserAttribute *admin.UserAttributeHandler ErrorPassthrough *admin.ErrorPassthroughHandler APIKey *admin.AdminAPIKeyHandler + ScheduledTest *admin.ScheduledTestHandler } // Handlers contains all HTTP handlers diff --git a/backend/internal/handler/wire.go b/backend/internal/handler/wire.go index 76f5a979..d1e12e03 100644 --- a/backend/internal/handler/wire.go +++ b/backend/internal/handler/wire.go @@ -30,6 +30,7 @@ func ProvideAdminHandlers( userAttributeHandler *admin.UserAttributeHandler, errorPassthroughHandler *admin.ErrorPassthroughHandler, apiKeyHandler *admin.AdminAPIKeyHandler, + scheduledTestHandler *admin.ScheduledTestHandler, ) *AdminHandlers { return &AdminHandlers{ Dashboard: dashboardHandler, @@ -53,6 +54,7 @@ func ProvideAdminHandlers( UserAttribute: userAttributeHandler, ErrorPassthrough: errorPassthroughHandler, APIKey: apiKeyHandler, + ScheduledTest: scheduledTestHandler, } } @@ -141,6 +143,7 @@ var ProviderSet = wire.NewSet( admin.NewUserAttributeHandler, admin.NewErrorPassthroughHandler, admin.NewAdminAPIKeyHandler, + admin.NewScheduledTestHandler, // AdminHandlers and Handlers constructors ProvideAdminHandlers, diff --git a/backend/internal/repository/scheduled_test_repo.go b/backend/internal/repository/scheduled_test_repo.go new file mode 100644 index 00000000..60627702 --- /dev/null +++ b/backend/internal/repository/scheduled_test_repo.go @@ -0,0 +1,183 @@ +package repository + +import ( + "context" + "database/sql" + "time" + + "github.com/Wei-Shaw/sub2api/internal/service" +) + +// --- Plan Repository --- + +type scheduledTestPlanRepository struct { + db *sql.DB +} + +func NewScheduledTestPlanRepository(db *sql.DB) service.ScheduledTestPlanRepository { + return &scheduledTestPlanRepository{db: db} +} + +func (r *scheduledTestPlanRepository) Create(ctx context.Context, plan *service.ScheduledTestPlan) (*service.ScheduledTestPlan, error) { + row := r.db.QueryRowContext(ctx, ` + INSERT INTO scheduled_test_plans (account_id, model_id, cron_expression, enabled, max_results, next_run_at, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, NOW(), NOW()) + RETURNING id, account_id, model_id, cron_expression, enabled, max_results, last_run_at, next_run_at, created_at, updated_at + `, plan.AccountID, plan.ModelID, plan.CronExpression, plan.Enabled, plan.MaxResults, plan.NextRunAt) + return scanPlan(row) +} + +func (r *scheduledTestPlanRepository) GetByID(ctx context.Context, id int64) (*service.ScheduledTestPlan, error) { + row := r.db.QueryRowContext(ctx, ` + SELECT id, account_id, model_id, cron_expression, enabled, max_results, last_run_at, next_run_at, created_at, updated_at + FROM scheduled_test_plans WHERE id = $1 + `, id) + return scanPlan(row) +} + +func (r *scheduledTestPlanRepository) ListByAccountID(ctx context.Context, accountID int64) ([]*service.ScheduledTestPlan, error) { + rows, err := r.db.QueryContext(ctx, ` + SELECT id, account_id, model_id, cron_expression, enabled, max_results, last_run_at, next_run_at, created_at, updated_at + FROM scheduled_test_plans WHERE account_id = $1 + ORDER BY created_at DESC + `, accountID) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + return scanPlans(rows) +} + +func (r *scheduledTestPlanRepository) ListDue(ctx context.Context, now time.Time) ([]*service.ScheduledTestPlan, error) { + rows, err := r.db.QueryContext(ctx, ` + SELECT id, account_id, model_id, cron_expression, enabled, max_results, last_run_at, next_run_at, created_at, updated_at + FROM scheduled_test_plans + WHERE enabled = true AND next_run_at <= $1 + ORDER BY next_run_at ASC + `, now) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + return scanPlans(rows) +} + +func (r *scheduledTestPlanRepository) Update(ctx context.Context, plan *service.ScheduledTestPlan) (*service.ScheduledTestPlan, error) { + row := r.db.QueryRowContext(ctx, ` + UPDATE scheduled_test_plans + SET model_id = $2, cron_expression = $3, enabled = $4, max_results = $5, next_run_at = $6, updated_at = NOW() + WHERE id = $1 + RETURNING id, account_id, model_id, cron_expression, enabled, max_results, last_run_at, next_run_at, created_at, updated_at + `, plan.ID, plan.ModelID, plan.CronExpression, plan.Enabled, plan.MaxResults, plan.NextRunAt) + return scanPlan(row) +} + +func (r *scheduledTestPlanRepository) Delete(ctx context.Context, id int64) error { + _, err := r.db.ExecContext(ctx, `DELETE FROM scheduled_test_plans WHERE id = $1`, id) + return err +} + +func (r *scheduledTestPlanRepository) UpdateAfterRun(ctx context.Context, id int64, lastRunAt time.Time, nextRunAt time.Time) error { + _, err := r.db.ExecContext(ctx, ` + UPDATE scheduled_test_plans SET last_run_at = $2, next_run_at = $3, updated_at = NOW() WHERE id = $1 + `, id, lastRunAt, nextRunAt) + return err +} + +// --- Result Repository --- + +type scheduledTestResultRepository struct { + db *sql.DB +} + +func NewScheduledTestResultRepository(db *sql.DB) service.ScheduledTestResultRepository { + return &scheduledTestResultRepository{db: db} +} + +func (r *scheduledTestResultRepository) Create(ctx context.Context, result *service.ScheduledTestResult) (*service.ScheduledTestResult, error) { + row := r.db.QueryRowContext(ctx, ` + INSERT INTO scheduled_test_results (plan_id, status, response_text, error_message, latency_ms, started_at, finished_at, created_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, NOW()) + RETURNING id, plan_id, status, response_text, error_message, latency_ms, started_at, finished_at, created_at + `, result.PlanID, result.Status, result.ResponseText, result.ErrorMessage, result.LatencyMs, result.StartedAt, result.FinishedAt) + + out := &service.ScheduledTestResult{} + if err := row.Scan( + &out.ID, &out.PlanID, &out.Status, &out.ResponseText, &out.ErrorMessage, + &out.LatencyMs, &out.StartedAt, &out.FinishedAt, &out.CreatedAt, + ); err != nil { + return nil, err + } + return out, nil +} + +func (r *scheduledTestResultRepository) ListByPlanID(ctx context.Context, planID int64, limit int) ([]*service.ScheduledTestResult, error) { + rows, err := r.db.QueryContext(ctx, ` + SELECT id, plan_id, status, response_text, error_message, latency_ms, started_at, finished_at, created_at + FROM scheduled_test_results + WHERE plan_id = $1 + ORDER BY created_at DESC + LIMIT $2 + `, planID, limit) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + + var results []*service.ScheduledTestResult + for rows.Next() { + r := &service.ScheduledTestResult{} + if err := rows.Scan( + &r.ID, &r.PlanID, &r.Status, &r.ResponseText, &r.ErrorMessage, + &r.LatencyMs, &r.StartedAt, &r.FinishedAt, &r.CreatedAt, + ); err != nil { + return nil, err + } + results = append(results, r) + } + return results, rows.Err() +} + +func (r *scheduledTestResultRepository) PruneOldResults(ctx context.Context, planID int64, keepCount int) error { + _, err := r.db.ExecContext(ctx, ` + DELETE FROM scheduled_test_results + WHERE id IN ( + SELECT id FROM ( + SELECT id, ROW_NUMBER() OVER (PARTITION BY plan_id ORDER BY created_at DESC) AS rn + FROM scheduled_test_results + WHERE plan_id = $1 + ) ranked + WHERE rn > $2 + ) + `, planID, keepCount) + return err +} + +// --- scan helpers --- + +type scannable interface { + Scan(dest ...any) error +} + +func scanPlan(row scannable) (*service.ScheduledTestPlan, error) { + p := &service.ScheduledTestPlan{} + if err := row.Scan( + &p.ID, &p.AccountID, &p.ModelID, &p.CronExpression, &p.Enabled, &p.MaxResults, + &p.LastRunAt, &p.NextRunAt, &p.CreatedAt, &p.UpdatedAt, + ); err != nil { + return nil, err + } + return p, nil +} + +func scanPlans(rows *sql.Rows) ([]*service.ScheduledTestPlan, error) { + var plans []*service.ScheduledTestPlan + for rows.Next() { + p, err := scanPlan(rows) + if err != nil { + return nil, err + } + plans = append(plans, p) + } + return plans, rows.Err() +} diff --git a/backend/internal/repository/wire.go b/backend/internal/repository/wire.go index 2e35e0a0..2ae1d916 100644 --- a/backend/internal/repository/wire.go +++ b/backend/internal/repository/wire.go @@ -53,7 +53,9 @@ var ProviderSet = wire.NewSet( NewAPIKeyRepository, NewGroupRepository, NewAccountRepository, - NewSoraAccountRepository, // Sora 账号扩展表仓储 + NewSoraAccountRepository, // Sora 账号扩展表仓储 + NewScheduledTestPlanRepository, // 定时测试计划仓储 + NewScheduledTestResultRepository, // 定时测试结果仓储 NewProxyRepository, NewRedeemCodeRepository, NewPromoCodeRepository, diff --git a/backend/internal/server/routes/admin.go b/backend/internal/server/routes/admin.go index 2b6077c1..e9f9bf62 100644 --- a/backend/internal/server/routes/admin.go +++ b/backend/internal/server/routes/admin.go @@ -78,6 +78,9 @@ func RegisterAdminRoutes( // API Key 管理 registerAdminAPIKeyRoutes(admin, h) + + // 定时测试计划 + registerScheduledTestRoutes(admin, h) } } @@ -478,6 +481,18 @@ func registerUserAttributeRoutes(admin *gin.RouterGroup, h *handler.Handlers) { } } +func registerScheduledTestRoutes(admin *gin.RouterGroup, h *handler.Handlers) { + plans := admin.Group("/scheduled-test-plans") + { + plans.POST("", h.Admin.ScheduledTest.Create) + plans.PUT("/:id", h.Admin.ScheduledTest.Update) + plans.DELETE("/:id", h.Admin.ScheduledTest.Delete) + plans.GET("/:id/results", h.Admin.ScheduledTest.ListResults) + } + // Nested under accounts + admin.GET("/accounts/:id/scheduled-test-plans", h.Admin.ScheduledTest.ListByAccount) +} + func registerErrorPassthroughRoutes(admin *gin.RouterGroup, h *handler.Handlers) { rules := admin.Group("/error-passthrough-rules") { diff --git a/backend/internal/service/account_test_service.go b/backend/internal/service/account_test_service.go index c55e418d..5891770d 100644 --- a/backend/internal/service/account_test_service.go +++ b/backend/internal/service/account_test_service.go @@ -12,6 +12,7 @@ import ( "io" "log" "net/http" + "net/http/httptest" "net/url" "regexp" "strings" @@ -1560,3 +1561,65 @@ func (s *AccountTestService) sendErrorAndEnd(c *gin.Context, errorMsg string) er s.sendEvent(c, TestEvent{Type: "error", Error: errorMsg}) return fmt.Errorf("%s", errorMsg) } + +// RunTestBackground executes an account test in-memory (no real HTTP client), +// capturing SSE output via httptest.NewRecorder, then parses the result. +func (s *AccountTestService) RunTestBackground(ctx context.Context, accountID int64, modelID string) (*ScheduledTestOutcome, error) { + startedAt := time.Now() + + w := httptest.NewRecorder() + ginCtx, _ := gin.CreateTestContext(w) + ginCtx.Request = (&http.Request{}).WithContext(ctx) + + testErr := s.TestAccountConnection(ginCtx, accountID, modelID) + + finishedAt := time.Now() + latencyMs := finishedAt.Sub(startedAt).Milliseconds() + + body := w.Body.String() + responseText, errMsg := parseTestSSEOutput(body) + + outcome := &ScheduledTestOutcome{ + Status: "success", + ResponseText: responseText, + ErrorMessage: errMsg, + LatencyMs: latencyMs, + StartedAt: startedAt, + FinishedAt: finishedAt, + } + + if testErr != nil || errMsg != "" { + outcome.Status = "failed" + if errMsg == "" && testErr != nil { + outcome.ErrorMessage = testErr.Error() + } + } + + return outcome, nil +} + +// parseTestSSEOutput extracts response text and error message from captured SSE output. +func parseTestSSEOutput(body string) (responseText, errMsg string) { + var texts []string + for _, line := range strings.Split(body, "\n") { + line = strings.TrimSpace(line) + if !strings.HasPrefix(line, "data: ") { + continue + } + jsonStr := strings.TrimPrefix(line, "data: ") + var event TestEvent + if err := json.Unmarshal([]byte(jsonStr), &event); err != nil { + continue + } + switch event.Type { + case "content": + if event.Text != "" { + texts = append(texts, event.Text) + } + case "error": + errMsg = event.Error + } + } + responseText = strings.Join(texts, "") + return +} diff --git a/backend/internal/service/scheduled_test_port.go b/backend/internal/service/scheduled_test_port.go new file mode 100644 index 00000000..f795ba00 --- /dev/null +++ b/backend/internal/service/scheduled_test_port.go @@ -0,0 +1,61 @@ +package service + +import ( + "context" + "time" +) + +// ScheduledTestPlan represents a scheduled test plan domain model. +type ScheduledTestPlan struct { + ID int64 `json:"id"` + AccountID int64 `json:"account_id"` + ModelID string `json:"model_id"` + CronExpression string `json:"cron_expression"` + Enabled bool `json:"enabled"` + MaxResults int `json:"max_results"` + LastRunAt *time.Time `json:"last_run_at"` + NextRunAt *time.Time `json:"next_run_at"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// ScheduledTestResult represents a single test execution result. +type ScheduledTestResult struct { + ID int64 `json:"id"` + PlanID int64 `json:"plan_id"` + Status string `json:"status"` + ResponseText string `json:"response_text"` + ErrorMessage string `json:"error_message"` + LatencyMs int64 `json:"latency_ms"` + StartedAt time.Time `json:"started_at"` + FinishedAt time.Time `json:"finished_at"` + CreatedAt time.Time `json:"created_at"` +} + +// ScheduledTestOutcome is returned by RunTestBackground. +type ScheduledTestOutcome struct { + Status string + ResponseText string + ErrorMessage string + LatencyMs int64 + StartedAt time.Time + FinishedAt time.Time +} + +// ScheduledTestPlanRepository defines the data access interface for test plans. +type ScheduledTestPlanRepository interface { + Create(ctx context.Context, plan *ScheduledTestPlan) (*ScheduledTestPlan, error) + GetByID(ctx context.Context, id int64) (*ScheduledTestPlan, error) + ListByAccountID(ctx context.Context, accountID int64) ([]*ScheduledTestPlan, error) + ListDue(ctx context.Context, now time.Time) ([]*ScheduledTestPlan, error) + Update(ctx context.Context, plan *ScheduledTestPlan) (*ScheduledTestPlan, error) + Delete(ctx context.Context, id int64) error + UpdateAfterRun(ctx context.Context, id int64, lastRunAt time.Time, nextRunAt time.Time) error +} + +// ScheduledTestResultRepository defines the data access interface for test results. +type ScheduledTestResultRepository interface { + Create(ctx context.Context, result *ScheduledTestResult) (*ScheduledTestResult, error) + ListByPlanID(ctx context.Context, planID int64, limit int) ([]*ScheduledTestResult, error) + PruneOldResults(ctx context.Context, planID int64, keepCount int) error +} diff --git a/backend/internal/service/scheduled_test_runner_service.go b/backend/internal/service/scheduled_test_runner_service.go new file mode 100644 index 00000000..151f5c17 --- /dev/null +++ b/backend/internal/service/scheduled_test_runner_service.go @@ -0,0 +1,207 @@ +package service + +import ( + "context" + "database/sql" + "sync" + "time" + + "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/Wei-Shaw/sub2api/internal/pkg/logger" + "github.com/google/uuid" + "github.com/redis/go-redis/v9" + "github.com/robfig/cron/v3" +) + +const ( + scheduledTestLeaderLockKey = "scheduled_test:runner:leader" + scheduledTestLeaderLockTTL = 2 * time.Minute + scheduledTestDefaultMaxWorkers = 10 +) + +var scheduledTestReleaseScript = redis.NewScript(` +if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("DEL", KEYS[1]) +end +return 0 +`) + +// ScheduledTestRunnerService periodically scans due test plans and executes them. +type ScheduledTestRunnerService struct { + planRepo ScheduledTestPlanRepository + scheduledSvc *ScheduledTestService + accountTestSvc *AccountTestService + db *sql.DB + redisClient *redis.Client + cfg *config.Config + + instanceID string + cron *cron.Cron + startOnce sync.Once + stopOnce sync.Once + + warnNoRedisOnce sync.Once +} + +// NewScheduledTestRunnerService creates a new runner. +func NewScheduledTestRunnerService( + planRepo ScheduledTestPlanRepository, + scheduledSvc *ScheduledTestService, + accountTestSvc *AccountTestService, + db *sql.DB, + redisClient *redis.Client, + cfg *config.Config, +) *ScheduledTestRunnerService { + return &ScheduledTestRunnerService{ + planRepo: planRepo, + scheduledSvc: scheduledSvc, + accountTestSvc: accountTestSvc, + db: db, + redisClient: redisClient, + cfg: cfg, + instanceID: uuid.NewString(), + } +} + +// Start begins the cron ticker (every minute). +func (s *ScheduledTestRunnerService) Start() { + if s == nil { + return + } + s.startOnce.Do(func() { + loc := time.Local + if s.cfg != nil { + if parsed, err := time.LoadLocation(s.cfg.Timezone); err == nil && parsed != nil { + loc = parsed + } + } + + c := cron.New(cron.WithParser(scheduledTestCronParser), cron.WithLocation(loc)) + _, err := c.AddFunc("* * * * *", func() { s.runScheduled() }) + if err != nil { + logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] not started (invalid schedule): %v", err) + return + } + s.cron = c + s.cron.Start() + logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] started (tick=every minute)") + }) +} + +// Stop gracefully shuts down the cron scheduler. +func (s *ScheduledTestRunnerService) Stop() { + if s == nil { + return + } + s.stopOnce.Do(func() { + if s.cron != nil { + ctx := s.cron.Stop() + select { + case <-ctx.Done(): + case <-time.After(3 * time.Second): + logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] cron stop timed out") + } + } + }) +} + +func (s *ScheduledTestRunnerService) runScheduled() { + // Delay 10s so execution lands at ~:10 of each minute instead of :00. + time.Sleep(10 * time.Second) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + release, ok := s.tryAcquireLeaderLock(ctx) + if !ok { + return + } + if release != nil { + defer release() + } + + now := time.Now() + plans, err := s.planRepo.ListDue(ctx, now) + if err != nil { + logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] ListDue error: %v", err) + return + } + if len(plans) == 0 { + return + } + + logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] found %d due plans", len(plans)) + + maxWorkers := scheduledTestDefaultMaxWorkers + sem := make(chan struct{}, maxWorkers) + var wg sync.WaitGroup + + for _, plan := range plans { + sem <- struct{}{} + wg.Add(1) + go func(p *ScheduledTestPlan) { + defer wg.Done() + defer func() { <-sem }() + s.runOnePlan(ctx, p) + }(plan) + } + + wg.Wait() +} + +func (s *ScheduledTestRunnerService) runOnePlan(ctx context.Context, plan *ScheduledTestPlan) { + outcome, err := s.accountTestSvc.RunTestBackground(ctx, plan.AccountID, plan.ModelID) + if err != nil { + logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] plan=%d RunTestBackground error: %v", plan.ID, err) + return + } + + if err := s.scheduledSvc.SaveResult(ctx, plan.ID, plan.MaxResults, outcome); err != nil { + logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] plan=%d SaveResult error: %v", plan.ID, err) + } + + // Compute next run + nextRun, err := computeNextRun(plan.CronExpression, time.Now()) + if err != nil { + logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] plan=%d computeNextRun error: %v", plan.ID, err) + return + } + + if err := s.planRepo.UpdateAfterRun(ctx, plan.ID, time.Now(), nextRun); err != nil { + logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] plan=%d UpdateAfterRun error: %v", plan.ID, err) + } +} + +func (s *ScheduledTestRunnerService) tryAcquireLeaderLock(ctx context.Context) (func(), bool) { + if s.cfg != nil && s.cfg.RunMode == config.RunModeSimple { + return nil, true + } + + key := scheduledTestLeaderLockKey + ttl := scheduledTestLeaderLockTTL + + if s.redisClient != nil { + ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result() + if err == nil { + if !ok { + return nil, false + } + return func() { + _, _ = scheduledTestReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result() + }, true + } + s.warnNoRedisOnce.Do(func() { + logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] Redis SetNX failed; falling back to DB advisory lock: %v", err) + }) + } else { + s.warnNoRedisOnce.Do(func() { + logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] Redis not configured; using DB advisory lock") + }) + } + + release, ok := tryAcquireDBAdvisoryLock(ctx, s.db, hashAdvisoryLockID(key)) + if !ok { + return nil, false + } + return release, true +} diff --git a/backend/internal/service/scheduled_test_service.go b/backend/internal/service/scheduled_test_service.go new file mode 100644 index 00000000..8f850c54 --- /dev/null +++ b/backend/internal/service/scheduled_test_service.go @@ -0,0 +1,102 @@ +package service + +import ( + "context" + "fmt" + "time" + + "github.com/robfig/cron/v3" +) + +var scheduledTestCronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) + +// ScheduledTestService provides CRUD operations for scheduled test plans and results. +type ScheduledTestService struct { + planRepo ScheduledTestPlanRepository + resultRepo ScheduledTestResultRepository +} + +// NewScheduledTestService creates a new ScheduledTestService. +func NewScheduledTestService( + planRepo ScheduledTestPlanRepository, + resultRepo ScheduledTestResultRepository, +) *ScheduledTestService { + return &ScheduledTestService{ + planRepo: planRepo, + resultRepo: resultRepo, + } +} + +// CreatePlan validates the cron expression, computes next_run_at, and persists the plan. +func (s *ScheduledTestService) CreatePlan(ctx context.Context, plan *ScheduledTestPlan) (*ScheduledTestPlan, error) { + nextRun, err := computeNextRun(plan.CronExpression, time.Now()) + if err != nil { + return nil, fmt.Errorf("invalid cron expression: %w", err) + } + plan.NextRunAt = &nextRun + + if plan.MaxResults <= 0 { + plan.MaxResults = 50 + } + + return s.planRepo.Create(ctx, plan) +} + +// GetPlan retrieves a plan by ID. +func (s *ScheduledTestService) GetPlan(ctx context.Context, id int64) (*ScheduledTestPlan, error) { + return s.planRepo.GetByID(ctx, id) +} + +// ListPlansByAccount returns all plans for a given account. +func (s *ScheduledTestService) ListPlansByAccount(ctx context.Context, accountID int64) ([]*ScheduledTestPlan, error) { + return s.planRepo.ListByAccountID(ctx, accountID) +} + +// UpdatePlan validates cron and updates the plan. +func (s *ScheduledTestService) UpdatePlan(ctx context.Context, plan *ScheduledTestPlan) (*ScheduledTestPlan, error) { + nextRun, err := computeNextRun(plan.CronExpression, time.Now()) + if err != nil { + return nil, fmt.Errorf("invalid cron expression: %w", err) + } + plan.NextRunAt = &nextRun + + return s.planRepo.Update(ctx, plan) +} + +// DeletePlan removes a plan and its results (via CASCADE). +func (s *ScheduledTestService) DeletePlan(ctx context.Context, id int64) error { + return s.planRepo.Delete(ctx, id) +} + +// ListResults returns the most recent results for a plan. +func (s *ScheduledTestService) ListResults(ctx context.Context, planID int64, limit int) ([]*ScheduledTestResult, error) { + if limit <= 0 { + limit = 50 + } + return s.resultRepo.ListByPlanID(ctx, planID, limit) +} + +// SaveResult inserts a result and prunes old entries beyond maxResults. +func (s *ScheduledTestService) SaveResult(ctx context.Context, planID int64, maxResults int, outcome *ScheduledTestOutcome) error { + result := &ScheduledTestResult{ + PlanID: planID, + Status: outcome.Status, + ResponseText: outcome.ResponseText, + ErrorMessage: outcome.ErrorMessage, + LatencyMs: outcome.LatencyMs, + StartedAt: outcome.StartedAt, + FinishedAt: outcome.FinishedAt, + } + if _, err := s.resultRepo.Create(ctx, result); err != nil { + return err + } + return s.resultRepo.PruneOldResults(ctx, planID, maxResults) +} + +func computeNextRun(cronExpr string, from time.Time) (time.Time, error) { + sched, err := scheduledTestCronParser.Parse(cronExpr) + if err != nil { + return time.Time{}, err + } + return sched.Next(from), nil +} diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index 920ab1cc..0b6091b2 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -274,6 +274,28 @@ func ProvideIdempotencyCleanupService(repo IdempotencyRepository, cfg *config.Co return svc } +// ProvideScheduledTestService creates ScheduledTestService. +func ProvideScheduledTestService( + planRepo ScheduledTestPlanRepository, + resultRepo ScheduledTestResultRepository, +) *ScheduledTestService { + return NewScheduledTestService(planRepo, resultRepo) +} + +// ProvideScheduledTestRunnerService creates and starts ScheduledTestRunnerService. +func ProvideScheduledTestRunnerService( + planRepo ScheduledTestPlanRepository, + scheduledSvc *ScheduledTestService, + accountTestSvc *AccountTestService, + db *sql.DB, + redisClient *redis.Client, + cfg *config.Config, +) *ScheduledTestRunnerService { + svc := NewScheduledTestRunnerService(planRepo, scheduledSvc, accountTestSvc, db, redisClient, cfg) + svc.Start() + return svc +} + // ProvideOpsScheduledReportService creates and starts OpsScheduledReportService. func ProvideOpsScheduledReportService( opsService *OpsService, @@ -380,4 +402,6 @@ var ProviderSet = wire.NewSet( ProvideIdempotencyCoordinator, ProvideSystemOperationLockService, ProvideIdempotencyCleanupService, + ProvideScheduledTestService, + ProvideScheduledTestRunnerService, ) diff --git a/backend/migrations/066_add_scheduled_test_tables.sql b/backend/migrations/066_add_scheduled_test_tables.sql new file mode 100644 index 00000000..a9f839c0 --- /dev/null +++ b/backend/migrations/066_add_scheduled_test_tables.sql @@ -0,0 +1,30 @@ +-- 066_add_scheduled_test_tables.sql +-- Scheduled account test plans and results + +CREATE TABLE IF NOT EXISTS scheduled_test_plans ( + id BIGSERIAL PRIMARY KEY, + account_id BIGINT NOT NULL REFERENCES accounts(id) ON DELETE CASCADE, + model_id VARCHAR(100) NOT NULL DEFAULT '', + cron_expression VARCHAR(100) NOT NULL DEFAULT '*/30 * * * *', + enabled BOOLEAN NOT NULL DEFAULT true, + max_results INT NOT NULL DEFAULT 50, + last_run_at TIMESTAMPTZ, + next_run_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_stp_account_id ON scheduled_test_plans(account_id); +CREATE INDEX IF NOT EXISTS idx_stp_enabled_next_run ON scheduled_test_plans(enabled, next_run_at) WHERE enabled = true; + +CREATE TABLE IF NOT EXISTS scheduled_test_results ( + id BIGSERIAL PRIMARY KEY, + plan_id BIGINT NOT NULL REFERENCES scheduled_test_plans(id) ON DELETE CASCADE, + status VARCHAR(20) NOT NULL DEFAULT 'success', + response_text TEXT NOT NULL DEFAULT '', + error_message TEXT NOT NULL DEFAULT '', + latency_ms BIGINT NOT NULL DEFAULT 0, + started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + finished_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_str_plan_created ON scheduled_test_results(plan_id, created_at DESC); diff --git a/deploy/docker-compose.local.yml b/deploy/docker-compose.local.yml index 0ef397df..2f969ed2 100644 --- a/deploy/docker-compose.local.yml +++ b/deploy/docker-compose.local.yml @@ -24,7 +24,8 @@ services: # Sub2API Application # =========================================================================== sub2api: - image: weishaw/sub2api:latest + image: sub2api:local + # image: weishaw/sub2api:latest # 远程镜像(注释掉) container_name: sub2api restart: unless-stopped ulimits: diff --git a/frontend/src/api/admin/index.ts b/frontend/src/api/admin/index.ts index 5db998e5..135ca50b 100644 --- a/frontend/src/api/admin/index.ts +++ b/frontend/src/api/admin/index.ts @@ -22,6 +22,7 @@ import opsAPI from './ops' import errorPassthroughAPI from './errorPassthrough' import dataManagementAPI from './dataManagement' import apiKeysAPI from './apiKeys' +import scheduledTestsAPI from './scheduledTests' /** * Unified admin API object for convenient access @@ -45,7 +46,8 @@ export const adminAPI = { ops: opsAPI, errorPassthrough: errorPassthroughAPI, dataManagement: dataManagementAPI, - apiKeys: apiKeysAPI + apiKeys: apiKeysAPI, + scheduledTests: scheduledTestsAPI } export { @@ -67,7 +69,8 @@ export { opsAPI, errorPassthroughAPI, dataManagementAPI, - apiKeysAPI + apiKeysAPI, + scheduledTestsAPI } export default adminAPI diff --git a/frontend/src/api/admin/scheduledTests.ts b/frontend/src/api/admin/scheduledTests.ts new file mode 100644 index 00000000..18e33220 --- /dev/null +++ b/frontend/src/api/admin/scheduledTests.ts @@ -0,0 +1,85 @@ +/** + * Admin Scheduled Tests API endpoints + * Handles scheduled test plan management for account connectivity monitoring + */ + +import { apiClient } from '../client' +import type { + ScheduledTestPlan, + ScheduledTestResult, + CreateScheduledTestPlanRequest, + UpdateScheduledTestPlanRequest +} from '@/types' + +/** + * List all scheduled test plans for an account + * @param accountId - Account ID + * @returns List of scheduled test plans + */ +export async function listByAccount(accountId: number): Promise { + const { data } = await apiClient.get( + `/admin/accounts/${accountId}/scheduled-test-plans` + ) + return data +} + +/** + * Create a new scheduled test plan + * @param req - Plan creation request + * @returns Created plan + */ +export async function create(req: CreateScheduledTestPlanRequest): Promise { + const { data } = await apiClient.post( + '/admin/scheduled-test-plans', + req + ) + return data +} + +/** + * Update an existing scheduled test plan + * @param id - Plan ID + * @param req - Fields to update + * @returns Updated plan + */ +export async function update(id: number, req: UpdateScheduledTestPlanRequest): Promise { + const { data } = await apiClient.put( + `/admin/scheduled-test-plans/${id}`, + req + ) + return data +} + +/** + * Delete a scheduled test plan + * @param id - Plan ID + */ +export async function deletePlan(id: number): Promise { + await apiClient.delete(`/admin/scheduled-test-plans/${id}`) +} + +/** + * List test results for a plan + * @param planId - Plan ID + * @param limit - Optional max number of results to return + * @returns List of test results + */ +export async function listResults(planId: number, limit?: number): Promise { + const { data } = await apiClient.get( + `/admin/scheduled-test-plans/${planId}/results`, + { + params: limit ? { limit } : undefined + } + ) + return data +} + +export const scheduledTestsAPI = { + listByAccount, + create, + update, + delete: deletePlan, + listResults +} + +export default scheduledTestsAPI diff --git a/frontend/src/components/admin/account/AccountActionMenu.vue b/frontend/src/components/admin/account/AccountActionMenu.vue index 2325f4b4..fbff0bed 100644 --- a/frontend/src/components/admin/account/AccountActionMenu.vue +++ b/frontend/src/components/admin/account/AccountActionMenu.vue @@ -18,6 +18,10 @@ {{ t('admin.accounts.viewStats') }} +