waf.go 17 KB


  1. package task
  2. import (
  3. "context"
  4. "fmt"
  5. v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  7. "github.com/go-nunu/nunu-layout-advanced/internal/repository"
  8. "github.com/go-nunu/nunu-layout-advanced/internal/service"
  9. "github.com/hashicorp/go-multierror"
  10. "go.uber.org/zap"
  11. "sync"
  12. "time"
  13. )
  14. // WafTask 定义了WAF相关的五个独立定时任务接口
  15. type WafTask interface {
  16. // 1. 同步即将到期(1天内)的套餐时间
  17. SynchronizationTime(ctx context.Context) error
  18. // 2. 停止所有已到期的套餐
  19. StopPlan(ctx context.Context) error
  20. // 3. 恢复7天内续费的套餐
  21. RecoverRecentPlan(ctx context.Context) error
  22. // 4. 清理过期超过7天且仍未续费的记录
  23. CleanUpStaleRecords(ctx context.Context) error
  24. // 5. 恢复超过7天后才续费的套餐
  25. RecoverStalePlan(ctx context.Context) error
  26. }
  27. func NewWafTask(
  28. webForWardingRep repository.WebForwardingRepository,
  29. tcpforwardingRep repository.TcpforwardingRepository,
  30. udpForWardingRep repository.UdpForWardingRepository,
  31. cdn service.CdnService,
  32. hostRep repository.HostRepository,
  33. globalLimitRep repository.GlobalLimitRepository,
  34. expiredRep repository.ExpiredRepository,
  35. task *Task,
  36. gatewayGroupIpRep repository.GateWayGroupIpRepository,
  37. tcp service.TcpforwardingService,
  38. udp service.UdpForWardingService,
  39. web service.WebForwardingService,
  40. ) WafTask {
  41. return &wafTask{
  42. Task: task,
  43. webForWardingRep: webForWardingRep,
  44. tcpforwardingRep: tcpforwardingRep,
  45. udpForWardingRep: udpForWardingRep,
  46. cdn: cdn,
  47. hostRep: hostRep,
  48. globalLimitRep: globalLimitRep,
  49. expiredRep: expiredRep,
  50. gatewayGroupIpRep: gatewayGroupIpRep,
  51. tcp: tcp,
  52. udp: udp,
  53. web: web,
  54. }
  55. }
  56. type wafTask struct {
  57. *Task
  58. webForWardingRep repository.WebForwardingRepository
  59. tcpforwardingRep repository.TcpforwardingRepository
  60. udpForWardingRep repository.UdpForWardingRepository
  61. cdn service.CdnService
  62. hostRep repository.HostRepository
  63. globalLimitRep repository.GlobalLimitRepository
  64. expiredRep repository.ExpiredRepository
  65. gatewayGroupIpRep repository.GateWayGroupIpRepository
  66. tcp service.TcpforwardingService
  67. udp service.UdpForWardingService
  68. web service.WebForwardingService
  69. }
  70. const (
  71. // 1天对应的秒数
  72. OneDaysInSeconds = 1 * 24 * 60 * 60
  73. // 7天对应的秒数
  74. SevenDaysInSeconds = 7 * 24 * 60 * 60
  75. )
  76. // RenewalRequest 续费操作请求结构体
  77. type RenewalRequest struct {
  78. HostId int
  79. PlanId int
  80. ExpiredAt int64
  81. }
  82. // =================================================================
  83. // =================== 原始辅助函数 (Helpers) =====================
  84. // =================================================================
  85. // 获取cdn web id
  86. func (t wafTask) GetCdnWebId(ctx context.Context,hostId int) ([]int, error) {
  87. tcpIds, err := t.tcpforwardingRep.GetTcpForwardingAllIdsByID(ctx, hostId)
  88. if err != nil {
  89. return nil, err
  90. }
  91. udpIds, err := t.udpForWardingRep.GetUdpForwardingWafUdpAllIds(ctx, hostId)
  92. if err != nil {
  93. return nil, err
  94. }
  95. webIds, err := t.webForWardingRep.GetWebForwardingWafWebAllIds(ctx, hostId)
  96. if err != nil {
  97. return nil, err
  98. }
  99. var ids []int
  100. ids = append(ids, tcpIds...)
  101. ids = append(ids, udpIds...)
  102. ids = append(ids, webIds...)
  103. return ids, nil
  104. }
  105. // BanServer 启用/禁用 网站 (并发执行)
  106. func (t wafTask) BanServer(ctx context.Context, ids []int, isBan bool) error {
  107. if len(ids) == 0 { return nil }
  108. var wg sync.WaitGroup
  109. errChan := make(chan error, len(ids))
  110. wg.Add(len(ids))
  111. for _, id := range ids {
  112. go func(id int) {
  113. defer wg.Done()
  114. if err := t.cdn.EditWebIsOn(ctx, int64(id), isBan); err != nil {
  115. errChan <- err
  116. }
  117. }(id)
  118. }
  119. wg.Wait()
  120. close(errChan)
  121. var result error
  122. for err := range errChan {
  123. result = multierror.Append(result, err)
  124. }
  125. return result
  126. }
  127. // EditExpired 统一的续费操作入口
  128. func (t wafTask) EditExpired(ctx context.Context, reqs []RenewalRequest) error {
  129. if len(reqs) == 0 { return nil }
  130. var globalLimitUpdates []struct { hostId int; expiredAt int64 }
  131. var planRenewals []struct { planId int; expiredAt int64 }
  132. for _, req := range reqs {
  133. globalLimitUpdates = append(globalLimitUpdates, struct{ hostId int; expiredAt int64 }{req.HostId, req.ExpiredAt})
  134. planRenewals = append(planRenewals, struct{ planId int; expiredAt int64 }{req.PlanId, req.ExpiredAt})
  135. }
  136. var result *multierror.Error
  137. if err := t.editGlobalLimitState(ctx, globalLimitUpdates, true); err != nil {
  138. result = multierror.Append(result, err)
  139. }
  140. if err := t.renewCdnPlan(ctx, planRenewals); err != nil {
  141. result = multierror.Append(result, err)
  142. }
  143. return result.ErrorOrNil()
  144. }
  145. // editGlobalLimitState 内部函数,用于更新数据库中的状态和时间
  146. func (t wafTask) editGlobalLimitState(ctx context.Context, req []struct { hostId int; expiredAt int64 }, state bool) error {
  147. var result *multierror.Error
  148. for _, v := range req {
  149. err := t.globalLimitRep.UpdateGlobalLimitByHostId(ctx, &model.GlobalLimit{HostId: v.hostId, ExpiredAt: v.expiredAt, State: state})
  150. if err != nil { result = multierror.Append(result, err) }
  151. }
  152. return result.ErrorOrNil()
  153. }
  154. // renewCdnPlan 内部函数,用于调用CDN服务进行续费
  155. func (t wafTask) renewCdnPlan(ctx context.Context, req []struct { planId int; expiredAt int64 }) error {
  156. var result *multierror.Error
  157. for _, v := range req {
  158. err := t.cdn.RenewPlan(ctx, v1.RenewalPlan{
  159. UserPlanId: int64(v.planId), IsFree: true, DayTo: time.Unix(v.expiredAt, 0).Format("2006-01-02"),
  160. Period: "monthly", CountPeriod: 1, PeriodDayTo: time.Unix(v.expiredAt, 0).Format("2006-01-02"),
  161. })
  162. if err != nil { result = multierror.Append(result, err) }
  163. }
  164. return result.ErrorOrNil()
  165. }
  166. // =================================================================
  167. // =================== 1. 数据查找层 (Finders) =====================
  168. // =================================================================
  169. // findMismatchedExpirations 检查 WAF 和 Host 的到期时间差异。这是决策的核心。
  170. func (t *wafTask) findMismatchedExpirations(ctx context.Context, wafLimits []model.GlobalLimit) ([]RenewalRequest, error) {
  171. if len(wafLimits) == 0 { return nil, nil }
  172. wafExpiredMap := make(map[int]int64, len(wafLimits))
  173. wafPlanMap := make(map[int]int, len(wafLimits))
  174. var hostIds []int
  175. for _, limit := range wafLimits {
  176. hostIds = append(hostIds, limit.HostId)
  177. wafExpiredMap[limit.HostId] = limit.ExpiredAt
  178. wafPlanMap[limit.HostId] = limit.RuleId
  179. }
  180. hostExpirations, err := t.hostRep.GetExpireTimeByHostId(ctx, hostIds)
  181. if err != nil { return nil, fmt.Errorf("获取主机到期时间失败: %w", err) }
  182. hostExpiredMap := make(map[int]int64, len(hostExpirations))
  183. for _, h := range hostExpirations { hostExpiredMap[h.HostId] = h.ExpiredAt }
  184. var renewalRequests []RenewalRequest
  185. for hostId, wafExpiredTime := range wafExpiredMap {
  186. hostTime, ok := hostExpiredMap[hostId]
  187. if !ok || hostTime != wafExpiredTime {
  188. planId, planOk := wafPlanMap[hostId]
  189. if !planOk {
  190. t.logger.Warn("数据不一致:在waf_limits中找不到hostId对应的套餐ID", zap.Int("hostId", hostId))
  191. continue
  192. }
  193. renewalRequests = append(renewalRequests, RenewalRequest{HostId: hostId, ExpiredAt: hostTime, PlanId: planId})
  194. }
  195. }
  196. return renewalRequests, nil
  197. }
  198. // findAllCurrentlyExpiredPlans 查找所有当前时间点已经到期的WAF记录。
  199. func (t *wafTask) findAllCurrentlyExpiredPlans(ctx context.Context) ([]model.GlobalLimit, error) {
  200. return t.globalLimitRep.GetGlobalLimitAlmostExpired(ctx, 0)
  201. }
  202. // findRecentlyExpiredPlans (精确查找) 查找在过去7天内到期的WAF记录。
  203. func (t *wafTask) findRecentlyExpiredPlans(ctx context.Context) ([]model.GlobalLimit, error) {
  204. sevenDaysAgo := time.Now().Add(-7 * 24 * time.Hour).Unix()
  205. now := time.Now().Unix()
  206. return t.globalLimitRep.GetGlobalLimitsByExpirationRange(ctx, sevenDaysAgo, now)
  207. }
  208. // findStaleExpiredPlans (精确查找) 查找7天前或更早就已到期的WAF记录。
  209. func (t *wafTask) findStaleExpiredPlans(ctx context.Context) ([]model.GlobalLimit, error) {
  210. sevenDaysAgoOffset := int64(-1 * SevenDaysInSeconds)
  211. return t.globalLimitRep.GetGlobalLimitAlmostExpired(ctx, sevenDaysAgoOffset)
  212. }
  213. // =================================================================
  214. // =================== 2. 业务决策层 (Filters) =====================
  215. // =================================================================
  216. // filterCleanablePlans (精确决策) 从长期过期的列表中,筛选出确认未续费且需要被清理的记录。
  217. func (t *wafTask) filterCleanablePlans(ctx context.Context, staleLimits []model.GlobalLimit) ([]model.GlobalLimit, error) {
  218. renewedStalePlans, err := t.findMismatchedExpirations(ctx, staleLimits)
  219. if err != nil {
  220. return nil, fmt.Errorf("决策[清理]: 检查续费状态失败: %w", err)
  221. }
  222. renewedHostIds := make(map[int]struct{}, len(renewedStalePlans))
  223. for _, req := range renewedStalePlans {
  224. renewedHostIds[req.HostId] = struct{}{}
  225. }
  226. var plansToClean []model.GlobalLimit
  227. for _, limit := range staleLimits {
  228. if _, found := renewedHostIds[limit.HostId]; !found {
  229. plansToClean = append(plansToClean, limit)
  230. }
  231. }
  232. return plansToClean, nil
  233. }
  234. // =================================================================
  235. // ============== 3. 业务执行层 (Executors & Public API) =============
  236. // =================================================================
  237. // executePlanRecovery (可重用) 负责恢复套餐的所有步骤。
  238. func (t *wafTask) executePlanRecovery(ctx context.Context, renewalRequests []RenewalRequest, taskName string,key repository.PlanListType) error {
  239. t.logger.Info(fmt.Sprintf("开始执行[%s]套餐恢复流程", taskName), zap.Int("数量", len(renewalRequests)))
  240. var hostIds []int
  241. for _, req := range renewalRequests {
  242. hostIds = append(hostIds, req.HostId)
  243. }
  244. var allErrors *multierror.Error
  245. for _, v := range renewalRequests {
  246. webIds, err := t.GetCdnWebId(ctx, v.HostId)
  247. if err != nil {
  248. allErrors = multierror.Append(allErrors, fmt.Errorf("执行[%s]-获取webId失败: %w", taskName, err))
  249. }
  250. if err := t.BanServer(ctx, webIds, true); err != nil {
  251. allErrors = multierror.Append(allErrors, fmt.Errorf("执行[%s]-封禁webId失败: %w", taskName, err))
  252. }
  253. }
  254. if err := t.EditExpired(ctx, renewalRequests); err != nil {
  255. allErrors = multierror.Append(allErrors, fmt.Errorf("执行[%s]-同步续费信息失败: %w", taskName, err))
  256. }
  257. planIdsToRecover := make([]int64, len(hostIds))
  258. for i, id := range hostIds { planIdsToRecover[i] = int64(id) }
  259. if err := t.expiredRep.RemovePlans(ctx, key, planIdsToRecover...); err != nil {
  260. allErrors = multierror.Append(allErrors, fmt.Errorf("执行[%s]-移除Redis关闭标记失败: %w", taskName, err))
  261. }
  262. return allErrors.ErrorOrNil()
  263. }
  264. // 1. SynchronizationTime 同步即将到期(1天内)的套餐时间
  265. func (t *wafTask) SynchronizationTime(ctx context.Context) error {
  266. wafLimits, err := t.globalLimitRep.GetGlobalLimitAlmostExpired(ctx, OneDaysInSeconds)
  267. if err != nil { return fmt.Errorf("执行[同步]-查找失败: %w", err) }
  268. renewalRequests, err := t.findMismatchedExpirations(ctx, wafLimits)
  269. if err != nil { return fmt.Errorf("执行[同步]-决策失败: %w", err) }
  270. if len(renewalRequests) > 0 {
  271. t.logger.Info("发现记录需要同步到期时间。", zap.Int("数量", len(renewalRequests)))
  272. return t.EditExpired(ctx, renewalRequests)
  273. }
  274. return nil
  275. }
  276. // 2. StopPlan (已优化) 停止所有已到期的套餐
  277. func (t *wafTask) StopPlan(ctx context.Context) error {
  278. // 1. 查找所有理论上已到期的记录
  279. expiredLimits, err := t.findAllCurrentlyExpiredPlans(ctx)
  280. if err != nil { return fmt.Errorf("执行[停止]-查找失败: %w", err) }
  281. if len(expiredLimits) == 0 { return nil }
  282. // 2. 决策 - 第1步:检查这些记录中是否已有续费但未同步的
  283. renewalRequests, err := t.findMismatchedExpirations(ctx, expiredLimits)
  284. if err != nil { return fmt.Errorf("执行[停止]-决策检查续费失败: %w", err) }
  285. renewedHostIds := make(map[int]struct{}, len(renewalRequests))
  286. for _, req := range renewalRequests {
  287. renewedHostIds[req.HostId] = struct{}{}
  288. }
  289. // 2. 决策 - 第2步:筛选出真正需要停止的记录
  290. var plansToClose []model.GlobalLimit
  291. for _, limit := range expiredLimits {
  292. if _, found := renewedHostIds[limit.HostId]; found {
  293. t.logger.Info("发现已到期但刚续费的套餐,跳过停止操作", zap.Int("hostId", limit.HostId))
  294. continue
  295. }
  296. isClosed, err := t.expiredRep.IsPlanInList(ctx, repository.ClosedPlansList, int64(limit.HostId))
  297. if err != nil {
  298. t.logger.Error("决策[停止]:检查套餐是否已关闭失败", zap.Int("hostId", limit.HostId), zap.Error(err))
  299. continue
  300. }
  301. if !isClosed {
  302. plansToClose = append(plansToClose, limit)
  303. }
  304. }
  305. if len(plansToClose) == 0 {
  306. t.logger.Info("没有需要停止的套餐(可能均已续费或已关闭)")
  307. return nil
  308. }
  309. // 3. 执行停止操作
  310. t.logger.Info("开始关闭到期的WAF服务", zap.Int("数量", len(plansToClose)))
  311. var hostIds []int
  312. for _, limit := range plansToClose {
  313. hostIds = append(hostIds, limit.HostId)
  314. }
  315. for _, hostId := range hostIds {
  316. webIds, err := t.GetCdnWebId(ctx, hostId)
  317. if err != nil { return fmt.Errorf("执行[停止]-获取cdn_web_id失败: %w", err) }
  318. if err := t.BanServer(ctx, webIds, false); err != nil {
  319. return fmt.Errorf("执行[停止]-禁用服务失败: %w", err)
  320. }
  321. }
  322. closedPlanIds := make([]int64, len(hostIds))
  323. for i, id := range hostIds { closedPlanIds[i] = int64(id) }
  324. if err := t.expiredRep.AddPlans(ctx, repository.ClosedPlansList, closedPlanIds...); err != nil {
  325. return fmt.Errorf("执行[停止]-标记为已关闭失败: %w", err)
  326. }
  327. return nil
  328. }
  329. // 3. RecoverRecentPlan 恢复7天内续费的套餐
  330. func (t *wafTask) RecoverRecentPlan(ctx context.Context) error {
  331. recentlyExpiredLimits, err := t.findRecentlyExpiredPlans(ctx)
  332. if err != nil { return fmt.Errorf("执行[近期恢复]-查找失败: %w", err) }
  333. if len(recentlyExpiredLimits) == 0 { return nil }
  334. renewalRequests, err := t.findMismatchedExpirations(ctx, recentlyExpiredLimits)
  335. if err != nil { return fmt.Errorf("执行[近期恢复]-决策失败: %w", err) }
  336. if len(renewalRequests) == 0 {
  337. t.logger.Info("在近期过期的套餐中,没有发现已续费的")
  338. return nil
  339. }
  340. return t.executePlanRecovery(ctx, renewalRequests, "近期恢复",repository.ClosedPlansList)
  341. }
  342. // 4. CleanUpStaleRecords 清理过期超过7天且仍未续费的记录
  343. func (t *wafTask) CleanUpStaleRecords(ctx context.Context) error {
  344. staleLimits, err := t.findStaleExpiredPlans(ctx)
  345. if err != nil { return fmt.Errorf("执行[清理]-查找失败: %w", err) }
  346. if len(staleLimits) == 0 { return nil }
  347. plansToClean, err := t.filterCleanablePlans(ctx, staleLimits)
  348. if err != nil { return fmt.Errorf("执行[清理]-决策失败: %w", err) }
  349. if len(plansToClean) == 0 {
  350. t.logger.Info("没有长期未续费的记录需要清理")
  351. return nil
  352. }
  353. t.logger.Info("开始清理长期未续费的WAF记录", zap.Int("数量", len(plansToClean)))
  354. var planIdsToClean []int64
  355. for _, limit := range plansToClean {
  356. planIdsToClean = append(planIdsToClean, int64(limit.HostId))
  357. }
  358. if err := t.expiredRep.RemovePlans(ctx, repository.ClosedPlansList, planIdsToClean...); err != nil {
  359. return fmt.Errorf("执行[清理]-从Redis移除关闭标记失败: %w", err)
  360. }
  361. if err := t.expiredRep.AddPlans(ctx, repository.ExpiringSoonPlansList, planIdsToClean...); err != nil {
  362. return fmt.Errorf("执行[清理]-从Redis移除过期标记失败: %w", err)
  363. }
  364. // 在这里可以添加从数据库删除或调用CDN API彻底删除的逻辑
  365. for _, limit := range plansToClean {
  366. err = t.globalLimitRep.UpdateGlobalLimitByHostId(ctx, &model.GlobalLimit{
  367. HostId: limit.HostId,
  368. GatewayGroupId: limit.GatewayGroupId,
  369. State: true,
  370. })
  371. if err != nil {
  372. return fmt.Errorf("执行[清理]-更新套餐状态失败: %w", err)
  373. }
  374. tcpIds, err := t.tcpforwardingRep.GetTcpForwardingAllIdsByID(ctx, limit.HostId)
  375. if err != nil {
  376. return err
  377. }
  378. udpIds, err := t.udpForWardingRep.GetUdpForwardingWafUdpAllIds(ctx, limit.HostId)
  379. if err != nil {
  380. return err
  381. }
  382. webIds, err := t.webForWardingRep.GetWebForwardingWafWebAllIds(ctx, limit.HostId)
  383. if err != nil {
  384. return err
  385. }
  386. err = t.tcp.DeleteTcpForwarding(ctx, v1.DeleteTcpForwardingRequest{
  387. Ids: tcpIds,
  388. Uid: 0,
  389. HostId: limit.HostId,
  390. })
  391. if err != nil {
  392. return err
  393. }
  394. err = t.udp.DeleteUdpForwarding(ctx, udpIds)
  395. if err != nil {
  396. return err
  397. }
  398. err = t.web.DeleteWebForwarding(ctx, webIds)
  399. if err != nil {
  400. return err
  401. }
  402. }
  403. return nil
  404. }
  405. // 5. RecoverStalePlan 恢复超过7天后才续费的套餐
  406. func (t *wafTask) RecoverStalePlan(ctx context.Context) error {
  407. staleLimits, err := t.findStaleExpiredPlans(ctx)
  408. if err != nil { return fmt.Errorf("执行[长期恢复]-查找失败: %w", err) }
  409. if len(staleLimits) == 0 { return nil }
  410. renewalRequests, err := t.findMismatchedExpirations(ctx, staleLimits)
  411. if err != nil { return fmt.Errorf("执行[长期恢复]-决策失败: %w", err) }
  412. if len(renewalRequests) == 0 {
  413. t.logger.Info("在长期过期的套餐中,没有发现已续费的")
  414. return nil
  415. }
  416. return t.executePlanRecovery(ctx, renewalRequests, "长期恢复",repository.ExpiringSoonPlansList)
  417. }