From cbdf26bf2cead42288934304ff79ae8c6921bed5 Mon Sep 17 00:00:00 2001 From: "1808837298@qq.com" <1808837298@qq.com> Date: Tue, 4 Mar 2025 18:42:34 +0800 Subject: [PATCH] feat: Add context-aware goroutine pool for safer concurrent operations --- common/gopool.go | 25 +++++++++++++++++++++++++ relay/channel/openai/relay-openai.go | 8 ++++++-- 2 files changed, 31 insertions(+), 2 deletions(-) create mode 100644 common/gopool.go diff --git a/common/gopool.go b/common/gopool.go new file mode 100644 index 00000000..91fc62cd --- /dev/null +++ b/common/gopool.go @@ -0,0 +1,25 @@ +package common + +import ( + "context" + "fmt" + "github.com/bytedance/gopkg/util/gopool" + "math" +) + +var relayGoPool gopool.Pool + +func init() { + relayGoPool = gopool.NewPool("gopool.RelayPool", math.MaxInt32, gopool.NewConfig()) + relayGoPool.SetPanicHandler(func(ctx context.Context, i interface{}) { + //check ctx.Value("stop_chan").(chan bool) + if stopChan, ok := ctx.Value("stop_chan").(chan bool); ok { + SafeSendBool(stopChan, true) + } + SysError(fmt.Sprintf("panic in gopool.RelayPool: %v", i)) + }) +} + +func CtxGo(ctx context.Context, f func()) { + relayGoPool.CtxGo(ctx, f) +} diff --git a/relay/channel/openai/relay-openai.go b/relay/channel/openai/relay-openai.go index 11811dc4..696fa3a0 100644 --- a/relay/channel/openai/relay-openai.go +++ b/relay/channel/openai/relay-openai.go @@ -3,6 +3,7 @@ package openai import ( "bufio" "bytes" + "context" "encoding/json" "fmt" "io" @@ -120,13 +121,16 @@ func OaiStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.Rel ticker := time.NewTicker(streamingTimeout) defer ticker.Stop() - stopChan := make(chan bool) + stopChan := make(chan bool, 2) defer close(stopChan) var ( lastStreamData string mu sync.Mutex ) - gopool.Go(func() { + + ctx := context.WithValue(context.Background(), "stop_chan", stopChan) + + common.CtxGo(ctx, func() { for scanner.Scan() { //info.SetFirstResponseTime() ticker.Reset(time.Duration(constant.StreamingTimeout) * time.Second)