webforwarding.go 16 KB


  1. package waf
  2. import (
  3. "context"
  4. "fmt"
  5. v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  7. "github.com/go-nunu/nunu-layout-advanced/internal/repository/api/waf"
  8. "github.com/go-nunu/nunu-layout-advanced/internal/service"
  9. "github.com/go-nunu/nunu-layout-advanced/internal/service/api/flexCdn"
  10. "github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
  11. "golang.org/x/sync/errgroup"
  12. "sort"
  13. )
  14. type WebForwardingService interface {
  15. GetWebForwarding(ctx context.Context, req v1.GetForwardingRequest) (v1.WebForwardingDataRequest, error)
  16. GetWebForwardingWafWebAllIps(ctx context.Context, req v1.GetForwardingRequest) ([]v1.WebForwardingDataRequest, error)
  17. AddWebForwarding(ctx context.Context, req *v1.WebForwardingRequest) (int, error)
  18. EditWebForwarding(ctx context.Context, req *v1.WebForwardingRequest) error
  19. DeleteWebForwarding(ctx context.Context, req v1.DeleteWebForwardingRequest) error
  20. }
  21. func NewWebForwardingService(
  22. service *service.Service,
  23. required service.RequiredService,
  24. webForwardingRepository waf.WebForwardingRepository,
  25. crawler service.CrawlerService,
  26. parser service.ParserService,
  27. wafformatter WafFormatterService,
  28. aoDun service.AoDunService,
  29. mq *rabbitmq.RabbitMQ,
  30. gatewayIp GatewayipService,
  31. globalLimitRep waf.GlobalLimitRepository,
  32. cdn flexCdn.CdnService,
  33. proxy flexCdn.ProxyService,
  34. sslCert flexCdn.SslCertService,
  35. websocket flexCdn.WebsocketService,
  36. cc CcService,
  37. ccIpList CcIpListService,
  38. aidedWeb AidedWebService,
  39. ) WebForwardingService {
  40. return &webForwardingService{
  41. Service: service,
  42. webForwardingRepository: webForwardingRepository,
  43. required: required,
  44. parser: parser,
  45. crawler: crawler,
  46. wafformatter: wafformatter,
  47. aoDun: aoDun,
  48. mq: mq,
  49. gatewayIp: gatewayIp,
  50. cdn: cdn,
  51. globalLimitRep: globalLimitRep,
  52. proxy: proxy,
  53. sslCert: sslCert,
  54. websocket: websocket,
  55. cc: cc,
  56. ccIpList: ccIpList,
  57. aidedWeb: aidedWeb,
  58. }
  59. }
  60. type webForwardingService struct {
  61. *service.Service
  62. webForwardingRepository waf.WebForwardingRepository
  63. required service.RequiredService
  64. parser service.ParserService
  65. crawler service.CrawlerService
  66. wafformatter WafFormatterService
  67. aoDun service.AoDunService
  68. mq *rabbitmq.RabbitMQ
  69. gatewayIp GatewayipService
  70. cdn flexCdn.CdnService
  71. globalLimitRep waf.GlobalLimitRepository
  72. proxy flexCdn.ProxyService
  73. sslCert flexCdn.SslCertService
  74. websocket flexCdn.WebsocketService
  75. cc CcService
  76. ccIpList CcIpListService
  77. aidedWeb AidedWebService
  78. }
  79. func (s *webForwardingService) GetWebForwarding(ctx context.Context, req v1.GetForwardingRequest) (v1.WebForwardingDataRequest, error) {
  80. var webForwarding model.WebForwarding
  81. var backend model.WebForwardingRule
  82. g, gCtx := errgroup.WithContext(ctx)
  83. g.Go(func() error {
  84. res, e := s.webForwardingRepository.GetWebForwarding(gCtx, int64(req.Id))
  85. if e != nil {
  86. // 直接返回错误,errgroup 会捕获它
  87. return fmt.Errorf("GetWebForwarding failed: %w", e)
  88. }
  89. if res != nil {
  90. webForwarding = *res
  91. }
  92. return nil
  93. })
  94. g.Go(func() error {
  95. res, e := s.webForwardingRepository.GetWebForwardingIpsByID(ctx, req.Id)
  96. if e != nil {
  97. return fmt.Errorf("GetWebForwardingByID failed: %w", e)
  98. }
  99. if res != nil {
  100. backend = *res
  101. }
  102. return nil
  103. })
  104. if err := g.Wait(); err != nil {
  105. return v1.WebForwardingDataRequest{}, err
  106. }
  107. return v1.WebForwardingDataRequest{
  108. Id: webForwarding.Id,
  109. Port: webForwarding.Port,
  110. Domain: webForwarding.Domain,
  111. IsHttps: webForwarding.IsHttps,
  112. Comment: webForwarding.Comment,
  113. BackendList: backend.BackendList,
  114. HttpsKey: webForwarding.HttpsKey,
  115. HttpsCert: webForwarding.HttpsCert,
  116. Proxy: webForwarding.Proxy,
  117. CcConfig: v1.CcConfigRequest{
  118. IsOn: webForwarding.Cc,
  119. ThresholdMethod: webForwarding.ThresholdMethod,
  120. Level: webForwarding.Level,
  121. Limit5s: webForwarding.Limit5s,
  122. Limit60s: webForwarding.Limit60s,
  123. Limit300s: webForwarding.Limit300s,
  124. },
  125. }, nil
  126. }
  127. // AddWebForwarding 添加Web转发配置
  128. // 该函数负责创建完整的Web转发配置,包括:
  129. // 1. 数据验证和预处理
  130. // 2. SSL证书管理
  131. // 3. CDN网站创建和配置
  132. // 4. 源站服务器添加
  133. // 5. 各种功能开启(WebSocket、Proxy、日志、CC防护等)
  134. // 6. 数据库记录保存
  135. // 7. 白名单任务发布
  136. func (s *webForwardingService) AddWebForwarding(ctx context.Context, req *v1.WebForwardingRequest) (int, error) {
  137. // 1. 数据准备和验证
  138. require, formData, err := s.aidedWeb.PrepareWafData(ctx, req)
  139. if err != nil {
  140. return 0, err
  141. }
  142. if err := s.aidedWeb.ValidateAddRequest(ctx, req, require); err != nil {
  143. return 0, err
  144. }
  145. // 2. 处理SSL证书
  146. if err := s.aidedWeb.ProcessSSLCertificate(ctx, req, require.CdnUid); err != nil {
  147. return 0, err
  148. }
  149. // 3. 创建CDN网站
  150. webId, err := s.aidedWeb.CreateCdnWebsite(ctx, formData)
  151. if err != nil {
  152. return 0, err
  153. }
  154. // 4. 配置WebSocket
  155. if err := s.aidedWeb.ConfigureWebsocket(ctx, webId); err != nil {
  156. return 0, err
  157. }
  158. // 5. 添加源站到网站
  159. cdnOriginIds, err := s.aidedWeb.AddOriginsToWebsite(ctx, req, webId)
  160. if err != nil {
  161. return 0, err
  162. }
  163. // 6. 配置各种功能
  164. if err := s.aidedWeb.ConfigureProxyProtocol(ctx, req.WebForwardingData.Proxy, webId); err != nil {
  165. return 0, err
  166. }
  167. if err := s.aidedWeb.EditLog(ctx, webId); err != nil {
  168. return 0, err
  169. }
  170. if err := s.aidedWeb.ConfigureCCProtection(ctx, req.WebForwardingData.CcConfig, webId); err != nil {
  171. return 0, err
  172. }
  173. if err := s.aidedWeb.ConfigureWafFirewall(ctx, webId, require.GroupId); err != nil {
  174. return 0, err
  175. }
  176. // 7. 保存到数据库
  177. id, err := s.aidedWeb.SaveToDatabase(ctx, req, require, webId, cdnOriginIds)
  178. if err != nil {
  179. return 0, err
  180. }
  181. // 8. 处理异步任务
  182. s.aidedWeb.ProcessAsyncTasks(ctx, req, require)
  183. return id, nil
  184. }
  185. func (s *webForwardingService) EditWebForwarding(ctx context.Context, req *v1.WebForwardingRequest) error {
  186. // 1. 获取原始数据
  187. oldData, err := s.webForwardingRepository.GetWebForwarding(ctx, int64(req.WebForwardingData.Id))
  188. if err != nil {
  189. return fmt.Errorf("获取原始Web转发数据失败: %w", err)
  190. }
  191. if s.aidedWeb.ValidateDeletePermission(oldData.HostId, req.HostId) != nil {
  192. return fmt.Errorf("用户权限不足")
  193. }
  194. // 继承旧的证书ID和策略ID,以便后续逻辑处理
  195. req.WebForwardingData.SslCertId = int64(oldData.SslCertId)
  196. req.WebForwardingData.SslPolicyId = int64(oldData.SslPolicyId)
  197. // 2. 准备WAF数据和基础验证
  198. require, formData, err := s.aidedWeb.PrepareWafData(ctx, req)
  199. if err != nil {
  200. return err
  201. }
  202. if err := s.aidedWeb.ValidateEditRequest(ctx, req); err != nil {
  203. return err
  204. }
  205. // 3. 处理SSL证书更新
  206. if err := s.aidedWeb.ProcessSSLCertificateUpdate(ctx, req, oldData, require.CdnUid); err != nil {
  207. return err
  208. }
  209. // 4. 更新核心CDN配置(端口、协议、域名、备注等)
  210. if err := s.aidedWeb.UpdateCdnConfiguration(ctx, req, oldData, require.Tag, formData); err != nil {
  211. return err
  212. }
  213. // 5. 更新Proxy Protocol配置
  214. if oldData.Proxy != req.WebForwardingData.Proxy {
  215. if err := s.aidedWeb.ConfigureProxyProtocol(ctx, req.WebForwardingData.Proxy, int64(oldData.CdnWebId)); err != nil {
  216. return err
  217. }
  218. }
  219. // 6. 更新CC防护配置
  220. if err := s.aidedWeb.ConfigureCCProtection(ctx, req.WebForwardingData.CcConfig, int64(oldData.CdnWebId)); err != nil {
  221. return err
  222. }
  223. // 7. 处理域名白名单变更
  224. if err := s.aidedWeb.ProcessDomainWhitelistChanges(ctx, req, oldData, require); err != nil {
  225. return err
  226. }
  227. // 8. 获取后端IP规则数据
  228. ipData, err := s.webForwardingRepository.GetWebForwardingIpsByID(ctx, req.WebForwardingData.Id)
  229. if err != nil {
  230. return fmt.Errorf("获取Web转发IP规则失败: %w", err)
  231. }
  232. // 9. 处理源站IP白名单变更
  233. if err := s.aidedWeb.ProcessIpWhitelistChanges(ctx, req, ipData); err != nil {
  234. return err
  235. }
  236. // 10. 更新CDN上的源站服务器
  237. if err := s.aidedWeb.UpdateOriginServers(ctx, req, oldData, ipData); err != nil {
  238. return err
  239. }
  240. // 11. 更新本地数据库记录
  241. if err := s.aidedWeb.UpdateDatabaseRecords(ctx, req, oldData, require, ipData); err != nil {
  242. return err
  243. }
  244. return nil
  245. }
  246. // DeleteWebForwarding 批量删除Web转发配置
  247. // 该函数遍历ID列表,对每个ID执行完整的、独立的删除流程
  248. func (s *webForwardingService) DeleteWebForwarding(ctx context.Context, req v1.DeleteWebForwardingRequest) error {
  249. for _, id := range req.Ids {
  250. if err := s.deleteSingleWebForwarding(ctx, id, req.HostId, req.Uid); err != nil {
  251. // 增加错误上下文,方便定位问题
  252. return fmt.Errorf("删除Web转发配置失败 ID:%d, %w", id, err)
  253. }
  254. }
  255. return nil
  256. }
  257. // deleteSingleWebForwarding 删除单个Web转发配置的完整流程
  258. func (s *webForwardingService) deleteSingleWebForwarding(ctx context.Context, id int, hostId int, uid int) error {
  259. // 1. 获取并验证数据
  260. oldData, err := s.webForwardingRepository.GetWebForwarding(ctx, int64(id))
  261. if err != nil {
  262. return fmt.Errorf("获取Web转发数据失败: %w", err)
  263. }
  264. if err := s.aidedWeb.ValidateDeletePermission(oldData.HostId, hostId); err != nil {
  265. return err
  266. }
  267. // 2. 删除CDN服务器
  268. if err := s.aidedWeb.DeleteCdnServer(ctx, oldData.CdnWebId); err != nil {
  269. return err
  270. }
  271. // 3. 处理域名白名单清理
  272. if err := s.aidedWeb.ProcessDeleteDomainWhitelist(ctx, oldData, uid); err != nil {
  273. return err
  274. }
  275. // 4. 处理IP白名单清理
  276. if err := s.aidedWeb.ProcessDeleteIpWhitelist(ctx, id); err != nil {
  277. return err
  278. }
  279. // 5. 清理SSL证书
  280. if err := s.aidedWeb.CleanupSSLCertificate(ctx, oldData); err != nil {
  281. return err
  282. }
  283. // 6. 清理数据库记录
  284. if err := s.aidedWeb.CleanupDatabaseRecords(ctx, id); err != nil {
  285. return err
  286. }
  287. return nil
  288. }
  289. func (s *webForwardingService) GetWebForwardingWafWebAllIps(ctx context.Context, req v1.GetForwardingRequest) ([]v1.WebForwardingDataRequest, error) {
  290. type CombinedResult struct {
  291. Id int
  292. Forwarding *model.WebForwarding
  293. BackendRule *model.WebForwardingRule
  294. Err error // 如果此ID的处理出错,则携带错误
  295. }
  296. g, gCtx := errgroup.WithContext(ctx)
  297. ids, err := s.webForwardingRepository.GetWebForwardingWafWebAllIds(gCtx, req.HostId)
  298. if err != nil {
  299. return nil, fmt.Errorf("GetWebForwardingWafWebAllIds failed: %w", err)
  300. }
  301. if len(ids) == 0 {
  302. return nil, nil // 没有ID,直接返回空切片
  303. }
  304. // 创建一个通道来接收每个ID的处理结果
  305. // 通道缓冲区大小设为ID数量,这样发送者不会因为接收者慢而阻塞(在所有goroutine都启动后)
  306. resultsChan := make(chan CombinedResult, len(ids))
  307. for _, idVal := range ids {
  308. currentID := idVal // 捕获循环变量
  309. g.Go(func() error {
  310. var wf *model.WebForwarding
  311. var bk *model.WebForwardingRule
  312. var localErr error
  313. // 1. 获取 WebForwarding 信息
  314. wf, localErr = s.webForwardingRepository.GetWebForwarding(gCtx, int64(currentID))
  315. if localErr != nil {
  316. // 发送错误到通道,并由 errgroup 捕获
  317. // errgroup 会处理第一个非nil错误,并取消其他 goroutine
  318. resultsChan <- CombinedResult{Id: currentID, Err: fmt.Errorf("GetWebForwarding for id %d failed: %w", currentID, localErr)}
  319. return localErr // 返回错误给 errgroup
  320. }
  321. if wf == nil { // 正常情况下,如果没错误,wf不应为nil,但防御性检查
  322. localErr = fmt.Errorf("GetWebForwarding for id %d returned nil data without error", currentID)
  323. resultsChan <- CombinedResult{Id: currentID, Err: localErr}
  324. return localErr
  325. }
  326. // 2. 获取 Backend IP 信息
  327. // 注意:这里我们允许 GetWebForwardingIpsByID 可能返回 nil 数据(例如没有规则)而不是错误
  328. // 如果它也可能返回错误,则处理方式与上面类似
  329. bk, localErr = s.webForwardingRepository.GetWebForwardingIpsByID(gCtx, currentID)
  330. if localErr != nil {
  331. // 如果获取IP信息失败是一个致命错误,则也应返回错误
  332. // 如果允许部分成功(比如有WebForwarding但没有IP信息),则可以不将此视为errgroup的错误
  333. // 这里假设它也是一个需要errgroup捕获的错误
  334. resultsChan <- CombinedResult{Id: currentID, Forwarding: wf, Err: fmt.Errorf("GetWebForwardingIpsByID for id %d failed: %w", currentID, localErr)}
  335. return localErr // 返回错误给 errgroup
  336. }
  337. // bk 可能是 nil 如果没有错误且没有规则,这取决于业务逻辑
  338. // 发送成功的结果到通道
  339. resultsChan <- CombinedResult{Id: currentID, Forwarding: wf, BackendRule: bk}
  340. return nil // 此goroutine成功
  341. })
  342. }
  343. // 等待所有goroutine完成
  344. groupErr := g.Wait()
  345. // 关闭通道,表示所有发送者都已完成
  346. // 这一步很重要,这样下面的 range 循环才能正常结束
  347. close(resultsChan)
  348. // 如果 errgroup 捕获到任何错误,优先返回该错误
  349. if groupErr != nil {
  350. // 虽然errgroup已经出错了,但通道中可能已经有一些结果(来自出错前成功或出错的goroutine)
  351. // 我们需要排空通道以避免goroutine泄漏(如果它们在发送时阻塞)
  352. // 但由于我们优先返回groupErr,这些结果将被丢弃。
  353. // 在这种设计下,通常任何一个子任务失败都会导致整个操作失败。
  354. return nil, groupErr
  355. }
  356. // 如果没有错误,收集所有成功的结果
  357. finalResults := make([]v1.WebForwardingDataRequest, 0, len(ids))
  358. for res := range resultsChan {
  359. // 再次检查通道中的错误,尽管 errgroup 应该已经捕获了
  360. // 但这是一种更细致的错误处理,以防万一有goroutine在errgroup.Wait()前发送了错误但未被errgroup捕获
  361. // (理论上,如果goroutine返回了错误,errgroup会处理)
  362. // 主要目的是处理 res.forwarding 为 nil 的情况 (如果上面允许不返回错误)
  363. if res.Err != nil {
  364. // 如果到这里还有错误,说明逻辑可能有问题,或者我们决定忽略某些类型的子错误
  365. // 在此示例中,因为 g.Wait() 没有错误,所以这里的 res.err 应该是nil
  366. // 如果不是,那么可能是goroutine在return nil前发送了带有错误的res。
  367. // 严格来说,如果errgroup没有错误,这里res.err也应该是nil
  368. // 但以防万一,我们可以记录日志
  369. return nil, fmt.Errorf("received error from goroutine for ID %d: %w", res.Id, res.Err)
  370. }
  371. if res.Forwarding == nil {
  372. return nil, fmt.Errorf("received nil forwarding from goroutine for ID %d", res.Id)
  373. }
  374. dataReq := v1.WebForwardingDataRequest{
  375. Id: res.Forwarding.Id,
  376. Port: res.Forwarding.Port,
  377. Domain: res.Forwarding.Domain,
  378. IsHttps: res.Forwarding.IsHttps,
  379. Comment: res.Forwarding.Comment,
  380. HttpsKey: res.Forwarding.HttpsKey,
  381. HttpsCert: res.Forwarding.HttpsCert,
  382. Proxy: res.Forwarding.Proxy,
  383. CcConfig: v1.CcConfigRequest{
  384. IsOn: res.Forwarding.Cc,
  385. ThresholdMethod: res.Forwarding.ThresholdMethod,
  386. Level: res.Forwarding.Level,
  387. Limit5s: res.Forwarding.Limit5s,
  388. Limit60s: res.Forwarding.Limit60s,
  389. Limit300s: res.Forwarding.Limit300s,
  390. },
  391. }
  392. if res.BackendRule != nil { // 只有当 BackendRule 存在时才填充相关字段
  393. dataReq.BackendList = res.BackendRule.BackendList
  394. }
  395. finalResults = append(finalResults, dataReq)
  396. }
  397. sort.Slice(finalResults, func(i, j int) bool {
  398. return finalResults[i].Id > finalResults[j].Id
  399. })
  400. return finalResults, nil
  401. }