- 邮件发送改为异步队列处理,避免并发导致发送失败 - 新增 Email 维度限流(30秒冷却期),防止邮件轰炸 - Token 验证使用常量时间比较,防止时序攻击 - 重构代码消除冗余,提取公共验证逻辑
141 lines
3.4 KiB
Go
141 lines
3.4 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Task type constants
|
|
const (
|
|
TaskTypeVerifyCode = "verify_code"
|
|
TaskTypePasswordReset = "password_reset"
|
|
)
|
|
|
|
// EmailTask 邮件发送任务
|
|
type EmailTask struct {
|
|
Email string
|
|
SiteName string
|
|
TaskType string // "verify_code" or "password_reset"
|
|
ResetURL string // Only used for password_reset task type
|
|
}
|
|
|
|
// EmailQueueService 异步邮件队列服务
|
|
type EmailQueueService struct {
|
|
emailService *EmailService
|
|
taskChan chan EmailTask
|
|
wg sync.WaitGroup
|
|
stopChan chan struct{}
|
|
workers int
|
|
}
|
|
|
|
// NewEmailQueueService 创建邮件队列服务
|
|
func NewEmailQueueService(emailService *EmailService, workers int) *EmailQueueService {
|
|
if workers <= 0 {
|
|
workers = 3 // 默认3个工作协程
|
|
}
|
|
|
|
service := &EmailQueueService{
|
|
emailService: emailService,
|
|
taskChan: make(chan EmailTask, 100), // 缓冲100个任务
|
|
stopChan: make(chan struct{}),
|
|
workers: workers,
|
|
}
|
|
|
|
// 启动工作协程
|
|
service.start()
|
|
|
|
return service
|
|
}
|
|
|
|
// start 启动工作协程
|
|
func (s *EmailQueueService) start() {
|
|
for i := 0; i < s.workers; i++ {
|
|
s.wg.Add(1)
|
|
go s.worker(i)
|
|
}
|
|
log.Printf("[EmailQueue] Started %d workers", s.workers)
|
|
}
|
|
|
|
// worker 工作协程
|
|
func (s *EmailQueueService) worker(id int) {
|
|
defer s.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case task := <-s.taskChan:
|
|
s.processTask(id, task)
|
|
case <-s.stopChan:
|
|
log.Printf("[EmailQueue] Worker %d stopping", id)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// processTask 处理任务
|
|
func (s *EmailQueueService) processTask(workerID int, task EmailTask) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
switch task.TaskType {
|
|
case TaskTypeVerifyCode:
|
|
if err := s.emailService.SendVerifyCode(ctx, task.Email, task.SiteName); err != nil {
|
|
log.Printf("[EmailQueue] Worker %d failed to send verify code to %s: %v", workerID, task.Email, err)
|
|
} else {
|
|
log.Printf("[EmailQueue] Worker %d sent verify code to %s", workerID, task.Email)
|
|
}
|
|
case TaskTypePasswordReset:
|
|
if err := s.emailService.SendPasswordResetEmailWithCooldown(ctx, task.Email, task.SiteName, task.ResetURL); err != nil {
|
|
log.Printf("[EmailQueue] Worker %d failed to send password reset to %s: %v", workerID, task.Email, err)
|
|
} else {
|
|
log.Printf("[EmailQueue] Worker %d sent password reset to %s", workerID, task.Email)
|
|
}
|
|
default:
|
|
log.Printf("[EmailQueue] Worker %d unknown task type: %s", workerID, task.TaskType)
|
|
}
|
|
}
|
|
|
|
// EnqueueVerifyCode 将验证码发送任务加入队列
|
|
func (s *EmailQueueService) EnqueueVerifyCode(email, siteName string) error {
|
|
task := EmailTask{
|
|
Email: email,
|
|
SiteName: siteName,
|
|
TaskType: TaskTypeVerifyCode,
|
|
}
|
|
|
|
select {
|
|
case s.taskChan <- task:
|
|
log.Printf("[EmailQueue] Enqueued verify code task for %s", email)
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("email queue is full")
|
|
}
|
|
}
|
|
|
|
// EnqueuePasswordReset 将密码重置邮件任务加入队列
|
|
func (s *EmailQueueService) EnqueuePasswordReset(email, siteName, resetURL string) error {
|
|
task := EmailTask{
|
|
Email: email,
|
|
SiteName: siteName,
|
|
TaskType: TaskTypePasswordReset,
|
|
ResetURL: resetURL,
|
|
}
|
|
|
|
select {
|
|
case s.taskChan <- task:
|
|
log.Printf("[EmailQueue] Enqueued password reset task for %s", email)
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("email queue is full")
|
|
}
|
|
}
|
|
|
|
// Stop 停止队列服务
|
|
func (s *EmailQueueService) Stop() {
|
|
close(s.stopChan)
|
|
s.wg.Wait()
|
|
log.Println("[EmailQueue] All workers stopped")
|
|
}
|