globallimit.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  7. "github.com/go-nunu/nunu-layout-advanced/internal/repository"
  8. "github.com/spf13/cast"
  9. "github.com/spf13/viper"
  10. "strconv"
  11. "sync"
  12. "github.com/sourcegraph/conc"
  13. )
  14. type GlobalLimitService interface {
  15. GetGlobalLimit(ctx context.Context, id int64) (*model.GlobalLimit, error)
  16. AddGlobalLimit(ctx context.Context, req v1.GlobalLimitRequest) error
  17. EditGlobalLimit(ctx context.Context, req v1.GlobalLimitRequest) error
  18. DeleteGlobalLimit(ctx context.Context, req v1.GlobalLimitRequest) error
  19. }
  20. func NewGlobalLimitService(
  21. service *Service,
  22. globalLimitRepository repository.GlobalLimitRepository,
  23. duedate DuedateService,
  24. crawler CrawlerService,
  25. conf *viper.Viper,
  26. required RequiredService,
  27. parser ParserService,
  28. host HostService,
  29. tcpLimit TcpLimitService,
  30. udpLimit UdpLimitService,
  31. webLimit WebLimitService,
  32. gateWayGroup GatewayGroupService,
  33. hostRep repository.HostRepository,
  34. ) GlobalLimitService {
  35. return &globalLimitService{
  36. Service: service,
  37. globalLimitRepository: globalLimitRepository,
  38. duedate: duedate,
  39. crawler: crawler,
  40. Url: conf.GetString("crawler.Url"),
  41. required: required,
  42. parser: parser,
  43. host: host,
  44. tcpLimit: tcpLimit,
  45. udpLimit: udpLimit,
  46. webLimit: webLimit,
  47. gateWayGroup: gateWayGroup,
  48. hostRep: hostRep,
  49. }
  50. }
  51. type globalLimitService struct {
  52. *Service
  53. globalLimitRepository repository.GlobalLimitRepository
  54. duedate DuedateService
  55. crawler CrawlerService
  56. Url string
  57. required RequiredService
  58. parser ParserService
  59. host HostService
  60. tcpLimit TcpLimitService
  61. udpLimit UdpLimitService
  62. webLimit WebLimitService
  63. gateWayGroup GatewayGroupService
  64. hostRep repository.HostRepository
  65. }
  66. func (s *globalLimitService) GlobalLimitRequire(ctx context.Context, req v1.GlobalLimitRequest) (res v1.GlobalLimitRequireResponse, err error) {
  67. isExist, err := s.globalLimitRepository.IsGlobalLimitExistByHostId(ctx, int64(req.HostId))
  68. if err != nil {
  69. return v1.GlobalLimitRequireResponse{}, err
  70. }
  71. if isExist {
  72. return v1.GlobalLimitRequireResponse{}, fmt.Errorf("配置限制已存在")
  73. }
  74. res.ExpiredAt, err = s.duedate.NextDueDate(ctx, req.Uid, req.HostId)
  75. if err != nil {
  76. return v1.GlobalLimitRequireResponse{}, err
  77. }
  78. configCount, err := s.host.GetGlobalLimitConfig(ctx, req.HostId)
  79. if err != nil {
  80. return v1.GlobalLimitRequireResponse{}, fmt.Errorf("获取配置限制失败: %w", err)
  81. }
  82. res.Bps = configCount.Bps
  83. res.MaxBytesMonth = configCount.MaxBytesMonth
  84. domain, err := s.hostRep.GetDomainById(ctx, req.HostId)
  85. if err != nil {
  86. return v1.GlobalLimitRequireResponse{}, err
  87. }
  88. res.GlobalLimitName = strconv.Itoa(req.Uid) + "_" + strconv.Itoa(req.HostId) + "_" + domain
  89. return res, nil
  90. }
  91. func (s *globalLimitService) GetGlobalLimit(ctx context.Context, id int64) (*model.GlobalLimit, error) {
  92. return s.globalLimitRepository.GetGlobalLimit(ctx, id)
  93. }
  94. func (s *globalLimitService) AddGlobalLimit(ctx context.Context, req v1.GlobalLimitRequest) error {
  95. require, err := s.GlobalLimitRequire(ctx, req)
  96. if err != nil {
  97. return err
  98. }
  99. formData := map[string]interface{}{
  100. "tag": require.GlobalLimitName,
  101. "bps": require.Bps,
  102. "max_bytes_month": require.MaxBytesMonth,
  103. "expired_at": require.ExpiredAt,
  104. }
  105. respBody, err := s.required.SendForm(ctx, "admin/info/waf_common_limit/new", "admin/new/waf_common_limit", formData)
  106. if err != nil {
  107. return err
  108. }
  109. ruleIdBase, err := s.parser.GetRuleIdByColumnName(ctx, respBody, require.GlobalLimitName)
  110. if err != nil {
  111. return err
  112. }
  113. if ruleIdBase == "" {
  114. res, err := s.parser.ParseAlert(string(respBody))
  115. if err != nil {
  116. return err
  117. }
  118. return fmt.Errorf(res)
  119. }
  120. ruleId, err := cast.ToIntE(ruleIdBase)
  121. if err != nil {
  122. return err
  123. }
  124. // 使用conc库并发执行API调用
  125. var tcpLimitRuleId, udpLimitRuleId, webLimitRuleId int
  126. var mu sync.Mutex // 用于保护共享变量
  127. // 为每个并发调用创建独立的请求参数(深拷贝)
  128. // 避免共享同一个指针可能导致的数据竞争
  129. // 创建一个WaitGroup来协调多个并发任务
  130. wg := conc.NewWaitGroup()
  131. // 启动tcpLimit调用 - 使用独立的请求参数副本
  132. wg.Go(func() {
  133. // 为该goroutine创建独立的请求参数副本
  134. tcpLimitReq := &v1.GeneralLimitRequireRequest{
  135. Tag: require.GlobalLimitName,
  136. HostId: req.HostId,
  137. RuleId: ruleId,
  138. Uid: req.Uid,
  139. }
  140. result, e := s.tcpLimit.AddTcpLimit(ctx, tcpLimitReq)
  141. if e != nil {
  142. // 只在修改共享的错误变量时加锁
  143. mu.Lock()
  144. err = e
  145. mu.Unlock()
  146. } else {
  147. // 不需要加锁,因为tcpLimitRuleId只被这一个goroutine修改
  148. tcpLimitRuleId = result
  149. }
  150. })
  151. // 启动udpLimit调用 - 使用独立的请求参数副本
  152. wg.Go(func() {
  153. // 为该goroutine创建独立的请求参数副本
  154. udpLimitReq := &v1.GeneralLimitRequireRequest{
  155. Tag: require.GlobalLimitName,
  156. HostId: req.HostId,
  157. RuleId: ruleId,
  158. Uid: req.Uid,
  159. }
  160. result, e := s.udpLimit.AddUdpLimit(ctx, udpLimitReq)
  161. if e != nil {
  162. // 只在修改共享的错误变量时加锁
  163. mu.Lock()
  164. err = e
  165. mu.Unlock()
  166. } else {
  167. // 不需要加锁,因为udpLimitRuleId只被这一个goroutine修改
  168. udpLimitRuleId = result
  169. }
  170. })
  171. // 启动webLimit调用 - 使用独立的请求参数副本
  172. wg.Go(func() {
  173. // 为该goroutine创建独立的请求参数副本
  174. webLimitReq := &v1.GeneralLimitRequireRequest{
  175. Tag: require.GlobalLimitName,
  176. HostId: req.HostId,
  177. RuleId: ruleId,
  178. Uid: req.Uid,
  179. }
  180. result, e := s.webLimit.AddWebLimit(ctx, webLimitReq)
  181. if e != nil {
  182. // 只在修改共享的错误变量时加锁
  183. mu.Lock()
  184. err = e
  185. mu.Unlock()
  186. } else {
  187. // 不需要加锁,因为webLimitRuleId只被这一个goroutine修改
  188. webLimitRuleId = result
  189. }
  190. })
  191. // 等待所有调用完成
  192. wg.Wait()
  193. // 检查是否有错误发生
  194. if err != nil {
  195. return err
  196. }
  197. err = s.globalLimitRepository.AddGlobalLimit(ctx, &model.GlobalLimit{
  198. HostId: req.HostId,
  199. RuleId: cast.ToInt(ruleId),
  200. GlobalLimitName: require.GlobalLimitName,
  201. Comment: req.Comment,
  202. TcpLimitRuleId: tcpLimitRuleId,
  203. UdpLimitRuleId: udpLimitRuleId,
  204. WebLimitRuleId: webLimitRuleId,
  205. GatewayGroupId: 5,
  206. })
  207. if err != nil {
  208. return err
  209. }
  210. return nil
  211. }
  212. func (s *globalLimitService) EditGlobalLimit(ctx context.Context, req v1.GlobalLimitRequest) error {
  213. if err := s.globalLimitRepository.UpdateGlobalLimitByHostId(ctx, &model.GlobalLimit{
  214. HostId: req.HostId,
  215. Comment: req.Comment,
  216. }); err != nil {
  217. return err
  218. }
  219. return nil
  220. }
  221. func (s *globalLimitService) DeleteGlobalLimit(ctx context.Context, req v1.GlobalLimitRequest) error {
  222. if err := s.globalLimitRepository.DeleteGlobalLimitByHostId(ctx, int64(req.HostId)); err != nil {
  223. return err
  224. }
  225. return nil
  226. }