udpforwarding.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. package udp
  2. import (
  3. "context"
  4. "fmt"
  5. v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  7. "github.com/go-nunu/nunu-layout-advanced/internal/repository"
  8. "github.com/go-nunu/nunu-layout-advanced/internal/repository/api/waf"
  9. "github.com/go-nunu/nunu-layout-advanced/internal/service"
  10. "github.com/go-nunu/nunu-layout-advanced/internal/service/api/flexCdn"
  11. waf2 "github.com/go-nunu/nunu-layout-advanced/internal/service/api/waf/common"
  12. "golang.org/x/sync/errgroup"
  13. "sort"
  14. )
  15. type UdpForWardingService interface {
  16. GetUdpForWarding(ctx context.Context,req v1.GetForwardingRequest) (v1.UdpForwardingDataRequest, error)
  17. AddUdpForwarding(ctx context.Context, req *v1.UdpForwardingRequest) (int, error)
  18. EditUdpForwarding(ctx context.Context, req *v1.UdpForwardingRequest) error
  19. DeleteUdpForwarding(ctx context.Context, req v1.DeleteUdpForwardingRequest) error
  20. GetUdpForwardingWafUdpAllIps(ctx context.Context, req v1.GetForwardingRequest) ([]v1.UdpForwardingDataRequest, error)
  21. }
  22. func NewUdpForWardingService(
  23. service *service.Service,
  24. udpForWardingRepository waf.UdpForWardingRepository,
  25. required service.RequiredService,
  26. parser service.ParserService,
  27. crawler service.CrawlerService,
  28. globalRep waf.GlobalLimitRepository,
  29. hostRep repository.HostRepository,
  30. wafformatter waf2.WafFormatterService,
  31. cdn flexCdn.CdnService,
  32. proxy flexCdn.ProxyService,
  33. aidedUdp AidedUdpService,
  34. ) UdpForWardingService {
  35. return &udpForWardingService{
  36. Service: service,
  37. udpForWardingRepository: udpForWardingRepository,
  38. required: required,
  39. parser: parser,
  40. crawler: crawler,
  41. globalRep: globalRep,
  42. hostRep: hostRep,
  43. wafformatter: wafformatter,
  44. cdn: cdn,
  45. proxy: proxy,
  46. aidedUdp: aidedUdp,
  47. }
  48. }
  49. type udpForWardingService struct {
  50. *service.Service
  51. udpForWardingRepository waf.UdpForWardingRepository
  52. required service.RequiredService
  53. parser service.ParserService
  54. crawler service.CrawlerService
  55. globalRep waf.GlobalLimitRepository
  56. hostRep repository.HostRepository
  57. wafformatter waf2.WafFormatterService
  58. cdn flexCdn.CdnService
  59. proxy flexCdn.ProxyService
  60. aidedUdp AidedUdpService
  61. }
  62. // GetUdpForWarding 获取单个UDP转发配置详情
  63. // 该函数根据ID同时查询主记录和规则记录,并合并返回完整的配置信息
  64. func (s *udpForWardingService) GetUdpForWarding(ctx context.Context, req v1.GetForwardingRequest) (v1.UdpForwardingDataRequest, error) {
  65. // 参数验证
  66. if req.Id <= 0 {
  67. return v1.UdpForwardingDataRequest{}, fmt.Errorf("非法的ID参数: %d", req.Id)
  68. }
  69. var udpForWarding model.UdpForWarding
  70. var backend model.UdpForwardingRule
  71. var err error
  72. // 并发查询主记录和规则记录以提高性能
  73. g, gCtx := errgroup.WithContext(ctx)
  74. g.Go(func() error {
  75. res, e := s.udpForWardingRepository.GetUdpForWarding(gCtx, int64(req.Id))
  76. if e != nil {
  77. return fmt.Errorf("查询UDP转发主记录失败 ID:%d, %w", req.Id, e)
  78. }
  79. if res != nil {
  80. udpForWarding = *res
  81. }
  82. return nil
  83. })
  84. g.Go(func() error {
  85. res, e := s.udpForWardingRepository.GetUdpForwardingIpsByID(gCtx, req.Id)
  86. if e != nil {
  87. return fmt.Errorf("查询UDP转发规则记录失败 ID:%d, %w", req.Id, e)
  88. }
  89. if res != nil {
  90. backend = *res
  91. }
  92. return nil
  93. })
  94. if err = g.Wait(); err != nil {
  95. return v1.UdpForwardingDataRequest{}, err
  96. }
  97. // 检查是否找到主记录
  98. if udpForWarding.Id == 0 {
  99. return v1.UdpForwardingDataRequest{}, fmt.Errorf("UDP转发配置不存在 ID:%d", req.Id)
  100. }
  101. return v1.UdpForwardingDataRequest{
  102. Id: udpForWarding.Id,
  103. Port: udpForWarding.Port,
  104. BackendList: backend.BackendList,
  105. Comment: udpForWarding.Comment,
  106. Proxy: udpForWarding.Proxy,
  107. }, nil
  108. }
  109. // AddUdpForwarding 添加 UDP 转发配置
  110. // 该函数完成 UDP 转发的完整创建流程:验证、创建 CDN、添加源站、配置代理、保存数据、处理异步任务
  111. func (s *udpForWardingService) AddUdpForwarding(ctx context.Context, req *v1.UdpForwardingRequest) (int, error) {
  112. // 1. 数据准备和验证
  113. require, formData, err := s.aidedUdp.PrepareWafData(ctx, req)
  114. if err != nil {
  115. return 0, err
  116. }
  117. if err := s.aidedUdp.ValidateAddRequest(ctx, req, require); err != nil {
  118. return 0, err
  119. }
  120. // 2. 创建CDN网站
  121. udpId, err := s.aidedUdp.CreateCdnWebsite(ctx, formData)
  122. if err != nil {
  123. return 0, err
  124. }
  125. // 3. 添加源站
  126. cdnOriginIds, err := s.aidedUdp.AddOriginsToWebsite(ctx, req, udpId)
  127. if err != nil {
  128. return 0, err
  129. }
  130. // 4. 配置代理协议
  131. if err := s.aidedUdp.ConfigureProxyProtocol(ctx, req, udpId); err != nil {
  132. return 0, err
  133. }
  134. // 5. 保存到数据库
  135. id, err := s.aidedUdp.SaveToDatabase(ctx, req, require, udpId, cdnOriginIds)
  136. if err != nil {
  137. return 0, err
  138. }
  139. // 6. 处理异步任务
  140. s.aidedUdp.ProcessAsyncTasks(req)
  141. return id, nil
  142. }
  143. // EditUdpForwarding 编辑 UDP 转发配置
  144. // 该函数完成 UDP 转发的完整编辑流程:验证、更新 CDN、处理IP白名单、更新源站、更新数据库
  145. func (s *udpForWardingService) EditUdpForwarding(ctx context.Context, req *v1.UdpForwardingRequest) error {
  146. // 1. 数据准备和验证
  147. require, formData, err := s.aidedUdp.PrepareWafData(ctx, req)
  148. if err != nil {
  149. return err
  150. }
  151. oldData, err := s.udpForWardingRepository.GetUdpForWarding(ctx, int64(req.UdpForwardingData.Id))
  152. if err != nil {
  153. return fmt.Errorf("获取原始数据失败: %w", err)
  154. }
  155. if err := s.aidedUdp.ValidateEditRequest(ctx, req, require, oldData); err != nil {
  156. return err
  157. }
  158. // 2. 更新CDN配置
  159. if err := s.aidedUdp.UpdateCdnConfiguration(ctx, req, oldData, require, formData); err != nil {
  160. return err
  161. }
  162. // 3. 获取IP数据并处理白名单
  163. ipData, err := s.udpForWardingRepository.GetUdpForwardingIpsByID(ctx, req.UdpForwardingData.Id)
  164. if err != nil {
  165. return fmt.Errorf("获取IP数据失败: %w", err)
  166. }
  167. if err := s.aidedUdp.ProcessIpWhitelistChanges(ctx, req, ipData); err != nil {
  168. return err
  169. }
  170. // 4. 更新源站配置
  171. if err := s.aidedUdp.UpdateOriginServers(ctx, req, oldData, ipData); err != nil {
  172. return err
  173. }
  174. // 5. 更新数据库记录
  175. if err := s.aidedUdp.UpdateDatabaseRecords(ctx, req, oldData, require, ipData); err != nil {
  176. return err
  177. }
  178. return nil
  179. }
  180. // DeleteUdpForwarding 批量删除 UDP 转发配置
  181. // 该函数支持批量删除多个 UDP 转发配置,对每个配置都执行完整的删除流程
  182. func (s *udpForWardingService) DeleteUdpForwarding(ctx context.Context, req v1.DeleteUdpForwardingRequest) error {
  183. // 批量删除处理
  184. for _, id := range req.Ids {
  185. if err := s.deleteSingleUdpForwarding(ctx, id, req.HostId); err != nil {
  186. return fmt.Errorf("删除UDP转发配置失败 ID:%d, %w", id, err)
  187. }
  188. }
  189. return nil
  190. }
  191. // deleteSingleUdpForwarding 删除单个 UDP 转发配置
  192. // 该函数完成单个配置的完整删除流程:权限验证、删除 CDN、清理IP白名单、清理数据库
  193. func (s *udpForWardingService) deleteSingleUdpForwarding(ctx context.Context, id int, hostId int) error {
  194. // 1. 获取原始数据并验证权限
  195. oldData, err := s.udpForWardingRepository.GetUdpForWarding(ctx, int64(id))
  196. if err != nil {
  197. return fmt.Errorf("获取UDP转发数据失败: %w", err)
  198. }
  199. if err := s.aidedUdp.ValidateDeletePermission(oldData, hostId); err != nil {
  200. return err
  201. }
  202. // 2. 删除CDN服务器
  203. if err := s.aidedUdp.DeleteCdnServer(ctx, oldData.CdnWebId); err != nil {
  204. return err
  205. }
  206. // 3. 处理IP白名单清理
  207. if err := s.aidedUdp.ProcessDeleteIpWhitelist(ctx, id); err != nil {
  208. return err
  209. }
  210. // 4. 清理数据库记录
  211. if err := s.aidedUdp.CleanupDatabaseRecords(ctx, id); err != nil {
  212. return err
  213. }
  214. return nil
  215. }
  216. // GetUdpForwardingWafUdpAllIps 获取指定主机的所有 UDP 转发配置列表
  217. // 该函数使用并发查询优化性能,同时获取多个配置的详细信息并按ID降序排列
  218. func (s *udpForWardingService) GetUdpForwardingWafUdpAllIps(ctx context.Context, req v1.GetForwardingRequest) ([]v1.UdpForwardingDataRequest, error) {
  219. type CombinedResult struct {
  220. Id int
  221. Forwarding *model.UdpForWarding
  222. BackendRule *model.UdpForwardingRule
  223. Err error
  224. }
  225. g, gCtx := errgroup.WithContext(ctx)
  226. ids, err := s.udpForWardingRepository.GetUdpForwardingWafUdpAllIds(gCtx, req.HostId)
  227. if err != nil {
  228. return nil, fmt.Errorf("获取UDP转发ID列表失败: %w", err)
  229. }
  230. if len(ids) == 0 {
  231. return nil, nil
  232. }
  233. resChan := make(chan CombinedResult, len(ids))
  234. g.Go(func() error {
  235. for _, idVal := range ids {
  236. currentID := idVal
  237. g.Go(func() error {
  238. var wf *model.UdpForWarding
  239. var bk *model.UdpForwardingRule
  240. var localErr error
  241. wf, localErr = s.udpForWardingRepository.GetUdpForWarding(gCtx, int64(currentID))
  242. if localErr != nil {
  243. resChan <- CombinedResult{Id: currentID, Err: localErr}
  244. return localErr
  245. }
  246. bk, localErr = s.udpForWardingRepository.GetUdpForwardingIpsByID(gCtx, currentID)
  247. if localErr != nil {
  248. resChan <- CombinedResult{Id: currentID, Err: localErr}
  249. return localErr
  250. }
  251. resChan <- CombinedResult{Id: currentID, Forwarding: wf, BackendRule: bk}
  252. return nil
  253. })
  254. }
  255. return nil
  256. })
  257. groupErr := g.Wait()
  258. close(resChan)
  259. if groupErr != nil {
  260. return nil, groupErr
  261. }
  262. res := make([]v1.UdpForwardingDataRequest, 0, len(ids))
  263. for r := range resChan {
  264. if r.Err != nil {
  265. return nil, fmt.Errorf("处理ID %d 时出错: %w", r.Id, r.Err)
  266. }
  267. if r.Forwarding == nil {
  268. return nil, fmt.Errorf("ID %d 对应的转发配置为空", r.Id)
  269. }
  270. dataReq := v1.UdpForwardingDataRequest{
  271. Id: r.Forwarding.Id,
  272. Port: r.Forwarding.Port,
  273. Comment: r.Forwarding.Comment,
  274. Proxy: r.Forwarding.Proxy,
  275. }
  276. if r.BackendRule != nil {
  277. dataReq.BackendList = r.BackendRule.BackendList
  278. }
  279. res = append(res, dataReq)
  280. }
  281. sort.Slice(res, func(i, j int) bool {
  282. return res[i].Id > res[j].Id
  283. })
  284. return res, nil
  285. }