webforwarding.go 15 KB


  1. package waf
  2. import (
  3. "context"
  4. "fmt"
  5. v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  7. "github.com/go-nunu/nunu-layout-advanced/internal/repository/api/waf"
  8. "github.com/go-nunu/nunu-layout-advanced/internal/service"
  9. "github.com/go-nunu/nunu-layout-advanced/internal/service/api/flexCdn"
  10. "github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
  11. "golang.org/x/sync/errgroup"
  12. "sort"
  13. )
  14. type WebForwardingService interface {
  15. GetWebForwarding(ctx context.Context, req v1.GetForwardingRequest) (v1.WebForwardingDataRequest, error)
  16. GetWebForwardingWafWebAllIps(ctx context.Context, req v1.GetForwardingRequest) ([]v1.WebForwardingDataRequest, error)
  17. AddWebForwarding(ctx context.Context, req *v1.WebForwardingRequest) (int, error)
  18. EditWebForwarding(ctx context.Context, req *v1.WebForwardingRequest) error
  19. DeleteWebForwarding(ctx context.Context, req v1.DeleteWebForwardingRequest) error
  20. }
  21. func NewWebForwardingService(
  22. service *service.Service,
  23. required service.RequiredService,
  24. webForwardingRepository waf.WebForwardingRepository,
  25. crawler service.CrawlerService,
  26. parser service.ParserService,
  27. wafformatter WafFormatterService,
  28. aoDun service.AoDunService,
  29. mq *rabbitmq.RabbitMQ,
  30. gatewayIp GatewayipService,
  31. globalLimitRep waf.GlobalLimitRepository,
  32. cdn flexCdn.CdnService,
  33. proxy flexCdn.ProxyService,
  34. sslCert flexCdn.SslCertService,
  35. websocket flexCdn.WebsocketService,
  36. cc CcService,
  37. ccIpList CcIpListService,
  38. aidedWeb AidedWebService,
  39. ) WebForwardingService {
  40. return &webForwardingService{
  41. Service: service,
  42. webForwardingRepository: webForwardingRepository,
  43. required: required,
  44. parser: parser,
  45. crawler: crawler,
  46. wafformatter: wafformatter,
  47. aoDun: aoDun,
  48. mq: mq,
  49. gatewayIp: gatewayIp,
  50. cdn: cdn,
  51. globalLimitRep: globalLimitRep,
  52. proxy: proxy,
  53. sslCert: sslCert,
  54. websocket: websocket,
  55. cc: cc,
  56. ccIpList: ccIpList,
  57. aidedWeb: aidedWeb,
  58. }
  59. }
  60. type webForwardingService struct {
  61. *service.Service
  62. webForwardingRepository waf.WebForwardingRepository
  63. required service.RequiredService
  64. parser service.ParserService
  65. crawler service.CrawlerService
  66. wafformatter WafFormatterService
  67. aoDun service.AoDunService
  68. mq *rabbitmq.RabbitMQ
  69. gatewayIp GatewayipService
  70. cdn flexCdn.CdnService
  71. globalLimitRep waf.GlobalLimitRepository
  72. proxy flexCdn.ProxyService
  73. sslCert flexCdn.SslCertService
  74. websocket flexCdn.WebsocketService
  75. cc CcService
  76. ccIpList CcIpListService
  77. aidedWeb AidedWebService
  78. }
  79. func (s *webForwardingService) GetWebForwarding(ctx context.Context, req v1.GetForwardingRequest) (v1.WebForwardingDataRequest, error) {
  80. var webForwarding model.WebForwarding
  81. var backend model.WebForwardingRule
  82. g, gCtx := errgroup.WithContext(ctx)
  83. g.Go(func() error {
  84. res, e := s.webForwardingRepository.GetWebForwarding(gCtx, int64(req.Id))
  85. if e != nil {
  86. // 直接返回错误,errgroup 会捕获它
  87. return fmt.Errorf("GetWebForwarding failed: %w", e)
  88. }
  89. if res != nil {
  90. webForwarding = *res
  91. }
  92. return nil
  93. })
  94. g.Go(func() error {
  95. res, e := s.webForwardingRepository.GetWebForwardingIpsByID(ctx, req.Id)
  96. if e != nil {
  97. return fmt.Errorf("GetWebForwardingByID failed: %w", e)
  98. }
  99. if res != nil {
  100. backend = *res
  101. }
  102. return nil
  103. })
  104. if err := g.Wait(); err != nil {
  105. return v1.WebForwardingDataRequest{}, err
  106. }
  107. return v1.WebForwardingDataRequest{
  108. Id: webForwarding.Id,
  109. Port: webForwarding.Port,
  110. Domain: webForwarding.Domain,
  111. IsHttps: webForwarding.IsHttps,
  112. Comment: webForwarding.Comment,
  113. BackendList: backend.BackendList,
  114. HttpsKey: webForwarding.HttpsKey,
  115. HttpsCert: webForwarding.HttpsCert,
  116. Proxy: webForwarding.Proxy,
  117. CcConfig: v1.CcConfigRequest{
  118. IsOn: webForwarding.Cc,
  119. ThresholdMethod: webForwarding.ThresholdMethod,
  120. Level: webForwarding.Level,
  121. Limit5s: webForwarding.Limit5s,
  122. Limit60s: webForwarding.Limit60s,
  123. Limit300s: webForwarding.Limit300s,
  124. },
  125. }, nil
  126. }
  127. // AddWebForwarding 添加Web转发配置
  128. // 该函数负责创建完整的Web转发配置,包括:
  129. // 1. 数据验证和预处理
  130. // 2. SSL证书管理
  131. // 3. CDN网站创建和配置
  132. // 4. 源站服务器添加
  133. // 5. 各种功能开启(WebSocket、Proxy、日志、CC防护等)
  134. // 6. 数据库记录保存
  135. // 7. 白名单任务发布
  136. func (s *webForwardingService) AddWebForwarding(ctx context.Context, req *v1.WebForwardingRequest) (int, error) {
  137. // 1. 数据准备和验证
  138. require, formData, err := s.aidedWeb.PrepareWafData(ctx, req)
  139. if err != nil {
  140. return 0, err
  141. }
  142. if err := s.aidedWeb.ValidateAddRequest(ctx, req, require); err != nil {
  143. return 0, err
  144. }
  145. // 2. 处理SSL证书
  146. if err := s.aidedWeb.ProcessSSLCertificate(ctx, req, require, formData); err != nil {
  147. return 0, err
  148. }
  149. // 3. 创建CDN网站
  150. webId, err := s.aidedWeb.CreateCdnWebsite(ctx, req, require, formData)
  151. if err != nil {
  152. return 0, err
  153. }
  154. // 4. 配置WebSocket
  155. if err := s.aidedWeb.ConfigureWebsocket(ctx, webId); err != nil {
  156. return 0, err
  157. }
  158. // 5. 添加源站到网站
  159. cdnOriginIds, err := s.aidedWeb.AddOriginsToWebsite(ctx, req, webId)
  160. if err != nil {
  161. return 0, err
  162. }
  163. // 6. 配置各种功能
  164. if err := s.aidedWeb.ConfigureProxyProtocol(ctx, req, webId); err != nil {
  165. return 0, err
  166. }
  167. if err := s.aidedWeb.EditLog(ctx, webId); err != nil {
  168. return 0, err
  169. }
  170. if err := s.aidedWeb.ConfigureCCProtection(ctx, req, webId); err != nil {
  171. return 0, err
  172. }
  173. if err := s.aidedWeb.ConfigureWafFirewall(ctx, webId, require.GroupId); err != nil {
  174. return 0, err
  175. }
  176. // 7. 保存到数据库
  177. id, err := s.aidedWeb.SaveToDatabase(ctx, req, require, webId, cdnOriginIds)
  178. if err != nil {
  179. return 0, err
  180. }
  181. // 8. 处理异步任务
  182. s.aidedWeb.ProcessAsyncTasks(ctx, req, require)
  183. return id, nil
  184. }
  185. func (s *webForwardingService) EditWebForwarding(ctx context.Context, req *v1.WebForwardingRequest) error {
  186. // 1. 获取原始数据
  187. oldData, err := s.webForwardingRepository.GetWebForwarding(ctx, int64(req.WebForwardingData.Id))
  188. if err != nil {
  189. return fmt.Errorf("获取原始Web转发数据失败: %w", err)
  190. }
  191. // 继承旧的证书ID和策略ID,以便后续逻辑处理
  192. req.WebForwardingData.SslCertId = int64(oldData.SslCertId)
  193. req.WebForwardingData.SslPolicyId = int64(oldData.SslPolicyId)
  194. // 2. 准备WAF数据和基础验证
  195. require, formData, err := s.aidedWeb.PrepareWafData(ctx, req)
  196. if err != nil {
  197. return err
  198. }
  199. if err := s.aidedWeb.ValidateEditRequest(ctx, req, require, oldData); err != nil {
  200. return err
  201. }
  202. // 3. 处理SSL证书更新
  203. if err := s.aidedWeb.ProcessSSLCertificateUpdate(ctx, req, oldData, require); err != nil {
  204. return err
  205. }
  206. // 4. 更新核心CDN配置(端口、协议、域名、备注等)
  207. if err := s.aidedWeb.UpdateCdnConfiguration(ctx, req, oldData, require, formData); err != nil {
  208. return err
  209. }
  210. // 5. 更新Proxy Protocol配置
  211. if oldData.Proxy != req.WebForwardingData.Proxy {
  212. if err := s.aidedWeb.ConfigureProxyProtocol(ctx, req, int64(oldData.CdnWebId)); err != nil {
  213. return err
  214. }
  215. }
  216. // 6. 更新CC防护配置
  217. if err := s.aidedWeb.ConfigureCCProtection(ctx, req, int64(oldData.CdnWebId)); err != nil {
  218. return err
  219. }
  220. // 7. 处理域名白名单变更
  221. if err := s.aidedWeb.ProcessDomainWhitelistChanges(ctx, req, oldData, require); err != nil {
  222. return err
  223. }
  224. // 8. 获取后端IP规则数据
  225. ipData, err := s.webForwardingRepository.GetWebForwardingIpsByID(ctx, req.WebForwardingData.Id)
  226. if err != nil {
  227. return fmt.Errorf("获取Web转发IP规则失败: %w", err)
  228. }
  229. // 9. 处理源站IP白名单变更
  230. if err := s.aidedWeb.ProcessIpWhitelistChanges(ctx, req, ipData); err != nil {
  231. return err
  232. }
  233. // 10. 更新CDN上的源站服务器
  234. if err := s.aidedWeb.UpdateOriginServers(ctx, req, oldData, ipData); err != nil {
  235. return err
  236. }
  237. // 11. 更新本地数据库记录
  238. if err := s.aidedWeb.UpdateDatabaseRecords(ctx, req, oldData, require, ipData); err != nil {
  239. return err
  240. }
  241. return nil
  242. }
  243. // DeleteWebForwarding 批量删除Web转发配置
  244. // 该函数遍历ID列表,对每个ID执行完整的、独立的删除流程
  245. func (s *webForwardingService) DeleteWebForwarding(ctx context.Context, req v1.DeleteWebForwardingRequest) error {
  246. for _, id := range req.Ids {
  247. if err := s.deleteSingleWebForwarding(ctx, id, req.HostId, req.Uid); err != nil {
  248. // 增加错误上下文,方便定位问题
  249. return fmt.Errorf("删除Web转发配置失败 ID:%d, %w", id, err)
  250. }
  251. }
  252. return nil
  253. }
  254. // deleteSingleWebForwarding 删除单个Web转发配置的完整流程
  255. func (s *webForwardingService) deleteSingleWebForwarding(ctx context.Context, id int, hostId int, uid int) error {
  256. // 1. 获取并验证数据
  257. oldData, err := s.webForwardingRepository.GetWebForwarding(ctx, int64(id))
  258. if err != nil {
  259. return fmt.Errorf("获取Web转发数据失败: %w", err)
  260. }
  261. if err := s.aidedWeb.ValidateDeletePermission(oldData, hostId); err != nil {
  262. return err
  263. }
  264. // 2. 删除CDN服务器
  265. if err := s.aidedWeb.DeleteCdnServer(ctx, oldData.CdnWebId); err != nil {
  266. return err
  267. }
  268. // 3. 处理域名白名单清理
  269. if err := s.aidedWeb.ProcessDeleteDomainWhitelist(ctx, oldData, uid); err != nil {
  270. return err
  271. }
  272. // 4. 处理IP白名单清理
  273. if err := s.aidedWeb.ProcessDeleteIpWhitelist(ctx, id); err != nil {
  274. return err
  275. }
  276. // 5. 清理SSL证书
  277. if err := s.aidedWeb.CleanupSSLCertificate(ctx, oldData); err != nil {
  278. return err
  279. }
  280. // 6. 清理数据库记录
  281. if err := s.aidedWeb.CleanupDatabaseRecords(ctx, id); err != nil {
  282. return err
  283. }
  284. return nil
  285. }
  286. func (s *webForwardingService) GetWebForwardingWafWebAllIps(ctx context.Context, req v1.GetForwardingRequest) ([]v1.WebForwardingDataRequest, error) {
  287. type CombinedResult struct {
  288. Id int
  289. Forwarding *model.WebForwarding
  290. BackendRule *model.WebForwardingRule
  291. Err error // 如果此ID的处理出错,则携带错误
  292. }
  293. g, gCtx := errgroup.WithContext(ctx)
  294. ids, err := s.webForwardingRepository.GetWebForwardingWafWebAllIds(gCtx, req.HostId)
  295. if err != nil {
  296. return nil, fmt.Errorf("GetWebForwardingWafWebAllIds failed: %w", err)
  297. }
  298. if len(ids) == 0 {
  299. return nil, nil // 没有ID,直接返回空切片
  300. }
  301. // 创建一个通道来接收每个ID的处理结果
  302. // 通道缓冲区大小设为ID数量,这样发送者不会因为接收者慢而阻塞(在所有goroutine都启动后)
  303. resultsChan := make(chan CombinedResult, len(ids))
  304. for _, idVal := range ids {
  305. currentID := idVal // 捕获循环变量
  306. g.Go(func() error {
  307. var wf *model.WebForwarding
  308. var bk *model.WebForwardingRule
  309. var localErr error
  310. // 1. 获取 WebForwarding 信息
  311. wf, localErr = s.webForwardingRepository.GetWebForwarding(gCtx, int64(currentID))
  312. if localErr != nil {
  313. // 发送错误到通道,并由 errgroup 捕获
  314. // errgroup 会处理第一个非nil错误,并取消其他 goroutine
  315. resultsChan <- CombinedResult{Id: currentID, Err: fmt.Errorf("GetWebForwarding for id %d failed: %w", currentID, localErr)}
  316. return localErr // 返回错误给 errgroup
  317. }
  318. if wf == nil { // 正常情况下,如果没错误,wf不应为nil,但防御性检查
  319. localErr = fmt.Errorf("GetWebForwarding for id %d returned nil data without error", currentID)
  320. resultsChan <- CombinedResult{Id: currentID, Err: localErr}
  321. return localErr
  322. }
  323. // 2. 获取 Backend IP 信息
  324. // 注意:这里我们允许 GetWebForwardingIpsByID 可能返回 nil 数据(例如没有规则)而不是错误
  325. // 如果它也可能返回错误,则处理方式与上面类似
  326. bk, localErr = s.webForwardingRepository.GetWebForwardingIpsByID(gCtx, currentID)
  327. if localErr != nil {
  328. // 如果获取IP信息失败是一个致命错误,则也应返回错误
  329. // 如果允许部分成功(比如有WebForwarding但没有IP信息),则可以不将此视为errgroup的错误
  330. // 这里假设它也是一个需要errgroup捕获的错误
  331. resultsChan <- CombinedResult{Id: currentID, Forwarding: wf, Err: fmt.Errorf("GetWebForwardingIpsByID for id %d failed: %w", currentID, localErr)}
  332. return localErr // 返回错误给 errgroup
  333. }
  334. // bk 可能是 nil 如果没有错误且没有规则,这取决于业务逻辑
  335. // 发送成功的结果到通道
  336. resultsChan <- CombinedResult{Id: currentID, Forwarding: wf, BackendRule: bk}
  337. return nil // 此goroutine成功
  338. })
  339. }
  340. // 等待所有goroutine完成
  341. groupErr := g.Wait()
  342. // 关闭通道,表示所有发送者都已完成
  343. // 这一步很重要,这样下面的 range 循环才能正常结束
  344. close(resultsChan)
  345. // 如果 errgroup 捕获到任何错误,优先返回该错误
  346. if groupErr != nil {
  347. // 虽然errgroup已经出错了,但通道中可能已经有一些结果(来自出错前成功或出错的goroutine)
  348. // 我们需要排空通道以避免goroutine泄漏(如果它们在发送时阻塞)
  349. // 但由于我们优先返回groupErr,这些结果将被丢弃。
  350. // 在这种设计下,通常任何一个子任务失败都会导致整个操作失败。
  351. return nil, groupErr
  352. }
  353. // 如果没有错误,收集所有成功的结果
  354. finalResults := make([]v1.WebForwardingDataRequest, 0, len(ids))
  355. for res := range resultsChan {
  356. // 再次检查通道中的错误,尽管 errgroup 应该已经捕获了
  357. // 但这是一种更细致的错误处理,以防万一有goroutine在errgroup.Wait()前发送了错误但未被errgroup捕获
  358. // (理论上,如果goroutine返回了错误,errgroup会处理)
  359. // 主要目的是处理 res.forwarding 为 nil 的情况 (如果上面允许不返回错误)
  360. if res.Err != nil {
  361. // 如果到这里还有错误,说明逻辑可能有问题,或者我们决定忽略某些类型的子错误
  362. // 在此示例中,因为 g.Wait() 没有错误,所以这里的 res.err 应该是nil
  363. // 如果不是,那么可能是goroutine在return nil前发送了带有错误的res。
  364. // 严格来说,如果errgroup没有错误,这里res.err也应该是nil
  365. // 但以防万一,我们可以记录日志
  366. return nil, fmt.Errorf("received error from goroutine for ID %d: %w", res.Id, res.Err)
  367. }
  368. if res.Forwarding == nil {
  369. return nil, fmt.Errorf("received nil forwarding from goroutine for ID %d", res.Id)
  370. }
  371. dataReq := v1.WebForwardingDataRequest{
  372. Id: res.Forwarding.Id,
  373. Port: res.Forwarding.Port,
  374. Domain: res.Forwarding.Domain,
  375. IsHttps: res.Forwarding.IsHttps,
  376. Comment: res.Forwarding.Comment,
  377. HttpsKey: res.Forwarding.HttpsKey,
  378. HttpsCert: res.Forwarding.HttpsCert,
  379. Proxy: res.Forwarding.Proxy,
  380. CcConfig: v1.CcConfigRequest{
  381. IsOn: res.Forwarding.Cc,
  382. ThresholdMethod: res.Forwarding.ThresholdMethod,
  383. Level: res.Forwarding.Level,
  384. Limit5s: res.Forwarding.Limit5s,
  385. Limit60s: res.Forwarding.Limit60s,
  386. Limit300s: res.Forwarding.Limit300s,
  387. },
  388. }
  389. if res.BackendRule != nil { // 只有当 BackendRule 存在时才填充相关字段
  390. dataReq.BackendList = res.BackendRule.BackendList
  391. }
  392. finalResults = append(finalResults, dataReq)
  393. }
  394. sort.Slice(finalResults, func(i, j int) bool {
  395. return finalResults[i].Id > finalResults[j].Id
  396. })
  397. return finalResults, nil
  398. }