From 84cc651b46a7fd273c14bffedeaa4b9419e72770 Mon Sep 17 00:00:00 2001 From: yangjianbo Date: Thu, 12 Feb 2026 17:42:29 +0800 Subject: [PATCH] =?UTF-8?q?fix(logger):=20=E4=BF=AE=E5=A4=8D=20caller=20?= =?UTF-8?q?=E5=AD=97=E6=AE=B5=E4=B8=8E=20OpsSystemLogSink=20=E5=81=9C?= =?UTF-8?q?=E6=AD=A2=E5=88=B7=E7=9B=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修复点: - zap logger 不再强制 AddCallerSkip(1),确保 caller 指向真实调用点 - slog handler 避免重复写 time 字段 - OpsSystemLogSink 优先从字段 component 识别业务组件;停止时 drain 队列并用可用 ctx 刷盘 补充:新增/完善对应单测 --- backend/internal/pkg/logger/logger.go | 1 - backend/internal/pkg/logger/logger_test.go | 63 +++++++++++++ backend/internal/pkg/logger/slog_handler.go | 3 +- .../internal/pkg/logger/slog_handler_test.go | 90 +++++++++++++++++++ .../internal/service/ops_system_log_sink.go | 47 ++++++++-- .../service/ops_system_log_sink_test.go | 59 ++++++++++++ 6 files changed, 253 insertions(+), 10 deletions(-) create mode 100644 backend/internal/pkg/logger/slog_handler_test.go diff --git a/backend/internal/pkg/logger/logger.go b/backend/internal/pkg/logger/logger.go index 57e6fd1f..3bb32b4d 100644 --- a/backend/internal/pkg/logger/logger.go +++ b/backend/internal/pkg/logger/logger.go @@ -247,7 +247,6 @@ func buildLogger(options InitOptions) (*zap.Logger, zap.AtomicLevel, error) { if stacktraceLevel <= zapcore.FatalLevel { zapOpts = append(zapOpts, zap.AddStacktrace(stacktraceLevel)) } - zapOpts = append(zapOpts, zap.AddCallerSkip(1)) logger := zap.New(core, zapOpts...).With( zap.String("service", options.ServiceName), diff --git a/backend/internal/pkg/logger/logger_test.go b/backend/internal/pkg/logger/logger_test.go index 75c85a9d..74aae061 100644 --- a/backend/internal/pkg/logger/logger_test.go +++ b/backend/internal/pkg/logger/logger_test.go @@ -1,6 +1,7 @@ package logger import ( + "encoding/json" "io" "os" "path/filepath" @@ -127,3 +128,65 @@ func TestInit_FileOutputFailureDowngrade(t *testing.T) { t.Fatalf("stderr should contain fallback warning, got: %s", string(stderrBytes)) } } + +func TestInit_CallerShouldPointToCallsite(t *testing.T) { + origStdout := os.Stdout + origStderr := os.Stderr + stdoutR, stdoutW, err := os.Pipe() + if err != nil { + t.Fatalf("create stdout pipe: %v", err) + } + _, stderrW, err := os.Pipe() + if err != nil { + t.Fatalf("create stderr pipe: %v", err) + } + os.Stdout = stdoutW + os.Stderr = stderrW + t.Cleanup(func() { + os.Stdout = origStdout + os.Stderr = origStderr + _ = stdoutR.Close() + _ = stdoutW.Close() + _ = stderrW.Close() + }) + + if err := Init(InitOptions{ + Level: "info", + Format: "json", + ServiceName: "sub2api", + Environment: "test", + Caller: true, + Output: OutputOptions{ + ToStdout: true, + ToFile: false, + }, + Sampling: SamplingOptions{Enabled: false}, + }); err != nil { + t.Fatalf("Init() error: %v", err) + } + + L().Info("caller-check") + Sync() + _ = stdoutW.Close() + logBytes, _ := io.ReadAll(stdoutR) + + var line string + for _, item := range strings.Split(string(logBytes), "\n") { + if strings.Contains(item, "caller-check") { + line = item + break + } + } + if line == "" { + t.Fatalf("log output missing caller-check: %s", string(logBytes)) + } + + var payload map[string]any + if err := json.Unmarshal([]byte(line), &payload); err != nil { + t.Fatalf("parse log json failed: %v, line=%s", err, line) + } + caller, _ := payload["caller"].(string) + if !strings.Contains(caller, "logger_test.go:") { + t.Fatalf("caller should point to this test file, got: %s", caller) + } +} diff --git a/backend/internal/pkg/logger/slog_handler.go b/backend/internal/pkg/logger/slog_handler.go index 47c80a6d..562b8341 100644 --- a/backend/internal/pkg/logger/slog_handler.go +++ b/backend/internal/pkg/logger/slog_handler.go @@ -41,8 +41,7 @@ func (h *slogZapHandler) Enabled(_ context.Context, level slog.Level) bool { } func (h *slogZapHandler) Handle(_ context.Context, record slog.Record) error { - fields := make([]zap.Field, 0, len(h.attrs)+record.NumAttrs()+4) - fields = append(fields, zap.Time("time", record.Time)) + fields := make([]zap.Field, 0, len(h.attrs)+record.NumAttrs()+3) fields = append(fields, slogAttrsToZapFields(h.groups, h.attrs)...) record.Attrs(func(attr slog.Attr) bool { fields = append(fields, slogAttrToZapField(h.groups, attr)) diff --git a/backend/internal/pkg/logger/slog_handler_test.go b/backend/internal/pkg/logger/slog_handler_test.go new file mode 100644 index 00000000..632bcd32 --- /dev/null +++ b/backend/internal/pkg/logger/slog_handler_test.go @@ -0,0 +1,90 @@ +package logger + +import ( + "context" + "log/slog" + "testing" + "time" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type captureState struct { + writes []capturedWrite +} + +type capturedWrite struct { + entry zapcore.Entry + fields []zapcore.Field +} + +type captureCore struct { + state *captureState + withFields []zapcore.Field +} + +func newCaptureCore() *captureCore { + return &captureCore{state: &captureState{}} +} + +func (c *captureCore) Enabled(zapcore.Level) bool { + return true +} + +func (c *captureCore) With(fields []zapcore.Field) zapcore.Core { + nextFields := make([]zapcore.Field, 0, len(c.withFields)+len(fields)) + nextFields = append(nextFields, c.withFields...) + nextFields = append(nextFields, fields...) + return &captureCore{ + state: c.state, + withFields: nextFields, + } +} + +func (c *captureCore) Check(entry zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + return ce.AddCore(entry, c) +} + +func (c *captureCore) Write(entry zapcore.Entry, fields []zapcore.Field) error { + allFields := make([]zapcore.Field, 0, len(c.withFields)+len(fields)) + allFields = append(allFields, c.withFields...) + allFields = append(allFields, fields...) + c.state.writes = append(c.state.writes, capturedWrite{ + entry: entry, + fields: allFields, + }) + return nil +} + +func (c *captureCore) Sync() error { + return nil +} + +func TestSlogZapHandler_Handle_DoesNotAppendTimeField(t *testing.T) { + core := newCaptureCore() + handler := newSlogZapHandler(zap.New(core)) + + record := slog.NewRecord(time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC), slog.LevelInfo, "hello", 0) + record.AddAttrs(slog.String("component", "http.access")) + + if err := handler.Handle(context.Background(), record); err != nil { + t.Fatalf("handle slog record: %v", err) + } + if len(core.state.writes) != 1 { + t.Fatalf("write calls = %d, want 1", len(core.state.writes)) + } + + var hasComponent bool + for _, field := range core.state.writes[0].fields { + if field.Key == "time" { + t.Fatalf("unexpected duplicate time field in slog adapter output") + } + if field.Key == "component" { + hasComponent = true + } + } + if !hasComponent { + t.Fatalf("component field should be preserved") + } +} diff --git a/backend/internal/service/ops_system_log_sink.go b/backend/internal/service/ops_system_log_sink.go index 65fa9e3f..c50a30d5 100644 --- a/backend/internal/service/ops_system_log_sink.go +++ b/backend/internal/service/ops_system_log_sink.go @@ -79,6 +79,13 @@ func (s *OpsSystemLogSink) WriteLogEvent(event *logger.LogEvent) { if s == nil || event == nil || !s.shouldIndex(event) { return } + if s.ctx != nil { + select { + case <-s.ctx.Done(): + return + default: + } + } select { case s.queue <- event: @@ -95,6 +102,12 @@ func (s *OpsSystemLogSink) shouldIndex(event *logger.LogEvent) bool { } component := strings.ToLower(strings.TrimSpace(event.Component)) + // zap 的 LoggerName 往往为空或不等于业务组件名;业务组件名通常以字段 component 透传。 + if event.Fields != nil { + if fc := strings.ToLower(strings.TrimSpace(asString(event.Fields["component"]))); fc != "" { + component = fc + } + } if strings.Contains(component, "http.access") { return true } @@ -111,12 +124,12 @@ func (s *OpsSystemLogSink) run() { defer ticker.Stop() batch := make([]*logger.LogEvent, 0, s.batchSize) - flush := func() { + flush := func(baseCtx context.Context) { if len(batch) == 0 { return } started := time.Now() - inserted, err := s.flushBatch(batch) + inserted, err := s.flushBatch(baseCtx, batch) delay := time.Since(started) if err != nil { atomic.AddUint64(&s.writeFailed, uint64(len(batch))) @@ -131,11 +144,28 @@ func (s *OpsSystemLogSink) run() { } batch = batch[:0] } + drainAndFlush := func() { + for { + select { + case item := <-s.queue: + if item == nil { + continue + } + batch = append(batch, item) + if len(batch) >= s.batchSize { + flush(context.Background()) + } + default: + flush(context.Background()) + return + } + } + } for { select { case <-s.ctx.Done(): - flush() + drainAndFlush() return case item := <-s.queue: if item == nil { @@ -143,15 +173,15 @@ func (s *OpsSystemLogSink) run() { } batch = append(batch, item) if len(batch) >= s.batchSize { - flush() + flush(s.ctx) } case <-ticker.C: - flush() + flush(s.ctx) } } } -func (s *OpsSystemLogSink) flushBatch(batch []*logger.LogEvent) (int, error) { +func (s *OpsSystemLogSink) flushBatch(baseCtx context.Context, batch []*logger.LogEvent) (int, error) { inputs := make([]*OpsInsertSystemLogInput, 0, len(batch)) for _, event := range batch { if event == nil { @@ -205,7 +235,10 @@ func (s *OpsSystemLogSink) flushBatch(batch []*logger.LogEvent) (int, error) { if len(inputs) == 0 { return 0, nil } - ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second) + if baseCtx == nil || baseCtx.Err() != nil { + baseCtx = context.Background() + } + ctx, cancel := context.WithTimeout(baseCtx, 5*time.Second) defer cancel() inserted, err := s.opsRepo.BatchInsertSystemLogs(ctx, inputs) if err != nil { diff --git a/backend/internal/service/ops_system_log_sink_test.go b/backend/internal/service/ops_system_log_sink_test.go index 335ffea4..12a2ec0c 100644 --- a/backend/internal/service/ops_system_log_sink_test.go +++ b/backend/internal/service/ops_system_log_sink_test.go @@ -36,11 +36,29 @@ func TestOpsSystemLogSink_ShouldIndex(t *testing.T) { event: &logger.LogEvent{Level: "info", Component: "http.access"}, want: true, }, + { + name: "access component from fields (real zap path)", + event: &logger.LogEvent{ + Level: "info", + Component: "", + Fields: map[string]any{"component": "http.access"}, + }, + want: true, + }, { name: "audit component", event: &logger.LogEvent{Level: "info", Component: "audit.log_config_change"}, want: true, }, + { + name: "audit component from fields (real zap path)", + event: &logger.LogEvent{ + Level: "info", + Component: "", + Fields: map[string]any{"component": "audit.log_config_change"}, + }, + want: true, + }, { name: "plain info", event: &logger.LogEvent{Level: "info", Component: "app"}, @@ -205,6 +223,47 @@ func TestOpsSystemLogSink_FlushFailureUpdatesHealth(t *testing.T) { t.Fatalf("write_failed_count not updated") } +func TestOpsSystemLogSink_StopFlushUsesActiveContextAndDrainsQueue(t *testing.T) { + var inserted int64 + var canceledCtxCalls int64 + repo := &opsRepoMock{ + BatchInsertSystemLogsFn: func(ctx context.Context, inputs []*OpsInsertSystemLogInput) (int64, error) { + if err := ctx.Err(); err != nil { + atomic.AddInt64(&canceledCtxCalls, 1) + return 0, err + } + atomic.AddInt64(&inserted, int64(len(inputs))) + return int64(len(inputs)), nil + }, + } + + sink := NewOpsSystemLogSink(repo) + sink.batchSize = 200 + sink.flushInterval = time.Hour + sink.Start() + + sink.WriteLogEvent(&logger.LogEvent{ + Time: time.Now().UTC(), + Level: "warn", + Component: "app", + Message: "pending-on-shutdown", + Fields: map[string]any{"component": "http.access"}, + }) + + sink.Stop() + + if got := atomic.LoadInt64(&inserted); got != 1 { + t.Fatalf("inserted = %d, want 1", got) + } + if got := atomic.LoadInt64(&canceledCtxCalls); got != 0 { + t.Fatalf("canceled ctx calls = %d, want 0", got) + } + health := sink.Health() + if health.WrittenCount != 1 { + t.Fatalf("written_count = %d, want 1", health.WrittenCount) + } +} + type stringerValue string func (s stringerValue) String() string { return string(s) }