fix(relay): 优化数据流处理

- 移除了 bufio 的无效使用
- 在 StreamScannerHandler 中增加了初始和最大缓冲区大小的常量设置
- 调整 StreamScannerHandler 中缓冲区大小,避免出现token too long报错
This commit is contained in:
quran
2025-04-10 16:56:16 +08:00
parent 8efa12b941
commit 9328b907f2
2 changed files with 7 additions and 10 deletions

View File

@@ -1,7 +1,6 @@
package dify package dify
import ( import (
"bufio"
"bytes" "bytes"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
@@ -213,12 +212,8 @@ func streamResponseDify2OpenAI(difyResponse DifyChunkChatCompletionResponse) *dt
func difyStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo) (*dto.OpenAIErrorWithStatusCode, *dto.Usage) { func difyStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo) (*dto.OpenAIErrorWithStatusCode, *dto.Usage) {
var responseText string var responseText string
usage := &dto.Usage{} usage := &dto.Usage{}
scanner := bufio.NewScanner(resp.Body)
scanner.Split(bufio.ScanLines)
var nodeToken int var nodeToken int
helper.SetEventStreamHeaders(c) helper.SetEventStreamHeaders(c)
helper.StreamScannerHandler(c, resp, info, func(data string) bool { helper.StreamScannerHandler(c, resp, info, func(data string) bool {
var difyResponse DifyChunkChatCompletionResponse var difyResponse DifyChunkChatCompletionResponse
err := json.Unmarshal([]byte(data), &difyResponse) err := json.Unmarshal([]byte(data), &difyResponse)
@@ -247,13 +242,10 @@ func difyStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.Re
} }
return true return true
}) })
if err := scanner.Err(); err != nil {
common.SysError("error reading stream: " + err.Error())
}
helper.Done(c) helper.Done(c)
err := resp.Body.Close() err := resp.Body.Close()
if err != nil { if err != nil {
//return service.OpenAIErrorWrapper(err, "close_response_body_failed", http.StatusInternalServerError), nil // return service.OpenAIErrorWrapper(err, "close_response_body_failed", http.StatusInternalServerError), nil
common.SysError("close_response_body_failed: " + err.Error()) common.SysError("close_response_body_failed: " + err.Error())
} }
if usage.TotalTokens == 0 { if usage.TotalTokens == 0 {

View File

@@ -14,6 +14,11 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
const (
InitialScannerBufferSize = 1 << 20 // 1MB (1*1024*1024)
MaxScannerBufferSize = 10 << 20 // 10MB (10*1024*1024)
)
func StreamScannerHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo, dataHandler func(data string) bool) { func StreamScannerHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo, dataHandler func(data string) bool) {
if resp == nil { if resp == nil {
@@ -38,7 +43,7 @@ func StreamScannerHandler(c *gin.Context, resp *http.Response, info *relaycommon
ticker.Stop() ticker.Stop()
close(stopChan) close(stopChan)
}() }()
scanner.Buffer(make([]byte, InitialScannerBufferSize), MaxScannerBufferSize)
scanner.Split(bufio.ScanLines) scanner.Split(bufio.ScanLines)
SetEventStreamHeaders(c) SetEventStreamHeaders(c)