feat(ops): 增强上游错误追踪和新增定时报告服务
- 优化错误日志中间件,即使请求成功也记录上游重试/故障转移事件 - 新增OpsScheduledReportService支持定时报告功能 - 使用Redis分布式锁确保定时任务单实例执行 - 完善依赖注入配置 - 优化前端错误趋势图表展示
This commit is contained in:
@@ -529,9 +529,9 @@ func (c *OpsMetricsCollector) queryErrorCounts(ctx context.Context, start, end t
|
||||
) {
|
||||
q := `
|
||||
SELECT
|
||||
COALESCE(COUNT(*), 0) AS error_total,
|
||||
COALESCE(COUNT(*) FILTER (WHERE is_business_limited), 0) AS business_limited,
|
||||
COALESCE(COUNT(*) FILTER (WHERE NOT is_business_limited), 0) AS error_sla,
|
||||
COALESCE(COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400), 0) AS error_total,
|
||||
COALESCE(COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400 AND is_business_limited), 0) AS business_limited,
|
||||
COALESCE(COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400 AND NOT is_business_limited), 0) AS error_sla,
|
||||
COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) NOT IN (429, 529)), 0) AS upstream_excl,
|
||||
COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) = 429), 0) AS upstream_429,
|
||||
COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) = 529), 0) AS upstream_529
|
||||
|
||||
708
backend/internal/service/ops_scheduled_report_service.go
Normal file
708
backend/internal/service/ops_scheduled_report_service.go
Normal file
@@ -0,0 +1,708 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/config"
|
||||
"github.com/google/uuid"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/robfig/cron/v3"
|
||||
)
|
||||
|
||||
const (
|
||||
opsScheduledReportJobName = "ops_scheduled_reports"
|
||||
|
||||
opsScheduledReportLeaderLockKeyDefault = "ops:scheduled_reports:leader"
|
||||
opsScheduledReportLeaderLockTTLDefault = 5 * time.Minute
|
||||
|
||||
opsScheduledReportLastRunKeyPrefix = "ops:scheduled_reports:last_run:"
|
||||
|
||||
opsScheduledReportTickInterval = 1 * time.Minute
|
||||
)
|
||||
|
||||
var opsScheduledReportCronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
|
||||
|
||||
var opsScheduledReportReleaseScript = redis.NewScript(`
|
||||
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
||||
return redis.call("DEL", KEYS[1])
|
||||
end
|
||||
return 0
|
||||
`)
|
||||
|
||||
type OpsScheduledReportService struct {
|
||||
opsService *OpsService
|
||||
userService *UserService
|
||||
emailService *EmailService
|
||||
redisClient *redis.Client
|
||||
cfg *config.Config
|
||||
|
||||
instanceID string
|
||||
loc *time.Location
|
||||
|
||||
distributedLockOn bool
|
||||
warnNoRedisOnce sync.Once
|
||||
|
||||
startOnce sync.Once
|
||||
stopOnce sync.Once
|
||||
stopCtx context.Context
|
||||
stop context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewOpsScheduledReportService(
|
||||
opsService *OpsService,
|
||||
userService *UserService,
|
||||
emailService *EmailService,
|
||||
redisClient *redis.Client,
|
||||
cfg *config.Config,
|
||||
) *OpsScheduledReportService {
|
||||
lockOn := true
|
||||
if cfg != nil && strings.TrimSpace(cfg.RunMode) == config.RunModeSimple {
|
||||
lockOn = false
|
||||
}
|
||||
|
||||
loc := time.Local
|
||||
if cfg != nil && strings.TrimSpace(cfg.Timezone) != "" {
|
||||
if parsed, err := time.LoadLocation(strings.TrimSpace(cfg.Timezone)); err == nil && parsed != nil {
|
||||
loc = parsed
|
||||
}
|
||||
}
|
||||
return &OpsScheduledReportService{
|
||||
opsService: opsService,
|
||||
userService: userService,
|
||||
emailService: emailService,
|
||||
redisClient: redisClient,
|
||||
cfg: cfg,
|
||||
|
||||
instanceID: uuid.NewString(),
|
||||
loc: loc,
|
||||
distributedLockOn: lockOn,
|
||||
warnNoRedisOnce: sync.Once{},
|
||||
startOnce: sync.Once{},
|
||||
stopOnce: sync.Once{},
|
||||
stopCtx: nil,
|
||||
stop: nil,
|
||||
wg: sync.WaitGroup{},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *OpsScheduledReportService) Start() {
|
||||
s.StartWithContext(context.Background())
|
||||
}
|
||||
|
||||
func (s *OpsScheduledReportService) StartWithContext(ctx context.Context) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
if s.cfg != nil && !s.cfg.Ops.Enabled {
|
||||
return
|
||||
}
|
||||
if s.opsService == nil || s.emailService == nil {
|
||||
return
|
||||
}
|
||||
|
||||
s.startOnce.Do(func() {
|
||||
s.stopCtx, s.stop = context.WithCancel(ctx)
|
||||
s.wg.Add(1)
|
||||
go s.run()
|
||||
})
|
||||
}
|
||||
|
||||
func (s *OpsScheduledReportService) Stop() {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
s.stopOnce.Do(func() {
|
||||
if s.stop != nil {
|
||||
s.stop()
|
||||
}
|
||||
})
|
||||
s.wg.Wait()
|
||||
}
|
||||
|
||||
func (s *OpsScheduledReportService) run() {
|
||||
defer s.wg.Done()
|
||||
|
||||
ticker := time.NewTicker(opsScheduledReportTickInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
s.runOnce()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
s.runOnce()
|
||||
case <-s.stopCtx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *OpsScheduledReportService) runOnce() {
|
||||
if s == nil || s.opsService == nil || s.emailService == nil {
|
||||
return
|
||||
}
|
||||
|
||||
startedAt := time.Now().UTC()
|
||||
runAt := startedAt
|
||||
|
||||
ctx, cancel := context.WithTimeout(s.stopCtx, 60*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Respect ops monitoring enabled switch.
|
||||
if !s.opsService.IsMonitoringEnabled(ctx) {
|
||||
return
|
||||
}
|
||||
|
||||
release, ok := s.tryAcquireLeaderLock(ctx)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if release != nil {
|
||||
defer release()
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
if s.loc != nil {
|
||||
now = now.In(s.loc)
|
||||
}
|
||||
|
||||
reports := s.listScheduledReports(ctx, now)
|
||||
if len(reports) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for _, report := range reports {
|
||||
if report == nil || !report.Enabled {
|
||||
continue
|
||||
}
|
||||
if report.NextRunAt.After(now) {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := s.runReport(ctx, report, now); err != nil {
|
||||
s.recordHeartbeatError(runAt, time.Since(startedAt), err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
s.recordHeartbeatSuccess(runAt, time.Since(startedAt))
|
||||
}
|
||||
|
||||
type opsScheduledReport struct {
|
||||
Name string
|
||||
ReportType string
|
||||
Schedule string
|
||||
Enabled bool
|
||||
|
||||
TimeRange time.Duration
|
||||
|
||||
Recipients []string
|
||||
|
||||
ErrorDigestMinCount int
|
||||
AccountHealthErrorRateThreshold float64
|
||||
|
||||
LastRunAt *time.Time
|
||||
NextRunAt time.Time
|
||||
}
|
||||
|
||||
func (s *OpsScheduledReportService) listScheduledReports(ctx context.Context, now time.Time) []*opsScheduledReport {
|
||||
if s == nil || s.opsService == nil {
|
||||
return nil
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
emailCfg, err := s.opsService.GetEmailNotificationConfig(ctx)
|
||||
if err != nil || emailCfg == nil {
|
||||
return nil
|
||||
}
|
||||
if !emailCfg.Report.Enabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
recipients := normalizeEmails(emailCfg.Report.Recipients)
|
||||
|
||||
type reportDef struct {
|
||||
enabled bool
|
||||
name string
|
||||
kind string
|
||||
timeRange time.Duration
|
||||
schedule string
|
||||
}
|
||||
|
||||
defs := []reportDef{
|
||||
{enabled: emailCfg.Report.DailySummaryEnabled, name: "日报", kind: "daily_summary", timeRange: 24 * time.Hour, schedule: emailCfg.Report.DailySummarySchedule},
|
||||
{enabled: emailCfg.Report.WeeklySummaryEnabled, name: "周报", kind: "weekly_summary", timeRange: 7 * 24 * time.Hour, schedule: emailCfg.Report.WeeklySummarySchedule},
|
||||
{enabled: emailCfg.Report.ErrorDigestEnabled, name: "错误摘要", kind: "error_digest", timeRange: 24 * time.Hour, schedule: emailCfg.Report.ErrorDigestSchedule},
|
||||
{enabled: emailCfg.Report.AccountHealthEnabled, name: "账号健康", kind: "account_health", timeRange: 24 * time.Hour, schedule: emailCfg.Report.AccountHealthSchedule},
|
||||
}
|
||||
|
||||
out := make([]*opsScheduledReport, 0, len(defs))
|
||||
for _, d := range defs {
|
||||
if !d.enabled {
|
||||
continue
|
||||
}
|
||||
spec := strings.TrimSpace(d.schedule)
|
||||
if spec == "" {
|
||||
continue
|
||||
}
|
||||
sched, err := opsScheduledReportCronParser.Parse(spec)
|
||||
if err != nil {
|
||||
log.Printf("[OpsScheduledReport] invalid cron spec=%q for report=%s: %v", spec, d.kind, err)
|
||||
continue
|
||||
}
|
||||
|
||||
lastRun := s.getLastRunAt(ctx, d.kind)
|
||||
base := lastRun
|
||||
if base.IsZero() {
|
||||
// Allow a schedule matching the current minute to trigger right after startup.
|
||||
base = now.Add(-1 * time.Minute)
|
||||
}
|
||||
next := sched.Next(base)
|
||||
if next.IsZero() {
|
||||
continue
|
||||
}
|
||||
|
||||
var lastRunPtr *time.Time
|
||||
if !lastRun.IsZero() {
|
||||
lastCopy := lastRun
|
||||
lastRunPtr = &lastCopy
|
||||
}
|
||||
|
||||
out = append(out, &opsScheduledReport{
|
||||
Name: d.name,
|
||||
ReportType: d.kind,
|
||||
Schedule: spec,
|
||||
Enabled: true,
|
||||
|
||||
TimeRange: d.timeRange,
|
||||
|
||||
Recipients: recipients,
|
||||
|
||||
ErrorDigestMinCount: emailCfg.Report.ErrorDigestMinCount,
|
||||
AccountHealthErrorRateThreshold: emailCfg.Report.AccountHealthErrorRateThreshold,
|
||||
|
||||
LastRunAt: lastRunPtr,
|
||||
NextRunAt: next,
|
||||
})
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func (s *OpsScheduledReportService) runReport(ctx context.Context, report *opsScheduledReport, now time.Time) error {
|
||||
if s == nil || s.opsService == nil || s.emailService == nil || report == nil {
|
||||
return nil
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
// Mark as "run" up-front so a broken SMTP config doesn't spam retries every minute.
|
||||
s.setLastRunAt(ctx, report.ReportType, now)
|
||||
|
||||
content, err := s.generateReportHTML(ctx, report, now)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if strings.TrimSpace(content) == "" {
|
||||
// Skip sending when the report decides not to emit content (e.g., digest below min count).
|
||||
return nil
|
||||
}
|
||||
|
||||
recipients := report.Recipients
|
||||
if len(recipients) == 0 && s.userService != nil {
|
||||
admin, err := s.userService.GetFirstAdmin(ctx)
|
||||
if err == nil && admin != nil && strings.TrimSpace(admin.Email) != "" {
|
||||
recipients = []string{strings.TrimSpace(admin.Email)}
|
||||
}
|
||||
}
|
||||
if len(recipients) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
subject := fmt.Sprintf("[Ops Report] %s", strings.TrimSpace(report.Name))
|
||||
|
||||
for _, to := range recipients {
|
||||
addr := strings.TrimSpace(to)
|
||||
if addr == "" {
|
||||
continue
|
||||
}
|
||||
if err := s.emailService.SendEmail(ctx, addr, subject, content); err != nil {
|
||||
// Ignore per-recipient failures; continue best-effort.
|
||||
continue
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *OpsScheduledReportService) generateReportHTML(ctx context.Context, report *opsScheduledReport, now time.Time) (string, error) {
|
||||
if s == nil || s.opsService == nil || report == nil {
|
||||
return "", fmt.Errorf("service not initialized")
|
||||
}
|
||||
if report.TimeRange <= 0 {
|
||||
return "", fmt.Errorf("invalid time range")
|
||||
}
|
||||
|
||||
end := now.UTC()
|
||||
start := end.Add(-report.TimeRange)
|
||||
|
||||
switch strings.TrimSpace(report.ReportType) {
|
||||
case "daily_summary", "weekly_summary":
|
||||
overview, err := s.opsService.GetDashboardOverview(ctx, &OpsDashboardFilter{
|
||||
StartTime: start,
|
||||
EndTime: end,
|
||||
Platform: "",
|
||||
GroupID: nil,
|
||||
QueryMode: OpsQueryModeAuto,
|
||||
})
|
||||
if err != nil {
|
||||
// If pre-aggregation isn't ready but the report is requested, fall back to raw.
|
||||
if strings.TrimSpace(report.ReportType) == "daily_summary" || strings.TrimSpace(report.ReportType) == "weekly_summary" {
|
||||
overview, err = s.opsService.GetDashboardOverview(ctx, &OpsDashboardFilter{
|
||||
StartTime: start,
|
||||
EndTime: end,
|
||||
Platform: "",
|
||||
GroupID: nil,
|
||||
QueryMode: OpsQueryModeRaw,
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
return buildOpsSummaryEmailHTML(report.Name, start, end, overview), nil
|
||||
case "error_digest":
|
||||
// Lightweight digest: list recent errors (status>=400) and breakdown by type.
|
||||
startTime := start
|
||||
endTime := end
|
||||
filter := &OpsErrorLogFilter{
|
||||
StartTime: &startTime,
|
||||
EndTime: &endTime,
|
||||
Page: 1,
|
||||
PageSize: 100,
|
||||
}
|
||||
out, err := s.opsService.GetErrorLogs(ctx, filter)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if report.ErrorDigestMinCount > 0 && out != nil && out.Total < report.ErrorDigestMinCount {
|
||||
return "", nil
|
||||
}
|
||||
return buildOpsErrorDigestEmailHTML(report.Name, start, end, out), nil
|
||||
case "account_health":
|
||||
// Best-effort: use account availability (not error rate yet).
|
||||
avail, err := s.opsService.GetAccountAvailability(ctx, "", nil)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
_ = report.AccountHealthErrorRateThreshold // reserved for future per-account error rate report
|
||||
return buildOpsAccountHealthEmailHTML(report.Name, start, end, avail), nil
|
||||
default:
|
||||
return "", fmt.Errorf("unknown report type: %s", report.ReportType)
|
||||
}
|
||||
}
|
||||
|
||||
func buildOpsSummaryEmailHTML(title string, start, end time.Time, overview *OpsDashboardOverview) string {
|
||||
if overview == nil {
|
||||
return fmt.Sprintf("<h2>%s</h2><p>No data.</p>", htmlEscape(title))
|
||||
}
|
||||
|
||||
latP50 := "-"
|
||||
latP99 := "-"
|
||||
if overview.Duration.P50 != nil {
|
||||
latP50 = fmt.Sprintf("%dms", *overview.Duration.P50)
|
||||
}
|
||||
if overview.Duration.P99 != nil {
|
||||
latP99 = fmt.Sprintf("%dms", *overview.Duration.P99)
|
||||
}
|
||||
|
||||
ttftP50 := "-"
|
||||
ttftP99 := "-"
|
||||
if overview.TTFT.P50 != nil {
|
||||
ttftP50 = fmt.Sprintf("%dms", *overview.TTFT.P50)
|
||||
}
|
||||
if overview.TTFT.P99 != nil {
|
||||
ttftP99 = fmt.Sprintf("%dms", *overview.TTFT.P99)
|
||||
}
|
||||
|
||||
return fmt.Sprintf(`
|
||||
<h2>%s</h2>
|
||||
<p><b>Period</b>: %s ~ %s (UTC)</p>
|
||||
<ul>
|
||||
<li><b>Total Requests</b>: %d</li>
|
||||
<li><b>Success</b>: %d</li>
|
||||
<li><b>Errors (SLA)</b>: %d</li>
|
||||
<li><b>Business Limited</b>: %d</li>
|
||||
<li><b>SLA</b>: %.2f%%</li>
|
||||
<li><b>Error Rate</b>: %.2f%%</li>
|
||||
<li><b>Upstream Error Rate (excl 429/529)</b>: %.2f%%</li>
|
||||
<li><b>Upstream Errors</b>: excl429/529=%d, 429=%d, 529=%d</li>
|
||||
<li><b>Latency</b>: p50=%s, p99=%s</li>
|
||||
<li><b>TTFT</b>: p50=%s, p99=%s</li>
|
||||
<li><b>Tokens</b>: %d</li>
|
||||
<li><b>QPS</b>: current=%.1f, peak=%.1f, avg=%.1f</li>
|
||||
<li><b>TPS</b>: current=%.1f, peak=%.1f, avg=%.1f</li>
|
||||
</ul>
|
||||
`,
|
||||
htmlEscape(strings.TrimSpace(title)),
|
||||
htmlEscape(start.UTC().Format(time.RFC3339)),
|
||||
htmlEscape(end.UTC().Format(time.RFC3339)),
|
||||
overview.RequestCountTotal,
|
||||
overview.SuccessCount,
|
||||
overview.ErrorCountSLA,
|
||||
overview.BusinessLimitedCount,
|
||||
overview.SLA*100,
|
||||
overview.ErrorRate*100,
|
||||
overview.UpstreamErrorRate*100,
|
||||
overview.UpstreamErrorCountExcl429529,
|
||||
overview.Upstream429Count,
|
||||
overview.Upstream529Count,
|
||||
htmlEscape(latP50),
|
||||
htmlEscape(latP99),
|
||||
htmlEscape(ttftP50),
|
||||
htmlEscape(ttftP99),
|
||||
overview.TokenConsumed,
|
||||
overview.QPS.Current,
|
||||
overview.QPS.Peak,
|
||||
overview.QPS.Avg,
|
||||
overview.TPS.Current,
|
||||
overview.TPS.Peak,
|
||||
overview.TPS.Avg,
|
||||
)
|
||||
}
|
||||
|
||||
func buildOpsErrorDigestEmailHTML(title string, start, end time.Time, list *OpsErrorLogList) string {
|
||||
total := 0
|
||||
recent := []*OpsErrorLog{}
|
||||
if list != nil {
|
||||
total = list.Total
|
||||
recent = list.Errors
|
||||
}
|
||||
if len(recent) > 10 {
|
||||
recent = recent[:10]
|
||||
}
|
||||
|
||||
rows := ""
|
||||
for _, item := range recent {
|
||||
if item == nil {
|
||||
continue
|
||||
}
|
||||
rows += fmt.Sprintf(
|
||||
"<tr><td>%s</td><td>%s</td><td>%d</td><td>%s</td></tr>",
|
||||
htmlEscape(item.CreatedAt.UTC().Format(time.RFC3339)),
|
||||
htmlEscape(item.Platform),
|
||||
item.StatusCode,
|
||||
htmlEscape(truncateString(item.Message, 180)),
|
||||
)
|
||||
}
|
||||
if rows == "" {
|
||||
rows = "<tr><td colspan=\"4\">No recent errors.</td></tr>"
|
||||
}
|
||||
|
||||
return fmt.Sprintf(`
|
||||
<h2>%s</h2>
|
||||
<p><b>Period</b>: %s ~ %s (UTC)</p>
|
||||
<p><b>Total Errors</b>: %d</p>
|
||||
<h3>Recent</h3>
|
||||
<table border="1" cellpadding="6" cellspacing="0" style="border-collapse:collapse;">
|
||||
<thead><tr><th>Time</th><th>Platform</th><th>Status</th><th>Message</th></tr></thead>
|
||||
<tbody>%s</tbody>
|
||||
</table>
|
||||
`,
|
||||
htmlEscape(strings.TrimSpace(title)),
|
||||
htmlEscape(start.UTC().Format(time.RFC3339)),
|
||||
htmlEscape(end.UTC().Format(time.RFC3339)),
|
||||
total,
|
||||
rows,
|
||||
)
|
||||
}
|
||||
|
||||
func buildOpsAccountHealthEmailHTML(title string, start, end time.Time, avail *OpsAccountAvailability) string {
|
||||
total := 0
|
||||
available := 0
|
||||
rateLimited := 0
|
||||
hasError := 0
|
||||
|
||||
if avail != nil && avail.Accounts != nil {
|
||||
for _, a := range avail.Accounts {
|
||||
if a == nil {
|
||||
continue
|
||||
}
|
||||
total++
|
||||
if a.IsAvailable {
|
||||
available++
|
||||
}
|
||||
if a.IsRateLimited {
|
||||
rateLimited++
|
||||
}
|
||||
if a.HasError {
|
||||
hasError++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Sprintf(`
|
||||
<h2>%s</h2>
|
||||
<p><b>Period</b>: %s ~ %s (UTC)</p>
|
||||
<ul>
|
||||
<li><b>Total Accounts</b>: %d</li>
|
||||
<li><b>Available</b>: %d</li>
|
||||
<li><b>Rate Limited</b>: %d</li>
|
||||
<li><b>Error</b>: %d</li>
|
||||
</ul>
|
||||
<p>Note: This report currently reflects account availability status only.</p>
|
||||
`,
|
||||
htmlEscape(strings.TrimSpace(title)),
|
||||
htmlEscape(start.UTC().Format(time.RFC3339)),
|
||||
htmlEscape(end.UTC().Format(time.RFC3339)),
|
||||
total,
|
||||
available,
|
||||
rateLimited,
|
||||
hasError,
|
||||
)
|
||||
}
|
||||
|
||||
func (s *OpsScheduledReportService) tryAcquireLeaderLock(ctx context.Context) (func(), bool) {
|
||||
if s == nil || !s.distributedLockOn {
|
||||
return nil, true
|
||||
}
|
||||
if s.redisClient == nil {
|
||||
s.warnNoRedisOnce.Do(func() {
|
||||
log.Printf("[OpsScheduledReport] redis not configured; running without distributed lock")
|
||||
})
|
||||
return nil, true
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
key := opsScheduledReportLeaderLockKeyDefault
|
||||
ttl := opsScheduledReportLeaderLockTTLDefault
|
||||
if strings.TrimSpace(key) == "" {
|
||||
key = "ops:scheduled_reports:leader"
|
||||
}
|
||||
if ttl <= 0 {
|
||||
ttl = 5 * time.Minute
|
||||
}
|
||||
|
||||
ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result()
|
||||
if err != nil {
|
||||
// Prefer fail-closed to avoid duplicate report sends when Redis is flaky.
|
||||
log.Printf("[OpsScheduledReport] leader lock SetNX failed; skipping this cycle: %v", err)
|
||||
return nil, false
|
||||
}
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return func() {
|
||||
_, _ = opsScheduledReportReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result()
|
||||
}, true
|
||||
}
|
||||
|
||||
func (s *OpsScheduledReportService) getLastRunAt(ctx context.Context, reportType string) time.Time {
|
||||
if s == nil || s.redisClient == nil {
|
||||
return time.Time{}
|
||||
}
|
||||
kind := strings.TrimSpace(reportType)
|
||||
if kind == "" {
|
||||
return time.Time{}
|
||||
}
|
||||
key := opsScheduledReportLastRunKeyPrefix + kind
|
||||
|
||||
raw, err := s.redisClient.Get(ctx, key).Result()
|
||||
if err != nil || strings.TrimSpace(raw) == "" {
|
||||
return time.Time{}
|
||||
}
|
||||
sec, err := strconv.ParseInt(strings.TrimSpace(raw), 10, 64)
|
||||
if err != nil || sec <= 0 {
|
||||
return time.Time{}
|
||||
}
|
||||
last := time.Unix(sec, 0)
|
||||
// Cron schedules are interpreted in the configured timezone (s.loc). Ensure the base time
|
||||
// passed into cron.Next() uses the same location; otherwise the job will drift by timezone
|
||||
// offset (e.g. Asia/Shanghai default would run 8h later after the first execution).
|
||||
if s.loc != nil {
|
||||
return last.In(s.loc)
|
||||
}
|
||||
return last.UTC()
|
||||
}
|
||||
|
||||
func (s *OpsScheduledReportService) setLastRunAt(ctx context.Context, reportType string, t time.Time) {
|
||||
if s == nil || s.redisClient == nil {
|
||||
return
|
||||
}
|
||||
kind := strings.TrimSpace(reportType)
|
||||
if kind == "" {
|
||||
return
|
||||
}
|
||||
if t.IsZero() {
|
||||
t = time.Now().UTC()
|
||||
}
|
||||
key := opsScheduledReportLastRunKeyPrefix + kind
|
||||
_ = s.redisClient.Set(ctx, key, strconv.FormatInt(t.UTC().Unix(), 10), 14*24*time.Hour).Err()
|
||||
}
|
||||
|
||||
func (s *OpsScheduledReportService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration) {
|
||||
if s == nil || s.opsService == nil || s.opsService.opsRepo == nil {
|
||||
return
|
||||
}
|
||||
now := time.Now().UTC()
|
||||
durMs := duration.Milliseconds()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
_ = s.opsService.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
|
||||
JobName: opsScheduledReportJobName,
|
||||
LastRunAt: &runAt,
|
||||
LastSuccessAt: &now,
|
||||
LastDurationMs: &durMs,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *OpsScheduledReportService) recordHeartbeatError(runAt time.Time, duration time.Duration, err error) {
|
||||
if s == nil || s.opsService == nil || s.opsService.opsRepo == nil || err == nil {
|
||||
return
|
||||
}
|
||||
now := time.Now().UTC()
|
||||
durMs := duration.Milliseconds()
|
||||
msg := truncateString(err.Error(), 2048)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
_ = s.opsService.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
|
||||
JobName: opsScheduledReportJobName,
|
||||
LastRunAt: &runAt,
|
||||
LastErrorAt: &now,
|
||||
LastError: &msg,
|
||||
LastDurationMs: &durMs,
|
||||
})
|
||||
}
|
||||
|
||||
func normalizeEmails(in []string) []string {
|
||||
if len(in) == 0 {
|
||||
return nil
|
||||
}
|
||||
seen := make(map[string]struct{}, len(in))
|
||||
out := make([]string, 0, len(in))
|
||||
for _, raw := range in {
|
||||
addr := strings.ToLower(strings.TrimSpace(raw))
|
||||
if addr == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[addr]; ok {
|
||||
continue
|
||||
}
|
||||
seen[addr] = struct{}{}
|
||||
out = append(out, addr)
|
||||
}
|
||||
return out
|
||||
}
|
||||
@@ -132,6 +132,19 @@ func ProvideOpsCleanupService(
|
||||
return svc
|
||||
}
|
||||
|
||||
// ProvideOpsScheduledReportService creates and starts OpsScheduledReportService.
|
||||
func ProvideOpsScheduledReportService(
|
||||
opsService *OpsService,
|
||||
userService *UserService,
|
||||
emailService *EmailService,
|
||||
redisClient *redis.Client,
|
||||
cfg *config.Config,
|
||||
) *OpsScheduledReportService {
|
||||
svc := NewOpsScheduledReportService(opsService, userService, emailService, redisClient, cfg)
|
||||
svc.Start()
|
||||
return svc
|
||||
}
|
||||
|
||||
// ProviderSet is the Wire provider set for all services
|
||||
var ProviderSet = wire.NewSet(
|
||||
// Core services
|
||||
@@ -169,6 +182,7 @@ var ProviderSet = wire.NewSet(
|
||||
ProvideOpsAggregationService,
|
||||
ProvideOpsAlertEvaluatorService,
|
||||
ProvideOpsCleanupService,
|
||||
ProvideOpsScheduledReportService,
|
||||
NewEmailService,
|
||||
ProvideEmailQueueService,
|
||||
NewTurnstileService,
|
||||
|
||||
Reference in New Issue
Block a user