feat: Add context-aware goroutine pool for safer concurrent operations
This commit is contained in:
25
common/gopool.go
Normal file
25
common/gopool.go
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/bytedance/gopkg/util/gopool"
|
||||||
|
"math"
|
||||||
|
)
|
||||||
|
|
||||||
|
var relayGoPool gopool.Pool
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
relayGoPool = gopool.NewPool("gopool.RelayPool", math.MaxInt32, gopool.NewConfig())
|
||||||
|
relayGoPool.SetPanicHandler(func(ctx context.Context, i interface{}) {
|
||||||
|
//check ctx.Value("stop_chan").(chan bool)
|
||||||
|
if stopChan, ok := ctx.Value("stop_chan").(chan bool); ok {
|
||||||
|
SafeSendBool(stopChan, true)
|
||||||
|
}
|
||||||
|
SysError(fmt.Sprintf("panic in gopool.RelayPool: %v", i))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func CtxGo(ctx context.Context, f func()) {
|
||||||
|
relayGoPool.CtxGo(ctx, f)
|
||||||
|
}
|
||||||
@@ -3,6 +3,7 @@ package openai
|
|||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@@ -120,13 +121,16 @@ func OaiStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.Rel
|
|||||||
ticker := time.NewTicker(streamingTimeout)
|
ticker := time.NewTicker(streamingTimeout)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
stopChan := make(chan bool)
|
stopChan := make(chan bool, 2)
|
||||||
defer close(stopChan)
|
defer close(stopChan)
|
||||||
var (
|
var (
|
||||||
lastStreamData string
|
lastStreamData string
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
)
|
)
|
||||||
gopool.Go(func() {
|
|
||||||
|
ctx := context.WithValue(context.Background(), "stop_chan", stopChan)
|
||||||
|
|
||||||
|
common.CtxGo(ctx, func() {
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
//info.SetFirstResponseTime()
|
//info.SetFirstResponseTime()
|
||||||
ticker.Reset(time.Duration(constant.StreamingTimeout) * time.Second)
|
ticker.Reset(time.Duration(constant.StreamingTimeout) * time.Second)
|
||||||
|
|||||||
Reference in New Issue
Block a user