package admin import ( "context" "fmt" v1 "github.com/go-nunu/nunu-layout-advanced/api/v1" "github.com/go-nunu/nunu-layout-advanced/internal/model" "github.com/go-nunu/nunu-layout-advanced/internal/repository" waf2 "github.com/go-nunu/nunu-layout-advanced/internal/repository/api/waf" "github.com/go-nunu/nunu-layout-advanced/internal/service" "github.com/go-nunu/nunu-layout-advanced/internal/service/api/flexCdn" "github.com/go-nunu/nunu-layout-advanced/internal/service/api/waf" "github.com/hashicorp/go-multierror" "go.uber.org/zap" "sync" "time" ) // WafOperationsService WAF通用操作服务接口 type WafOperationsService interface { // 清理单个套餐的所有相关资源 CleanupPlan(ctx context.Context, limit model.GlobalLimit) error // 批量恢复套餐服务 RecoverPlans(ctx context.Context, limits []model.GlobalLimit, redisListKey repository.PlanListType) error // 获取主机关联的所有转发规则ID GetForwardingRuleIds(ctx context.Context, hostIds []int) ([]int, error) // 批量设置CDN网站状态 SetCdnWebsitesState(ctx context.Context, ids []int, enable bool) error // 执行续费操作 ExecuteRenewalActions(ctx context.Context, reqs []RenewalRequest) error } type RenewalRequest struct { HostId int ExpiredAt int64 } func NewWafOperationsService( service *service.Service, webForWardingRep waf2.WebForwardingRepository, tcpforwardingRep waf2.TcpforwardingRepository, udpForWardingRep waf2.UdpForWardingRepository, cdn flexCdn.CdnService, hostRep repository.HostRepository, globalLimitRep waf2.GlobalLimitRepository, expiredRep repository.ExpiredRepository, gatewayIpRep waf2.GatewayipRepository, tcp waf.TcpforwardingService, udp waf.UdpForWardingService, web waf.WebForwardingService, buildAoDun waf.BuildAudunService, zzyBgp waf.ZzybgpService, ) WafOperationsService { return &wafOperationsService{ Service: service, webForWardingRep: webForWardingRep, tcpforwardingRep: tcpforwardingRep, udpForWardingRep: udpForWardingRep, cdn: cdn, hostRep: hostRep, globalLimitRep: globalLimitRep, expiredRep: expiredRep, gatewayIpRep: gatewayIpRep, tcp: tcp, udp: udp, web: web, buildAoDun: buildAoDun, zzyBgp: zzyBgp, } } type wafOperationsService struct { *service.Service webForWardingRep waf2.WebForwardingRepository tcpforwardingRep waf2.TcpforwardingRepository udpForWardingRep waf2.UdpForWardingRepository cdn flexCdn.CdnService hostRep repository.HostRepository globalLimitRep waf2.GlobalLimitRepository expiredRep repository.ExpiredRepository gatewayIpRep waf2.GatewayipRepository tcp waf.TcpforwardingService udp waf.UdpForWardingService web waf.WebForwardingService buildAoDun waf.BuildAudunService zzyBgp waf.ZzybgpService } // GetForwardingRuleIds 获取主机关联的所有转发规则ID func (s *wafOperationsService) GetForwardingRuleIds(ctx context.Context, hostIds []int) ([]int, error) { if len(hostIds) == 0 { return nil, nil } var ids []int var result *multierror.Error // 获取TCP转发规则ID tcpIds, err := s.tcpforwardingRep.GetTcpAll(ctx, hostIds) if err != nil { result = multierror.Append(result, fmt.Errorf("获取TCP转发规则失败: %w", err)) } ids = append(ids, tcpIds...) // 获取UDP转发规则ID udpIds, err := s.udpForWardingRep.GetUdpAll(ctx, hostIds) if err != nil { result = multierror.Append(result, fmt.Errorf("获取UDP转发规则失败: %w", err)) } ids = append(ids, udpIds...) // 获取Web转发规则ID webIds, err := s.webForWardingRep.GetWebAll(ctx, hostIds) if err != nil { result = multierror.Append(result, fmt.Errorf("获取Web转发规则失败: %w", err)) } ids = append(ids, webIds...) return ids, result.ErrorOrNil() } // SetCdnWebsitesState 批量设置CDN网站状态(并发执行) func (s *wafOperationsService) SetCdnWebsitesState(ctx context.Context, ids []int, enable bool) error { if len(ids) == 0 { return nil } var wg sync.WaitGroup errChan := make(chan error, len(ids)) wg.Add(len(ids)) for _, id := range ids { go func(id int) { defer wg.Done() // cdn.EditWebIsOn 的第二个参数 enable: true=启用, false=禁用 if err := s.cdn.EditWebIsOn(ctx, int64(id), enable); err != nil { errChan <- fmt.Errorf("设置CDN网站状态失败(ID:%d): %w", id, err) } }(id) } wg.Wait() close(errChan) var result *multierror.Error for err := range errChan { result = multierror.Append(result, err) } return result.ErrorOrNil() } // ExecuteRenewalActions 执行续费操作,包括更新DB和调用CDN API // 该方法并发更新数据库中的套餐状态,将到期时间和状态同步到GlobalLimit表 // 主要用于套餐续费后,需要将最新的到期时间从主机表同步到WAF套餐表 // // 执行流程: // 1. 参数校验,如果没有续费请求则直接返回 // 2. 创建goroutine池,每个续费请求对应一个goroutine // 3. 并发调用数据库更新操作,提高处理效率 // 4. 使用互斥锁保护错误收集,避免并发写入冲突 // 5. 等待所有更新操作完成,返回聚合的错误信息 // // 并发安全: // - 使用sync.Mutex保护共享的错误收集器 // - 每个goroutine独立处理一个续费请求,避免数据竞争 // - 使用WaitGroup确保所有操作完成后才返回 // // 参数: // - ctx: 上下文对象,用于控制请求生命周期和传递trace信息 // - reqs: 续费请求列表,包含HostId和新的到期时间 // // 返回: // - error: 更新过程中的任何错误,如果部分失败会包含所有失败的详细信息 func (s *wafOperationsService) ExecuteRenewalActions(ctx context.Context, reqs []RenewalRequest) error { // 参数校验:如果没有续费请求,直接返回成功 if len(reqs) == 0 { return nil } // 并发控制和错误收集初始化 var allErrors *multierror.Error var wg sync.WaitGroup var mu sync.Mutex // 保护allErrors的并发写入 wg.Add(len(reqs)) // 为每个续费请求创建一个goroutine进行并发处理 for _, req := range reqs { go func(r RenewalRequest) { defer wg.Done() // 更新数据库中的套餐状态 // 将State设置为true表示套餐处于激活状态 // ExpiredAt更新为最新的到期时间 err := s.globalLimitRep.UpdateGlobalLimitByHostId(ctx, &model.GlobalLimit{ HostId: r.HostId, // 主机ID,用于定位具体的套餐 ExpiredAt: r.ExpiredAt, // 新的到期时间戳 State: true, // 激活状态,表示套餐可用 }) if err != nil { // 线程安全的错误收集 mu.Lock() allErrors = multierror.Append(allErrors, fmt.Errorf("更新主机%d续费状态失败: %w", r.HostId, err)) mu.Unlock() } }(req) } // 等待所有更新操作完成 wg.Wait() return allErrors.ErrorOrNil() } // CleanupPlan 清理单个套餐的所有相关资源 // 该方法执行套餐过期后的完整清理流程,包括删除转发规则、重置防护设置、清理网络配置等 // 这是一个复合操作,涉及多个子系统的协调,确保套餐相关的所有资源都被正确清理 // // 清理步骤(按执行顺序): // 1. 从Redis "停止列表" 中移除该套餐(因为即将转移到 "已清理列表") // 2. 删除TCP转发规则 - 清理所有TCP端口转发配置 // 3. 删除UDP转发规则 - 清理所有UDP端口转发配置 // 4. 删除Web转发规则 - 清理所有HTTP/HTTPS转发配置 // 5. 重置BGP防护设置 - 将防护等级重置为默认值(10) // 6. 清除带宽限制 - 移除小防火墙的带宽限制配置 // 7. 清理网关IP配置 - 删除该主机关联的所有网关IP // 8. 将套餐标记为"已清理" - 添加到Redis "已清理列表" // // 错误处理策略: // - 使用multierror收集所有步骤的错误,不会因单个步骤失败而中断整个流程 // - 只有在前面所有步骤都成功的情况下,才执行最终的网关IP清理和Redis标记 // - 记录详细的日志信息,便于问题排查和监控 // // 幂等性保证: // - 该方法可以安全地重复调用,不会产生副作用 // - 如果某个资源已经被清理,相关操作会优雅地处理 // // 参数: // - ctx: 上下文对象,用于控制请求生命周期和传递trace信息 // - limit: 需要清理的套餐信息,包含HostId、Uid等关键字段 // // 返回: // - error: 清理过程中的任何错误,使用multierror聚合多个错误 func (s *wafOperationsService) CleanupPlan(ctx context.Context, limit model.GlobalLimit) error { var allErrors *multierror.Error hostId := int64(limit.HostId) // 记录清理开始的日志,便于监控和调试 s.Logger.Info("开始清理套餐资源", zap.Int("hostId", limit.HostId), zap.Int("uid", limit.Uid), zap.String("operation", "cleanup_plan")) // 步骤1: 从Redis "停止列表" 中移除该套餐 // 这是状态转换的第一步,表示套餐即将从 "已停止" 状态转换到 "已清理" 状态 if err := s.expiredRep.RemovePlans(ctx, repository.ClosedPlansList, hostId); err != nil { allErrors = multierror.Append(allErrors, fmt.Errorf("从停止列表移除失败: %w", err)) s.Logger.Warn("从停止列表移除失败", zap.Int64("hostId", hostId), zap.Error(err)) } // 步骤2: 删除TCP转发规则 // TCP转发规则通常用于游戏服务器、数据库等需要TCP连接的服务 // 需要先获取所有关联的TCP规则ID,然后批量删除 tcpIds, err := s.tcpforwardingRep.GetTcpForwardingAllIdsByID(ctx, limit.HostId) if err != nil { allErrors = multierror.Append(allErrors, fmt.Errorf("获取TCP转发规则失败: %w", err)) s.Logger.Warn("获取TCP转发规则失败", zap.Int("hostId", limit.HostId), zap.Error(err)) } else if len(tcpIds) > 0 { s.Logger.Info("开始删除TCP转发规则", zap.Int("hostId", limit.HostId), zap.Int("count", len(tcpIds))) if err := s.tcp.DeleteTcpForwarding(ctx, v1.DeleteTcpForwardingRequest{ Ids: tcpIds, // 需要删除的TCP规则ID列表 HostId: limit.HostId, // 主机ID,用于权限验证 Uid: limit.Uid, // 用户ID,用于权限验证 }); err != nil { allErrors = multierror.Append(allErrors, fmt.Errorf("删除TCP转发规则失败: %w", err)) s.Logger.Error("删除TCP转发规则失败", zap.Int("hostId", limit.HostId), zap.Error(err)) } else { s.Logger.Info("成功删除TCP转发规则", zap.Int("hostId", limit.HostId), zap.Int("count", len(tcpIds))) } } else { s.Logger.Debug("该主机没有TCP转发规则需要删除", zap.Int("hostId", limit.HostId)) } // 步骤3: 删除UDP转发规则 // UDP转发规则通常用于游戏服务器、DNS服务等需要UDP连接的服务 // UDP协议的特点是无连接,但在防护场景下同样需要转发规则 udpIds, err := s.udpForWardingRep.GetUdpForwardingWafUdpAllIds(ctx, limit.HostId) if err != nil { allErrors = multierror.Append(allErrors, fmt.Errorf("获取UDP转发规则失败: %w", err)) s.Logger.Warn("获取UDP转发规则失败", zap.Int("hostId", limit.HostId), zap.Error(err)) } else if len(udpIds) > 0 { s.Logger.Info("开始删除UDP转发规则", zap.Int("hostId", limit.HostId), zap.Int("count", len(udpIds))) if err := s.udp.DeleteUdpForwarding(ctx, v1.DeleteUdpForwardingRequest{ Ids: udpIds, // 需要删除的UDP规则ID列表 HostId: limit.HostId, // 主机ID,用于权限验证 Uid: limit.Uid, // 用户ID,用于权限验证 }); err != nil { allErrors = multierror.Append(allErrors, fmt.Errorf("删除UDP转发规则失败: %w", err)) s.Logger.Error("删除UDP转发规则失败", zap.Int("hostId", limit.HostId), zap.Error(err)) } else { s.Logger.Info("成功删除UDP转发规则", zap.Int("hostId", limit.HostId), zap.Int("count", len(udpIds))) } } else { s.Logger.Debug("该主机没有UDP转发规则需要删除", zap.Int("hostId", limit.HostId)) } // 步骤4: 删除Web转发规则 // Web转发规则用于HTTP/HTTPS网站服务,是最常见的转发类型 // 包括域名解析、SSL证书、负载均衡等复杂配置 webIds, err := s.webForWardingRep.GetWebForwardingWafWebAllIds(ctx, limit.HostId) if err != nil { allErrors = multierror.Append(allErrors, fmt.Errorf("获取Web转发规则失败: %w", err)) s.Logger.Warn("获取Web转发规则失败", zap.Int("hostId", limit.HostId), zap.Error(err)) } else if len(webIds) > 0 { s.Logger.Info("开始删除Web转发规则", zap.Int("hostId", limit.HostId), zap.Int("count", len(webIds))) if err := s.web.DeleteWebForwarding(ctx, v1.DeleteWebForwardingRequest{ Ids: webIds, // 需要删除的Web规则ID列表 HostId: limit.HostId, // 主机ID,用于权限验证 Uid: limit.Uid, // 用户ID,用于权限验证 }); err != nil { allErrors = multierror.Append(allErrors, fmt.Errorf("删除Web转发规则失败: %w", err)) s.Logger.Error("删除Web转发规则失败", zap.Int("hostId", limit.HostId), zap.Error(err)) } else { s.Logger.Info("成功删除Web转发规则", zap.Int("hostId", limit.HostId), zap.Int("count", len(webIds))) } } else { s.Logger.Debug("该主机没有Web转发规则需要删除", zap.Int("hostId", limit.HostId)) } // 步骤5: 重置BGP防护设置 // 将防护等级重置为默认值(10),这通常是最低的防护级别 // BGP防护是网络层面的DDoS防护,重置后将停止高级防护功能 s.Logger.Info("开始重置BGP防护设置", zap.Int64("hostId", hostId), zap.Int("defenseLevel", 10)) if err := s.zzyBgp.SetDefense(ctx, hostId, 10); err != nil { allErrors = multierror.Append(allErrors, fmt.Errorf("重置BGP防护设置失败: %w", err)) s.Logger.Error("重置BGP防护设置失败", zap.Int64("hostId", hostId), zap.Error(err)) } else { s.Logger.Info("成功重置BGP防护设置", zap.Int64("hostId", hostId)) } // 步骤6: 清除小防火墙带宽限制 // 移除奥盾防护系统中设置的带宽限制配置 // "del" 操作表示删除该主机的所有带宽限制规则 s.Logger.Info("开始清除小防火墙带宽限制", zap.Int64("hostId", hostId)) if err := s.buildAoDun.Bandwidth(ctx, hostId, "del"); err != nil { allErrors = multierror.Append(allErrors, fmt.Errorf("清除带宽限制失败: %w", err)) s.Logger.Error("清除带宽限制失败", zap.Int64("hostId", hostId), zap.Error(err)) } else { s.Logger.Info("成功清除小防火墙带宽限制", zap.Int64("hostId", hostId)) } // 步骤7: 执行最终清理操作(仅在前面步骤都成功时执行) // 这是一个关键的设计决策:只有在所有资源清理都成功的情况下, // 才执行网关IP清理和状态标记,确保数据一致性 if allErrors.ErrorOrNil() == nil { s.Logger.Info("前置清理步骤全部成功,开始执行最终清理操作", zap.Int64("hostId", hostId)) // 步骤7a: 清理网关IP配置 // 删除该主机在网关系统中的所有IP配置,断开网络连接 if err := s.gatewayIpRep.CleanIPByHostId(ctx, []int64{hostId}); err != nil { allErrors = multierror.Append(allErrors, fmt.Errorf("清理网关IP失败: %w", err)) s.Logger.Error("清理网关IP失败", zap.Int64("hostId", hostId), zap.Error(err)) } else { s.Logger.Info("成功清理网关IP", zap.Int64("hostId", hostId)) } // 步骤7b: 将套餐标记为"已清理"状态 // 添加到ExpiringSoonPlansList(已清理列表),表示清理流程完成 // 这个标记用于防止重复清理和状态跟踪 if err := s.expiredRep.AddPlans(ctx, repository.ExpiringSoonPlansList, hostId); err != nil { allErrors = multierror.Append(allErrors, fmt.Errorf("标记为已清理失败: %w", err)) s.Logger.Error("标记为已清理失败", zap.Int64("hostId", hostId), zap.Error(err)) } else { s.Logger.Info("成功标记套餐为已清理状态", zap.Int64("hostId", hostId)) } } else { // 如果前面的步骤有失败,记录警告日志,不执行最终清理 s.Logger.Warn("由于前置清理步骤存在错误,跳过最终清理操作", zap.Int64("hostId", hostId), zap.Error(allErrors.ErrorOrNil())) } // 记录最终的清理结果 if allErrors.ErrorOrNil() != nil { s.Logger.Error("清理套餐资源失败", zap.Int("hostId", limit.HostId), zap.Int("uid", limit.Uid), zap.Error(allErrors.ErrorOrNil())) } else { s.Logger.Info("成功清理套餐资源", zap.Int("hostId", limit.HostId), zap.Int("uid", limit.Uid), zap.String("status", "completed")) } return allErrors.ErrorOrNil() } // RecoverPlans 批量恢复套餐服务 func (s *wafOperationsService) RecoverPlans(ctx context.Context, limits []model.GlobalLimit, redisListKey repository.PlanListType) error { if len(limits) == 0 { return nil } // 1. 检查哪些套餐需要恢复(已续费且未过期) var hostIdsToCheck []int64 for _, limit := range limits { hostIdsToCheck = append(hostIdsToCheck, int64(limit.HostId)) } // 2. 获取最新的主机到期时间 hostExpirations, err := s.hostRep.GetExpireTimeByHostId(ctx, hostIdsToCheck) if err != nil { return fmt.Errorf("获取主机到期时间失败: %w", err) } hostExpiredMap := make(map[int]int64, len(hostExpirations)) for _, h := range hostExpirations { hostExpiredMap[h.HostId] = h.ExpiredAt } // 3. 筛选出需要恢复的套餐 var renewalRequests []RenewalRequest var hostIdsToRecover []int now := time.Now().Unix() for _, limit := range limits { if hostTime, ok := hostExpiredMap[limit.HostId]; ok && hostTime > now { renewalRequests = append(renewalRequests, RenewalRequest{ HostId: limit.HostId, ExpiredAt: hostTime, }) hostIdsToRecover = append(hostIdsToRecover, limit.HostId) } } if len(renewalRequests) == 0 { s.Logger.Info("没有需要恢复的套餐") return nil } s.Logger.Info("开始恢复已续费的WAF服务", zap.Int("数量", len(renewalRequests)), zap.Any("套餐内容", renewalRequests)) var allErrors *multierror.Error // 4. 启用CDN服务 webIds, err := s.GetForwardingRuleIds(ctx, hostIdsToRecover) if err != nil { allErrors = multierror.Append(allErrors, fmt.Errorf("获取转发规则ID失败: %w", err)) } else { if err := s.SetCdnWebsitesState(ctx, webIds, true); err != nil { allErrors = multierror.Append(allErrors, fmt.Errorf("启用CDN服务失败: %w", err)) } } // 5. 同步续费信息到数据库 if err := s.ExecuteRenewalActions(ctx, renewalRequests); err != nil { allErrors = multierror.Append(allErrors, fmt.Errorf("同步续费信息失败: %w", err)) } // 步骤7: 状态清理 - 从Redis相关列表中移除停止/清理标记 // 将hostId转换为int64类型以符合Redis操作接口要求 planIdsToRecover := make([]int64, len(hostIdsToRecover)) for i, id := range hostIdsToRecover { planIdsToRecover[i] = int64(id) } s.Logger.Info("开始从Redis列表移除状态标记", zap.String("listKey", string(redisListKey)), zap.Int("套餐数量", len(planIdsToRecover))) if err := s.expiredRep.RemovePlans(ctx, redisListKey, planIdsToRecover...); err != nil { allErrors = multierror.Append(allErrors, fmt.Errorf("从Redis列表移除标记失败: %w", err)) s.Logger.Error("从Redis列表移除标记失败", zap.Error(err)) } else { s.Logger.Info("成功从Redis列表移除状态标记") } // 记录最终的恢复结果 if allErrors.ErrorOrNil() != nil { s.Logger.Error("恢复套餐服务部分失败", zap.Int("成功数量", len(renewalRequests)), zap.Error(allErrors.ErrorOrNil())) } else { s.Logger.Info("成功恢复套餐服务", zap.Int("恢复数量", len(renewalRequests)), zap.String("status", "completed")) } return allErrors.ErrorOrNil() }