webforwarding.go 16 KB


  1. package waf
  2. import (
  3. "context"
  4. "fmt"
  5. v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  7. "github.com/go-nunu/nunu-layout-advanced/internal/repository/api/waf"
  8. "github.com/go-nunu/nunu-layout-advanced/internal/service"
  9. "github.com/go-nunu/nunu-layout-advanced/internal/service/api/flexCdn"
  10. "github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
  11. "golang.org/x/sync/errgroup"
  12. "sort"
  13. )
  14. type WebForwardingService interface {
  15. GetWebForwarding(ctx context.Context, req v1.GetForwardingRequest) (v1.WebForwardingDataRequest, error)
  16. GetWebForwardingWafWebAllIps(ctx context.Context, req v1.GetForwardingRequest) ([]v1.WebForwardingDataRequest, error)
  17. AddWebForwarding(ctx context.Context, req *v1.WebForwardingRequest) (int, error)
  18. EditWebForwarding(ctx context.Context, req *v1.WebForwardingRequest) error
  19. DeleteWebForwarding(ctx context.Context, req v1.DeleteWebForwardingRequest) error
  20. }
  21. func NewWebForwardingService(
  22. service *service.Service,
  23. required service.RequiredService,
  24. webForwardingRepository waf.WebForwardingRepository,
  25. crawler service.CrawlerService,
  26. parser service.ParserService,
  27. wafformatter WafFormatterService,
  28. aoDun service.AoDunService,
  29. mq *rabbitmq.RabbitMQ,
  30. gatewayIp GatewayipService,
  31. globalLimitRep waf.GlobalLimitRepository,
  32. cdn flexCdn.CdnService,
  33. proxy flexCdn.ProxyService,
  34. sslCert flexCdn.SslCertService,
  35. websocket flexCdn.WebsocketService,
  36. cc CcService,
  37. ccIpList CcIpListService,
  38. aidedWeb AidedWebService,
  39. ) WebForwardingService {
  40. return &webForwardingService{
  41. Service: service,
  42. webForwardingRepository: webForwardingRepository,
  43. required: required,
  44. parser: parser,
  45. crawler: crawler,
  46. wafformatter: wafformatter,
  47. aoDun: aoDun,
  48. mq: mq,
  49. gatewayIp: gatewayIp,
  50. cdn: cdn,
  51. globalLimitRep: globalLimitRep,
  52. proxy: proxy,
  53. sslCert: sslCert,
  54. websocket: websocket,
  55. cc: cc,
  56. ccIpList: ccIpList,
  57. aidedWeb: aidedWeb,
  58. }
  59. }
  60. type webForwardingService struct {
  61. *service.Service
  62. webForwardingRepository waf.WebForwardingRepository
  63. required service.RequiredService
  64. parser service.ParserService
  65. crawler service.CrawlerService
  66. wafformatter WafFormatterService
  67. aoDun service.AoDunService
  68. mq *rabbitmq.RabbitMQ
  69. gatewayIp GatewayipService
  70. cdn flexCdn.CdnService
  71. globalLimitRep waf.GlobalLimitRepository
  72. proxy flexCdn.ProxyService
  73. sslCert flexCdn.SslCertService
  74. websocket flexCdn.WebsocketService
  75. cc CcService
  76. ccIpList CcIpListService
  77. aidedWeb AidedWebService
  78. }
  79. func (s *webForwardingService) GetWebForwarding(ctx context.Context, req v1.GetForwardingRequest) (v1.WebForwardingDataRequest, error) {
  80. var webForwarding model.WebForwarding
  81. var backend model.WebForwardingRule
  82. g, gCtx := errgroup.WithContext(ctx)
  83. g.Go(func() error {
  84. res, e := s.webForwardingRepository.GetWebForwarding(gCtx, int64(req.Id))
  85. if e != nil {
  86. // 直接返回错误,errgroup 会捕获它
  87. return fmt.Errorf("GetWebForwarding failed: %w", e)
  88. }
  89. if res != nil {
  90. webForwarding = *res
  91. }
  92. return nil
  93. })
  94. g.Go(func() error {
  95. res, e := s.webForwardingRepository.GetWebForwardingIpsByID(ctx, req.Id)
  96. if e != nil {
  97. return fmt.Errorf("GetWebForwardingByID failed: %w", e)
  98. }
  99. if res != nil {
  100. backend = *res
  101. }
  102. return nil
  103. })
  104. if err := g.Wait(); err != nil {
  105. return v1.WebForwardingDataRequest{}, err
  106. }
  107. return v1.WebForwardingDataRequest{
  108. Id: webForwarding.Id,
  109. Port: webForwarding.Port,
  110. Domain: webForwarding.Domain,
  111. IsHttps: webForwarding.IsHttps,
  112. Comment: webForwarding.Comment,
  113. BackendList: backend.BackendList,
  114. HttpsKey: webForwarding.HttpsKey,
  115. HttpsCert: webForwarding.HttpsCert,
  116. Proxy: webForwarding.Proxy,
  117. CcConfig: v1.CcConfigRequest{
  118. IsOn: webForwarding.Cc,
  119. ThresholdMethod: webForwarding.ThresholdMethod,
  120. Level: webForwarding.Level,
  121. Limit5s: webForwarding.Limit5s,
  122. Limit60s: webForwarding.Limit60s,
  123. Limit300s: webForwarding.Limit300s,
  124. },
  125. }, nil
  126. }
  127. // AddWebForwarding 添加Web转发配置
  128. // 该函数负责创建完整的Web转发配置,包括:
  129. // 1. 数据验证和预处理
  130. // 2. SSL证书管理
  131. // 3. CDN网站创建和配置
  132. // 4. 源站服务器添加
  133. // 5. 各种功能开启(WebSocket、Proxy、日志、CC防护等)
  134. // 6. 数据库记录保存
  135. // 7. 白名单任务发布
  136. func (s *webForwardingService) AddWebForwarding(ctx context.Context, req *v1.WebForwardingRequest) (int, error) {
  137. // 1. 数据准备和验证
  138. require, formData, err := s.aidedWeb.PrepareWafData(ctx, req)
  139. if err != nil {
  140. return 0, err
  141. }
  142. if err := s.aidedWeb.ValidateAddRequest(ctx, req, require); err != nil {
  143. return 0, err
  144. }
  145. // 2. 处理SSL证书
  146. if err := s.aidedWeb.ProcessSSLCertificate(ctx, req, require.CdnUid); err != nil {
  147. return 0, err
  148. }
  149. // 3. 创建CDN网站
  150. webId, err := s.aidedWeb.CreateCdnWebsite(ctx, formData)
  151. if err != nil {
  152. return 0, err
  153. }
  154. // 4. 配置WebSocket
  155. if err := s.aidedWeb.ConfigureWebsocket(ctx, webId); err != nil {
  156. return 0, err
  157. }
  158. // 5. 添加源站到网站
  159. cdnOriginIds, err := s.aidedWeb.AddOriginsToWebsite(ctx, req, webId)
  160. if err != nil {
  161. return 0, err
  162. }
  163. // 6. 配置各种功能
  164. if err := s.aidedWeb.ConfigureProxyProtocol(ctx, req.WebForwardingData.Proxy, webId); err != nil {
  165. return 0, err
  166. }
  167. if err := s.aidedWeb.EditLog(ctx, webId); err != nil {
  168. return 0, err
  169. }
  170. if err := s.aidedWeb.ConfigureCCProtection(ctx, req.WebForwardingData.CcConfig, webId); err != nil {
  171. return 0, err
  172. }
  173. if err := s.aidedWeb.ConfigureWafFirewall(ctx, webId, require.GroupId); err != nil {
  174. return 0, err
  175. }
  176. // 7. 保存到数据库
  177. id, err := s.aidedWeb.SaveToDatabase(ctx, req, require, webId, cdnOriginIds)
  178. if err != nil {
  179. return 0, err
  180. }
  181. // 8. 处理异步任务
  182. s.aidedWeb.ProcessAsyncTasks(ctx, req, require)
  183. return id, nil
  184. }
  185. func (s *webForwardingService) EditWebForwarding(ctx context.Context, req *v1.WebForwardingRequest) error {
  186. // 1. 获取原始数据
  187. oldData, err := s.webForwardingRepository.GetWebForwarding(ctx, int64(req.WebForwardingData.Id))
  188. if err != nil {
  189. return fmt.Errorf("获取原始Web转发数据失败: %w", err)
  190. }
  191. if s.aidedWeb.ValidateDeletePermission(oldData.HostId, req.HostId) != nil {
  192. return fmt.Errorf("用户权限不足")
  193. }
  194. // 继承旧的证书ID和策略ID,以便后续逻辑处理
  195. req.WebForwardingData.SslCertId = int64(oldData.SslCertId)
  196. req.WebForwardingData.SslPolicyId = int64(oldData.SslPolicyId)
  197. // 2. 准备WAF数据和基础验证
  198. require, formData, err := s.aidedWeb.PrepareWafData(ctx, req)
  199. if err != nil {
  200. return err
  201. }
  202. if err := s.aidedWeb.ValidateEditRequest(ctx, req); err != nil {
  203. return err
  204. }
  205. // 3. 处理SSL证书更新
  206. if err := s.aidedWeb.ProcessSSLCertificateUpdate(ctx, req, oldData, require.CdnUid); err != nil {
  207. return err
  208. }
  209. // 4. 更新核心CDN配置(端口、协议、域名、备注等)
  210. if err := s.aidedWeb.UpdateCdnConfiguration(ctx, req, oldData, require.Tag, formData); err != nil {
  211. return err
  212. }
  213. // 5. 更新Proxy Protocol配置
  214. if err := s.aidedWeb.ConfigureProxyProtocol(ctx, req.WebForwardingData.Proxy, int64(oldData.CdnWebId)); err != nil {
  215. return err
  216. }
  217. // 6. 更新CC防护配置
  218. if err := s.aidedWeb.ConfigureCCProtection(ctx, req.WebForwardingData.CcConfig, int64(oldData.CdnWebId)); err != nil {
  219. return err
  220. }
  221. // 7. 处理域名白名单变更
  222. if err := s.aidedWeb.ProcessDomainWhitelistChanges(ctx, req, oldData, require); err != nil {
  223. return err
  224. }
  225. // 8. 获取后端IP规则数据
  226. ipData, err := s.webForwardingRepository.GetWebForwardingIpsByID(ctx, req.WebForwardingData.Id)
  227. if err != nil {
  228. return fmt.Errorf("获取Web转发IP规则失败: %w", err)
  229. }
  230. // 9. 处理源站IP白名单变更
  231. if err := s.aidedWeb.ProcessIpWhitelistChanges(ctx, req, ipData); err != nil {
  232. return err
  233. }
  234. // 10. 更新CDN上的源站服务器
  235. if err := s.aidedWeb.UpdateOriginServers(ctx, req, oldData, ipData); err != nil {
  236. return err
  237. }
  238. // 11. 更新本地数据库记录
  239. if err := s.aidedWeb.UpdateDatabaseRecords(ctx, req, oldData, require, ipData); err != nil {
  240. return err
  241. }
  242. return nil
  243. }
  244. // DeleteWebForwarding 批量删除Web转发配置
  245. // 该函数遍历ID列表,对每个ID执行完整的、独立的删除流程
  246. func (s *webForwardingService) DeleteWebForwarding(ctx context.Context, req v1.DeleteWebForwardingRequest) error {
  247. for _, id := range req.Ids {
  248. if err := s.deleteSingleWebForwarding(ctx, id, req.HostId, req.Uid); err != nil {
  249. // 增加错误上下文,方便定位问题
  250. return fmt.Errorf("删除Web转发配置失败 ID:%d, %w", id, err)
  251. }
  252. }
  253. return nil
  254. }
  255. // deleteSingleWebForwarding 删除单个Web转发配置的完整流程
  256. func (s *webForwardingService) deleteSingleWebForwarding(ctx context.Context, id int, hostId int, uid int) error {
  257. // 1. 获取并验证数据
  258. oldData, err := s.webForwardingRepository.GetWebForwarding(ctx, int64(id))
  259. if err != nil {
  260. return fmt.Errorf("获取Web转发数据失败: %w", err)
  261. }
  262. if err := s.aidedWeb.ValidateDeletePermission(oldData.HostId, hostId); err != nil {
  263. return err
  264. }
  265. // 2. 删除CDN服务器
  266. if err := s.aidedWeb.DeleteCdnServer(ctx, oldData.CdnWebId); err != nil {
  267. return err
  268. }
  269. // 3. 处理域名白名单清理
  270. if err := s.aidedWeb.ProcessDeleteDomainWhitelist(ctx, oldData, uid); err != nil {
  271. return err
  272. }
  273. // 4. 处理IP白名单清理
  274. if err := s.aidedWeb.ProcessDeleteIpWhitelist(ctx, id); err != nil {
  275. return err
  276. }
  277. // 5. 清理SSL证书
  278. if err := s.aidedWeb.CleanupSSLCertificate(ctx, oldData); err != nil {
  279. return err
  280. }
  281. // 6. 清理数据库记录
  282. if err := s.aidedWeb.CleanupDatabaseRecords(ctx, id); err != nil {
  283. return err
  284. }
  285. return nil
  286. }
  287. func (s *webForwardingService) GetWebForwardingWafWebAllIps(ctx context.Context, req v1.GetForwardingRequest) ([]v1.WebForwardingDataRequest, error) {
  288. type CombinedResult struct {
  289. Id int
  290. Forwarding *model.WebForwarding
  291. BackendRule *model.WebForwardingRule
  292. Err error // 如果此ID的处理出错,则携带错误
  293. }
  294. g, gCtx := errgroup.WithContext(ctx)
  295. ids, err := s.webForwardingRepository.GetWebForwardingWafWebAllIds(gCtx, req.HostId)
  296. if err != nil {
  297. return nil, fmt.Errorf("GetWebForwardingWafWebAllIds failed: %w", err)
  298. }
  299. if len(ids) == 0 {
  300. return nil, nil // 没有ID,直接返回空切片
  301. }
  302. // 创建一个通道来接收每个ID的处理结果
  303. // 通道缓冲区大小设为ID数量,这样发送者不会因为接收者慢而阻塞(在所有goroutine都启动后)
  304. resultsChan := make(chan CombinedResult, len(ids))
  305. for _, idVal := range ids {
  306. currentID := idVal // 捕获循环变量
  307. g.Go(func() error {
  308. var wf *model.WebForwarding
  309. var bk *model.WebForwardingRule
  310. var localErr error
  311. // 1. 获取 WebForwarding 信息
  312. wf, localErr = s.webForwardingRepository.GetWebForwarding(gCtx, int64(currentID))
  313. if localErr != nil {
  314. // 发送错误到通道,并由 errgroup 捕获
  315. // errgroup 会处理第一个非nil错误,并取消其他 goroutine
  316. resultsChan <- CombinedResult{Id: currentID, Err: fmt.Errorf("GetWebForwarding for id %d failed: %w", currentID, localErr)}
  317. return localErr // 返回错误给 errgroup
  318. }
  319. if wf == nil { // 正常情况下,如果没错误,wf不应为nil,但防御性检查
  320. localErr = fmt.Errorf("GetWebForwarding for id %d returned nil data without error", currentID)
  321. resultsChan <- CombinedResult{Id: currentID, Err: localErr}
  322. return localErr
  323. }
  324. // 2. 获取 Backend IP 信息
  325. // 注意:这里我们允许 GetWebForwardingIpsByID 可能返回 nil 数据(例如没有规则)而不是错误
  326. // 如果它也可能返回错误,则处理方式与上面类似
  327. bk, localErr = s.webForwardingRepository.GetWebForwardingIpsByID(gCtx, currentID)
  328. if localErr != nil {
  329. // 如果获取IP信息失败是一个致命错误,则也应返回错误
  330. // 如果允许部分成功(比如有WebForwarding但没有IP信息),则可以不将此视为errgroup的错误
  331. // 这里假设它也是一个需要errgroup捕获的错误
  332. resultsChan <- CombinedResult{Id: currentID, Forwarding: wf, Err: fmt.Errorf("GetWebForwardingIpsByID for id %d failed: %w", currentID, localErr)}
  333. return localErr // 返回错误给 errgroup
  334. }
  335. // bk 可能是 nil 如果没有错误且没有规则,这取决于业务逻辑
  336. // 发送成功的结果到通道
  337. resultsChan <- CombinedResult{Id: currentID, Forwarding: wf, BackendRule: bk}
  338. return nil // 此goroutine成功
  339. })
  340. }
  341. // 等待所有goroutine完成
  342. groupErr := g.Wait()
  343. // 关闭通道,表示所有发送者都已完成
  344. // 这一步很重要,这样下面的 range 循环才能正常结束
  345. close(resultsChan)
  346. // 如果 errgroup 捕获到任何错误,优先返回该错误
  347. if groupErr != nil {
  348. // 虽然errgroup已经出错了,但通道中可能已经有一些结果(来自出错前成功或出错的goroutine)
  349. // 我们需要排空通道以避免goroutine泄漏(如果它们在发送时阻塞)
  350. // 但由于我们优先返回groupErr,这些结果将被丢弃。
  351. // 在这种设计下,通常任何一个子任务失败都会导致整个操作失败。
  352. return nil, groupErr
  353. }
  354. // 如果没有错误,收集所有成功的结果
  355. finalResults := make([]v1.WebForwardingDataRequest, 0, len(ids))
  356. for res := range resultsChan {
  357. // 再次检查通道中的错误,尽管 errgroup 应该已经捕获了
  358. // 但这是一种更细致的错误处理,以防万一有goroutine在errgroup.Wait()前发送了错误但未被errgroup捕获
  359. // (理论上,如果goroutine返回了错误,errgroup会处理)
  360. // 主要目的是处理 res.forwarding 为 nil 的情况 (如果上面允许不返回错误)
  361. if res.Err != nil {
  362. // 如果到这里还有错误,说明逻辑可能有问题,或者我们决定忽略某些类型的子错误
  363. // 在此示例中,因为 g.Wait() 没有错误,所以这里的 res.err 应该是nil
  364. // 如果不是,那么可能是goroutine在return nil前发送了带有错误的res。
  365. // 严格来说,如果errgroup没有错误,这里res.err也应该是nil
  366. // 但以防万一,我们可以记录日志
  367. return nil, fmt.Errorf("received error from goroutine for ID %d: %w", res.Id, res.Err)
  368. }
  369. if res.Forwarding == nil {
  370. return nil, fmt.Errorf("received nil forwarding from goroutine for ID %d", res.Id)
  371. }
  372. dataReq := v1.WebForwardingDataRequest{
  373. Id: res.Forwarding.Id,
  374. Port: res.Forwarding.Port,
  375. Domain: res.Forwarding.Domain,
  376. IsHttps: res.Forwarding.IsHttps,
  377. Comment: res.Forwarding.Comment,
  378. HttpsKey: res.Forwarding.HttpsKey,
  379. HttpsCert: res.Forwarding.HttpsCert,
  380. Proxy: res.Forwarding.Proxy,
  381. CcConfig: v1.CcConfigRequest{
  382. IsOn: res.Forwarding.Cc,
  383. ThresholdMethod: res.Forwarding.ThresholdMethod,
  384. Level: res.Forwarding.Level,
  385. Limit5s: res.Forwarding.Limit5s,
  386. Limit60s: res.Forwarding.Limit60s,
  387. Limit300s: res.Forwarding.Limit300s,
  388. },
  389. }
  390. if res.BackendRule != nil { // 只有当 BackendRule 存在时才填充相关字段
  391. dataReq.BackendList = res.BackendRule.BackendList
  392. }
  393. finalResults = append(finalResults, dataReq)
  394. }
  395. sort.Slice(finalResults, func(i, j int) bool {
  396. return finalResults[i].Id > finalResults[j].Id
  397. })
  398. return finalResults, nil
  399. }