feat: Add user notification settings with quota warning and multiple notification methods
- Implement user notification settings with email and webhook options - Add new user settings for quota warning threshold and notification preferences - Create backend API and database support for user notification configuration - Enhance frontend personal settings with notification configuration UI - Support custom notification email and webhook URL - Add service layer for sending user notifications
This commit is contained in:
@@ -3,13 +3,11 @@ package model
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"one-api/common"
|
||||
"strings"
|
||||
|
||||
"github.com/bytedance/gopkg/util/gopool"
|
||||
"gorm.io/gorm"
|
||||
"one-api/common"
|
||||
relaycommon "one-api/relay/common"
|
||||
"one-api/setting"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type Token struct {
|
||||
@@ -322,80 +320,3 @@ func decreaseTokenQuota(id int, quota int) (err error) {
|
||||
).Error
|
||||
return err
|
||||
}
|
||||
|
||||
func PreConsumeTokenQuota(relayInfo *relaycommon.RelayInfo, quota int) error {
|
||||
if quota < 0 {
|
||||
return errors.New("quota 不能为负数!")
|
||||
}
|
||||
if relayInfo.IsPlayground {
|
||||
return nil
|
||||
}
|
||||
//if relayInfo.TokenUnlimited {
|
||||
// return nil
|
||||
//}
|
||||
token, err := GetTokenById(relayInfo.TokenId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !relayInfo.TokenUnlimited && token.RemainQuota < quota {
|
||||
return errors.New("令牌额度不足")
|
||||
}
|
||||
err = DecreaseTokenQuota(relayInfo.TokenId, relayInfo.TokenKey, quota)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func PostConsumeQuota(relayInfo *relaycommon.RelayInfo, userQuota int, quota int, preConsumedQuota int, sendEmail bool) (err error) {
|
||||
|
||||
if quota > 0 {
|
||||
err = DecreaseUserQuota(relayInfo.UserId, quota)
|
||||
} else {
|
||||
err = IncreaseUserQuota(relayInfo.UserId, -quota)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !relayInfo.IsPlayground {
|
||||
if quota > 0 {
|
||||
err = DecreaseTokenQuota(relayInfo.TokenId, relayInfo.TokenKey, quota)
|
||||
} else {
|
||||
err = IncreaseTokenQuota(relayInfo.TokenId, relayInfo.TokenKey, -quota)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if sendEmail {
|
||||
if (quota + preConsumedQuota) != 0 {
|
||||
quotaTooLow := userQuota >= common.QuotaRemindThreshold && userQuota-(quota+preConsumedQuota) < common.QuotaRemindThreshold
|
||||
noMoreQuota := userQuota-(quota+preConsumedQuota) <= 0
|
||||
if quotaTooLow || noMoreQuota {
|
||||
go func() {
|
||||
email, err := GetUserEmail(relayInfo.UserId)
|
||||
if err != nil {
|
||||
common.SysError("failed to fetch user email: " + err.Error())
|
||||
}
|
||||
prompt := "您的额度即将用尽"
|
||||
if noMoreQuota {
|
||||
prompt = "您的额度已用尽"
|
||||
}
|
||||
if email != "" {
|
||||
topUpLink := fmt.Sprintf("%s/topup", setting.ServerAddress)
|
||||
err = common.SendEmail(prompt, email,
|
||||
fmt.Sprintf("%s,当前剩余额度为 %d,为了不影响您的使用,请及时充值。<br/>充值链接:<a href='%s'>%s</a>", prompt, userQuota, topUpLink, topUpLink))
|
||||
if err != nil {
|
||||
common.SysError("failed to send email" + err.Error())
|
||||
}
|
||||
common.SysLog("user quota is low, consumed quota: " + strconv.Itoa(quota) + ", user quota: " + strconv.Itoa(userQuota))
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"one-api/common"
|
||||
@@ -38,6 +39,7 @@ type User struct {
|
||||
InviterId int `json:"inviter_id" gorm:"type:int;column:inviter_id;index"`
|
||||
DeletedAt gorm.DeletedAt `gorm:"index"`
|
||||
LinuxDOId string `json:"linux_do_id" gorm:"column:linux_do_id;index"`
|
||||
Setting string `json:"setting" gorm:"type:text;column:setting"`
|
||||
}
|
||||
|
||||
func (user *User) GetAccessToken() string {
|
||||
@@ -51,6 +53,22 @@ func (user *User) SetAccessToken(token string) {
|
||||
user.AccessToken = &token
|
||||
}
|
||||
|
||||
func (user *User) GetSetting() map[string]interface{} {
|
||||
if user.Setting == "" {
|
||||
return nil
|
||||
}
|
||||
return common.StrToMap(user.Setting)
|
||||
}
|
||||
|
||||
func (user *User) SetSetting(setting map[string]interface{}) {
|
||||
settingBytes, err := json.Marshal(setting)
|
||||
if err != nil {
|
||||
common.SysError("failed to marshal setting: " + err.Error())
|
||||
return
|
||||
}
|
||||
user.Setting = string(settingBytes)
|
||||
}
|
||||
|
||||
// CheckUserExistOrDeleted check if user exist or deleted, if not exist, return false, nil, if deleted or exist, return true, nil
|
||||
func CheckUserExistOrDeleted(username string, email string) (bool, error) {
|
||||
var user User
|
||||
@@ -315,8 +333,8 @@ func (user *User) Update(updatePassword bool) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// 更新缓存
|
||||
return updateUserCache(user.Id, user.Username, user.Group, user.Quota, user.Status)
|
||||
// Update cache
|
||||
return updateUserCache(*user)
|
||||
}
|
||||
|
||||
func (user *User) Edit(updatePassword bool) error {
|
||||
@@ -344,8 +362,8 @@ func (user *User) Edit(updatePassword bool) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// 更新缓存
|
||||
return updateUserCache(user.Id, user.Username, user.Group, user.Quota, user.Status)
|
||||
// Update cache
|
||||
return updateUserCache(*user)
|
||||
}
|
||||
|
||||
func (user *User) Delete() error {
|
||||
@@ -371,8 +389,8 @@ func (user *User) HardDelete() error {
|
||||
// ValidateAndFill check password & user status
|
||||
func (user *User) ValidateAndFill() (err error) {
|
||||
// When querying with struct, GORM will only query with non-zero fields,
|
||||
// that means if your field’s value is 0, '', false or other zero values,
|
||||
// it won’t be used to build query conditions
|
||||
// that means if your field's value is 0, '', false or other zero values,
|
||||
// it won't be used to build query conditions
|
||||
password := user.Password
|
||||
username := strings.TrimSpace(user.Username)
|
||||
if username == "" || password == "" {
|
||||
@@ -531,7 +549,6 @@ func GetUserQuota(id int, fromDB bool) (quota int, err error) {
|
||||
return quota, nil
|
||||
}
|
||||
// Don't return error - fall through to DB
|
||||
//common.SysError("failed to get user quota from cache: " + err.Error())
|
||||
}
|
||||
fromDB = true
|
||||
err = DB.Model(&User{}).Where("id = ?", id).Select("quota").Find("a).Error
|
||||
@@ -580,6 +597,35 @@ func GetUserGroup(id int, fromDB bool) (group string, err error) {
|
||||
return group, nil
|
||||
}
|
||||
|
||||
// GetUserSetting gets setting from Redis first, falls back to DB if needed
|
||||
func GetUserSetting(id int, fromDB bool) (settingMap map[string]interface{}, err error) {
|
||||
var setting string
|
||||
defer func() {
|
||||
// Update Redis cache asynchronously on successful DB read
|
||||
if shouldUpdateRedis(fromDB, err) {
|
||||
gopool.Go(func() {
|
||||
if err := updateUserSettingCache(id, setting); err != nil {
|
||||
common.SysError("failed to update user setting cache: " + err.Error())
|
||||
}
|
||||
})
|
||||
}
|
||||
}()
|
||||
if !fromDB && common.RedisEnabled {
|
||||
setting, err := getUserSettingCache(id)
|
||||
if err == nil {
|
||||
return setting, nil
|
||||
}
|
||||
// Don't return error - fall through to DB
|
||||
}
|
||||
fromDB = true
|
||||
err = DB.Model(&User{}).Where("id = ?", id).Select("setting").Find(&setting).Error
|
||||
if err != nil {
|
||||
return map[string]interface{}{}, err
|
||||
}
|
||||
|
||||
return common.StrToMap(setting), nil
|
||||
}
|
||||
|
||||
func IncreaseUserQuota(id int, quota int) (err error) {
|
||||
if quota < 0 {
|
||||
return errors.New("quota 不能为负数!")
|
||||
@@ -725,10 +771,10 @@ func IsLinuxDOIdAlreadyTaken(linuxDOId string) bool {
|
||||
return !errors.Is(err, gorm.ErrRecordNotFound)
|
||||
}
|
||||
|
||||
func (u *User) FillUserByLinuxDOId() error {
|
||||
if u.LinuxDOId == "" {
|
||||
func (user *User) FillUserByLinuxDOId() error {
|
||||
if user.LinuxDOId == "" {
|
||||
return errors.New("linux do id is empty")
|
||||
}
|
||||
err := DB.Where("linux_do_id = ?", u.LinuxDOId).First(u).Error
|
||||
err := DB.Where("linux_do_id = ?", user.LinuxDOId).First(user).Error
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1,206 +1,210 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"one-api/common"
|
||||
"one-api/constant"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/bytedance/gopkg/util/gopool"
|
||||
)
|
||||
|
||||
// Change UserCache struct to userCache
|
||||
type userCache struct {
|
||||
// UserCache struct remains the same as it represents the cached data structure
|
||||
type UserCache struct {
|
||||
Id int `json:"id"`
|
||||
Group string `json:"group"`
|
||||
Email string `json:"email"`
|
||||
Quota int `json:"quota"`
|
||||
Status int `json:"status"`
|
||||
Role int `json:"role"`
|
||||
Username string `json:"username"`
|
||||
Setting string `json:"setting"`
|
||||
}
|
||||
|
||||
// Rename all exported functions to private ones
|
||||
// invalidateUserCache clears all user related cache
|
||||
func (user *UserCache) GetSetting() map[string]interface{} {
|
||||
if user.Setting == "" {
|
||||
return nil
|
||||
}
|
||||
return common.StrToMap(user.Setting)
|
||||
}
|
||||
|
||||
func (user *UserCache) SetSetting(setting map[string]interface{}) {
|
||||
settingBytes, err := json.Marshal(setting)
|
||||
if err != nil {
|
||||
common.SysError("failed to marshal setting: " + err.Error())
|
||||
return
|
||||
}
|
||||
user.Setting = string(settingBytes)
|
||||
}
|
||||
|
||||
// getUserCacheKey returns the key for user cache
|
||||
func getUserCacheKey(userId int) string {
|
||||
return fmt.Sprintf("user:%d", userId)
|
||||
}
|
||||
|
||||
// invalidateUserCache clears user cache
|
||||
func invalidateUserCache(userId int) error {
|
||||
if !common.RedisEnabled {
|
||||
return nil
|
||||
}
|
||||
return common.RedisHDelObj(getUserCacheKey(userId))
|
||||
}
|
||||
|
||||
keys := []string{
|
||||
fmt.Sprintf(constant.UserGroupKeyFmt, userId),
|
||||
fmt.Sprintf(constant.UserQuotaKeyFmt, userId),
|
||||
fmt.Sprintf(constant.UserEnabledKeyFmt, userId),
|
||||
fmt.Sprintf(constant.UserUsernameKeyFmt, userId),
|
||||
// updateUserCache updates all user cache fields using hash
|
||||
func updateUserCache(user User) error {
|
||||
if !common.RedisEnabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
if err := common.RedisDel(key); err != nil {
|
||||
return fmt.Errorf("failed to delete cache key %s: %w", key, err)
|
||||
cache := &UserCache{
|
||||
Id: user.Id,
|
||||
Group: user.Group,
|
||||
Quota: user.Quota,
|
||||
Status: user.Status,
|
||||
Username: user.Username,
|
||||
Setting: user.Setting,
|
||||
Email: user.Email,
|
||||
}
|
||||
|
||||
return common.RedisHSetObj(
|
||||
getUserCacheKey(user.Id),
|
||||
cache,
|
||||
time.Duration(constant.UserId2QuotaCacheSeconds)*time.Second,
|
||||
)
|
||||
}
|
||||
|
||||
// GetUserCache gets complete user cache from hash
|
||||
func GetUserCache(userId int) (userCache *UserCache, err error) {
|
||||
var user *User
|
||||
var fromDB bool
|
||||
defer func() {
|
||||
// Update Redis cache asynchronously on successful DB read
|
||||
if shouldUpdateRedis(fromDB, err) && user != nil {
|
||||
gopool.Go(func() {
|
||||
if err := updateUserCache(*user); err != nil {
|
||||
common.SysError("failed to update user status cache: " + err.Error())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}()
|
||||
|
||||
// updateUserGroupCache updates user group cache
|
||||
func updateUserGroupCache(userId int, group string) error {
|
||||
if !common.RedisEnabled {
|
||||
return nil
|
||||
}
|
||||
return common.RedisSet(
|
||||
fmt.Sprintf(constant.UserGroupKeyFmt, userId),
|
||||
group,
|
||||
time.Duration(constant.UserId2QuotaCacheSeconds)*time.Second,
|
||||
)
|
||||
}
|
||||
|
||||
// updateUserQuotaCache updates user quota cache
|
||||
func updateUserQuotaCache(userId int, quota int) error {
|
||||
if !common.RedisEnabled {
|
||||
return nil
|
||||
}
|
||||
return common.RedisSet(
|
||||
fmt.Sprintf(constant.UserQuotaKeyFmt, userId),
|
||||
fmt.Sprintf("%d", quota),
|
||||
time.Duration(constant.UserId2QuotaCacheSeconds)*time.Second,
|
||||
)
|
||||
}
|
||||
|
||||
// updateUserStatusCache updates user status cache
|
||||
func updateUserStatusCache(userId int, userEnabled bool) error {
|
||||
if !common.RedisEnabled {
|
||||
return nil
|
||||
}
|
||||
enabled := "0"
|
||||
if userEnabled {
|
||||
enabled = "1"
|
||||
}
|
||||
return common.RedisSet(
|
||||
fmt.Sprintf(constant.UserEnabledKeyFmt, userId),
|
||||
enabled,
|
||||
time.Duration(constant.UserId2StatusCacheSeconds)*time.Second,
|
||||
)
|
||||
}
|
||||
|
||||
// updateUserNameCache updates username cache
|
||||
func updateUserNameCache(userId int, username string) error {
|
||||
if !common.RedisEnabled {
|
||||
return nil
|
||||
}
|
||||
return common.RedisSet(
|
||||
fmt.Sprintf(constant.UserUsernameKeyFmt, userId),
|
||||
username,
|
||||
time.Duration(constant.UserId2QuotaCacheSeconds)*time.Second,
|
||||
)
|
||||
}
|
||||
|
||||
// updateUserCache updates all user cache fields
|
||||
func updateUserCache(userId int, username string, userGroup string, quota int, status int) error {
|
||||
if !common.RedisEnabled {
|
||||
return nil
|
||||
// Try getting from Redis first
|
||||
err = common.RedisHGetObj(getUserCacheKey(userId), &userCache)
|
||||
if err == nil {
|
||||
return userCache, nil
|
||||
}
|
||||
|
||||
if err := updateUserGroupCache(userId, userGroup); err != nil {
|
||||
return fmt.Errorf("update group cache: %w", err)
|
||||
}
|
||||
|
||||
if err := updateUserQuotaCache(userId, quota); err != nil {
|
||||
return fmt.Errorf("update quota cache: %w", err)
|
||||
}
|
||||
|
||||
if err := updateUserStatusCache(userId, status == common.UserStatusEnabled); err != nil {
|
||||
return fmt.Errorf("update status cache: %w", err)
|
||||
}
|
||||
|
||||
if err := updateUserNameCache(userId, username); err != nil {
|
||||
return fmt.Errorf("update username cache: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getUserGroupCache gets user group from cache
|
||||
func getUserGroupCache(userId int) (string, error) {
|
||||
if !common.RedisEnabled {
|
||||
return "", nil
|
||||
}
|
||||
return common.RedisGet(fmt.Sprintf(constant.UserGroupKeyFmt, userId))
|
||||
}
|
||||
|
||||
// getUserQuotaCache gets user quota from cache
|
||||
func getUserQuotaCache(userId int) (int, error) {
|
||||
if !common.RedisEnabled {
|
||||
return 0, nil
|
||||
}
|
||||
quotaStr, err := common.RedisGet(fmt.Sprintf(constant.UserQuotaKeyFmt, userId))
|
||||
// If Redis fails, get from DB
|
||||
fromDB = true
|
||||
user, err = GetUserById(userId, false)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return nil, err // Return nil and error if DB lookup fails
|
||||
}
|
||||
return strconv.Atoi(quotaStr)
|
||||
|
||||
// Create cache object from user data
|
||||
userCache = &UserCache{
|
||||
Id: user.Id,
|
||||
Group: user.Group,
|
||||
Quota: user.Quota,
|
||||
Status: user.Status,
|
||||
Username: user.Username,
|
||||
Setting: user.Setting,
|
||||
Email: user.Email,
|
||||
}
|
||||
|
||||
return userCache, nil
|
||||
}
|
||||
|
||||
// getUserStatusCache gets user status from cache
|
||||
func getUserStatusCache(userId int) (int, error) {
|
||||
if !common.RedisEnabled {
|
||||
return 0, nil
|
||||
}
|
||||
statusStr, err := common.RedisGet(fmt.Sprintf(constant.UserEnabledKeyFmt, userId))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return strconv.Atoi(statusStr)
|
||||
}
|
||||
|
||||
// getUserNameCache gets username from cache
|
||||
func getUserNameCache(userId int) (string, error) {
|
||||
if !common.RedisEnabled {
|
||||
return "", nil
|
||||
}
|
||||
return common.RedisGet(fmt.Sprintf(constant.UserUsernameKeyFmt, userId))
|
||||
}
|
||||
|
||||
// getUserCache gets complete user cache
|
||||
func getUserCache(userId int) (*userCache, error) {
|
||||
if !common.RedisEnabled {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
group, err := getUserGroupCache(userId)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get group cache: %w", err)
|
||||
}
|
||||
|
||||
quota, err := getUserQuotaCache(userId)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get quota cache: %w", err)
|
||||
}
|
||||
|
||||
status, err := getUserStatusCache(userId)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get status cache: %w", err)
|
||||
}
|
||||
|
||||
username, err := getUserNameCache(userId)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get username cache: %w", err)
|
||||
}
|
||||
|
||||
return &userCache{
|
||||
Id: userId,
|
||||
Group: group,
|
||||
Quota: quota,
|
||||
Status: status,
|
||||
Username: username,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Add atomic quota operations
|
||||
// Add atomic quota operations using hash fields
|
||||
func cacheIncrUserQuota(userId int, delta int64) error {
|
||||
if !common.RedisEnabled {
|
||||
return nil
|
||||
}
|
||||
key := fmt.Sprintf(constant.UserQuotaKeyFmt, userId)
|
||||
return common.RedisIncr(key, delta)
|
||||
return common.RedisHIncrBy(getUserCacheKey(userId), "Quota", delta)
|
||||
}
|
||||
|
||||
func cacheDecrUserQuota(userId int, delta int64) error {
|
||||
return cacheIncrUserQuota(userId, -delta)
|
||||
}
|
||||
|
||||
// Helper functions to get individual fields if needed
|
||||
func getUserGroupCache(userId int) (string, error) {
|
||||
cache, err := GetUserCache(userId)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return cache.Group, nil
|
||||
}
|
||||
|
||||
func getUserQuotaCache(userId int) (int, error) {
|
||||
cache, err := GetUserCache(userId)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return cache.Quota, nil
|
||||
}
|
||||
|
||||
func getUserStatusCache(userId int) (int, error) {
|
||||
cache, err := GetUserCache(userId)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return cache.Status, nil
|
||||
}
|
||||
|
||||
func getUserNameCache(userId int) (string, error) {
|
||||
cache, err := GetUserCache(userId)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return cache.Username, nil
|
||||
}
|
||||
|
||||
func getUserSettingCache(userId int) (map[string]interface{}, error) {
|
||||
setting := make(map[string]interface{})
|
||||
cache, err := GetUserCache(userId)
|
||||
if err != nil {
|
||||
return setting, err
|
||||
}
|
||||
return cache.GetSetting(), nil
|
||||
}
|
||||
|
||||
// New functions for individual field updates
|
||||
func updateUserStatusCache(userId int, status bool) error {
|
||||
if !common.RedisEnabled {
|
||||
return nil
|
||||
}
|
||||
statusInt := common.UserStatusEnabled
|
||||
if !status {
|
||||
statusInt = common.UserStatusDisabled
|
||||
}
|
||||
return common.RedisHSetField(getUserCacheKey(userId), "Status", fmt.Sprintf("%d", statusInt))
|
||||
}
|
||||
|
||||
func updateUserQuotaCache(userId int, quota int) error {
|
||||
if !common.RedisEnabled {
|
||||
return nil
|
||||
}
|
||||
return common.RedisHSetField(getUserCacheKey(userId), "Quota", fmt.Sprintf("%d", quota))
|
||||
}
|
||||
|
||||
func updateUserGroupCache(userId int, group string) error {
|
||||
if !common.RedisEnabled {
|
||||
return nil
|
||||
}
|
||||
return common.RedisHSetField(getUserCacheKey(userId), "Group", group)
|
||||
}
|
||||
|
||||
func updateUserNameCache(userId int, username string) error {
|
||||
if !common.RedisEnabled {
|
||||
return nil
|
||||
}
|
||||
return common.RedisHSetField(getUserCacheKey(userId), "Username", username)
|
||||
}
|
||||
|
||||
func updateUserSettingCache(userId int, setting string) error {
|
||||
if !common.RedisEnabled {
|
||||
return nil
|
||||
}
|
||||
return common.RedisHSetField(getUserCacheKey(userId), "Setting", setting)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user