package admin import ( "context" "strconv" "strings" infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors" "github.com/Wei-Shaw/sub2api/internal/pkg/response" middleware2 "github.com/Wei-Shaw/sub2api/internal/server/middleware" "github.com/Wei-Shaw/sub2api/internal/service" "github.com/gin-gonic/gin" ) type DataManagementHandler struct { dataManagementService dataManagementService } func NewDataManagementHandler(dataManagementService *service.DataManagementService) *DataManagementHandler { return &DataManagementHandler{dataManagementService: dataManagementService} } type dataManagementService interface { GetConfig(ctx context.Context) (service.DataManagementConfig, error) UpdateConfig(ctx context.Context, cfg service.DataManagementConfig) (service.DataManagementConfig, error) ValidateS3(ctx context.Context, cfg service.DataManagementS3Config) (service.DataManagementTestS3Result, error) CreateBackupJob(ctx context.Context, input service.DataManagementCreateBackupJobInput) (service.DataManagementBackupJob, error) ListSourceProfiles(ctx context.Context, sourceType string) ([]service.DataManagementSourceProfile, error) CreateSourceProfile(ctx context.Context, input service.DataManagementCreateSourceProfileInput) (service.DataManagementSourceProfile, error) UpdateSourceProfile(ctx context.Context, input service.DataManagementUpdateSourceProfileInput) (service.DataManagementSourceProfile, error) DeleteSourceProfile(ctx context.Context, sourceType, profileID string) error SetActiveSourceProfile(ctx context.Context, sourceType, profileID string) (service.DataManagementSourceProfile, error) ListS3Profiles(ctx context.Context) ([]service.DataManagementS3Profile, error) CreateS3Profile(ctx context.Context, input service.DataManagementCreateS3ProfileInput) (service.DataManagementS3Profile, error) UpdateS3Profile(ctx context.Context, input service.DataManagementUpdateS3ProfileInput) (service.DataManagementS3Profile, error) DeleteS3Profile(ctx context.Context, profileID string) error SetActiveS3Profile(ctx context.Context, profileID string) (service.DataManagementS3Profile, error) ListBackupJobs(ctx context.Context, input service.DataManagementListBackupJobsInput) (service.DataManagementListBackupJobsResult, error) GetBackupJob(ctx context.Context, jobID string) (service.DataManagementBackupJob, error) EnsureAgentEnabled(ctx context.Context) error GetAgentHealth(ctx context.Context) service.DataManagementAgentHealth } type TestS3ConnectionRequest struct { Endpoint string `json:"endpoint"` Region string `json:"region" binding:"required"` Bucket string `json:"bucket" binding:"required"` AccessKeyID string `json:"access_key_id"` SecretAccessKey string `json:"secret_access_key"` Prefix string `json:"prefix"` ForcePathStyle bool `json:"force_path_style"` UseSSL bool `json:"use_ssl"` } type CreateBackupJobRequest struct { BackupType string `json:"backup_type" binding:"required,oneof=postgres redis full"` UploadToS3 bool `json:"upload_to_s3"` S3ProfileID string `json:"s3_profile_id"` PostgresID string `json:"postgres_profile_id"` RedisID string `json:"redis_profile_id"` IdempotencyKey string `json:"idempotency_key"` } type CreateSourceProfileRequest struct { ProfileID string `json:"profile_id" binding:"required"` Name string `json:"name" binding:"required"` Config service.DataManagementSourceConfig `json:"config" binding:"required"` SetActive bool `json:"set_active"` } type UpdateSourceProfileRequest struct { Name string `json:"name" binding:"required"` Config service.DataManagementSourceConfig `json:"config" binding:"required"` } type CreateS3ProfileRequest struct { ProfileID string `json:"profile_id" binding:"required"` Name string `json:"name" binding:"required"` Enabled bool `json:"enabled"` Endpoint string `json:"endpoint"` Region string `json:"region"` Bucket string `json:"bucket"` AccessKeyID string `json:"access_key_id"` SecretAccessKey string `json:"secret_access_key"` Prefix string `json:"prefix"` ForcePathStyle bool `json:"force_path_style"` UseSSL bool `json:"use_ssl"` SetActive bool `json:"set_active"` } type UpdateS3ProfileRequest struct { Name string `json:"name" binding:"required"` Enabled bool `json:"enabled"` Endpoint string `json:"endpoint"` Region string `json:"region"` Bucket string `json:"bucket"` AccessKeyID string `json:"access_key_id"` SecretAccessKey string `json:"secret_access_key"` Prefix string `json:"prefix"` ForcePathStyle bool `json:"force_path_style"` UseSSL bool `json:"use_ssl"` } func (h *DataManagementHandler) GetAgentHealth(c *gin.Context) { health := h.getAgentHealth(c) payload := gin.H{ "enabled": health.Enabled, "reason": health.Reason, "socket_path": health.SocketPath, } if health.Agent != nil { payload["agent"] = gin.H{ "status": health.Agent.Status, "version": health.Agent.Version, "uptime_seconds": health.Agent.UptimeSeconds, } } response.Success(c, payload) } func (h *DataManagementHandler) GetConfig(c *gin.Context) { if !h.requireAgentEnabled(c) { return } cfg, err := h.dataManagementService.GetConfig(c.Request.Context()) if err != nil { response.ErrorFrom(c, err) return } response.Success(c, cfg) } func (h *DataManagementHandler) UpdateConfig(c *gin.Context) { var req service.DataManagementConfig if err := c.ShouldBindJSON(&req); err != nil { response.BadRequest(c, "Invalid request: "+err.Error()) return } if !h.requireAgentEnabled(c) { return } cfg, err := h.dataManagementService.UpdateConfig(c.Request.Context(), req) if err != nil { response.ErrorFrom(c, err) return } response.Success(c, cfg) } func (h *DataManagementHandler) TestS3(c *gin.Context) { var req TestS3ConnectionRequest if err := c.ShouldBindJSON(&req); err != nil { response.BadRequest(c, "Invalid request: "+err.Error()) return } if !h.requireAgentEnabled(c) { return } result, err := h.dataManagementService.ValidateS3(c.Request.Context(), service.DataManagementS3Config{ Enabled: true, Endpoint: req.Endpoint, Region: req.Region, Bucket: req.Bucket, AccessKeyID: req.AccessKeyID, SecretAccessKey: req.SecretAccessKey, Prefix: req.Prefix, ForcePathStyle: req.ForcePathStyle, UseSSL: req.UseSSL, }) if err != nil { response.ErrorFrom(c, err) return } response.Success(c, gin.H{"ok": result.OK, "message": result.Message}) } func (h *DataManagementHandler) CreateBackupJob(c *gin.Context) { var req CreateBackupJobRequest if err := c.ShouldBindJSON(&req); err != nil { response.BadRequest(c, "Invalid request: "+err.Error()) return } req.IdempotencyKey = normalizeBackupIdempotencyKey(c.GetHeader("X-Idempotency-Key"), req.IdempotencyKey) if !h.requireAgentEnabled(c) { return } triggeredBy := "admin:unknown" if subject, ok := middleware2.GetAuthSubjectFromContext(c); ok { triggeredBy = "admin:" + strconv.FormatInt(subject.UserID, 10) } job, err := h.dataManagementService.CreateBackupJob(c.Request.Context(), service.DataManagementCreateBackupJobInput{ BackupType: req.BackupType, UploadToS3: req.UploadToS3, S3ProfileID: req.S3ProfileID, PostgresID: req.PostgresID, RedisID: req.RedisID, TriggeredBy: triggeredBy, IdempotencyKey: req.IdempotencyKey, }) if err != nil { response.ErrorFrom(c, err) return } response.Success(c, gin.H{"job_id": job.JobID, "status": job.Status}) } func (h *DataManagementHandler) ListSourceProfiles(c *gin.Context) { sourceType := strings.TrimSpace(c.Param("source_type")) if sourceType == "" { response.BadRequest(c, "Invalid source_type") return } if sourceType != "postgres" && sourceType != "redis" { response.BadRequest(c, "source_type must be postgres or redis") return } if !h.requireAgentEnabled(c) { return } items, err := h.dataManagementService.ListSourceProfiles(c.Request.Context(), sourceType) if err != nil { response.ErrorFrom(c, err) return } response.Success(c, gin.H{"items": items}) } func (h *DataManagementHandler) CreateSourceProfile(c *gin.Context) { sourceType := strings.TrimSpace(c.Param("source_type")) if sourceType != "postgres" && sourceType != "redis" { response.BadRequest(c, "source_type must be postgres or redis") return } var req CreateSourceProfileRequest if err := c.ShouldBindJSON(&req); err != nil { response.BadRequest(c, "Invalid request: "+err.Error()) return } if !h.requireAgentEnabled(c) { return } profile, err := h.dataManagementService.CreateSourceProfile(c.Request.Context(), service.DataManagementCreateSourceProfileInput{ SourceType: sourceType, ProfileID: req.ProfileID, Name: req.Name, Config: req.Config, SetActive: req.SetActive, }) if err != nil { response.ErrorFrom(c, err) return } response.Success(c, profile) } func (h *DataManagementHandler) UpdateSourceProfile(c *gin.Context) { sourceType := strings.TrimSpace(c.Param("source_type")) if sourceType != "postgres" && sourceType != "redis" { response.BadRequest(c, "source_type must be postgres or redis") return } profileID := strings.TrimSpace(c.Param("profile_id")) if profileID == "" { response.BadRequest(c, "Invalid profile_id") return } var req UpdateSourceProfileRequest if err := c.ShouldBindJSON(&req); err != nil { response.BadRequest(c, "Invalid request: "+err.Error()) return } if !h.requireAgentEnabled(c) { return } profile, err := h.dataManagementService.UpdateSourceProfile(c.Request.Context(), service.DataManagementUpdateSourceProfileInput{ SourceType: sourceType, ProfileID: profileID, Name: req.Name, Config: req.Config, }) if err != nil { response.ErrorFrom(c, err) return } response.Success(c, profile) } func (h *DataManagementHandler) DeleteSourceProfile(c *gin.Context) { sourceType := strings.TrimSpace(c.Param("source_type")) if sourceType != "postgres" && sourceType != "redis" { response.BadRequest(c, "source_type must be postgres or redis") return } profileID := strings.TrimSpace(c.Param("profile_id")) if profileID == "" { response.BadRequest(c, "Invalid profile_id") return } if !h.requireAgentEnabled(c) { return } if err := h.dataManagementService.DeleteSourceProfile(c.Request.Context(), sourceType, profileID); err != nil { response.ErrorFrom(c, err) return } response.Success(c, gin.H{"deleted": true}) } func (h *DataManagementHandler) SetActiveSourceProfile(c *gin.Context) { sourceType := strings.TrimSpace(c.Param("source_type")) if sourceType != "postgres" && sourceType != "redis" { response.BadRequest(c, "source_type must be postgres or redis") return } profileID := strings.TrimSpace(c.Param("profile_id")) if profileID == "" { response.BadRequest(c, "Invalid profile_id") return } if !h.requireAgentEnabled(c) { return } profile, err := h.dataManagementService.SetActiveSourceProfile(c.Request.Context(), sourceType, profileID) if err != nil { response.ErrorFrom(c, err) return } response.Success(c, profile) } func (h *DataManagementHandler) ListS3Profiles(c *gin.Context) { if !h.requireAgentEnabled(c) { return } items, err := h.dataManagementService.ListS3Profiles(c.Request.Context()) if err != nil { response.ErrorFrom(c, err) return } response.Success(c, gin.H{"items": items}) } func (h *DataManagementHandler) CreateS3Profile(c *gin.Context) { var req CreateS3ProfileRequest if err := c.ShouldBindJSON(&req); err != nil { response.BadRequest(c, "Invalid request: "+err.Error()) return } if !h.requireAgentEnabled(c) { return } profile, err := h.dataManagementService.CreateS3Profile(c.Request.Context(), service.DataManagementCreateS3ProfileInput{ ProfileID: req.ProfileID, Name: req.Name, SetActive: req.SetActive, S3: service.DataManagementS3Config{ Enabled: req.Enabled, Endpoint: req.Endpoint, Region: req.Region, Bucket: req.Bucket, AccessKeyID: req.AccessKeyID, SecretAccessKey: req.SecretAccessKey, Prefix: req.Prefix, ForcePathStyle: req.ForcePathStyle, UseSSL: req.UseSSL, }, }) if err != nil { response.ErrorFrom(c, err) return } response.Success(c, profile) } func (h *DataManagementHandler) UpdateS3Profile(c *gin.Context) { var req UpdateS3ProfileRequest if err := c.ShouldBindJSON(&req); err != nil { response.BadRequest(c, "Invalid request: "+err.Error()) return } profileID := strings.TrimSpace(c.Param("profile_id")) if profileID == "" { response.BadRequest(c, "Invalid profile_id") return } if !h.requireAgentEnabled(c) { return } profile, err := h.dataManagementService.UpdateS3Profile(c.Request.Context(), service.DataManagementUpdateS3ProfileInput{ ProfileID: profileID, Name: req.Name, S3: service.DataManagementS3Config{ Enabled: req.Enabled, Endpoint: req.Endpoint, Region: req.Region, Bucket: req.Bucket, AccessKeyID: req.AccessKeyID, SecretAccessKey: req.SecretAccessKey, Prefix: req.Prefix, ForcePathStyle: req.ForcePathStyle, UseSSL: req.UseSSL, }, }) if err != nil { response.ErrorFrom(c, err) return } response.Success(c, profile) } func (h *DataManagementHandler) DeleteS3Profile(c *gin.Context) { profileID := strings.TrimSpace(c.Param("profile_id")) if profileID == "" { response.BadRequest(c, "Invalid profile_id") return } if !h.requireAgentEnabled(c) { return } if err := h.dataManagementService.DeleteS3Profile(c.Request.Context(), profileID); err != nil { response.ErrorFrom(c, err) return } response.Success(c, gin.H{"deleted": true}) } func (h *DataManagementHandler) SetActiveS3Profile(c *gin.Context) { profileID := strings.TrimSpace(c.Param("profile_id")) if profileID == "" { response.BadRequest(c, "Invalid profile_id") return } if !h.requireAgentEnabled(c) { return } profile, err := h.dataManagementService.SetActiveS3Profile(c.Request.Context(), profileID) if err != nil { response.ErrorFrom(c, err) return } response.Success(c, profile) } func (h *DataManagementHandler) ListBackupJobs(c *gin.Context) { if !h.requireAgentEnabled(c) { return } pageSize := int32(20) if raw := strings.TrimSpace(c.Query("page_size")); raw != "" { v, err := strconv.Atoi(raw) if err != nil || v <= 0 { response.BadRequest(c, "Invalid page_size") return } pageSize = int32(v) } result, err := h.dataManagementService.ListBackupJobs(c.Request.Context(), service.DataManagementListBackupJobsInput{ PageSize: pageSize, PageToken: c.Query("page_token"), Status: c.Query("status"), BackupType: c.Query("backup_type"), }) if err != nil { response.ErrorFrom(c, err) return } response.Success(c, result) } func (h *DataManagementHandler) GetBackupJob(c *gin.Context) { jobID := strings.TrimSpace(c.Param("job_id")) if jobID == "" { response.BadRequest(c, "Invalid backup job ID") return } if !h.requireAgentEnabled(c) { return } job, err := h.dataManagementService.GetBackupJob(c.Request.Context(), jobID) if err != nil { response.ErrorFrom(c, err) return } response.Success(c, job) } func (h *DataManagementHandler) requireAgentEnabled(c *gin.Context) bool { if h.dataManagementService == nil { err := infraerrors.ServiceUnavailable( service.DataManagementAgentUnavailableReason, "data management agent service is not configured", ).WithMetadata(map[string]string{"socket_path": service.DefaultDataManagementAgentSocketPath}) response.ErrorFrom(c, err) return false } if err := h.dataManagementService.EnsureAgentEnabled(c.Request.Context()); err != nil { response.ErrorFrom(c, err) return false } return true } func (h *DataManagementHandler) getAgentHealth(c *gin.Context) service.DataManagementAgentHealth { if h.dataManagementService == nil { return service.DataManagementAgentHealth{ Enabled: false, Reason: service.DataManagementAgentUnavailableReason, SocketPath: service.DefaultDataManagementAgentSocketPath, } } return h.dataManagementService.GetAgentHealth(c.Request.Context()) } func normalizeBackupIdempotencyKey(headerValue, bodyValue string) string { headerKey := strings.TrimSpace(headerValue) if headerKey != "" { return headerKey } return strings.TrimSpace(bodyValue) }