waf.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585
  1. package task
  2. import (
  3. "context"
  4. "fmt"
  5. v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  7. "github.com/go-nunu/nunu-layout-advanced/internal/repository"
  8. waf2 "github.com/go-nunu/nunu-layout-advanced/internal/repository/api/waf"
  9. "github.com/go-nunu/nunu-layout-advanced/internal/service/api/flexCdn"
  10. "github.com/go-nunu/nunu-layout-advanced/internal/service/api/waf"
  11. "github.com/hashicorp/go-multierror"
  12. "go.uber.org/zap"
  13. "sync"
  14. "time"
  15. )
  16. // WafTask 定义了WAF相关的五个独立定时任务接口
  17. type WafTask interface {
  18. // 1. 同步即将到期(1天内)的套餐时间
  19. SynchronizationTime(ctx context.Context) error
  20. // 2. 停止所有已到期的套餐
  21. StopPlan(ctx context.Context) error
  22. // 3. 恢复7天内续费的套餐
  23. RecoverRecentPlan(ctx context.Context) error
  24. // 4. 清理过期超过7天且仍未续费的记录
  25. CleanUpStaleRecords(ctx context.Context) error
  26. // 5. 恢复超过7天后才续费的套餐
  27. RecoverStalePlan(ctx context.Context) error
  28. }
  29. // =================================================================
  30. // =================== 结构体与构造函数 ==========================
  31. // =================================================================
  32. func NewWafTask(
  33. webForWardingRep waf2.WebForwardingRepository,
  34. tcpforwardingRep waf2.TcpforwardingRepository,
  35. udpForWardingRep waf2.UdpForWardingRepository,
  36. cdn flexCdn.CdnService,
  37. hostRep repository.HostRepository,
  38. globalLimitRep waf2.GlobalLimitRepository,
  39. expiredRep repository.ExpiredRepository,
  40. task *Task,
  41. gatewayIpRep waf2.GatewayipRepository,
  42. tcp waf.TcpforwardingService,
  43. udp waf.UdpForWardingService,
  44. web waf.WebForwardingService,
  45. buildAoDun waf.BuildAudunService,
  46. zzyBgp waf.ZzybgpService,
  47. ) WafTask {
  48. return &wafTask{
  49. Task: task,
  50. webForWardingRep: webForWardingRep,
  51. tcpforwardingRep: tcpforwardingRep,
  52. udpForWardingRep: udpForWardingRep,
  53. cdn: cdn,
  54. hostRep: hostRep,
  55. globalLimitRep: globalLimitRep,
  56. expiredRep: expiredRep,
  57. gatewayIpRep: gatewayIpRep,
  58. tcp: tcp,
  59. udp: udp,
  60. web: web,
  61. buildAoDun: buildAoDun,
  62. zzyBgp : zzyBgp,
  63. }
  64. }
  65. type wafTask struct {
  66. *Task
  67. webForWardingRep waf2.WebForwardingRepository
  68. tcpforwardingRep waf2.TcpforwardingRepository
  69. udpForWardingRep waf2.UdpForWardingRepository
  70. cdn flexCdn.CdnService
  71. hostRep repository.HostRepository
  72. globalLimitRep waf2.GlobalLimitRepository
  73. expiredRep repository.ExpiredRepository
  74. gatewayIpRep waf2.GatewayipRepository
  75. tcp waf.TcpforwardingService
  76. udp waf.UdpForWardingService
  77. web waf.WebForwardingService
  78. buildAoDun waf.BuildAudunService
  79. zzyBgp waf.ZzybgpService
  80. }
  81. const (
  82. SynchronousInSeconds = 7 * 24 * 60 * 60
  83. SevenDaysInSeconds = 7 * 24 * 60 * 60
  84. )
  85. type RenewalRequest struct {
  86. HostId int
  87. ExpiredAt int64
  88. }
  89. // =================================================================
  90. // =================== 核心辅助函数 (Core Helpers) =================
  91. // =================================================================
  92. // wrapTaskError 统一封装任务错误信息,方便日志和调试
  93. func (t *wafTask) wrapTaskError(taskName, step string, err error) error {
  94. if err == nil {
  95. return nil
  96. }
  97. return fmt.Errorf("执行[%s]-%s失败: %w", taskName, step, err)
  98. }
  99. // getCdnWebIdsByHostIds (原GetCdnWebId) 根据hostId列表获取所有关联的转发规则ID
  100. func (t *wafTask) getCdnWebIdsByHostIds(ctx context.Context, hostIds []int) ([]int, error) {
  101. if len(hostIds) == 0 {
  102. return nil, nil
  103. }
  104. var ids []int
  105. var result *multierror.Error
  106. tcpIds, err := t.tcpforwardingRep.GetTcpAll(ctx, hostIds)
  107. if err != nil {
  108. result = multierror.Append(result, err)
  109. }
  110. ids = append(ids, tcpIds...)
  111. udpIds, err := t.udpForWardingRep.GetUdpAll(ctx, hostIds)
  112. if err != nil {
  113. result = multierror.Append(result, err)
  114. }
  115. ids = append(ids, udpIds...)
  116. webIds, err := t.webForWardingRep.GetWebAll(ctx, hostIds)
  117. if err != nil {
  118. result = multierror.Append(result, err)
  119. }
  120. ids = append(ids, webIds...)
  121. return ids, result.ErrorOrNil()
  122. }
  123. // setCdnWebsitesState (原BanServer) 启用或禁用一组CDN网站 (并发执行)
  124. func (t *wafTask) setCdnWebsitesState(ctx context.Context, ids []int, enable bool) error {
  125. if len(ids) == 0 {
  126. return nil
  127. }
  128. var wg sync.WaitGroup
  129. errChan := make(chan error, len(ids))
  130. wg.Add(len(ids))
  131. for _, id := range ids {
  132. go func(id int) {
  133. defer wg.Done()
  134. // cdn.EditWebIsOn 的第二个参数 isBan, false=启用, true=禁用
  135. // 所以 enable=true 对应 isBan=false
  136. if err := t.cdn.EditWebIsOn(ctx, int64(id), enable); err != nil {
  137. errChan <- err
  138. }
  139. }(id)
  140. }
  141. wg.Wait()
  142. close(errChan)
  143. var result *multierror.Error
  144. for err := range errChan {
  145. result = multierror.Append(result, err)
  146. }
  147. return result.ErrorOrNil()
  148. }
  149. // executeRenewalActions (原EditExpired) 执行续费操作,包括更新DB和调用CDN API
  150. func (t *wafTask) executeRenewalActions(ctx context.Context, reqs []RenewalRequest) error {
  151. if len(reqs) == 0 {
  152. return nil
  153. }
  154. var allErrors *multierror.Error
  155. var wg sync.WaitGroup
  156. wg.Add(len(reqs))
  157. var mu sync.Mutex
  158. for _, req := range reqs {
  159. go func(r RenewalRequest) {
  160. defer wg.Done()
  161. // 更新数据库状态
  162. err := t.globalLimitRep.UpdateGlobalLimitByHostId(ctx, &model.GlobalLimit{HostId: r.HostId, ExpiredAt: r.ExpiredAt, State: true})
  163. if err != nil {
  164. mu.Lock() // 在修改前加锁
  165. allErrors = multierror.Append(allErrors, err)
  166. mu.Unlock() // 修改后解锁
  167. return // 如果DB更新失败,不继续调用CDN API
  168. }
  169. }(req)
  170. }
  171. wg.Wait()
  172. return allErrors.ErrorOrNil()
  173. }
  174. // =================================================================
  175. // =================== 1. 数据查找与决策层 ==========================
  176. // =================================================================
  177. // findPlansNeedingSync (原findMismatchedExpirations) 检查WAF和Host的到期时间差异,返回需要同步的请求
  178. func (t *wafTask) findPlansNeedingSync(ctx context.Context, wafLimits []model.GlobalLimit) ([]RenewalRequest, error) {
  179. if len(wafLimits) == 0 {
  180. return nil, nil
  181. }
  182. wafExpiredMap := make(map[int]int64, len(wafLimits))
  183. var hostIds []int
  184. for _, limit := range wafLimits {
  185. hostIds = append(hostIds, limit.HostId)
  186. wafExpiredMap[limit.HostId] = limit.ExpiredAt
  187. }
  188. hostExpirations, err := t.hostRep.GetExpireTimeByHostId(ctx, hostIds)
  189. if err != nil {
  190. return nil, fmt.Errorf("获取主机到期时间失败: %w", err)
  191. }
  192. hostExpiredMap := make(map[int]int64, len(hostExpirations))
  193. for _, h := range hostExpirations {
  194. hostExpiredMap[h.HostId] = h.ExpiredAt
  195. }
  196. var renewalRequests []RenewalRequest
  197. for hostId, wafExpiredTime := range wafExpiredMap {
  198. hostTime, ok := hostExpiredMap[hostId]
  199. if !ok || hostTime != wafExpiredTime {
  200. renewalRequests = append(renewalRequests, RenewalRequest{HostId: hostId, ExpiredAt: hostTime})
  201. }
  202. }
  203. return renewalRequests, nil
  204. }
  205. // findAllCurrentlyExpiredWAFPlans (原findAllCurrentlyExpiredPlans) 查找所有当前时间点已经到期的WAF记录
  206. func (t *wafTask) findAllCurrentlyExpiredWAFPlans(ctx context.Context) ([]model.GlobalLimit, error) {
  207. return t.globalLimitRep.GetGlobalLimitAlmostExpired(ctx, 0)
  208. }
  209. // findRecentlyExpiredWAFPlans (原findRecentlyExpiredPlans) 查找在过去7天内到期的WAF记录
  210. func (t *wafTask) findRecentlyExpiredWAFPlans(ctx context.Context) ([]model.GlobalLimit, error) {
  211. sevenDaysAgo := time.Now().Add(-7 * 24 * time.Hour).Unix()
  212. now := time.Now().Unix()
  213. return t.globalLimitRep.GetGlobalLimitsByExpirationRange(ctx, sevenDaysAgo, now)
  214. }
  215. // findStaleWAFPlans (原findStaleExpiredPlans) 查找7天前或更早就已到期的WAF记录
  216. func (t *wafTask) findStaleWAFPlans(ctx context.Context) ([]model.GlobalLimit, error) {
  217. sevenDaysAgoOffset := int64(-1 * SevenDaysInSeconds)
  218. return t.globalLimitRep.GetGlobalLimitAlmostExpired(ctx, sevenDaysAgoOffset)
  219. }
  220. // =================================================================
  221. // ============== 2. 业务执行与公共API层 ===========================
  222. // =================================================================
  223. // SynchronizationTime 同步即将到期(1天内)的套餐时间
  224. func (t *wafTask) SynchronizationTime(ctx context.Context) error {
  225. taskName := "同步到期时间"
  226. wafLimits, err := t.globalLimitRep.GetGlobalLimitAlmostExpired(ctx, SynchronousInSeconds)
  227. if err != nil {
  228. return t.wrapTaskError(taskName, "查找失败", err)
  229. }
  230. if len(wafLimits) == 0 {
  231. return nil
  232. }
  233. renewalRequests, err := t.findPlansNeedingSync(ctx, wafLimits)
  234. if err != nil {
  235. return t.wrapTaskError(taskName, "决策失败", err)
  236. }
  237. if len(renewalRequests) > 0 {
  238. t.logger.Info("发现记录需要同步到期时间", zap.String("task", taskName), zap.Int("数量", len(renewalRequests)), zap.Any("套餐内容", renewalRequests))
  239. return t.wrapTaskError(taskName, "执行同步", t.executeRenewalActions(ctx, renewalRequests))
  240. }
  241. return nil
  242. }
  243. // StopPlan 停止所有已到期的套餐
  244. func (t *wafTask) StopPlan(ctx context.Context) error {
  245. taskName := "停止到期套餐"
  246. // 1. 查找所有理论上已到期的记录
  247. expiredLimits, err := t.findAllCurrentlyExpiredWAFPlans(ctx)
  248. if err != nil {
  249. return t.wrapTaskError(taskName, "查找失败", err)
  250. }
  251. if len(expiredLimits) == 0 {
  252. return nil
  253. }
  254. // 2. 决策 - 第1步:检查这些记录中是否已有续费但未同步的
  255. renewalRequests, err := t.findPlansNeedingSync(ctx, expiredLimits)
  256. if err != nil {
  257. return t.wrapTaskError(taskName, "决策检查续费", err)
  258. }
  259. renewedHostIds := make(map[int]struct{}, len(renewalRequests))
  260. for _, req := range renewalRequests {
  261. if req.ExpiredAt > time.Now().Unix() {
  262. renewedHostIds[req.HostId] = struct{}{}
  263. }
  264. }
  265. // 2. 决策 - 第2步:筛选出真正需要停止的记录
  266. var plansToClose []model.GlobalLimit
  267. for _, limit := range expiredLimits {
  268. if _, found := renewedHostIds[limit.HostId]; found {
  269. t.logger.Info("发现已到期但刚续费的套餐,跳过停止操作", zap.String("task", taskName), zap.Int("hostId", limit.HostId))
  270. continue
  271. }
  272. isClosed, err := t.expiredRep.IsPlanInList(ctx, repository.ClosedPlansList, int64(limit.HostId))
  273. if err != nil {
  274. t.logger.Error("决策[停止]:检查Redis套餐状态失败", zap.String("task", taskName), zap.Int("hostId", limit.HostId), zap.Error(err))
  275. continue
  276. }
  277. if !isClosed {
  278. plansToClose = append(plansToClose, limit)
  279. }
  280. }
  281. if len(plansToClose) == 0 {
  282. t.logger.Info("没有需要停止的套餐(可能均已续费或已在停止列表)", zap.String("task", taskName))
  283. return nil
  284. }
  285. // 3. 执行停止操作
  286. t.logger.Info("开始关闭到期的WAF服务", zap.String("task", taskName), zap.Int("数量", len(plansToClose)), zap.Any("套餐内容", renewalRequests))
  287. var hostIds []int
  288. for _, limit := range plansToClose {
  289. hostIds = append(hostIds, limit.HostId)
  290. }
  291. var allErrors *multierror.Error
  292. webIds, err := t.getCdnWebIdsByHostIds(ctx, hostIds)
  293. if err != nil {
  294. allErrors = multierror.Append(allErrors, fmt.Errorf("获取cdn_web_id失败: %w", err))
  295. } else {
  296. if err := t.setCdnWebsitesState(ctx, webIds, false); err != nil { // enable=false
  297. allErrors = multierror.Append(allErrors, fmt.Errorf("禁用服务失败: %w", err))
  298. }
  299. }
  300. closedPlanIds := make([]int64, len(hostIds))
  301. for i, id := range hostIds {
  302. closedPlanIds[i] = int64(id)
  303. }
  304. if err := t.expiredRep.AddPlans(ctx, repository.ClosedPlansList, closedPlanIds...); err != nil {
  305. allErrors = multierror.Append(allErrors, fmt.Errorf("标记为已关闭失败: %w", err))
  306. }
  307. return t.wrapTaskError(taskName, "执行停止", allErrors.ErrorOrNil())
  308. }
  309. // _recoverPlans 是一个统一的、可重用的套餐恢复流程
  310. func (t *wafTask) _recoverPlans(ctx context.Context, limitsToCheck []model.GlobalLimit, taskName string, redisListKey repository.PlanListType) error {
  311. if len(limitsToCheck) == 0 {
  312. return nil
  313. }
  314. requestsToSync, err := t.findPlansNeedingSync(ctx, limitsToCheck)
  315. if err != nil {
  316. return t.wrapTaskError(taskName, "决策检查续费状态", err)
  317. }
  318. var finalRecoveryRequests []RenewalRequest
  319. for _, req := range requestsToSync {
  320. if req.ExpiredAt > time.Now().Unix() {
  321. finalRecoveryRequests = append(finalRecoveryRequests, req)
  322. }
  323. }
  324. if len(finalRecoveryRequests) == 0 {
  325. t.logger.Info("在检查范围内未发现已续费的套餐", zap.String("task", taskName))
  326. return nil
  327. }
  328. t.logger.Info("开始恢复已续费的WAF服务", zap.String("task", taskName), zap.Int("数量", len(finalRecoveryRequests)), zap.Any("套餐内容", finalRecoveryRequests))
  329. var hostIdsToRecover []int
  330. for _, req := range finalRecoveryRequests {
  331. hostIdsToRecover = append(hostIdsToRecover, req.HostId)
  332. }
  333. var allErrors *multierror.Error
  334. webIds, err := t.getCdnWebIdsByHostIds(ctx, hostIdsToRecover)
  335. if err != nil {
  336. allErrors = multierror.Append(allErrors, fmt.Errorf("获取webId失败: %w", err))
  337. } else {
  338. if err := t.setCdnWebsitesState(ctx, webIds, true); err != nil { // enable=true
  339. allErrors = multierror.Append(allErrors, fmt.Errorf("启用web服务失败: %w", err))
  340. }
  341. }
  342. if err := t.executeRenewalActions(ctx, finalRecoveryRequests); err != nil {
  343. allErrors = multierror.Append(allErrors, fmt.Errorf("同步续费信息失败: %w", err))
  344. }
  345. planIdsToRecover := make([]int64, len(hostIdsToRecover))
  346. for i, id := range hostIdsToRecover {
  347. planIdsToRecover[i] = int64(id)
  348. }
  349. // 从指定的Redis列表中移除标记 (ClosedPlansList 或 ExpiringSoonPlansList)
  350. if err := t.expiredRep.RemovePlans(ctx, redisListKey, planIdsToRecover...); err != nil {
  351. allErrors = multierror.Append(allErrors, fmt.Errorf("从Redis列表 '%s' 移除标记失败: %w", redisListKey, err))
  352. }
  353. return t.wrapTaskError(taskName, "执行恢复", allErrors.ErrorOrNil())
  354. }
  355. // 3. RecoverRecentPlan 恢复7天内续费的套餐
  356. func (t *wafTask) RecoverRecentPlan(ctx context.Context) error {
  357. taskName := "恢复近期到期套餐"
  358. recentlyExpiredLimits, err := t.findRecentlyExpiredWAFPlans(ctx)
  359. if err != nil {
  360. return t.wrapTaskError(taskName, "查找近期到期记录", err)
  361. }
  362. return t._recoverPlans(ctx, recentlyExpiredLimits, taskName, repository.ClosedPlansList)
  363. }
  364. // 4. CleanUpStaleRecords 清理过期超过7天且仍未续费的记录
  365. func (t *wafTask) CleanUpStaleRecords(ctx context.Context) error {
  366. taskName := "清理陈旧记录"
  367. // 1. 从数据库查找所有陈旧的记录作为候选
  368. candidateLimits, err := t.findStaleWAFPlans(ctx)
  369. if err != nil {
  370. return t.wrapTaskError(taskName, "查找陈旧记录", err)
  371. }
  372. if len(candidateLimits) == 0 {
  373. return nil
  374. }
  375. // 2. [CORRECTION] 幂等性检查: 过滤掉那些已经被标记为“已清理”的记录
  376. // 根据您的定义,`ExpiringSoonPlansList` 就是已清理列表。
  377. var uncleanedLimits []model.GlobalLimit
  378. for _, limit := range candidateLimits {
  379. isAlreadyCleaned, err := t.expiredRep.IsPlanInList(ctx, repository.ExpiringSoonPlansList, int64(limit.HostId))
  380. if err != nil {
  381. t.logger.Error("检查Redis清理状态失败,跳过", zap.String("task", taskName), zap.Int("hostId", limit.HostId), zap.Error(err))
  382. continue
  383. }
  384. if !isAlreadyCleaned {
  385. uncleanedLimits = append(uncleanedLimits, limit)
  386. }
  387. }
  388. if len(uncleanedLimits) == 0 {
  389. t.logger.Info("没有需要清理的新套餐(可能均已清理)", zap.String("task", taskName))
  390. return nil
  391. }
  392. // 3. [性能优化] 批量获取未清理记录的真实到期时间
  393. uncleanedHostIds := make([]int, len(uncleanedLimits))
  394. for i, limit := range uncleanedLimits {
  395. uncleanedHostIds[i] = limit.HostId
  396. }
  397. hostExpirations, err := t.hostRep.GetExpireTimeByHostId(ctx, uncleanedHostIds)
  398. if err != nil {
  399. return t.wrapTaskError(taskName, "批量获取主机到期时间", err)
  400. }
  401. hostExpiredMap := make(map[int]int64, len(hostExpirations))
  402. for _, h := range hostExpirations {
  403. hostExpiredMap[h.HostId] = h.ExpiredAt
  404. }
  405. // 4. 决策:筛选出最终需要清理的记录(未在最后一刻续费)
  406. var plansToClean []model.GlobalLimit
  407. now := time.Now().Unix()
  408. for _, limit := range uncleanedLimits {
  409. hostExpiredTime, ok := hostExpiredMap[limit.HostId]
  410. // 清理条件:主机记录不存在,或者主机记录的到期时间是过去时
  411. if !ok || hostExpiredTime <= now {
  412. plansToClean = append(plansToClean, limit)
  413. }
  414. }
  415. if len(plansToClean) == 0 {
  416. t.logger.Info("没有长期未续费的记录需要清理(可能均已续费)", zap.String("task", taskName))
  417. return nil
  418. }
  419. // 5. 并发执行清理操作
  420. t.logger.Info("开始清理长期未续费的WAF记录", zap.String("task", taskName), zap.Int("数量", len(plansToClean)), zap.Any("套餐内容", plansToClean))
  421. var wg sync.WaitGroup
  422. errChan := make(chan error, len(plansToClean))
  423. wg.Add(len(plansToClean))
  424. for _, limit := range plansToClean {
  425. go func(l model.GlobalLimit) {
  426. defer wg.Done()
  427. if err := t.executeSinglePlanCleanup(ctx, l); err != nil {
  428. errChan <- fmt.Errorf("清理hostId %d 失败: %w", l.HostId, err)
  429. }
  430. }(limit)
  431. }
  432. wg.Wait()
  433. close(errChan)
  434. var allErrors *multierror.Error
  435. for err := range errChan {
  436. allErrors = multierror.Append(allErrors, err)
  437. }
  438. return t.wrapTaskError(taskName, "执行清理", allErrors.ErrorOrNil())
  439. }
  440. // executeSinglePlanCleanup 执行对单个套餐的完整清理操作,方便并发调用
  441. func (t *wafTask) executeSinglePlanCleanup(ctx context.Context, limit model.GlobalLimit) error {
  442. var allErrors *multierror.Error
  443. hostId := int64(limit.HostId)
  444. // 从“停止列表”中移除,因为它即将被归档到“已清理列表”
  445. if err := t.expiredRep.RemovePlans(ctx, repository.ClosedPlansList, hostId); err != nil {
  446. allErrors = multierror.Append(allErrors, err)
  447. }
  448. // 删除关联的转发规则...
  449. tcpIds, err := t.tcpforwardingRep.GetTcpForwardingAllIdsByID(ctx, limit.HostId)
  450. if err != nil {
  451. allErrors = multierror.Append(allErrors, err)
  452. } else if len(tcpIds) > 0 {
  453. if err := t.tcp.DeleteTcpForwarding(ctx, v1.DeleteTcpForwardingRequest{Ids: tcpIds, HostId: limit.HostId,Uid: limit.Uid}); err != nil {
  454. allErrors = multierror.Append(allErrors, err)
  455. }
  456. }
  457. udpIds, err := t.udpForWardingRep.GetUdpForwardingWafUdpAllIds(ctx, limit.HostId)
  458. if err != nil {
  459. allErrors = multierror.Append(allErrors, err)
  460. } else if len(udpIds) > 0 {
  461. if err := t.udp.DeleteUdpForwarding(ctx, v1.DeleteUdpForwardingRequest{Ids: udpIds, HostId: limit.HostId,Uid: limit.Uid}); err != nil {
  462. allErrors = multierror.Append(allErrors, err)
  463. }
  464. }
  465. webIds, err := t.webForWardingRep.GetWebForwardingWafWebAllIds(ctx, limit.HostId)
  466. if err != nil {
  467. allErrors = multierror.Append(allErrors, err)
  468. } else if len(webIds) > 0 {
  469. if err := t.web.DeleteWebForwarding(ctx, v1.DeleteWebForwardingRequest{Ids: webIds, HostId: limit.HostId,Uid: limit.Uid}); err != nil {
  470. allErrors = multierror.Append(allErrors, err)
  471. }
  472. }
  473. // 重置防护
  474. err = t.zzyBgp.SetDefense(ctx, hostId, 10)
  475. if err != nil {
  476. return err
  477. }
  478. // 清除小防火墙带宽限制
  479. if err := t.buildAoDun.Bandwidth(ctx, hostId, "del"); err != nil {
  480. allErrors = multierror.Append(allErrors, err)
  481. }
  482. // 只有在上述所有步骤都没有出错的情况下,才执行最终的数据库更新和Redis标记
  483. if allErrors.ErrorOrNil() == nil {
  484. err := t.gatewayIpRep.CleanIPByHostId(ctx, []int64{hostId})
  485. if err != nil {
  486. allErrors = multierror.Append(allErrors, err)
  487. }
  488. // [CORRECTION] 幂等性保障:将此hostId标记为“已清理”,即添加到 `ExpiringSoonPlansList`
  489. if err := t.expiredRep.AddPlans(ctx, repository.ExpiringSoonPlansList, hostId); err != nil {
  490. allErrors = multierror.Append(allErrors, fmt.Errorf("将hostId %d标记为已清理失败: %w", hostId, err))
  491. }
  492. }
  493. return allErrors.ErrorOrNil()
  494. }
  495. // 5. RecoverStalePlan 恢复超过7天后才续费的套餐
  496. func (t *wafTask) RecoverStalePlan(ctx context.Context) error {
  497. taskName := "恢复长期到期套餐"
  498. staleLimits, err := t.findStaleWAFPlans(ctx)
  499. if err != nil {
  500. return t.wrapTaskError(taskName, "查找陈旧记录", err)
  501. }
  502. // [CORRECTION] 当恢复一个被清理过的套餐时,需要从“已清理列表”(`ExpiringSoonPlansList`)中移除它。
  503. return t._recoverPlans(ctx, staleLimits, taskName, repository.ExpiringSoonPlansList)
  504. }