wafoperations.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. package admin
  2. import (
  3. "context"
  4. "fmt"
  5. v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  7. "github.com/go-nunu/nunu-layout-advanced/internal/repository"
  8. waf2 "github.com/go-nunu/nunu-layout-advanced/internal/repository/api/waf"
  9. "github.com/go-nunu/nunu-layout-advanced/internal/service"
  10. "github.com/go-nunu/nunu-layout-advanced/internal/service/api/flexCdn"
  11. "github.com/go-nunu/nunu-layout-advanced/internal/service/api/waf"
  12. "github.com/hashicorp/go-multierror"
  13. "go.uber.org/zap"
  14. "sync"
  15. "time"
  16. )
  17. // WafOperationsService WAF通用操作服务接口
  18. type WafOperationsService interface {
  19. // 清理单个套餐的所有相关资源
  20. CleanupPlan(ctx context.Context, limit model.GlobalLimit) error
  21. // 批量恢复套餐服务
  22. RecoverPlans(ctx context.Context, limits []model.GlobalLimit, redisListKey repository.PlanListType) error
  23. // 获取主机关联的所有转发规则ID
  24. GetForwardingRuleIds(ctx context.Context, hostIds []int) ([]int, error)
  25. // 批量设置CDN网站状态
  26. SetCdnWebsitesState(ctx context.Context, ids []int, enable bool) error
  27. // 执行续费操作
  28. ExecuteRenewalActions(ctx context.Context, reqs []RenewalRequest) error
  29. }
  30. type RenewalRequest struct {
  31. HostId int
  32. ExpiredAt int64
  33. }
  34. func NewWafOperationsService(
  35. service *service.Service,
  36. webForWardingRep waf2.WebForwardingRepository,
  37. tcpforwardingRep waf2.TcpforwardingRepository,
  38. udpForWardingRep waf2.UdpForWardingRepository,
  39. cdn flexCdn.CdnService,
  40. hostRep repository.HostRepository,
  41. globalLimitRep waf2.GlobalLimitRepository,
  42. expiredRep repository.ExpiredRepository,
  43. gatewayIpRep waf2.GatewayipRepository,
  44. tcp waf.TcpforwardingService,
  45. udp waf.UdpForWardingService,
  46. web waf.WebForwardingService,
  47. buildAoDun waf.BuildAudunService,
  48. zzyBgp waf.ZzybgpService,
  49. ) WafOperationsService {
  50. return &wafOperationsService{
  51. Service: service,
  52. webForWardingRep: webForWardingRep,
  53. tcpforwardingRep: tcpforwardingRep,
  54. udpForWardingRep: udpForWardingRep,
  55. cdn: cdn,
  56. hostRep: hostRep,
  57. globalLimitRep: globalLimitRep,
  58. expiredRep: expiredRep,
  59. gatewayIpRep: gatewayIpRep,
  60. tcp: tcp,
  61. udp: udp,
  62. web: web,
  63. buildAoDun: buildAoDun,
  64. zzyBgp: zzyBgp,
  65. }
  66. }
  67. type wafOperationsService struct {
  68. *service.Service
  69. webForWardingRep waf2.WebForwardingRepository
  70. tcpforwardingRep waf2.TcpforwardingRepository
  71. udpForWardingRep waf2.UdpForWardingRepository
  72. cdn flexCdn.CdnService
  73. hostRep repository.HostRepository
  74. globalLimitRep waf2.GlobalLimitRepository
  75. expiredRep repository.ExpiredRepository
  76. gatewayIpRep waf2.GatewayipRepository
  77. tcp waf.TcpforwardingService
  78. udp waf.UdpForWardingService
  79. web waf.WebForwardingService
  80. buildAoDun waf.BuildAudunService
  81. zzyBgp waf.ZzybgpService
  82. }
  83. // GetForwardingRuleIds 获取主机关联的所有转发规则ID
  84. func (s *wafOperationsService) GetForwardingRuleIds(ctx context.Context, hostIds []int) ([]int, error) {
  85. if len(hostIds) == 0 {
  86. return nil, nil
  87. }
  88. var ids []int
  89. var result *multierror.Error
  90. // 获取TCP转发规则ID
  91. tcpIds, err := s.tcpforwardingRep.GetTcpAll(ctx, hostIds)
  92. if err != nil {
  93. result = multierror.Append(result, fmt.Errorf("获取TCP转发规则失败: %w", err))
  94. }
  95. ids = append(ids, tcpIds...)
  96. // 获取UDP转发规则ID
  97. udpIds, err := s.udpForWardingRep.GetUdpAll(ctx, hostIds)
  98. if err != nil {
  99. result = multierror.Append(result, fmt.Errorf("获取UDP转发规则失败: %w", err))
  100. }
  101. ids = append(ids, udpIds...)
  102. // 获取Web转发规则ID
  103. webIds, err := s.webForWardingRep.GetWebAll(ctx, hostIds)
  104. if err != nil {
  105. result = multierror.Append(result, fmt.Errorf("获取Web转发规则失败: %w", err))
  106. }
  107. ids = append(ids, webIds...)
  108. return ids, result.ErrorOrNil()
  109. }
  110. // SetCdnWebsitesState 批量设置CDN网站状态(并发执行)
  111. func (s *wafOperationsService) SetCdnWebsitesState(ctx context.Context, ids []int, enable bool) error {
  112. if len(ids) == 0 {
  113. return nil
  114. }
  115. var wg sync.WaitGroup
  116. errChan := make(chan error, len(ids))
  117. wg.Add(len(ids))
  118. for _, id := range ids {
  119. go func(id int) {
  120. defer wg.Done()
  121. // cdn.EditWebIsOn 的第二个参数 enable: true=启用, false=禁用
  122. if err := s.cdn.EditWebIsOn(ctx, int64(id), enable); err != nil {
  123. errChan <- fmt.Errorf("设置CDN网站状态失败(ID:%d): %w", id, err)
  124. }
  125. }(id)
  126. }
  127. wg.Wait()
  128. close(errChan)
  129. var result *multierror.Error
  130. for err := range errChan {
  131. result = multierror.Append(result, err)
  132. }
  133. return result.ErrorOrNil()
  134. }
  135. // ExecuteRenewalActions 执行续费操作,包括更新DB和调用CDN API
  136. // 该方法并发更新数据库中的套餐状态,将到期时间和状态同步到GlobalLimit表
  137. // 主要用于套餐续费后,需要将最新的到期时间从主机表同步到WAF套餐表
  138. //
  139. // 执行流程:
  140. // 1. 参数校验,如果没有续费请求则直接返回
  141. // 2. 创建goroutine池,每个续费请求对应一个goroutine
  142. // 3. 并发调用数据库更新操作,提高处理效率
  143. // 4. 使用互斥锁保护错误收集,避免并发写入冲突
  144. // 5. 等待所有更新操作完成,返回聚合的错误信息
  145. //
  146. // 并发安全:
  147. // - 使用sync.Mutex保护共享的错误收集器
  148. // - 每个goroutine独立处理一个续费请求,避免数据竞争
  149. // - 使用WaitGroup确保所有操作完成后才返回
  150. //
  151. // 参数:
  152. // - ctx: 上下文对象,用于控制请求生命周期和传递trace信息
  153. // - reqs: 续费请求列表,包含HostId和新的到期时间
  154. //
  155. // 返回:
  156. // - error: 更新过程中的任何错误,如果部分失败会包含所有失败的详细信息
  157. func (s *wafOperationsService) ExecuteRenewalActions(ctx context.Context, reqs []RenewalRequest) error {
  158. // 参数校验:如果没有续费请求,直接返回成功
  159. if len(reqs) == 0 {
  160. return nil
  161. }
  162. // 并发控制和错误收集初始化
  163. var allErrors *multierror.Error
  164. var wg sync.WaitGroup
  165. var mu sync.Mutex // 保护allErrors的并发写入
  166. wg.Add(len(reqs))
  167. // 为每个续费请求创建一个goroutine进行并发处理
  168. for _, req := range reqs {
  169. go func(r RenewalRequest) {
  170. defer wg.Done()
  171. // 更新数据库中的套餐状态
  172. // 将State设置为true表示套餐处于激活状态
  173. // ExpiredAt更新为最新的到期时间
  174. err := s.globalLimitRep.UpdateGlobalLimitByHostId(ctx, &model.GlobalLimit{
  175. HostId: r.HostId, // 主机ID,用于定位具体的套餐
  176. ExpiredAt: r.ExpiredAt, // 新的到期时间戳
  177. State: true, // 激活状态,表示套餐可用
  178. })
  179. if err != nil {
  180. // 线程安全的错误收集
  181. mu.Lock()
  182. allErrors = multierror.Append(allErrors, fmt.Errorf("更新主机%d续费状态失败: %w", r.HostId, err))
  183. mu.Unlock()
  184. }
  185. }(req)
  186. }
  187. // 等待所有更新操作完成
  188. wg.Wait()
  189. return allErrors.ErrorOrNil()
  190. }
  191. // CleanupPlan 清理单个套餐的所有相关资源
  192. // 该方法执行套餐过期后的完整清理流程,包括删除转发规则、重置防护设置、清理网络配置等
  193. // 这是一个复合操作,涉及多个子系统的协调,确保套餐相关的所有资源都被正确清理
  194. //
  195. // 清理步骤(按执行顺序):
  196. // 1. 从Redis "停止列表" 中移除该套餐(因为即将转移到 "已清理列表")
  197. // 2. 删除TCP转发规则 - 清理所有TCP端口转发配置
  198. // 3. 删除UDP转发规则 - 清理所有UDP端口转发配置
  199. // 4. 删除Web转发规则 - 清理所有HTTP/HTTPS转发配置
  200. // 5. 重置BGP防护设置 - 将防护等级重置为默认值(10)
  201. // 6. 清除带宽限制 - 移除小防火墙的带宽限制配置
  202. // 7. 清理网关IP配置 - 删除该主机关联的所有网关IP
  203. // 8. 将套餐标记为"已清理" - 添加到Redis "已清理列表"
  204. //
  205. // 错误处理策略:
  206. // - 使用multierror收集所有步骤的错误,不会因单个步骤失败而中断整个流程
  207. // - 只有在前面所有步骤都成功的情况下,才执行最终的网关IP清理和Redis标记
  208. // - 记录详细的日志信息,便于问题排查和监控
  209. //
  210. // 幂等性保证:
  211. // - 该方法可以安全地重复调用,不会产生副作用
  212. // - 如果某个资源已经被清理,相关操作会优雅地处理
  213. //
  214. // 参数:
  215. // - ctx: 上下文对象,用于控制请求生命周期和传递trace信息
  216. // - limit: 需要清理的套餐信息,包含HostId、Uid等关键字段
  217. //
  218. // 返回:
  219. // - error: 清理过程中的任何错误,使用multierror聚合多个错误
  220. func (s *wafOperationsService) CleanupPlan(ctx context.Context, limit model.GlobalLimit) error {
  221. var allErrors *multierror.Error
  222. hostId := int64(limit.HostId)
  223. // 记录清理开始的日志,便于监控和调试
  224. s.Logger.Info("开始清理套餐资源",
  225. zap.Int("hostId", limit.HostId),
  226. zap.Int("uid", limit.Uid),
  227. zap.String("operation", "cleanup_plan"))
  228. // 步骤1: 从Redis "停止列表" 中移除该套餐
  229. // 这是状态转换的第一步,表示套餐即将从 "已停止" 状态转换到 "已清理" 状态
  230. if err := s.expiredRep.RemovePlans(ctx, repository.ClosedPlansList, hostId); err != nil {
  231. allErrors = multierror.Append(allErrors, fmt.Errorf("从停止列表移除失败: %w", err))
  232. s.Logger.Warn("从停止列表移除失败", zap.Int64("hostId", hostId), zap.Error(err))
  233. }
  234. // 步骤2: 删除TCP转发规则
  235. // TCP转发规则通常用于游戏服务器、数据库等需要TCP连接的服务
  236. // 需要先获取所有关联的TCP规则ID,然后批量删除
  237. tcpIds, err := s.tcpforwardingRep.GetTcpForwardingAllIdsByID(ctx, limit.HostId)
  238. if err != nil {
  239. allErrors = multierror.Append(allErrors, fmt.Errorf("获取TCP转发规则失败: %w", err))
  240. s.Logger.Warn("获取TCP转发规则失败", zap.Int("hostId", limit.HostId), zap.Error(err))
  241. } else if len(tcpIds) > 0 {
  242. s.Logger.Info("开始删除TCP转发规则", zap.Int("hostId", limit.HostId), zap.Int("count", len(tcpIds)))
  243. if err := s.tcp.DeleteTcpForwarding(ctx, v1.DeleteTcpForwardingRequest{
  244. Ids: tcpIds, // 需要删除的TCP规则ID列表
  245. HostId: limit.HostId, // 主机ID,用于权限验证
  246. Uid: limit.Uid, // 用户ID,用于权限验证
  247. }); err != nil {
  248. allErrors = multierror.Append(allErrors, fmt.Errorf("删除TCP转发规则失败: %w", err))
  249. s.Logger.Error("删除TCP转发规则失败", zap.Int("hostId", limit.HostId), zap.Error(err))
  250. } else {
  251. s.Logger.Info("成功删除TCP转发规则", zap.Int("hostId", limit.HostId), zap.Int("count", len(tcpIds)))
  252. }
  253. } else {
  254. s.Logger.Debug("该主机没有TCP转发规则需要删除", zap.Int("hostId", limit.HostId))
  255. }
  256. // 步骤3: 删除UDP转发规则
  257. // UDP转发规则通常用于游戏服务器、DNS服务等需要UDP连接的服务
  258. // UDP协议的特点是无连接,但在防护场景下同样需要转发规则
  259. udpIds, err := s.udpForWardingRep.GetUdpForwardingWafUdpAllIds(ctx, limit.HostId)
  260. if err != nil {
  261. allErrors = multierror.Append(allErrors, fmt.Errorf("获取UDP转发规则失败: %w", err))
  262. s.Logger.Warn("获取UDP转发规则失败", zap.Int("hostId", limit.HostId), zap.Error(err))
  263. } else if len(udpIds) > 0 {
  264. s.Logger.Info("开始删除UDP转发规则", zap.Int("hostId", limit.HostId), zap.Int("count", len(udpIds)))
  265. if err := s.udp.DeleteUdpForwarding(ctx, v1.DeleteUdpForwardingRequest{
  266. Ids: udpIds, // 需要删除的UDP规则ID列表
  267. HostId: limit.HostId, // 主机ID,用于权限验证
  268. Uid: limit.Uid, // 用户ID,用于权限验证
  269. }); err != nil {
  270. allErrors = multierror.Append(allErrors, fmt.Errorf("删除UDP转发规则失败: %w", err))
  271. s.Logger.Error("删除UDP转发规则失败", zap.Int("hostId", limit.HostId), zap.Error(err))
  272. } else {
  273. s.Logger.Info("成功删除UDP转发规则", zap.Int("hostId", limit.HostId), zap.Int("count", len(udpIds)))
  274. }
  275. } else {
  276. s.Logger.Debug("该主机没有UDP转发规则需要删除", zap.Int("hostId", limit.HostId))
  277. }
  278. // 步骤4: 删除Web转发规则
  279. // Web转发规则用于HTTP/HTTPS网站服务,是最常见的转发类型
  280. // 包括域名解析、SSL证书、负载均衡等复杂配置
  281. webIds, err := s.webForWardingRep.GetWebForwardingWafWebAllIds(ctx, limit.HostId)
  282. if err != nil {
  283. allErrors = multierror.Append(allErrors, fmt.Errorf("获取Web转发规则失败: %w", err))
  284. s.Logger.Warn("获取Web转发规则失败", zap.Int("hostId", limit.HostId), zap.Error(err))
  285. } else if len(webIds) > 0 {
  286. s.Logger.Info("开始删除Web转发规则", zap.Int("hostId", limit.HostId), zap.Int("count", len(webIds)))
  287. if err := s.web.DeleteWebForwarding(ctx, v1.DeleteWebForwardingRequest{
  288. Ids: webIds, // 需要删除的Web规则ID列表
  289. HostId: limit.HostId, // 主机ID,用于权限验证
  290. Uid: limit.Uid, // 用户ID,用于权限验证
  291. }); err != nil {
  292. allErrors = multierror.Append(allErrors, fmt.Errorf("删除Web转发规则失败: %w", err))
  293. s.Logger.Error("删除Web转发规则失败", zap.Int("hostId", limit.HostId), zap.Error(err))
  294. } else {
  295. s.Logger.Info("成功删除Web转发规则", zap.Int("hostId", limit.HostId), zap.Int("count", len(webIds)))
  296. }
  297. } else {
  298. s.Logger.Debug("该主机没有Web转发规则需要删除", zap.Int("hostId", limit.HostId))
  299. }
  300. // 步骤5: 重置BGP防护设置
  301. // 将防护等级重置为默认值(10),这通常是最低的防护级别
  302. // BGP防护是网络层面的DDoS防护,重置后将停止高级防护功能
  303. s.Logger.Info("开始重置BGP防护设置", zap.Int64("hostId", hostId), zap.Int("defenseLevel", 10))
  304. if err := s.zzyBgp.SetDefense(ctx, hostId, 10); err != nil {
  305. allErrors = multierror.Append(allErrors, fmt.Errorf("重置BGP防护设置失败: %w", err))
  306. s.Logger.Error("重置BGP防护设置失败", zap.Int64("hostId", hostId), zap.Error(err))
  307. } else {
  308. s.Logger.Info("成功重置BGP防护设置", zap.Int64("hostId", hostId))
  309. }
  310. // 步骤6: 清除小防火墙带宽限制
  311. // 移除奥盾防护系统中设置的带宽限制配置
  312. // "del" 操作表示删除该主机的所有带宽限制规则
  313. s.Logger.Info("开始清除小防火墙带宽限制", zap.Int64("hostId", hostId))
  314. if err := s.buildAoDun.Bandwidth(ctx, hostId, "del"); err != nil {
  315. allErrors = multierror.Append(allErrors, fmt.Errorf("清除带宽限制失败: %w", err))
  316. s.Logger.Error("清除带宽限制失败", zap.Int64("hostId", hostId), zap.Error(err))
  317. } else {
  318. s.Logger.Info("成功清除小防火墙带宽限制", zap.Int64("hostId", hostId))
  319. }
  320. // 步骤7: 执行最终清理操作(仅在前面步骤都成功时执行)
  321. // 这是一个关键的设计决策:只有在所有资源清理都成功的情况下,
  322. // 才执行网关IP清理和状态标记,确保数据一致性
  323. if allErrors.ErrorOrNil() == nil {
  324. s.Logger.Info("前置清理步骤全部成功,开始执行最终清理操作", zap.Int64("hostId", hostId))
  325. // 步骤7a: 清理网关IP配置
  326. // 删除该主机在网关系统中的所有IP配置,断开网络连接
  327. if err := s.gatewayIpRep.CleanIPByHostId(ctx, []int64{hostId}); err != nil {
  328. allErrors = multierror.Append(allErrors, fmt.Errorf("清理网关IP失败: %w", err))
  329. s.Logger.Error("清理网关IP失败", zap.Int64("hostId", hostId), zap.Error(err))
  330. } else {
  331. s.Logger.Info("成功清理网关IP", zap.Int64("hostId", hostId))
  332. }
  333. // 步骤7b: 将套餐标记为"已清理"状态
  334. // 添加到ExpiringSoonPlansList(已清理列表),表示清理流程完成
  335. // 这个标记用于防止重复清理和状态跟踪
  336. if err := s.expiredRep.AddPlans(ctx, repository.ExpiringSoonPlansList, hostId); err != nil {
  337. allErrors = multierror.Append(allErrors, fmt.Errorf("标记为已清理失败: %w", err))
  338. s.Logger.Error("标记为已清理失败", zap.Int64("hostId", hostId), zap.Error(err))
  339. } else {
  340. s.Logger.Info("成功标记套餐为已清理状态", zap.Int64("hostId", hostId))
  341. }
  342. } else {
  343. // 如果前面的步骤有失败,记录警告日志,不执行最终清理
  344. s.Logger.Warn("由于前置清理步骤存在错误,跳过最终清理操作",
  345. zap.Int64("hostId", hostId),
  346. zap.Error(allErrors.ErrorOrNil()))
  347. }
  348. // 记录最终的清理结果
  349. if allErrors.ErrorOrNil() != nil {
  350. s.Logger.Error("清理套餐资源失败",
  351. zap.Int("hostId", limit.HostId),
  352. zap.Int("uid", limit.Uid),
  353. zap.Error(allErrors.ErrorOrNil()))
  354. } else {
  355. s.Logger.Info("成功清理套餐资源",
  356. zap.Int("hostId", limit.HostId),
  357. zap.Int("uid", limit.Uid),
  358. zap.String("status", "completed"))
  359. }
  360. return allErrors.ErrorOrNil()
  361. }
  362. // RecoverPlans 批量恢复套餐服务
  363. func (s *wafOperationsService) RecoverPlans(ctx context.Context, limits []model.GlobalLimit, redisListKey repository.PlanListType) error {
  364. if len(limits) == 0 {
  365. return nil
  366. }
  367. // 1. 检查哪些套餐需要恢复(已续费且未过期)
  368. var hostIdsToCheck []int64
  369. for _, limit := range limits {
  370. hostIdsToCheck = append(hostIdsToCheck, int64(limit.HostId))
  371. }
  372. // 2. 获取最新的主机到期时间
  373. hostExpirations, err := s.hostRep.GetExpireTimeByHostId(ctx, hostIdsToCheck)
  374. if err != nil {
  375. return fmt.Errorf("获取主机到期时间失败: %w", err)
  376. }
  377. hostExpiredMap := make(map[int]int64, len(hostExpirations))
  378. for _, h := range hostExpirations {
  379. hostExpiredMap[h.HostId] = h.ExpiredAt
  380. }
  381. // 3. 筛选出需要恢复的套餐
  382. var renewalRequests []RenewalRequest
  383. var hostIdsToRecover []int
  384. now := time.Now().Unix()
  385. for _, limit := range limits {
  386. if hostTime, ok := hostExpiredMap[limit.HostId]; ok && hostTime > now {
  387. renewalRequests = append(renewalRequests, RenewalRequest{
  388. HostId: limit.HostId,
  389. ExpiredAt: hostTime,
  390. })
  391. hostIdsToRecover = append(hostIdsToRecover, limit.HostId)
  392. }
  393. }
  394. if len(renewalRequests) == 0 {
  395. s.Logger.Info("没有需要恢复的套餐")
  396. return nil
  397. }
  398. s.Logger.Info("开始恢复已续费的WAF服务",
  399. zap.Int("数量", len(renewalRequests)),
  400. zap.Any("套餐内容", renewalRequests))
  401. var allErrors *multierror.Error
  402. // 4. 启用CDN服务
  403. webIds, err := s.GetForwardingRuleIds(ctx, hostIdsToRecover)
  404. if err != nil {
  405. allErrors = multierror.Append(allErrors, fmt.Errorf("获取转发规则ID失败: %w", err))
  406. } else {
  407. if err := s.SetCdnWebsitesState(ctx, webIds, true); err != nil {
  408. allErrors = multierror.Append(allErrors, fmt.Errorf("启用CDN服务失败: %w", err))
  409. }
  410. }
  411. // 5. 同步续费信息到数据库
  412. if err := s.ExecuteRenewalActions(ctx, renewalRequests); err != nil {
  413. allErrors = multierror.Append(allErrors, fmt.Errorf("同步续费信息失败: %w", err))
  414. }
  415. // 步骤7: 状态清理 - 从Redis相关列表中移除停止/清理标记
  416. // 将hostId转换为int64类型以符合Redis操作接口要求
  417. planIdsToRecover := make([]int64, len(hostIdsToRecover))
  418. for i, id := range hostIdsToRecover {
  419. planIdsToRecover[i] = int64(id)
  420. }
  421. s.Logger.Info("开始从Redis列表移除状态标记",
  422. zap.String("listKey", string(redisListKey)),
  423. zap.Int("套餐数量", len(planIdsToRecover)))
  424. if err := s.expiredRep.RemovePlans(ctx, redisListKey, planIdsToRecover...); err != nil {
  425. allErrors = multierror.Append(allErrors, fmt.Errorf("从Redis列表移除标记失败: %w", err))
  426. s.Logger.Error("从Redis列表移除标记失败", zap.Error(err))
  427. } else {
  428. s.Logger.Info("成功从Redis列表移除状态标记")
  429. }
  430. // 记录最终的恢复结果
  431. if allErrors.ErrorOrNil() != nil {
  432. s.Logger.Error("恢复套餐服务部分失败",
  433. zap.Int("成功数量", len(renewalRequests)),
  434. zap.Error(allErrors.ErrorOrNil()))
  435. } else {
  436. s.Logger.Info("成功恢复套餐服务",
  437. zap.Int("恢复数量", len(renewalRequests)),
  438. zap.String("status", "completed"))
  439. }
  440. return allErrors.ErrorOrNil()
  441. }