From 99dc3b59bc54ca8512478397b8ad69bb8b3834fa Mon Sep 17 00:00:00 2001 From: yangjianbo Date: Fri, 30 Jan 2026 14:08:04 +0800 Subject: [PATCH] =?UTF-8?q?feat(=E8=B4=A6=E5=8F=B7):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=20Sora=20=E8=B4=A6=E5=8F=B7=E5=8F=8C=E8=A1=A8=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E4=B8=8E=E5=88=9B=E5=BB=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 sora_accounts 表与 accounts.extra GIN 索引\n- OpenAI OAuth 支持同时创建 Sora 账号并同步配置\n- Token 刷新同步关联 Sora 账号凭证与扩展表\n- 增加 Sora 账号连通性测试与前端开关文案 --- backend/cmd/server/wire_gen.go | 5 +- backend/internal/repository/account_repo.go | 61 ++++++++++++ .../internal/repository/sora_account_repo.go | 98 +++++++++++++++++++ backend/internal/repository/wire.go | 1 + backend/internal/server/api_contract_test.go | 2 +- backend/internal/service/account_service.go | 3 + .../service/account_service_delete_test.go | 4 + .../internal/service/account_test_service.go | 73 ++++++++++++++ backend/internal/service/admin_service.go | 15 +++ backend/internal/service/domain_constants.go | 1 + .../service/gateway_multiplatform_test.go | 3 + .../service/gemini_multiplatform_test.go | 3 + .../internal/service/sora_account_service.go | 40 ++++++++ .../internal/service/token_refresh_service.go | 16 ++- backend/internal/service/token_refresher.go | 79 ++++++++++++++- backend/internal/service/wire.go | 3 + .../045_add_accounts_extra_index.sql | 13 +++ backend/migrations/046_add_sora_accounts.sql | 24 +++++ .../components/account/CreateAccountModal.vue | 95 +++++++++++++++++- frontend/src/i18n/locales/en.ts | 6 +- frontend/src/i18n/locales/zh.ts | 6 +- 21 files changed, 542 insertions(+), 9 deletions(-) create mode 100644 backend/internal/repository/sora_account_repo.go create mode 100644 backend/internal/service/sora_account_service.go create mode 100644 backend/migrations/045_add_accounts_extra_index.sql create mode 100644 backend/migrations/046_add_sora_accounts.sql diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 7b22a31e..b8668665 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -86,10 +86,11 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { dashboardHandler := admin.NewDashboardHandler(dashboardService, dashboardAggregationService) schedulerCache := repository.NewSchedulerCache(redisClient) accountRepository := repository.NewAccountRepository(client, db, schedulerCache) + soraAccountRepository := repository.NewSoraAccountRepository(db) proxyRepository := repository.NewProxyRepository(client, db) proxyExitInfoProber := repository.NewProxyExitInfoProber(configConfig) proxyLatencyCache := repository.NewProxyLatencyCache(redisClient) - adminService := service.NewAdminService(userRepository, groupRepository, accountRepository, proxyRepository, apiKeyRepository, redeemCodeRepository, billingCacheService, proxyExitInfoProber, proxyLatencyCache, apiKeyAuthCacheInvalidator) + adminService := service.NewAdminService(userRepository, groupRepository, accountRepository, soraAccountRepository, proxyRepository, apiKeyRepository, redeemCodeRepository, billingCacheService, proxyExitInfoProber, proxyLatencyCache, apiKeyAuthCacheInvalidator) adminUserHandler := admin.NewUserHandler(adminService) groupHandler := admin.NewGroupHandler(adminService) claudeOAuthClient := repository.NewClaudeOAuthClient() @@ -176,7 +177,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { opsAlertEvaluatorService := service.ProvideOpsAlertEvaluatorService(opsService, opsRepository, emailService, redisClient, configConfig) opsCleanupService := service.ProvideOpsCleanupService(opsRepository, db, redisClient, configConfig) opsScheduledReportService := service.ProvideOpsScheduledReportService(opsService, userService, emailService, redisClient, configConfig) - tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, compositeTokenCacheInvalidator, configConfig) + tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, soraAccountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, compositeTokenCacheInvalidator, configConfig) accountExpiryService := service.ProvideAccountExpiryService(accountRepository) v := provideCleanup(client, redisClient, opsMetricsCollector, opsAggregationService, opsAlertEvaluatorService, opsCleanupService, opsScheduledReportService, schedulerSnapshotService, tokenRefreshService, accountExpiryService, usageCleanupService, pricingService, emailQueueService, billingCacheService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService) application := &Application{ diff --git a/backend/internal/repository/account_repo.go b/backend/internal/repository/account_repo.go index c11c079b..5edc4f6d 100644 --- a/backend/internal/repository/account_repo.go +++ b/backend/internal/repository/account_repo.go @@ -1553,3 +1553,64 @@ func joinClauses(clauses []string, sep string) string { func itoa(v int) string { return strconv.Itoa(v) } + +// FindByExtraField 根据 extra 字段中的键值对查找账号。 +// 该方法限定 platform='sora',避免误查询其他平台的账号。 +// 使用 PostgreSQL JSONB @> 操作符进行高效查询(需要 GIN 索引支持)。 +// +// 应用场景:查找通过 linked_openai_account_id 关联的 Sora 账号。 +// +// FindByExtraField finds accounts by key-value pairs in the extra field. +// Limited to platform='sora' to avoid querying accounts from other platforms. +// Uses PostgreSQL JSONB @> operator for efficient queries (requires GIN index). +// +// Use case: Finding Sora accounts linked via linked_openai_account_id. +func (r *accountRepository) FindByExtraField(ctx context.Context, key string, value interface{}) ([]service.Account, error) { + accounts, err := r.client.Account.Query(). + Where( + dbaccount.PlatformEQ("sora"), // 限定平台为 sora + dbaccount.DeletedAtIsNil(), + func(s *entsql.Selector) { + path := sqljson.Path(key) + switch v := value.(type) { + case string: + preds := []*entsql.Predicate{sqljson.ValueEQ(dbaccount.FieldExtra, v, path)} + if parsed, err := strconv.ParseInt(v, 10, 64); err == nil { + preds = append(preds, sqljson.ValueEQ(dbaccount.FieldExtra, parsed, path)) + } + if len(preds) == 1 { + s.Where(preds[0]) + } else { + s.Where(entsql.Or(preds...)) + } + case int: + s.Where(entsql.Or( + sqljson.ValueEQ(dbaccount.FieldExtra, v, path), + sqljson.ValueEQ(dbaccount.FieldExtra, strconv.Itoa(v), path), + )) + case int64: + s.Where(entsql.Or( + sqljson.ValueEQ(dbaccount.FieldExtra, v, path), + sqljson.ValueEQ(dbaccount.FieldExtra, strconv.FormatInt(v, 10), path), + )) + case json.Number: + if parsed, err := v.Int64(); err == nil { + s.Where(entsql.Or( + sqljson.ValueEQ(dbaccount.FieldExtra, parsed, path), + sqljson.ValueEQ(dbaccount.FieldExtra, v.String(), path), + )) + } else { + s.Where(sqljson.ValueEQ(dbaccount.FieldExtra, v.String(), path)) + } + default: + s.Where(sqljson.ValueEQ(dbaccount.FieldExtra, value, path)) + } + }, + ). + All(ctx) + if err != nil { + return nil, translatePersistenceError(err, service.ErrAccountNotFound, nil) + } + + return r.accountsToService(ctx, accounts) +} diff --git a/backend/internal/repository/sora_account_repo.go b/backend/internal/repository/sora_account_repo.go new file mode 100644 index 00000000..e0ec6073 --- /dev/null +++ b/backend/internal/repository/sora_account_repo.go @@ -0,0 +1,98 @@ +package repository + +import ( + "context" + "database/sql" + "errors" + + "github.com/Wei-Shaw/sub2api/internal/service" +) + +// soraAccountRepository 实现 service.SoraAccountRepository 接口。 +// 使用原生 SQL 操作 sora_accounts 表,因为该表不在 Ent ORM 管理范围内。 +// +// 设计说明: +// - sora_accounts 表是独立迁移创建的,不通过 Ent Schema 管理 +// - 使用 ON CONFLICT (account_id) DO UPDATE 实现 Upsert 语义 +// - 与 accounts 主表通过外键关联,ON DELETE CASCADE 确保级联删除 +type soraAccountRepository struct { + sql *sql.DB +} + +// NewSoraAccountRepository 创建 Sora 账号扩展表仓储实例 +func NewSoraAccountRepository(sqlDB *sql.DB) service.SoraAccountRepository { + return &soraAccountRepository{sql: sqlDB} +} + +// Upsert 创建或更新 Sora 账号扩展信息 +// 使用 PostgreSQL ON CONFLICT ... DO UPDATE 实现原子性 upsert +func (r *soraAccountRepository) Upsert(ctx context.Context, accountID int64, updates map[string]any) error { + accessToken, accessOK := updates["access_token"].(string) + refreshToken, refreshOK := updates["refresh_token"].(string) + sessionToken, sessionOK := updates["session_token"].(string) + + if !accessOK || accessToken == "" || !refreshOK || refreshToken == "" { + if !sessionOK { + return errors.New("缺少 access_token/refresh_token,且未提供可更新字段") + } + result, err := r.sql.ExecContext(ctx, ` + UPDATE sora_accounts + SET session_token = CASE WHEN $2 = '' THEN session_token ELSE $2 END, + updated_at = NOW() + WHERE account_id = $1 + `, accountID, sessionToken) + if err != nil { + return err + } + rows, err := result.RowsAffected() + if err != nil { + return err + } + if rows == 0 { + return errors.New("sora_accounts 记录不存在,无法仅更新 session_token") + } + return nil + } + + _, err := r.sql.ExecContext(ctx, ` + INSERT INTO sora_accounts (account_id, access_token, refresh_token, session_token, created_at, updated_at) + VALUES ($1, $2, $3, $4, NOW(), NOW()) + ON CONFLICT (account_id) DO UPDATE SET + access_token = EXCLUDED.access_token, + refresh_token = EXCLUDED.refresh_token, + session_token = CASE WHEN EXCLUDED.session_token = '' THEN sora_accounts.session_token ELSE EXCLUDED.session_token END, + updated_at = NOW() + `, accountID, accessToken, refreshToken, sessionToken) + return err +} + +// GetByAccountID 根据账号 ID 获取 Sora 扩展信息 +func (r *soraAccountRepository) GetByAccountID(ctx context.Context, accountID int64) (*service.SoraAccount, error) { + rows, err := r.sql.QueryContext(ctx, ` + SELECT account_id, access_token, refresh_token, COALESCE(session_token, '') + FROM sora_accounts + WHERE account_id = $1 + `, accountID) + if err != nil { + return nil, err + } + defer rows.Close() + + if !rows.Next() { + return nil, nil // 记录不存在 + } + + var sa service.SoraAccount + if err := rows.Scan(&sa.AccountID, &sa.AccessToken, &sa.RefreshToken, &sa.SessionToken); err != nil { + return nil, err + } + return &sa, nil +} + +// Delete 删除 Sora 账号扩展信息 +func (r *soraAccountRepository) Delete(ctx context.Context, accountID int64) error { + _, err := r.sql.ExecContext(ctx, ` + DELETE FROM sora_accounts WHERE account_id = $1 + `, accountID) + return err +} diff --git a/backend/internal/repository/wire.go b/backend/internal/repository/wire.go index 7a8d85f4..929eb22b 100644 --- a/backend/internal/repository/wire.go +++ b/backend/internal/repository/wire.go @@ -53,6 +53,7 @@ var ProviderSet = wire.NewSet( NewAPIKeyRepository, NewGroupRepository, NewAccountRepository, + NewSoraAccountRepository, // Sora 账号扩展表仓储 NewProxyRepository, NewRedeemCodeRepository, NewPromoCodeRepository, diff --git a/backend/internal/server/api_contract_test.go b/backend/internal/server/api_contract_test.go index 4d1b4be2..f3eebd41 100644 --- a/backend/internal/server/api_contract_test.go +++ b/backend/internal/server/api_contract_test.go @@ -594,7 +594,7 @@ func newContractDeps(t *testing.T) *contractDeps { settingRepo := newStubSettingRepo() settingService := service.NewSettingService(settingRepo, cfg) - adminService := service.NewAdminService(userRepo, groupRepo, &accountRepo, proxyRepo, apiKeyRepo, redeemRepo, nil, nil, nil, nil) + adminService := service.NewAdminService(userRepo, groupRepo, &accountRepo, nil, proxyRepo, apiKeyRepo, redeemRepo, nil, nil, nil, nil) authHandler := handler.NewAuthHandler(cfg, nil, userService, settingService, nil) apiKeyHandler := handler.NewAPIKeyHandler(apiKeyService) usageHandler := handler.NewUsageHandler(usageService, apiKeyService) diff --git a/backend/internal/service/account_service.go b/backend/internal/service/account_service.go index 90365d2f..4befc996 100644 --- a/backend/internal/service/account_service.go +++ b/backend/internal/service/account_service.go @@ -25,6 +25,9 @@ type AccountRepository interface { // GetByCRSAccountID finds an account previously synced from CRS. // Returns (nil, nil) if not found. GetByCRSAccountID(ctx context.Context, crsAccountID string) (*Account, error) + // FindByExtraField 根据 extra 字段中的键值对查找账号(限定 platform='sora') + // 用于查找通过 linked_openai_account_id 关联的 Sora 账号 + FindByExtraField(ctx context.Context, key string, value interface{}) ([]Account, error) Update(ctx context.Context, account *Account) error Delete(ctx context.Context, id int64) error diff --git a/backend/internal/service/account_service_delete_test.go b/backend/internal/service/account_service_delete_test.go index e5eabfc6..f4e03e8e 100644 --- a/backend/internal/service/account_service_delete_test.go +++ b/backend/internal/service/account_service_delete_test.go @@ -54,6 +54,10 @@ func (s *accountRepoStub) GetByCRSAccountID(ctx context.Context, crsAccountID st panic("unexpected GetByCRSAccountID call") } +func (s *accountRepoStub) FindByExtraField(ctx context.Context, key string, value interface{}) ([]Account, error) { + panic("unexpected FindByExtraField call") +} + func (s *accountRepoStub) Update(ctx context.Context, account *Account) error { panic("unexpected Update call") } diff --git a/backend/internal/service/account_test_service.go b/backend/internal/service/account_test_service.go index 46376c69..f80a2af8 100644 --- a/backend/internal/service/account_test_service.go +++ b/backend/internal/service/account_test_service.go @@ -31,6 +31,7 @@ var sseDataPrefix = regexp.MustCompile(`^data:\s*`) const ( testClaudeAPIURL = "https://api.anthropic.com/v1/messages" chatgptCodexAPIURL = "https://chatgpt.com/backend-api/codex/responses" + soraMeAPIURL = "https://sora.chatgpt.com/backend/me" // Sora 用户信息接口,用于测试连接 ) // TestEvent represents a SSE event for account testing @@ -163,6 +164,10 @@ func (s *AccountTestService) TestAccountConnection(c *gin.Context, accountID int return s.testAntigravityAccountConnection(c, account, modelID) } + if account.Platform == PlatformSora { + return s.testSoraAccountConnection(c, account) + } + return s.testClaudeAccountConnection(c, account, modelID) } @@ -461,6 +466,74 @@ func (s *AccountTestService) testGeminiAccountConnection(c *gin.Context, account return s.processGeminiStream(c, resp.Body) } +// testSoraAccountConnection 测试 Sora 账号的连接 +// 调用 /backend/me 接口验证 access_token 有效性(不需要 Sentinel Token) +func (s *AccountTestService) testSoraAccountConnection(c *gin.Context, account *Account) error { + ctx := c.Request.Context() + + authToken := account.GetCredential("access_token") + if authToken == "" { + return s.sendErrorAndEnd(c, "No access token available") + } + + // Set SSE headers + c.Writer.Header().Set("Content-Type", "text/event-stream") + c.Writer.Header().Set("Cache-Control", "no-cache") + c.Writer.Header().Set("Connection", "keep-alive") + c.Writer.Header().Set("X-Accel-Buffering", "no") + c.Writer.Flush() + + // Send test_start event + s.sendEvent(c, TestEvent{Type: "test_start", Model: "sora"}) + + req, err := http.NewRequestWithContext(ctx, "GET", soraMeAPIURL, nil) + if err != nil { + return s.sendErrorAndEnd(c, "Failed to create request") + } + + // 使用 Sora 客户端标准请求头(参考 sora2api) + req.Header.Set("Authorization", "Bearer "+authToken) + req.Header.Set("User-Agent", "Sora/1.2026.007 (Android 15; 24122RKC7C; build 2600700)") + req.Header.Set("Accept", "application/json") + + // Get proxy URL + proxyURL := "" + if account.ProxyID != nil && account.Proxy != nil { + proxyURL = account.Proxy.URL() + } + + resp, err := s.httpUpstream.DoWithTLS(req, proxyURL, account.ID, account.Concurrency, account.IsTLSFingerprintEnabled()) + if err != nil { + return s.sendErrorAndEnd(c, fmt.Sprintf("Request failed: %s", err.Error())) + } + defer func() { _ = resp.Body.Close() }() + + body, _ := io.ReadAll(resp.Body) + + if resp.StatusCode != http.StatusOK { + return s.sendErrorAndEnd(c, fmt.Sprintf("Sora API returned %d: %s", resp.StatusCode, string(body))) + } + + // 解析 /me 响应,提取用户信息 + var meResp map[string]any + if err := json.Unmarshal(body, &meResp); err != nil { + // 能收到 200 就说明 token 有效 + s.sendEvent(c, TestEvent{Type: "content", Text: "Sora connection OK (token valid)"}) + } else { + // 尝试提取用户名或邮箱信息 + info := "Sora connection OK" + if name, ok := meResp["name"].(string); ok && name != "" { + info = fmt.Sprintf("Sora connection OK - User: %s", name) + } else if email, ok := meResp["email"].(string); ok && email != "" { + info = fmt.Sprintf("Sora connection OK - Email: %s", email) + } + s.sendEvent(c, TestEvent{Type: "content", Text: info}) + } + + s.sendEvent(c, TestEvent{Type: "test_complete", Success: true}) + return nil +} + // testAntigravityAccountConnection tests an Antigravity account's connection // 支持 Claude 和 Gemini 两种协议,使用非流式请求 func (s *AccountTestService) testAntigravityAccountConnection(c *gin.Context, account *Account, modelID string) error { diff --git a/backend/internal/service/admin_service.go b/backend/internal/service/admin_service.go index 0afa0716..398de0e0 100644 --- a/backend/internal/service/admin_service.go +++ b/backend/internal/service/admin_service.go @@ -272,6 +272,7 @@ type adminServiceImpl struct { userRepo UserRepository groupRepo GroupRepository accountRepo AccountRepository + soraAccountRepo SoraAccountRepository // Sora 账号扩展表仓储 proxyRepo ProxyRepository apiKeyRepo APIKeyRepository redeemCodeRepo RedeemCodeRepository @@ -286,6 +287,7 @@ func NewAdminService( userRepo UserRepository, groupRepo GroupRepository, accountRepo AccountRepository, + soraAccountRepo SoraAccountRepository, proxyRepo ProxyRepository, apiKeyRepo APIKeyRepository, redeemCodeRepo RedeemCodeRepository, @@ -298,6 +300,7 @@ func NewAdminService( userRepo: userRepo, groupRepo: groupRepo, accountRepo: accountRepo, + soraAccountRepo: soraAccountRepo, proxyRepo: proxyRepo, apiKeyRepo: apiKeyRepo, redeemCodeRepo: redeemCodeRepo, @@ -862,6 +865,18 @@ func (s *adminServiceImpl) CreateAccount(ctx context.Context, input *CreateAccou return nil, err } + // 如果是 Sora 平台账号,自动创建 sora_accounts 扩展表记录 + if account.Platform == PlatformSora && s.soraAccountRepo != nil { + soraUpdates := map[string]any{ + "access_token": account.GetCredential("access_token"), + "refresh_token": account.GetCredential("refresh_token"), + } + if err := s.soraAccountRepo.Upsert(ctx, account.ID, soraUpdates); err != nil { + // 只记录警告日志,不阻塞账号创建 + log.Printf("[AdminService] 创建 sora_accounts 记录失败: account_id=%d err=%v", account.ID, err) + } + } + // 绑定分组 if len(groupIDs) > 0 { if err := s.accountRepo.BindGroups(ctx, account.ID, groupIDs); err != nil { diff --git a/backend/internal/service/domain_constants.go b/backend/internal/service/domain_constants.go index 3bb63ffa..31c576ed 100644 --- a/backend/internal/service/domain_constants.go +++ b/backend/internal/service/domain_constants.go @@ -22,6 +22,7 @@ const ( PlatformOpenAI = "openai" PlatformGemini = "gemini" PlatformAntigravity = "antigravity" + PlatformSora = "sora" ) // Account type constants diff --git a/backend/internal/service/gateway_multiplatform_test.go b/backend/internal/service/gateway_multiplatform_test.go index 26eb24e4..d9ae6709 100644 --- a/backend/internal/service/gateway_multiplatform_test.go +++ b/backend/internal/service/gateway_multiplatform_test.go @@ -77,6 +77,9 @@ func (m *mockAccountRepoForPlatform) Create(ctx context.Context, account *Accoun func (m *mockAccountRepoForPlatform) GetByCRSAccountID(ctx context.Context, crsAccountID string) (*Account, error) { return nil, nil } +func (m *mockAccountRepoForPlatform) FindByExtraField(ctx context.Context, key string, value interface{}) ([]Account, error) { + return nil, nil +} func (m *mockAccountRepoForPlatform) Update(ctx context.Context, account *Account) error { return nil } diff --git a/backend/internal/service/gemini_multiplatform_test.go b/backend/internal/service/gemini_multiplatform_test.go index c63a020c..a2c6f937 100644 --- a/backend/internal/service/gemini_multiplatform_test.go +++ b/backend/internal/service/gemini_multiplatform_test.go @@ -66,6 +66,9 @@ func (m *mockAccountRepoForGemini) Create(ctx context.Context, account *Account) func (m *mockAccountRepoForGemini) GetByCRSAccountID(ctx context.Context, crsAccountID string) (*Account, error) { return nil, nil } +func (m *mockAccountRepoForGemini) FindByExtraField(ctx context.Context, key string, value interface{}) ([]Account, error) { + return nil, nil +} func (m *mockAccountRepoForGemini) Update(ctx context.Context, account *Account) error { return nil } func (m *mockAccountRepoForGemini) Delete(ctx context.Context, id int64) error { return nil } func (m *mockAccountRepoForGemini) List(ctx context.Context, params pagination.PaginationParams) ([]Account, *pagination.PaginationResult, error) { diff --git a/backend/internal/service/sora_account_service.go b/backend/internal/service/sora_account_service.go new file mode 100644 index 00000000..eccc1acf --- /dev/null +++ b/backend/internal/service/sora_account_service.go @@ -0,0 +1,40 @@ +package service + +import "context" + +// SoraAccountRepository Sora 账号扩展表仓储接口 +// 用于管理 sora_accounts 表,与 accounts 主表形成双表结构。 +// +// 设计说明: +// - sora_accounts 表存储 Sora 账号的 OAuth 凭证副本 +// - Sora gateway 优先读取此表的字段以获得更好的查询性能 +// - 主表 accounts 通过 credentials JSON 字段也存储相同信息 +// - Token 刷新时需要同时更新两个表以保持数据一致性 +type SoraAccountRepository interface { + // Upsert 创建或更新 Sora 账号扩展信息 + // accountID: 关联的 accounts.id + // updates: 要更新的字段,支持 access_token、refresh_token、session_token + // + // 如果记录不存在则创建,存在则更新。 + // 用于: + // 1. 创建 Sora 账号时初始化扩展表 + // 2. Token 刷新时同步更新扩展表 + Upsert(ctx context.Context, accountID int64, updates map[string]any) error + + // GetByAccountID 根据账号 ID 获取 Sora 扩展信息 + // 返回 nil, nil 表示记录不存在(非错误) + GetByAccountID(ctx context.Context, accountID int64) (*SoraAccount, error) + + // Delete 删除 Sora 账号扩展信息 + // 通常由外键 ON DELETE CASCADE 自动处理,此方法用于手动清理 + Delete(ctx context.Context, accountID int64) error +} + +// SoraAccount Sora 账号扩展信息 +// 对应 sora_accounts 表,存储 Sora 账号的 OAuth 凭证副本 +type SoraAccount struct { + AccountID int64 // 关联的 accounts.id + AccessToken string // OAuth access_token + RefreshToken string // OAuth refresh_token + SessionToken string // Session token(可选,用于 ST→AT 兜底) +} diff --git a/backend/internal/service/token_refresh_service.go b/backend/internal/service/token_refresh_service.go index 7364bd33..797ab721 100644 --- a/backend/internal/service/token_refresh_service.go +++ b/backend/internal/service/token_refresh_service.go @@ -15,6 +15,7 @@ import ( // 定期检查并刷新即将过期的token type TokenRefreshService struct { accountRepo AccountRepository + soraAccountRepo SoraAccountRepository // Sora 扩展表仓储,用于双表同步 refreshers []TokenRefresher cfg *config.TokenRefreshConfig cacheInvalidator TokenCacheInvalidator @@ -43,7 +44,7 @@ func NewTokenRefreshService( // 注册平台特定的刷新器 s.refreshers = []TokenRefresher{ NewClaudeTokenRefresher(oauthService), - NewOpenAITokenRefresher(openaiOAuthService), + NewOpenAITokenRefresher(openaiOAuthService, accountRepo), NewGeminiTokenRefresher(geminiOAuthService), NewAntigravityTokenRefresher(antigravityOAuthService), } @@ -51,6 +52,19 @@ func NewTokenRefreshService( return s } +// SetSoraAccountRepo 设置 Sora 账号扩展表仓储 +// 用于在 OpenAI Token 刷新时同步更新 sora_accounts 表 +// 需要在 Start() 之前调用 +func (s *TokenRefreshService) SetSoraAccountRepo(repo SoraAccountRepository) { + s.soraAccountRepo = repo + // 将 soraAccountRepo 注入到 OpenAITokenRefresher + for _, refresher := range s.refreshers { + if openaiRefresher, ok := refresher.(*OpenAITokenRefresher); ok { + openaiRefresher.SetSoraAccountRepo(repo) + } + } +} + // Start 启动后台刷新服务 func (s *TokenRefreshService) Start() { if !s.cfg.Enabled { diff --git a/backend/internal/service/token_refresher.go b/backend/internal/service/token_refresher.go index 214a290a..807524fd 100644 --- a/backend/internal/service/token_refresher.go +++ b/backend/internal/service/token_refresher.go @@ -2,6 +2,7 @@ package service import ( "context" + "log" "strconv" "time" ) @@ -82,16 +83,26 @@ func (r *ClaudeTokenRefresher) Refresh(ctx context.Context, account *Account) (m // OpenAITokenRefresher 处理 OpenAI OAuth token刷新 type OpenAITokenRefresher struct { - openaiOAuthService *OpenAIOAuthService + openaiOAuthService *OpenAIOAuthService + accountRepo AccountRepository + soraAccountRepo SoraAccountRepository // Sora 扩展表仓储,用于双表同步 } // NewOpenAITokenRefresher 创建 OpenAI token刷新器 -func NewOpenAITokenRefresher(openaiOAuthService *OpenAIOAuthService) *OpenAITokenRefresher { +func NewOpenAITokenRefresher(openaiOAuthService *OpenAIOAuthService, accountRepo AccountRepository) *OpenAITokenRefresher { return &OpenAITokenRefresher{ openaiOAuthService: openaiOAuthService, + accountRepo: accountRepo, } } +// SetSoraAccountRepo 设置 Sora 账号扩展表仓储 +// 用于在 Token 刷新时同步更新 sora_accounts 表 +// 如果未设置,syncLinkedSoraAccounts 只会更新 accounts.credentials +func (r *OpenAITokenRefresher) SetSoraAccountRepo(repo SoraAccountRepository) { + r.soraAccountRepo = repo +} + // CanRefresh 检查是否能处理此账号 // 只处理 openai 平台的 oauth 类型账号 func (r *OpenAITokenRefresher) CanRefresh(account *Account) bool { @@ -112,6 +123,7 @@ func (r *OpenAITokenRefresher) NeedsRefresh(account *Account, refreshWindow time // Refresh 执行token刷新 // 保留原有credentials中的所有字段,只更新token相关字段 +// 刷新成功后,异步同步关联的 Sora 账号 func (r *OpenAITokenRefresher) Refresh(ctx context.Context, account *Account) (map[string]any, error) { tokenInfo, err := r.openaiOAuthService.RefreshAccountToken(ctx, account) if err != nil { @@ -128,5 +140,68 @@ func (r *OpenAITokenRefresher) Refresh(ctx context.Context, account *Account) (m } } + // 异步同步关联的 Sora 账号(不阻塞主流程) + if r.accountRepo != nil { + go r.syncLinkedSoraAccounts(context.Background(), account.ID, newCredentials) + } + return newCredentials, nil } + +// syncLinkedSoraAccounts 同步关联的 Sora 账号的 token(双表同步) +// 该方法异步执行,失败只记录日志,不影响主流程 +// +// 同步策略: +// 1. 更新 accounts.credentials(主表) +// 2. 更新 sora_accounts 扩展表(如果 soraAccountRepo 已设置) +// +// 超时控制:30 秒,防止数据库阻塞导致 goroutine 泄漏 +func (r *OpenAITokenRefresher) syncLinkedSoraAccounts(ctx context.Context, openaiAccountID int64, newCredentials map[string]any) { + // 添加超时控制,防止 goroutine 泄漏 + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + // 1. 查找所有关联的 Sora 账号(限定 platform='sora') + soraAccounts, err := r.accountRepo.FindByExtraField(ctx, "linked_openai_account_id", openaiAccountID) + if err != nil { + log.Printf("[TokenSync] 查找关联 Sora 账号失败: openai_account_id=%d err=%v", openaiAccountID, err) + return + } + + if len(soraAccounts) == 0 { + // 没有关联的 Sora 账号,直接返回 + return + } + + // 2. 同步更新每个 Sora 账号的双表数据 + for _, soraAccount := range soraAccounts { + // 2.1 更新 accounts.credentials(主表) + soraAccount.Credentials["access_token"] = newCredentials["access_token"] + soraAccount.Credentials["refresh_token"] = newCredentials["refresh_token"] + if expiresAt, ok := newCredentials["expires_at"]; ok { + soraAccount.Credentials["expires_at"] = expiresAt + } + + if err := r.accountRepo.Update(ctx, &soraAccount); err != nil { + log.Printf("[TokenSync] 更新 Sora accounts 表失败: sora_account_id=%d openai_account_id=%d err=%v", + soraAccount.ID, openaiAccountID, err) + continue + } + + // 2.2 更新 sora_accounts 扩展表(如果仓储已设置) + if r.soraAccountRepo != nil { + soraUpdates := map[string]any{ + "access_token": newCredentials["access_token"], + "refresh_token": newCredentials["refresh_token"], + } + if err := r.soraAccountRepo.Upsert(ctx, soraAccount.ID, soraUpdates); err != nil { + log.Printf("[TokenSync] 更新 sora_accounts 表失败: account_id=%d openai_account_id=%d err=%v", + soraAccount.ID, openaiAccountID, err) + // 继续处理其他账号,不中断 + } + } + + log.Printf("[TokenSync] 成功同步 Sora 账号 token: sora_account_id=%d openai_account_id=%d dual_table=%v", + soraAccount.ID, openaiAccountID, r.soraAccountRepo != nil) + } +} diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index b210286d..73d23025 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -39,6 +39,7 @@ func ProvideEmailQueueService(emailService *EmailService) *EmailQueueService { // ProvideTokenRefreshService creates and starts TokenRefreshService func ProvideTokenRefreshService( accountRepo AccountRepository, + soraAccountRepo SoraAccountRepository, // Sora 扩展表仓储,用于双表同步 oauthService *OAuthService, openaiOAuthService *OpenAIOAuthService, geminiOAuthService *GeminiOAuthService, @@ -47,6 +48,8 @@ func ProvideTokenRefreshService( cfg *config.Config, ) *TokenRefreshService { svc := NewTokenRefreshService(accountRepo, oauthService, openaiOAuthService, geminiOAuthService, antigravityOAuthService, cacheInvalidator, cfg) + // 注入 Sora 账号扩展表仓储,用于 OpenAI Token 刷新时同步 sora_accounts 表 + svc.SetSoraAccountRepo(soraAccountRepo) svc.Start() return svc } diff --git a/backend/migrations/045_add_accounts_extra_index.sql b/backend/migrations/045_add_accounts_extra_index.sql new file mode 100644 index 00000000..05414062 --- /dev/null +++ b/backend/migrations/045_add_accounts_extra_index.sql @@ -0,0 +1,13 @@ +-- Migration: 045_add_accounts_extra_index +-- 为 accounts.extra 字段添加 GIN 索引,优化 FindByExtraField 查询性能 +-- 用于支持通过 extra 字段中的 linked_openai_account_id 快速查找关联的 Sora 账号 + +CREATE INDEX IF NOT EXISTS idx_accounts_extra_gin +ON accounts USING GIN (extra); + +-- 查询示例(使用 @> 操作符) +-- EXPLAIN ANALYZE +-- SELECT * FROM accounts +-- WHERE platform = 'sora' +-- AND extra @> '{"linked_openai_account_id": 123}'::jsonb +-- AND deleted_at IS NULL; diff --git a/backend/migrations/046_add_sora_accounts.sql b/backend/migrations/046_add_sora_accounts.sql new file mode 100644 index 00000000..62f98718 --- /dev/null +++ b/backend/migrations/046_add_sora_accounts.sql @@ -0,0 +1,24 @@ +-- Migration: 046_add_sora_accounts +-- 新增 sora_accounts 扩展表,存储 Sora 账号的 OAuth 凭证 +-- 与 accounts 主表形成双表结构: +-- - accounts: 统一账号管理和调度 +-- - sora_accounts: Sora gateway 快速读取和资格校验 +-- +-- 设计说明: +-- - account_id 为主键,外键关联 accounts.id +-- - ON DELETE CASCADE 确保删除账号时自动清理扩展表 +-- - access_token/refresh_token 与 accounts.credentials 保持同步 + +CREATE TABLE IF NOT EXISTS sora_accounts ( + account_id BIGINT PRIMARY KEY, + access_token TEXT NOT NULL, + refresh_token TEXT NOT NULL, + session_token TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT fk_sora_accounts_account_id + FOREIGN KEY (account_id) REFERENCES accounts(id) + ON DELETE CASCADE +); + +-- 索引说明:主键已自动创建唯一索引,无需额外创建 idx_sora_accounts_account_id diff --git a/frontend/src/components/account/CreateAccountModal.vue b/frontend/src/components/account/CreateAccountModal.vue index 144241ff..0e81a717 100644 --- a/frontend/src/components/account/CreateAccountModal.vue +++ b/frontend/src/components/account/CreateAccountModal.vue @@ -1482,6 +1482,32 @@
+ +
+ +
+ ([]) const customErrorCodeInput = ref(null) const interceptWarmupRequests = ref(false) const autoPauseOnExpired = ref(true) +const enableSoraOnOpenAIOAuth = ref(false) // OpenAI OAuth 时同时启用 Sora const mixedScheduling = ref(false) // For antigravity accounts: enable mixed scheduling const tempUnschedEnabled = ref(false) const tempUnschedRules = ref([]) @@ -2334,6 +2361,7 @@ const resetForm = () => { customErrorCodeInput.value = null interceptWarmupRequests.value = false autoPauseOnExpired.value = true + enableSoraOnOpenAIOAuth.value = false // Reset quota control state windowCostEnabled.value = false windowCostLimit.value = null @@ -2509,7 +2537,72 @@ const handleOpenAIExchange = async (authCode: string) => { const credentials = openaiOAuth.buildCredentials(tokenInfo) const extra = openaiOAuth.buildExtraInfo(tokenInfo) - await createAccountAndFinish('openai', 'oauth', credentials, extra) + + // 应用临时不可调度配置 + if (!applyTempUnschedConfig(credentials)) { + return + } + + // 1. 创建 OpenAI 账号 + const openaiAccount = await adminAPI.accounts.create({ + name: form.name, + notes: form.notes, + platform: 'openai', + type: 'oauth', + credentials, + extra, + proxy_id: form.proxy_id, + concurrency: form.concurrency, + priority: form.priority, + rate_multiplier: form.rate_multiplier, + group_ids: form.group_ids, + expires_at: form.expires_at, + auto_pause_on_expired: autoPauseOnExpired.value + }) + + appStore.showSuccess(t('admin.accounts.accountCreated')) + + // 2. 如果启用了 Sora,同时创建 Sora 账号 + if (enableSoraOnOpenAIOAuth.value) { + try { + // Sora 使用相同的 OAuth credentials + const soraCredentials = { + access_token: credentials.access_token, + refresh_token: credentials.refresh_token, + expires_at: credentials.expires_at + } + + // 建立关联关系 + const soraExtra = { + ...extra, + linked_openai_account_id: String(openaiAccount.id) + } + + await adminAPI.accounts.create({ + name: `${form.name} (Sora)`, + notes: form.notes, + platform: 'sora', + type: 'oauth', + credentials: soraCredentials, + extra: soraExtra, + proxy_id: form.proxy_id, + concurrency: form.concurrency, + priority: form.priority, + rate_multiplier: form.rate_multiplier, + group_ids: form.group_ids, + expires_at: form.expires_at, + auto_pause_on_expired: autoPauseOnExpired.value + }) + + appStore.showSuccess(t('admin.accounts.soraAccountCreated')) + } catch (error: any) { + console.error('创建 Sora 账号失败:', error) + appStore.showWarning(t('admin.accounts.soraAccountFailed')) + } + } + + emit('created') + handleClose() } catch (error: any) { openaiOAuth.error.value = error.response?.data?.detail || t('admin.accounts.oauth.authFailed') appStore.showError(openaiOAuth.error.value) diff --git a/frontend/src/i18n/locales/en.ts b/frontend/src/i18n/locales/en.ts index e293491b..a1403a8e 100644 --- a/frontend/src/i18n/locales/en.ts +++ b/frontend/src/i18n/locales/en.ts @@ -1245,7 +1245,9 @@ export default { // OpenAI specific hints openai: { baseUrlHint: 'Leave default for official OpenAI API', - apiKeyHint: 'Your OpenAI API Key' + apiKeyHint: 'Your OpenAI API Key', + enableSora: 'Enable Sora simultaneously', + enableSoraHint: 'Sora uses the same OpenAI account. Enable to create Sora account simultaneously.' }, modelRestriction: 'Model Restriction (Optional)', modelWhitelist: 'Model Whitelist', @@ -1337,6 +1339,8 @@ export default { creating: 'Creating...', updating: 'Updating...', accountCreated: 'Account created successfully', + soraAccountCreated: 'Sora account created simultaneously', + soraAccountFailed: 'Failed to create Sora account, please add manually later', accountUpdated: 'Account updated successfully', failedToCreate: 'Failed to create account', failedToUpdate: 'Failed to update account', diff --git a/frontend/src/i18n/locales/zh.ts b/frontend/src/i18n/locales/zh.ts index dbeb3819..7b85ca64 100644 --- a/frontend/src/i18n/locales/zh.ts +++ b/frontend/src/i18n/locales/zh.ts @@ -1380,7 +1380,9 @@ export default { // OpenAI specific hints openai: { baseUrlHint: '留空使用官方 OpenAI API', - apiKeyHint: '您的 OpenAI API Key' + apiKeyHint: '您的 OpenAI API Key', + enableSora: '同时启用 Sora', + enableSoraHint: 'Sora 使用相同的 OpenAI 账号,开启后将同时创建 Sora 平台账号' }, modelRestriction: '模型限制(可选)', modelWhitelist: '模型白名单', @@ -1469,6 +1471,8 @@ export default { creating: '创建中...', updating: '更新中...', accountCreated: '账号创建成功', + soraAccountCreated: 'Sora 账号已同时创建', + soraAccountFailed: 'Sora 账号创建失败,请稍后手动添加', accountUpdated: '账号更新成功', failedToCreate: '创建账号失败', failedToUpdate: '更新账号失败',