package waf import ( "context" "encoding/json" "fmt" "maps" "net" v1 "github.com/go-nunu/nunu-layout-advanced/api/v1" "github.com/go-nunu/nunu-layout-advanced/internal/model" "github.com/go-nunu/nunu-layout-advanced/internal/repository/api/waf" "github.com/go-nunu/nunu-layout-advanced/internal/service" "github.com/go-nunu/nunu-layout-advanced/internal/service/api/flexCdn" ) // AidedUdpService UDP转发辅助服务接口 type AidedUdpService interface { // 验证相关 ValidateAddRequest(ctx context.Context, req *v1.UdpForwardingRequest, require RequireResponse) error ValidateEditRequest(ctx context.Context, req *v1.UdpForwardingRequest, require RequireResponse, oldData *model.UdpForWarding) error ValidateDeletePermission(oldData *model.UdpForWarding, hostId int) error // CDN操作相关 CreateCdnWebsite(ctx context.Context, formData v1.WebsiteSend) (int64, error) UpdateCdnConfiguration(ctx context.Context, req *v1.UdpForwardingRequest, oldData *model.UdpForWarding, require RequireResponse, formData v1.WebsiteSend) error DeleteCdnServer(ctx context.Context, cdnWebId int) error // 源站操作相关 AddOriginsToWebsite(ctx context.Context, req *v1.UdpForwardingRequest, udpId int64) (map[string]int64, error) UpdateOriginServers(ctx context.Context, req *v1.UdpForwardingRequest, oldData *model.UdpForWarding, ipData *model.UdpForwardingRule) error // 代理协议配置 ConfigureProxyProtocol(ctx context.Context, req *v1.UdpForwardingRequest, udpId int64) error // 异步任务处理 ProcessAsyncTasks(req *v1.UdpForwardingRequest) ProcessIpWhitelistChanges(ctx context.Context, req *v1.UdpForwardingRequest, ipData *model.UdpForwardingRule) error ProcessDeleteIpWhitelist(ctx context.Context, id int) error // 数据准备和配置 PrepareWafData(ctx context.Context, req *v1.UdpForwardingRequest) (RequireResponse, v1.WebsiteSend, error) BuildUdpListenConfig(gatewayIps []string, port string) ([]byte, error) // 模型构建 BuildUdpForwardingModel(req *v1.UdpForwardingDataRequest, ruleId int, require RequireResponse) *model.UdpForWarding BuildUdpRuleModel(reqData *v1.UdpForwardingDataRequest, require RequireResponse, localDbId int, cdnOriginIds map[string]int64) *model.UdpForwardingRule // 数据库操作 SaveToDatabase(ctx context.Context, req *v1.UdpForwardingRequest, require RequireResponse, udpId int64, cdnOriginIds map[string]int64) (int, error) UpdateDatabaseRecords(ctx context.Context, req *v1.UdpForwardingRequest, oldData *model.UdpForWarding, require RequireResponse, ipData *model.UdpForwardingRule) error CleanupDatabaseRecords(ctx context.Context, id int) error // 工具函数 ExtractIpsFromBackends(backends []string) []string } type aidedUdpService struct { *service.Service wafformatter WafFormatterService cdn flexCdn.CdnService proxy flexCdn.ProxyService globalRep waf.GlobalLimitRepository udpRepository waf.UdpForWardingRepository } func NewAidedUdpService( service *service.Service, wafformatter WafFormatterService, cdn flexCdn.CdnService, proxy flexCdn.ProxyService, globalRep waf.GlobalLimitRepository, udpRepository waf.UdpForWardingRepository, ) AidedUdpService { return &aidedUdpService{ Service: service, wafformatter: wafformatter, cdn: cdn, proxy: proxy, globalRep: globalRep, udpRepository: udpRepository, } } // ValidateAddRequest 验证添加 UDP 转发请求的合法性 // 该函数验证以下内容: // 1. 验证 WAF 端口数量限制(防止超出配额) // 2. 验证端口号是否已被占用(确保端口唯一性) func (s *aidedUdpService) ValidateAddRequest(ctx context.Context, req *v1.UdpForwardingRequest, require RequireResponse) error { // 验证端口数量限制 if err := s.wafformatter.validateWafPortCount(ctx, require.HostId); err != nil { return fmt.Errorf("端口数量验证失败: %w", err) } // 验证端口占用情况 if err := s.wafformatter.VerifyPort(ctx, "udp", int64(req.UdpForwardingData.Id), req.UdpForwardingData.Port, int64(require.HostId), ""); err != nil { return fmt.Errorf("端口 %s 已被占用或不合法: %w", req.UdpForwardingData.Port, err) } return nil } // ValidateEditRequest 验证编辑 UDP 转发请求的合法性 // 该函数仅在端口发生变更时才验证端口冲突,提高性能并避免不必要的检查 func (s *aidedUdpService) ValidateEditRequest(ctx context.Context, req *v1.UdpForwardingRequest, require RequireResponse, oldData *model.UdpForWarding) error { // 只有端口发生变更时才需要验证端口冲突 if oldData.Port != req.UdpForwardingData.Port { if err := s.wafformatter.VerifyPort(ctx, "udp", int64(req.UdpForwardingData.Id), req.UdpForwardingData.Port, int64(require.HostId), ""); err != nil { return fmt.Errorf("新端口 %s 已被占用或不合法: %w", req.UdpForwardingData.Port, err) } } return nil } // ValidateDeletePermission 验证删除 UDP 转发配置的权限 // 该函数确保用户只能删除属于自己主机的配置,防止越权操作 func (s *aidedUdpService) ValidateDeletePermission(oldData *model.UdpForWarding, hostId int) error { if oldData == nil { return fmt.Errorf("UDP转发配置数据不存在") } if oldData.HostId != hostId { return fmt.Errorf("用户权限不足,无法删除不属于自己主机的配置(主机ID: %d)", hostId) } return nil } // CreateCdnWebsite 在 CDN 系统中创建 UDP 代理网站 // 该函数调用 CDN 服务接口创建一个新的 UDP 代理网站,返回网站ID用于后续配置 func (s *aidedUdpService) CreateCdnWebsite(ctx context.Context, formData v1.WebsiteSend) (int64, error) { udpId, err := s.cdn.CreateWebsite(ctx, formData) if err != nil { return 0, fmt.Errorf("在CDN系统中创建 UDP 代理网站失败: %w", err) } if udpId <= 0 { return 0, fmt.Errorf("CDN系统返回了无效的网站ID: %d", udpId) } return udpId, nil } // UpdateCdnConfiguration 更新 CDN 系统中的 UDP 代理配置 // 该函数根据变更内容智能更新: // 1. 端口变更 - 更新网站监听配置 // 2. 名称变更 - 更新网站基本信息 // 3. 代理协议变更 - 开启/关闭 Proxy Protocol func (s *aidedUdpService) UpdateCdnConfiguration(ctx context.Context, req *v1.UdpForwardingRequest, oldData *model.UdpForWarding, require RequireResponse, formData v1.WebsiteSend) error { // 更新网站端口 if oldData.Port != req.UdpForwardingData.Port { if err := s.cdn.EditServerType(ctx, v1.EditWebsite{ Id: int64(oldData.CdnWebId), TypeJSON: formData.UdpJSON, }, "udp"); err != nil { return fmt.Errorf("更新网站端口失败: %w", err) } } // 更新网站名称 if oldData.Comment != req.UdpForwardingData.Comment { nodeId, err := s.globalRep.GetNodeId(ctx, oldData.CdnWebId) if err != nil { return fmt.Errorf("获取节点ID失败: %w", err) } if err := s.cdn.EditServerBasic(ctx, int64(oldData.CdnWebId), require.Tag, nodeId); err != nil { return fmt.Errorf("更新网站名称失败: %w", err) } } // 更新代理协议 if oldData.Proxy != req.UdpForwardingData.Proxy { if err := s.proxy.EditProxy(ctx, int64(oldData.CdnWebId), v1.ProxyProtocolJSON{ IsOn: req.UdpForwardingData.Proxy, Version: 1, }); err != nil { return fmt.Errorf("更新代理协议失败: %w", err) } } return nil } // DeleteCdnServer 从 CDN 系统中删除 UDP 代理网站 // 该函数删除 CDN 中的网站配置,释放端口资源和相关配置 func (s *aidedUdpService) DeleteCdnServer(ctx context.Context, cdnWebId int) error { if err := s.cdn.DelServer(ctx, int64(cdnWebId)); err != nil { return fmt.Errorf("删除CDN服务器失败: %w", err) } return nil } // AddOriginsToWebsite 为 UDP 代理网站添加后端源站 // 该函数执行以下操作: // 1. 批量创建后端服务器的源站记录 // 2. 将源站关联到 UDP 代理网站 // 3. 返回后端地址与源站ID的映射关系 func (s *aidedUdpService) AddOriginsToWebsite(ctx context.Context, req *v1.UdpForwardingRequest, udpId int64) (map[string]int64, error) { cdnOriginIds := make(map[string]int64) // 批量创建源站 for _, backend := range req.UdpForwardingData.BackendList { id, err := s.wafformatter.AddOrigin(ctx, v1.WebJson{ ApiType: "udp", BackendList: backend, Comment: req.UdpForwardingData.Comment, }) if err != nil { return nil, fmt.Errorf("添加源站失败 %s: %w", backend, err) } cdnOriginIds[backend] = id } // 批量关联源站到网站 for _, originId := range cdnOriginIds { if err := s.cdn.AddServerOrigin(ctx, udpId, originId); err != nil { return nil, fmt.Errorf("关联源站到网站失败: %w", err) } } return cdnOriginIds, nil } // UpdateOriginServers 更新 UDP 代理的后端源站配置 // 该函数智能对比新旧后端列表,只处理变更的部分: // 1. 添加新增的后端服务器 // 2. 删除不再需要的后端服务器 // 3. 更新源站ID映射关系 func (s *aidedUdpService) UpdateOriginServers(ctx context.Context, req *v1.UdpForwardingRequest, oldData *model.UdpForWarding, ipData *model.UdpForwardingRule) error { addOrigins, delOrigins := s.wafformatter.findIpDifferences(ipData.BackendList, req.UdpForwardingData.BackendList) // 添加新源站 addedIds := make(map[string]int64) for _, origin := range addOrigins { id, err := s.wafformatter.AddOrigin(ctx, v1.WebJson{ ApiType: "udp", BackendList: origin, Comment: req.UdpForwardingData.Comment, }) if err != nil { return fmt.Errorf("添加源站失败 %s: %w", origin, err) } addedIds[origin] = id } // 关联新源站到网站 for _, originId := range addedIds { if err := s.cdn.AddServerOrigin(ctx, int64(oldData.CdnWebId), originId); err != nil { return fmt.Errorf("关联源站到网站失败: %w", err) } } // 更新源站ID映射 maps.Copy(ipData.CdnOriginIds, addedIds) // 删除不需要的源站 for backend, originId := range ipData.CdnOriginIds { for _, delOrigin := range delOrigins { if backend == delOrigin { if err := s.cdn.DelServerOrigin(ctx, int64(oldData.CdnWebId), originId); err != nil { return fmt.Errorf("删除源站失败: %w", err) } delete(ipData.CdnOriginIds, backend) break } } } return nil } // ConfigureProxyProtocol 配置 UDP 代理的 Proxy Protocol 协议 // 该函数根据请求参数决定是否开启 Proxy Protocol,用于传递客户端真实IP func (s *aidedUdpService) ConfigureProxyProtocol(ctx context.Context, req *v1.UdpForwardingRequest, udpId int64) error { // 如果不需要开启 Proxy Protocol,直接返回 if !req.UdpForwardingData.Proxy { return nil } // 配置 Proxy Protocol v1 if err := s.proxy.EditProxy(ctx, udpId, v1.ProxyProtocolJSON{ IsOn: true, Version: 1, }); err != nil { return fmt.Errorf("开启 Proxy Protocol 失败: %w", err) } return nil } // ProcessAsyncTasks 处理 UDP 转发相关的异步任务 // 该函数主要处理新增后端服务器时的IP白名单添加任务,确保后端服务器能正常访问 func (s *aidedUdpService) ProcessAsyncTasks(req *v1.UdpForwardingRequest) { // 检查是否有后端服务器需要处理 if req == nil || len(req.UdpForwardingData.BackendList) == 0 { return } // 提取后端 IP 地址 ips := s.ExtractIpsFromBackends(req.UdpForwardingData.BackendList) if len(ips) > 0 { // 异步添加到白名单 go s.wafformatter.PublishIpWhitelistTask(ips, "add", "", "white") } } // ProcessIpWhitelistChanges 处理 UDP 转发编辑时的IP白名单变更 // 该函数对比新旧后端列表,智能处理IP白名单: // 1. 将新增的后端 IP 加入白名单 // 2. 将不再使用的后端 IP 从白名单移除 func (s *aidedUdpService) ProcessIpWhitelistChanges(ctx context.Context, req *v1.UdpForwardingRequest, ipData *model.UdpForwardingRule) error { addedIps, removedIps, err := s.wafformatter.WashEditWafIp(ctx, req.UdpForwardingData.BackendList, ipData.BackendList) if err != nil { return fmt.Errorf("处理IP变更失败: %w", err) } // 处理新增IP if len(addedIps) > 0 { go s.wafformatter.PublishIpWhitelistTask(addedIps, "add", "", "white") } // 处理移除IP if len(removedIps) > 0 { ipsToDelist, err := s.wafformatter.WashDelIps(ctx, removedIps) if err != nil { return fmt.Errorf("处理移除IP失败: %w", err) } if len(ipsToDelist) > 0 { go s.wafformatter.PublishIpWhitelistTask(ipsToDelist, "del", "0", "white") } } return nil } // ProcessDeleteIpWhitelist 处理 UDP 转发删除时的IP白名单清理 // 该函数在删除 UDP 转发配置时,智能清理不再需要的后端 IP 白名单条目 func (s *aidedUdpService) ProcessDeleteIpWhitelist(ctx context.Context, id int) error { ipData, err := s.udpRepository.GetUdpForwardingIpsByID(ctx, id) if err != nil { return fmt.Errorf("获取IP数据失败: %w", err) } if ipData == nil || len(ipData.BackendList) == 0 { return nil } ips, err := s.wafformatter.WashDeleteWafIp(ctx, ipData.BackendList) if err != nil { return fmt.Errorf("处理删除IP失败: %w", err) } if len(ips) > 0 { ipsToDelist, err := s.wafformatter.WashDelIps(ctx, ips) if err != nil { return fmt.Errorf("清理IP列表失败: %w", err) } if len(ipsToDelist) > 0 { go s.wafformatter.PublishIpWhitelistTask(ipsToDelist, "del", "0", "white") } } return nil } // PrepareWafData 准备WAF配置数据 // 该函数的作用是根据请求参数准备创建CDN网站所需的所有配置数据,包括: // 1. 获取全局配置信息(用户、主机、网关等) // 2. 构建 UDP 代理的监听配置 JSON // 3. 组装 CDN 创建网站的表单数据 func (s *aidedUdpService) PrepareWafData(ctx context.Context, req *v1.UdpForwardingRequest) (RequireResponse, v1.WebsiteSend, error) { // 获取全局配置信息,包括用户信息、网关IP等 require, err := s.wafformatter.Require(ctx, v1.GlobalRequire{ HostId: req.HostId, Uid: req.Uid, Comment: req.UdpForwardingData.Comment, }) if err != nil { return RequireResponse{}, v1.WebsiteSend{}, fmt.Errorf("获取全局配置信息失败: %w", err) } // 验证实例配置是否完整 if require.Uid == 0 { return RequireResponse{}, v1.WebsiteSend{}, fmt.Errorf("请先配置实例,确保用户信息和网关配置正确") } // 构建 UDP 监听配置 udpConfig, err := s.BuildUdpListenConfig(require.GatewayIps, req.UdpForwardingData.Port) if err != nil { return RequireResponse{}, v1.WebsiteSend{}, fmt.Errorf("构建 UDP 监听配置失败: %w", err) } // 组装 CDN 创建网站的表单数据 formData := v1.WebsiteSend{ UserId: int64(require.CdnUid), Type: "udpProxy", Name: require.Tag, Description: req.UdpForwardingData.Comment, UdpJSON: udpConfig, ServerGroupIds: []int64{int64(require.GroupId)}, NodeClusterId: 2, // 默认节点集群ID } return require, formData, nil } // BuildUdpListenConfig 构建 UDP 监听配置 JSON // 该函数将网关IP列表和端口转换为 CDN 所需的 JSON 配置格式 func (s *aidedUdpService) BuildUdpListenConfig(gatewayIps []string, port string) ([]byte, error) { if len(gatewayIps) == 0 { return nil, fmt.Errorf("网关IP列表不能为空") } // 验证端口字符串不为空 if port == "" { return nil, fmt.Errorf("端口号不能为空") } // 构建监听配置 var listenConfigs []v1.Listen for _, gatewayIp := range gatewayIps { if gatewayIp == "" { continue // 跳过空的IP } listenConfigs = append(listenConfigs, v1.Listen{ Protocol: "udp", Host: gatewayIp, Port: port, }) } if len(listenConfigs) == 0 { return nil, fmt.Errorf("没有有效的网关IP配置") } // 组装最终的 JSON 配置 udpJSON := v1.TypeJSON{ IsOn: true, Listen: listenConfigs, } byteData, err := json.Marshal(udpJSON) if err != nil { return nil, fmt.Errorf("序列化 UDP 配置失败: %w", err) } return byteData, nil } // BuildUdpForwardingModel 构建 UDP 转发主记录模型 // 该函数将请求数据转换为数据库模型,包含主机、CDN网站、端口等信息 func (s *aidedUdpService) BuildUdpForwardingModel(req *v1.UdpForwardingDataRequest, ruleId int, require RequireResponse) *model.UdpForWarding { return &model.UdpForWarding{ HostId: require.HostId, CdnWebId: ruleId, Port: req.Port, Comment: req.Comment, Proxy: req.Proxy, } } // BuildUdpRuleModel 构建 UDP 转发规则记录模型 // 该函数构建包含后端服务器列表和 CDN 源站ID映射的规则记录 func (s *aidedUdpService) BuildUdpRuleModel(reqData *v1.UdpForwardingDataRequest, require RequireResponse, localDbId int, cdnOriginIds map[string]int64) *model.UdpForwardingRule { return &model.UdpForwardingRule{ Uid: require.Uid, HostId: require.HostId, UdpId: localDbId, CdnOriginIds: cdnOriginIds, BackendList: reqData.BackendList, } } // SaveToDatabase 保存 UDP 转发配置到数据库 // 该函数分别保存主记录(基本信息)和规则记录(后端服务器和源站ID映射) func (s *aidedUdpService) SaveToDatabase(ctx context.Context, req *v1.UdpForwardingRequest, require RequireResponse, udpId int64, cdnOriginIds map[string]int64) (int, error) { // 保存主记录 udpModel := s.BuildUdpForwardingModel(&req.UdpForwardingData, int(udpId), require) id, err := s.udpRepository.AddUdpForwarding(ctx, udpModel) if err != nil { return 0, fmt.Errorf("保存UDP转发记录失败: %w", err) } // 保存规则记录 udpRuleModel := s.BuildUdpRuleModel(&req.UdpForwardingData, require, id, cdnOriginIds) if _, err := s.udpRepository.AddUdpForwardingIps(ctx, *udpRuleModel); err != nil { return 0, fmt.Errorf("保存UDP转发规则失败: %w", err) } return id, nil } // UpdateDatabaseRecords 更新数据库记录 // 该函数更新 UDP 转发的主记录和规则记录,同步最新的配置变更 func (s *aidedUdpService) UpdateDatabaseRecords(ctx context.Context, req *v1.UdpForwardingRequest, oldData *model.UdpForWarding, require RequireResponse, ipData *model.UdpForwardingRule) error { // 更新主记录 udpModel := s.BuildUdpForwardingModel(&req.UdpForwardingData, oldData.CdnWebId, require) udpModel.Id = req.UdpForwardingData.Id if err := s.udpRepository.EditUdpForwarding(ctx, udpModel); err != nil { return fmt.Errorf("更新UDP转发记录失败: %w", err) } // 更新规则记录 udpRuleModel := s.BuildUdpRuleModel(&req.UdpForwardingData, require, req.UdpForwardingData.Id, ipData.CdnOriginIds) if err := s.udpRepository.EditUdpForwardingIps(ctx, *udpRuleModel); err != nil { return fmt.Errorf("更新UDP转发规则失败: %w", err) } return nil } // CleanupDatabaseRecords 清理数据库记录 // 该函数删除 UDP 转发相关的所有数据库记录,包括主记录和规则记录 func (s *aidedUdpService) CleanupDatabaseRecords(ctx context.Context, id int) error { if err := s.udpRepository.DeleteUdpForwarding(ctx, int64(id)); err != nil { return fmt.Errorf("删除UDP转发主记录失败: %w", err) } if err := s.udpRepository.DeleteUdpForwardingIpsById(ctx, id); err != nil { return fmt.Errorf("删除UDP转发规则记录失败: %w", err) } return nil } // ExtractIpsFromBackends 从后端服务器地址列表中提取纯IP地址 // 该函数解析 "IP:端口" 格式的后端地址,提取出纯IP地址用于白名单处理 func (s *aidedUdpService) ExtractIpsFromBackends(backends []string) []string { var ips []string for _, backend := range backends { // 跳过空字符串 if backend == "" { continue } // 解析 "IP:端口" 格式 if ip, _, err := net.SplitHostPort(backend); err == nil && ip != "" { ips = append(ips, ip) } } return ips }