feat: track authenticated user activity

This commit is contained in:
IanShaw027
2026-04-21 14:54:53 +08:00
parent 422f3449a2
commit ed01c59916
10 changed files with 254 additions and 26 deletions

View File

@@ -88,6 +88,9 @@ func (s *userRepoStubForGroupUpdate) GetLatestUsedAtByUserIDs(context.Context, [
func (s *userRepoStubForGroupUpdate) GetLatestUsedAtByUserID(context.Context, int64) (*time.Time, error) {
panic("unexpected")
}
func (s *userRepoStubForGroupUpdate) UpdateUserLastActiveAt(context.Context, int64, time.Time) error {
panic("unexpected")
}
func (s *userRepoStubForGroupUpdate) RemoveGroupFromUserAllowedGroups(context.Context, int64, int64) error {
panic("unexpected")
}

View File

@@ -107,6 +107,18 @@ func (s *userRepoStub) ListWithFilters(ctx context.Context, params pagination.Pa
panic("unexpected ListWithFilters call")
}
func (s *userRepoStub) GetLatestUsedAtByUserIDs(ctx context.Context, userIDs []int64) (map[int64]*time.Time, error) {
panic("unexpected GetLatestUsedAtByUserIDs call")
}
func (s *userRepoStub) GetLatestUsedAtByUserID(ctx context.Context, userID int64) (*time.Time, error) {
panic("unexpected GetLatestUsedAtByUserID call")
}
func (s *userRepoStub) UpdateUserLastActiveAt(ctx context.Context, userID int64, activeAt time.Time) error {
panic("unexpected UpdateUserLastActiveAt call")
}
func (s *userRepoStub) UpdateBalance(ctx context.Context, id int64, amount float64) error {
panic("unexpected UpdateBalance call")
}

View File

@@ -97,6 +97,10 @@ func (s *emailSyncRepoStub) GetLatestUsedAtByUserID(context.Context, int64) (*ti
return nil, nil
}
func (s *emailSyncRepoStub) UpdateUserLastActiveAt(context.Context, int64, time.Time) error {
return nil
}
func (s *emailSyncRepoStub) UpdateBalance(context.Context, int64, float64) error { return nil }
func (s *emailSyncRepoStub) DeductBalance(context.Context, int64, float64) error { return nil }

View File

@@ -19,10 +19,13 @@ import (
"log/slog"
"net/url"
"sort"
"strconv"
"strings"
"sync"
"time"
xdraw "golang.org/x/image/draw"
"golang.org/x/sync/singleflight"
)
var (
@@ -47,6 +50,8 @@ const (
notifyCodeUserRateWindow = 10 * time.Minute
defaultUserIdentityRedirect = "/settings/profile"
userLastActiveMinTouch = 10 * time.Minute
userLastActiveFailBackoff = 30 * time.Second
)
var (
@@ -82,6 +87,7 @@ type UserRepository interface {
ListWithFilters(ctx context.Context, params pagination.PaginationParams, filters UserListFilters) ([]User, *pagination.PaginationResult, error)
GetLatestUsedAtByUserIDs(ctx context.Context, userIDs []int64) (map[int64]*time.Time, error)
GetLatestUsedAtByUserID(ctx context.Context, userID int64) (*time.Time, error)
UpdateUserLastActiveAt(ctx context.Context, userID int64, activeAt time.Time) error
UpdateBalance(ctx context.Context, id int64, amount float64) error
DeductBalance(ctx context.Context, id int64, amount float64) error
@@ -192,6 +198,8 @@ type UserService struct {
settingRepo SettingRepository
authCacheInvalidator APIKeyAuthCacheInvalidator
billingCache BillingCache
lastActiveTouchL1 sync.Map
lastActiveTouchSF singleflight.Group
}
// NewUserService 创建用户服务实例
@@ -788,6 +796,66 @@ func (s *UserService) GetByID(ctx context.Context, id int64) (*User, error) {
return user, nil
}
// TouchLastActive 通过防抖更新 users.last_active_at减少鉴权热路径写放大。
// 该操作为尽力而为,不应中断正常请求。
func (s *UserService) TouchLastActive(ctx context.Context, userID int64) {
if s == nil || s.userRepo == nil || userID <= 0 {
return
}
user, err := s.userRepo.GetByID(ctx, userID)
if err != nil {
slog.Debug("skip touch user last active after load failure", "user_id", userID, "error", err)
return
}
s.TouchLastActiveForUser(ctx, user)
}
// TouchLastActiveForUser 使用已加载的用户信息更新 last_active_at避免重复读取数据库。
func (s *UserService) TouchLastActiveForUser(ctx context.Context, user *User) {
if s == nil || s.userRepo == nil || user == nil || user.ID <= 0 {
return
}
now := time.Now()
if userLastActiveFresh(user.LastActiveAt, now) {
return
}
if v, ok := s.lastActiveTouchL1.Load(user.ID); ok {
if nextAllowedAt, ok := v.(time.Time); ok && now.Before(nextAllowedAt) {
return
}
}
_, err, _ := s.lastActiveTouchSF.Do(strconv.FormatInt(user.ID, 10), func() (any, error) {
latest := time.Now()
if v, ok := s.lastActiveTouchL1.Load(user.ID); ok {
if nextAllowedAt, ok := v.(time.Time); ok && latest.Before(nextAllowedAt) {
return nil, nil
}
}
if userLastActiveFresh(user.LastActiveAt, latest) {
return nil, nil
}
if err := s.userRepo.UpdateUserLastActiveAt(ctx, user.ID, latest); err != nil {
s.lastActiveTouchL1.Store(user.ID, latest.Add(userLastActiveFailBackoff))
return nil, fmt.Errorf("touch user last active: %w", err)
}
s.lastActiveTouchL1.Store(user.ID, latest.Add(userLastActiveMinTouch))
return nil, nil
})
if err != nil {
slog.Warn("touch user last active failed", "user_id", user.ID, "error", err)
}
}
func userLastActiveFresh(lastActiveAt *time.Time, now time.Time) bool {
if lastActiveAt == nil {
return false
}
return now.Before(lastActiveAt.Add(userLastActiveMinTouch))
}
func (s *UserService) hydrateUserAvatar(ctx context.Context, user *User) error {
if s == nil || s.userRepo == nil || user == nil || user.ID == 0 {
return nil

View File

@@ -23,18 +23,21 @@ import (
// --- mock: UserRepository ---
type mockUserRepo struct {
updateBalanceErr error
updateBalanceFn func(ctx context.Context, id int64, amount float64) error
getByIDUser *User
getByIDErr error
updateFn func(ctx context.Context, user *User) error
updateCalls int
upsertAvatarFn func(ctx context.Context, userID int64, input UpsertUserAvatarInput) (*UserAvatar, error)
upsertAvatarArgs []UpsertUserAvatarInput
deleteAvatarFn func(ctx context.Context, userID int64) error
deleteAvatarIDs []int64
getAvatarFn func(ctx context.Context, userID int64) (*UserAvatar, error)
txCalls int
updateBalanceErr error
updateBalanceFn func(ctx context.Context, id int64, amount float64) error
getByIDUser *User
getByIDErr error
updateLastActiveErr error
updateLastActiveUserIDs []int64
updateLastActiveAt []time.Time
updateFn func(ctx context.Context, user *User) error
updateCalls int
upsertAvatarFn func(ctx context.Context, userID int64, input UpsertUserAvatarInput) (*UserAvatar, error)
upsertAvatarArgs []UpsertUserAvatarInput
deleteAvatarFn func(ctx context.Context, userID int64) error
deleteAvatarIDs []int64
getAvatarFn func(ctx context.Context, userID int64) (*UserAvatar, error)
txCalls int
}
type mockUserRepoTxKey struct{}
@@ -144,6 +147,11 @@ func (m *mockUserRepo) UpdateBalance(ctx context.Context, id int64, amount float
}
return m.updateBalanceErr
}
func (m *mockUserRepo) UpdateUserLastActiveAt(_ context.Context, userID int64, activeAt time.Time) error {
m.updateLastActiveUserIDs = append(m.updateLastActiveUserIDs, userID)
m.updateLastActiveAt = append(m.updateLastActiveAt, activeAt)
return m.updateLastActiveErr
}
func (m *mockUserRepo) DeductBalance(context.Context, int64, float64) error { return nil }
func (m *mockUserRepo) UpdateConcurrency(context.Context, int64, int) error { return nil }
func (m *mockUserRepo) ExistsByEmail(context.Context, string) (bool, error) { return false, nil }
@@ -288,6 +296,39 @@ func TestUpdateBalance_CacheFailure_DoesNotAffectReturn(t *testing.T) {
}, 2*time.Second, 10*time.Millisecond, "即使失败也应调用 InvalidateUserBalance")
}
func TestTouchLastActive_UpdatesWhenStale(t *testing.T) {
stale := time.Now().Add(-11 * time.Minute)
repo := &mockUserRepo{
getByIDUser: &User{
ID: 42,
LastActiveAt: &stale,
},
}
svc := NewUserService(repo, nil, nil, nil)
svc.TouchLastActive(context.Background(), 42)
require.Equal(t, []int64{42}, repo.updateLastActiveUserIDs)
require.Len(t, repo.updateLastActiveAt, 1)
require.WithinDuration(t, time.Now(), repo.updateLastActiveAt[0], 2*time.Second)
}
func TestTouchLastActive_SkipsWhenRecent(t *testing.T) {
recent := time.Now().Add(-time.Minute)
repo := &mockUserRepo{
getByIDUser: &User{
ID: 42,
LastActiveAt: &recent,
},
}
svc := NewUserService(repo, nil, nil, nil)
svc.TouchLastActive(context.Background(), 42)
require.Empty(t, repo.updateLastActiveUserIDs)
require.Empty(t, repo.updateLastActiveAt)
}
func TestUpdateBalance_RepoError_ReturnsError(t *testing.T) {
repo := &mockUserRepo{updateBalanceErr: errors.New("database error")}
cache := &mockBillingCache{}