fix(logger): 修复 caller 字段与 OpsSystemLogSink 停止刷盘

修复点:

- zap logger 不再强制 AddCallerSkip(1),确保 caller 指向真实调用点

- slog handler 避免重复写 time 字段

- OpsSystemLogSink 优先从字段 component 识别业务组件;停止时 drain 队列并用可用 ctx 刷盘

补充:新增/完善对应单测
This commit is contained in:
yangjianbo
2026-02-12 17:42:29 +08:00
parent b7243660c4
commit 84cc651b46
6 changed files with 253 additions and 10 deletions

View File

@@ -79,6 +79,13 @@ func (s *OpsSystemLogSink) WriteLogEvent(event *logger.LogEvent) {
if s == nil || event == nil || !s.shouldIndex(event) {
return
}
if s.ctx != nil {
select {
case <-s.ctx.Done():
return
default:
}
}
select {
case s.queue <- event:
@@ -95,6 +102,12 @@ func (s *OpsSystemLogSink) shouldIndex(event *logger.LogEvent) bool {
}
component := strings.ToLower(strings.TrimSpace(event.Component))
// zap 的 LoggerName 往往为空或不等于业务组件名;业务组件名通常以字段 component 透传。
if event.Fields != nil {
if fc := strings.ToLower(strings.TrimSpace(asString(event.Fields["component"]))); fc != "" {
component = fc
}
}
if strings.Contains(component, "http.access") {
return true
}
@@ -111,12 +124,12 @@ func (s *OpsSystemLogSink) run() {
defer ticker.Stop()
batch := make([]*logger.LogEvent, 0, s.batchSize)
flush := func() {
flush := func(baseCtx context.Context) {
if len(batch) == 0 {
return
}
started := time.Now()
inserted, err := s.flushBatch(batch)
inserted, err := s.flushBatch(baseCtx, batch)
delay := time.Since(started)
if err != nil {
atomic.AddUint64(&s.writeFailed, uint64(len(batch)))
@@ -131,11 +144,28 @@ func (s *OpsSystemLogSink) run() {
}
batch = batch[:0]
}
drainAndFlush := func() {
for {
select {
case item := <-s.queue:
if item == nil {
continue
}
batch = append(batch, item)
if len(batch) >= s.batchSize {
flush(context.Background())
}
default:
flush(context.Background())
return
}
}
}
for {
select {
case <-s.ctx.Done():
flush()
drainAndFlush()
return
case item := <-s.queue:
if item == nil {
@@ -143,15 +173,15 @@ func (s *OpsSystemLogSink) run() {
}
batch = append(batch, item)
if len(batch) >= s.batchSize {
flush()
flush(s.ctx)
}
case <-ticker.C:
flush()
flush(s.ctx)
}
}
}
func (s *OpsSystemLogSink) flushBatch(batch []*logger.LogEvent) (int, error) {
func (s *OpsSystemLogSink) flushBatch(baseCtx context.Context, batch []*logger.LogEvent) (int, error) {
inputs := make([]*OpsInsertSystemLogInput, 0, len(batch))
for _, event := range batch {
if event == nil {
@@ -205,7 +235,10 @@ func (s *OpsSystemLogSink) flushBatch(batch []*logger.LogEvent) (int, error) {
if len(inputs) == 0 {
return 0, nil
}
ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second)
if baseCtx == nil || baseCtx.Err() != nil {
baseCtx = context.Background()
}
ctx, cancel := context.WithTimeout(baseCtx, 5*time.Second)
defer cancel()
inserted, err := s.opsRepo.BatchInsertSystemLogs(ctx, inputs)
if err != nil {