webforwarding.go 16 KB


  1. package web
  2. import (
  3. "context"
  4. "fmt"
  5. v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  7. "github.com/go-nunu/nunu-layout-advanced/internal/repository/api/waf"
  8. "github.com/go-nunu/nunu-layout-advanced/internal/service"
  9. "github.com/go-nunu/nunu-layout-advanced/internal/service/api/flexCdn"
  10. waf2 "github.com/go-nunu/nunu-layout-advanced/internal/service/api/waf"
  11. "github.com/go-nunu/nunu-layout-advanced/internal/service/api/waf/common"
  12. "github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
  13. "golang.org/x/sync/errgroup"
  14. "sort"
  15. )
  16. type WebForwardingService interface {
  17. GetWebForwarding(ctx context.Context, req v1.GetForwardingRequest) (v1.WebForwardingDataRequest, error)
  18. GetWebForwardingWafWebAllIps(ctx context.Context, req v1.GetForwardingRequest) ([]v1.WebForwardingDataRequest, error)
  19. AddWebForwarding(ctx context.Context, req *v1.WebForwardingRequest) (int, error)
  20. EditWebForwarding(ctx context.Context, req *v1.WebForwardingRequest) error
  21. DeleteWebForwarding(ctx context.Context, req v1.DeleteWebForwardingRequest) error
  22. }
  23. func NewWebForwardingService(
  24. service *service.Service,
  25. required service.RequiredService,
  26. webForwardingRepository waf.WebForwardingRepository,
  27. crawler service.CrawlerService,
  28. parser service.ParserService,
  29. wafformatter common.WafFormatterService,
  30. aoDun service.AoDunService,
  31. mq *rabbitmq.RabbitMQ,
  32. gatewayIp common.GatewayipService,
  33. globalLimitRep waf.GlobalLimitRepository,
  34. cdn flexCdn.CdnService,
  35. proxy flexCdn.ProxyService,
  36. sslCert flexCdn.SslCertService,
  37. websocket flexCdn.WebsocketService,
  38. cc waf2.CcService,
  39. ccIpList waf2.CcIpListService,
  40. aidedWeb *AidedWebService,
  41. ) WebForwardingService {
  42. return &webForwardingService{
  43. Service: service,
  44. webForwardingRepository: webForwardingRepository,
  45. required: required,
  46. parser: parser,
  47. crawler: crawler,
  48. wafformatter: wafformatter,
  49. aoDun: aoDun,
  50. mq: mq,
  51. gatewayIp: gatewayIp,
  52. cdn: cdn,
  53. globalLimitRep: globalLimitRep,
  54. proxy: proxy,
  55. sslCert: sslCert,
  56. websocket: websocket,
  57. cc: cc,
  58. ccIpList: ccIpList,
  59. aidedWeb: aidedWeb,
  60. }
  61. }
  62. type webForwardingService struct {
  63. *service.Service
  64. webForwardingRepository waf.WebForwardingRepository
  65. required service.RequiredService
  66. parser service.ParserService
  67. crawler service.CrawlerService
  68. wafformatter common.WafFormatterService
  69. aoDun service.AoDunService
  70. mq *rabbitmq.RabbitMQ
  71. gatewayIp common.GatewayipService
  72. cdn flexCdn.CdnService
  73. globalLimitRep waf.GlobalLimitRepository
  74. proxy flexCdn.ProxyService
  75. sslCert flexCdn.SslCertService
  76. websocket flexCdn.WebsocketService
  77. cc waf2.CcService
  78. ccIpList waf2.CcIpListService
  79. aidedWeb *AidedWebService
  80. }
  81. func (s *webForwardingService) GetWebForwarding(ctx context.Context, req v1.GetForwardingRequest) (v1.WebForwardingDataRequest, error) {
  82. var webForwarding model.WebForwarding
  83. var backend model.WebForwardingRule
  84. g, gCtx := errgroup.WithContext(ctx)
  85. g.Go(func() error {
  86. res, e := s.webForwardingRepository.GetWebForwarding(gCtx, int64(req.Id))
  87. if e != nil {
  88. // 直接返回错误,errgroup 会捕获它
  89. return fmt.Errorf("GetWebForwarding failed: %w", e)
  90. }
  91. if res != nil {
  92. webForwarding = *res
  93. }
  94. return nil
  95. })
  96. g.Go(func() error {
  97. res, e := s.webForwardingRepository.GetWebForwardingIpsByID(ctx, req.Id)
  98. if e != nil {
  99. return fmt.Errorf("GetWebForwardingByID failed: %w", e)
  100. }
  101. if res != nil {
  102. backend = *res
  103. }
  104. return nil
  105. })
  106. if err := g.Wait(); err != nil {
  107. return v1.WebForwardingDataRequest{}, err
  108. }
  109. return v1.WebForwardingDataRequest{
  110. Id: webForwarding.Id,
  111. Port: webForwarding.Port,
  112. Domain: webForwarding.Domain,
  113. IsHttps: webForwarding.IsHttps,
  114. Comment: webForwarding.Comment,
  115. BackendList: backend.BackendList,
  116. HttpsKey: webForwarding.HttpsKey,
  117. HttpsCert: webForwarding.HttpsCert,
  118. Proxy: webForwarding.Proxy,
  119. CcConfig: v1.CcConfigRequest{
  120. IsOn: webForwarding.Cc,
  121. ThresholdMethod: webForwarding.ThresholdMethod,
  122. Level: webForwarding.Level,
  123. Limit5s: webForwarding.Limit5s,
  124. Limit60s: webForwarding.Limit60s,
  125. Limit300s: webForwarding.Limit300s,
  126. },
  127. }, nil
  128. }
  129. // AddWebForwarding 添加Web转发配置
  130. // 该函数负责创建完整的Web转发配置,包括:
  131. // 1. 数据验证和预处理
  132. // 2. SSL证书管理
  133. // 3. CDN网站创建和配置
  134. // 4. 源站服务器添加
  135. // 5. 各种功能开启(WebSocket、Proxy、日志、CC防护等)
  136. // 6. 数据库记录保存
  137. // 7. 白名单任务发布
  138. func (s *webForwardingService) AddWebForwarding(ctx context.Context, req *v1.WebForwardingRequest) (int, error) {
  139. // 1. 数据准备和验证
  140. require, formData, err := s.aidedWeb.PrepareWafData(ctx, req)
  141. if err != nil {
  142. return 0, err
  143. }
  144. if err := s.aidedWeb.ValidateAddRequest(ctx, req, require); err != nil {
  145. return 0, err
  146. }
  147. // 2. 处理SSL证书
  148. if err := s.aidedWeb.ProcessSSLCertificate(ctx, req, require.CdnUid); err != nil {
  149. return 0, err
  150. }
  151. // 3. 创建CDN网站
  152. webId, err := s.aidedWeb.CreateCdnWebsite(ctx, formData)
  153. if err != nil {
  154. return 0, err
  155. }
  156. // 4. 配置WebSocket
  157. if err := s.aidedWeb.ConfigureWebsocket(ctx, webId); err != nil {
  158. return 0, err
  159. }
  160. // 5. 添加源站到网站
  161. cdnOriginIds, err := s.aidedWeb.AddOriginsToWebsite(ctx, req, webId)
  162. if err != nil {
  163. return 0, err
  164. }
  165. // 6. 配置各种功能
  166. if err := s.aidedWeb.ConfigureProxyProtocol(ctx, req.WebForwardingData.Proxy, webId); err != nil {
  167. return 0, err
  168. }
  169. if err := s.aidedWeb.EditLog(ctx, webId); err != nil {
  170. return 0, err
  171. }
  172. if err := s.aidedWeb.ConfigureCCProtection(ctx, req.WebForwardingData.CcConfig, webId); err != nil {
  173. return 0, err
  174. }
  175. if err := s.aidedWeb.ConfigureWafFirewall(ctx, webId, require.GroupId); err != nil {
  176. return 0, err
  177. }
  178. // 7. 保存到数据库
  179. id, err := s.aidedWeb.SaveToDatabase(ctx, req, require, webId, cdnOriginIds)
  180. if err != nil {
  181. return 0, err
  182. }
  183. // 8. 处理异步任务
  184. s.aidedWeb.ProcessAsyncTasks(ctx, req, require)
  185. return id, nil
  186. }
  187. func (s *webForwardingService) EditWebForwarding(ctx context.Context, req *v1.WebForwardingRequest) error {
  188. // 1. 获取原始数据
  189. oldData, err := s.webForwardingRepository.GetWebForwarding(ctx, int64(req.WebForwardingData.Id))
  190. if err != nil {
  191. return fmt.Errorf("获取原始Web转发数据失败: %w", err)
  192. }
  193. if s.aidedWeb.ValidateDeletePermission(oldData.HostId, req.HostId) != nil {
  194. return fmt.Errorf("用户权限不足")
  195. }
  196. // 继承旧的证书ID和策略ID,以便后续逻辑处理
  197. req.WebForwardingData.SslCertId = int64(oldData.SslCertId)
  198. req.WebForwardingData.SslPolicyId = int64(oldData.SslPolicyId)
  199. // 2. 准备WAF数据和基础验证
  200. require, formData, err := s.aidedWeb.PrepareWafData(ctx, req)
  201. if err != nil {
  202. return err
  203. }
  204. if err := s.aidedWeb.ValidateEditRequest(ctx, req); err != nil {
  205. return err
  206. }
  207. // 3. 处理SSL证书更新
  208. if err := s.aidedWeb.ProcessSSLCertificateUpdate(ctx, req, oldData, require.CdnUid); err != nil {
  209. return err
  210. }
  211. // 4. 更新核心CDN配置(端口、协议、域名、备注等)
  212. if err := s.aidedWeb.UpdateCdnConfiguration(ctx, req, oldData, require.Tag, formData); err != nil {
  213. return err
  214. }
  215. // 5. 更新Proxy Protocol配置
  216. if err := s.aidedWeb.ConfigureProxyProtocol(ctx, req.WebForwardingData.Proxy, int64(oldData.CdnWebId)); err != nil {
  217. return err
  218. }
  219. // 6. 更新CC防护配置
  220. if err := s.aidedWeb.ConfigureCCProtection(ctx, req.WebForwardingData.CcConfig, int64(oldData.CdnWebId)); err != nil {
  221. return err
  222. }
  223. // 7. 处理域名白名单变更
  224. if err := s.aidedWeb.ProcessDomainWhitelistChanges(ctx, req, oldData, require); err != nil {
  225. return err
  226. }
  227. // 8. 获取后端IP规则数据
  228. ipData, err := s.webForwardingRepository.GetWebForwardingIpsByID(ctx, req.WebForwardingData.Id)
  229. if err != nil {
  230. return fmt.Errorf("获取Web转发IP规则失败: %w", err)
  231. }
  232. // 9. 处理源站IP白名单变更
  233. if err := s.aidedWeb.ProcessIpWhitelistChanges(ctx, req, ipData); err != nil {
  234. return err
  235. }
  236. // 10. 更新CDN上的源站服务器
  237. if err := s.aidedWeb.UpdateOriginServers(ctx, req, oldData, ipData); err != nil {
  238. return err
  239. }
  240. // 11. 更新本地数据库记录
  241. if err := s.aidedWeb.UpdateDatabaseRecords(ctx, req, require, ipData); err != nil {
  242. return err
  243. }
  244. return nil
  245. }
  246. // DeleteWebForwarding 批量删除Web转发配置
  247. // 该函数遍历ID列表,对每个ID执行完整的、独立的删除流程
  248. func (s *webForwardingService) DeleteWebForwarding(ctx context.Context, req v1.DeleteWebForwardingRequest) error {
  249. for _, id := range req.Ids {
  250. if err := s.deleteSingleWebForwarding(ctx, id, req.HostId, req.Uid); err != nil {
  251. // 增加错误上下文,方便定位问题
  252. return fmt.Errorf("删除Web转发配置失败 ID:%d, %w", id, err)
  253. }
  254. }
  255. return nil
  256. }
  257. // deleteSingleWebForwarding 删除单个Web转发配置的完整流程
  258. func (s *webForwardingService) deleteSingleWebForwarding(ctx context.Context, id int, hostId int, uid int) error {
  259. // 1. 获取并验证数据
  260. oldData, err := s.webForwardingRepository.GetWebForwarding(ctx, int64(id))
  261. if err != nil {
  262. return fmt.Errorf("获取Web转发数据失败: %w", err)
  263. }
  264. if err := s.aidedWeb.ValidateDeletePermission(oldData.HostId, hostId); err != nil {
  265. return err
  266. }
  267. // 2. 删除CDN服务器
  268. if err := s.aidedWeb.DeleteCdnServer(ctx, oldData.CdnWebId); err != nil {
  269. return err
  270. }
  271. // 3. 处理域名白名单清理
  272. if err := s.aidedWeb.ProcessDeleteDomainWhitelist(ctx, oldData, uid); err != nil {
  273. return err
  274. }
  275. // 4. 处理IP白名单清理
  276. if err := s.aidedWeb.ProcessDeleteIpWhitelist(ctx, id); err != nil {
  277. return err
  278. }
  279. // 5. 清理SSL证书
  280. if err := s.aidedWeb.CleanupSSLCertificate(ctx, oldData); err != nil {
  281. return err
  282. }
  283. // 6. 清理数据库记录
  284. if err := s.aidedWeb.CleanupDatabaseRecords(ctx, id); err != nil {
  285. return err
  286. }
  287. return nil
  288. }
  289. func (s *webForwardingService) GetWebForwardingWafWebAllIps(ctx context.Context, req v1.GetForwardingRequest) ([]v1.WebForwardingDataRequest, error) {
  290. type CombinedResult struct {
  291. Id int
  292. Forwarding *model.WebForwarding
  293. BackendRule *model.WebForwardingRule
  294. Err error // 如果此ID的处理出错,则携带错误
  295. }
  296. g, gCtx := errgroup.WithContext(ctx)
  297. ids, err := s.webForwardingRepository.GetWebForwardingWafWebAllIds(gCtx, req.HostId)
  298. if err != nil {
  299. return nil, fmt.Errorf("GetWebForwardingWafWebAllIds failed: %w", err)
  300. }
  301. if len(ids) == 0 {
  302. return nil, nil // 没有ID,直接返回空切片
  303. }
  304. // 创建一个通道来接收每个ID的处理结果
  305. // 通道缓冲区大小设为ID数量,这样发送者不会因为接收者慢而阻塞(在所有goroutine都启动后)
  306. resultsChan := make(chan CombinedResult, len(ids))
  307. for _, idVal := range ids {
  308. currentID := idVal // 捕获循环变量
  309. g.Go(func() error {
  310. var wf *model.WebForwarding
  311. var bk *model.WebForwardingRule
  312. var localErr error
  313. // 1. 获取 WebForwarding 信息
  314. wf, localErr = s.webForwardingRepository.GetWebForwarding(gCtx, int64(currentID))
  315. if localErr != nil {
  316. // 发送错误到通道,并由 errgroup 捕获
  317. // errgroup 会处理第一个非nil错误,并取消其他 goroutine
  318. resultsChan <- CombinedResult{Id: currentID, Err: fmt.Errorf("GetWebForwarding for id %d failed: %w", currentID, localErr)}
  319. return localErr // 返回错误给 errgroup
  320. }
  321. if wf == nil { // 正常情况下,如果没错误,wf不应为nil,但防御性检查
  322. localErr = fmt.Errorf("GetWebForwarding for id %d returned nil data without error", currentID)
  323. resultsChan <- CombinedResult{Id: currentID, Err: localErr}
  324. return localErr
  325. }
  326. // 2. 获取 Backend IP 信息
  327. // 注意:这里我们允许 GetWebForwardingIpsByID 可能返回 nil 数据(例如没有规则)而不是错误
  328. // 如果它也可能返回错误,则处理方式与上面类似
  329. bk, localErr = s.webForwardingRepository.GetWebForwardingIpsByID(gCtx, currentID)
  330. if localErr != nil {
  331. // 如果获取IP信息失败是一个致命错误,则也应返回错误
  332. // 如果允许部分成功(比如有WebForwarding但没有IP信息),则可以不将此视为errgroup的错误
  333. // 这里假设它也是一个需要errgroup捕获的错误
  334. resultsChan <- CombinedResult{Id: currentID, Forwarding: wf, Err: fmt.Errorf("GetWebForwardingIpsByID for id %d failed: %w", currentID, localErr)}
  335. return localErr // 返回错误给 errgroup
  336. }
  337. // bk 可能是 nil 如果没有错误且没有规则,这取决于业务逻辑
  338. // 发送成功的结果到通道
  339. resultsChan <- CombinedResult{Id: currentID, Forwarding: wf, BackendRule: bk}
  340. return nil // 此goroutine成功
  341. })
  342. }
  343. // 等待所有goroutine完成
  344. groupErr := g.Wait()
  345. // 关闭通道,表示所有发送者都已完成
  346. // 这一步很重要,这样下面的 range 循环才能正常结束
  347. close(resultsChan)
  348. // 如果 errgroup 捕获到任何错误,优先返回该错误
  349. if groupErr != nil {
  350. // 虽然errgroup已经出错了,但通道中可能已经有一些结果(来自出错前成功或出错的goroutine)
  351. // 我们需要排空通道以避免goroutine泄漏(如果它们在发送时阻塞)
  352. // 但由于我们优先返回groupErr,这些结果将被丢弃。
  353. // 在这种设计下,通常任何一个子任务失败都会导致整个操作失败。
  354. return nil, groupErr
  355. }
  356. // 如果没有错误,收集所有成功的结果
  357. finalResults := make([]v1.WebForwardingDataRequest, 0, len(ids))
  358. for res := range resultsChan {
  359. // 再次检查通道中的错误,尽管 errgroup 应该已经捕获了
  360. // 但这是一种更细致的错误处理,以防万一有goroutine在errgroup.Wait()前发送了错误但未被errgroup捕获
  361. // (理论上,如果goroutine返回了错误,errgroup会处理)
  362. // 主要目的是处理 res.forwarding 为 nil 的情况 (如果上面允许不返回错误)
  363. if res.Err != nil {
  364. // 如果到这里还有错误,说明逻辑可能有问题,或者我们决定忽略某些类型的子错误
  365. // 在此示例中,因为 g.Wait() 没有错误,所以这里的 res.err 应该是nil
  366. // 如果不是,那么可能是goroutine在return nil前发送了带有错误的res。
  367. // 严格来说,如果errgroup没有错误,这里res.err也应该是nil
  368. // 但以防万一,我们可以记录日志
  369. return nil, fmt.Errorf("received error from goroutine for ID %d: %w", res.Id, res.Err)
  370. }
  371. if res.Forwarding == nil {
  372. return nil, fmt.Errorf("received nil forwarding from goroutine for ID %d", res.Id)
  373. }
  374. dataReq := v1.WebForwardingDataRequest{
  375. Id: res.Forwarding.Id,
  376. Port: res.Forwarding.Port,
  377. Domain: res.Forwarding.Domain,
  378. IsHttps: res.Forwarding.IsHttps,
  379. Comment: res.Forwarding.Comment,
  380. HttpsKey: res.Forwarding.HttpsKey,
  381. HttpsCert: res.Forwarding.HttpsCert,
  382. Proxy: res.Forwarding.Proxy,
  383. CcConfig: v1.CcConfigRequest{
  384. IsOn: res.Forwarding.Cc,
  385. ThresholdMethod: res.Forwarding.ThresholdMethod,
  386. Level: res.Forwarding.Level,
  387. Limit5s: res.Forwarding.Limit5s,
  388. Limit60s: res.Forwarding.Limit60s,
  389. Limit300s: res.Forwarding.Limit300s,
  390. },
  391. }
  392. if res.BackendRule != nil { // 只有当 BackendRule 存在时才填充相关字段
  393. dataReq.BackendList = res.BackendRule.BackendList
  394. }
  395. finalResults = append(finalResults, dataReq)
  396. }
  397. sort.Slice(finalResults, func(i, j int) bool {
  398. return finalResults[i].Id > finalResults[j].Id
  399. })
  400. return finalResults, nil
  401. }