From 136a46218bf4a36ded690dc9bc14104dc4d2cc6e Mon Sep 17 00:00:00 2001 From: CaIon <1808837298@qq.com> Date: Tue, 10 Jun 2025 03:42:23 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A7=20fix(api=5Frequest):=20enhance=20?= =?UTF-8?q?ping=20keep-alive=20mechanism=20with=20error=20handling=20and?= =?UTF-8?q?=20timeout=20controls?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- relay/channel/api_request.go | 78 +++++++++++++++++++++++++++++------- 1 file changed, 63 insertions(+), 15 deletions(-) diff --git a/relay/channel/api_request.go b/relay/channel/api_request.go index 1d733bd4..c3da5134 100644 --- a/relay/channel/api_request.go +++ b/relay/channel/api_request.go @@ -109,6 +109,12 @@ func startPingKeepAlive(c *gin.Context, pingInterval time.Duration) context.Canc gopool.Go(func() { defer func() { + // 增加panic恢复处理 + if r := recover(); r != nil { + if common2.DebugEnabled { + println("SSE ping goroutine panic recovered:", fmt.Sprintf("%v", r)) + } + } if common2.DebugEnabled { println("SSE ping goroutine stopped.") } @@ -119,19 +125,32 @@ func startPingKeepAlive(c *gin.Context, pingInterval time.Duration) context.Canc } ticker := time.NewTicker(pingInterval) - // 退出时清理 ticker - defer ticker.Stop() + // 确保在任何情况下都清理ticker + defer func() { + ticker.Stop() + if common2.DebugEnabled { + println("SSE ping ticker stopped") + } + }() var pingMutex sync.Mutex if common2.DebugEnabled { println("SSE ping goroutine started") } + // 增加超时控制,防止goroutine长时间运行 + maxPingDuration := 120 * time.Minute // 最大ping持续时间 + pingTimeout := time.NewTimer(maxPingDuration) + defer pingTimeout.Stop() + for { select { // 发送 ping 数据 case <-ticker.C: if err := sendPingData(c, &pingMutex); err != nil { + if common2.DebugEnabled { + println("SSE ping error, stopping goroutine:", err.Error()) + } return } // 收到退出信号 @@ -140,6 +159,12 @@ func startPingKeepAlive(c *gin.Context, pingInterval time.Duration) context.Canc // request 结束 case <-c.Request.Context().Done(): return + // 超时保护,防止goroutine无限运行 + case <-pingTimeout.C: + if common2.DebugEnabled { + println("SSE ping goroutine timeout, stopping") + } + return } } }) @@ -148,19 +173,34 @@ func startPingKeepAlive(c *gin.Context, pingInterval time.Duration) context.Canc } func sendPingData(c *gin.Context, mutex *sync.Mutex) error { - mutex.Lock() - defer mutex.Unlock() + // 增加超时控制,防止锁死等待 + done := make(chan error, 1) + go func() { + mutex.Lock() + defer mutex.Unlock() - err := helper.PingData(c) - if err != nil { - common2.LogError(c, "SSE ping error: "+err.Error()) + err := helper.PingData(c) + if err != nil { + common2.LogError(c, "SSE ping error: "+err.Error()) + done <- err + return + } + + if common2.DebugEnabled { + println("SSE ping data sent.") + } + done <- nil + }() + + // 设置发送ping数据的超时时间 + select { + case err := <-done: return err + case <-time.After(10 * time.Second): + return errors.New("SSE ping data send timeout") + case <-c.Request.Context().Done(): + return errors.New("request context cancelled during ping") } - - if common2.DebugEnabled { - println("SSE ping data sent.") - } - return nil } func doRequest(c *gin.Context, req *http.Request, info *common.RelayInfo) (*http.Response, error) { @@ -175,15 +215,23 @@ func doRequest(c *gin.Context, req *http.Request, info *common.RelayInfo) (*http client = service.GetHttpClient() } + var stopPinger context.CancelFunc if info.IsStream { helper.SetEventStreamHeaders(c) - // 处理流式请求的 ping 保活 generalSettings := operation_setting.GetGeneralSetting() if generalSettings.PingIntervalEnabled { pingInterval := time.Duration(generalSettings.PingIntervalSeconds) * time.Second - stopPinger := startPingKeepAlive(c, pingInterval) - defer stopPinger() + stopPinger = startPingKeepAlive(c, pingInterval) + // 使用defer确保在任何情况下都能停止ping goroutine + defer func() { + if stopPinger != nil { + stopPinger() + if common2.DebugEnabled { + println("SSE ping goroutine stopped by defer") + } + } + }() } }