fix(仪表盘): 添加聚合和回填操作的执行时间日志
This commit is contained in:
@@ -134,6 +134,7 @@ func (s *DashboardAggregationService) runScheduledAggregation() {
|
|||||||
}
|
}
|
||||||
defer atomic.StoreInt32(&s.running, 0)
|
defer atomic.StoreInt32(&s.running, 0)
|
||||||
|
|
||||||
|
jobStart := time.Now().UTC()
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
@@ -162,9 +163,16 @@ func (s *DashboardAggregationService) runScheduledAggregation() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.repo.UpdateAggregationWatermark(ctx, now); err != nil {
|
updateErr := s.repo.UpdateAggregationWatermark(ctx, now)
|
||||||
log.Printf("[DashboardAggregation] 更新水位失败: %v", err)
|
if updateErr != nil {
|
||||||
|
log.Printf("[DashboardAggregation] 更新水位失败: %v", updateErr)
|
||||||
}
|
}
|
||||||
|
log.Printf("[DashboardAggregation] 聚合完成 (start=%s end=%s duration=%s watermark_updated=%t)",
|
||||||
|
start.Format(time.RFC3339),
|
||||||
|
now.Format(time.RFC3339),
|
||||||
|
time.Since(jobStart).String(),
|
||||||
|
updateErr == nil,
|
||||||
|
)
|
||||||
|
|
||||||
s.maybeCleanupRetention(ctx, now)
|
s.maybeCleanupRetention(ctx, now)
|
||||||
}
|
}
|
||||||
@@ -175,6 +183,7 @@ func (s *DashboardAggregationService) backfillRange(ctx context.Context, start,
|
|||||||
}
|
}
|
||||||
defer atomic.StoreInt32(&s.running, 0)
|
defer atomic.StoreInt32(&s.running, 0)
|
||||||
|
|
||||||
|
jobStart := time.Now().UTC()
|
||||||
startUTC := start.UTC()
|
startUTC := start.UTC()
|
||||||
endUTC := end.UTC()
|
endUTC := end.UTC()
|
||||||
if !endUTC.After(startUTC) {
|
if !endUTC.After(startUTC) {
|
||||||
@@ -193,9 +202,16 @@ func (s *DashboardAggregationService) backfillRange(ctx context.Context, start,
|
|||||||
cursor = windowEnd
|
cursor = windowEnd
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.repo.UpdateAggregationWatermark(ctx, endUTC); err != nil {
|
updateErr := s.repo.UpdateAggregationWatermark(ctx, endUTC)
|
||||||
log.Printf("[DashboardAggregation] 更新水位失败: %v", err)
|
if updateErr != nil {
|
||||||
|
log.Printf("[DashboardAggregation] 更新水位失败: %v", updateErr)
|
||||||
}
|
}
|
||||||
|
log.Printf("[DashboardAggregation] 回填聚合完成 (start=%s end=%s duration=%s watermark_updated=%t)",
|
||||||
|
startUTC.Format(time.RFC3339),
|
||||||
|
endUTC.Format(time.RFC3339),
|
||||||
|
time.Since(jobStart).String(),
|
||||||
|
updateErr == nil,
|
||||||
|
)
|
||||||
|
|
||||||
s.maybeCleanupRetention(ctx, endUTC)
|
s.maybeCleanupRetention(ctx, endUTC)
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
Reference in New Issue
Block a user