package admin import ( "context" "encoding/json" "fmt" v1 "github.com/go-nunu/nunu-layout-advanced/api/v1" adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin" "github.com/go-nunu/nunu-layout-advanced/internal/model" adminRep "github.com/go-nunu/nunu-layout-advanced/internal/repository/admin" "github.com/go-nunu/nunu-layout-advanced/internal/repository/api/waf" "github.com/go-nunu/nunu-layout-advanced/internal/service" "github.com/go-nunu/nunu-layout-advanced/pkg/excel" "github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq" amqp "github.com/rabbitmq/amqp091-go" "go.uber.org/zap" "net/http" "strings" "time" ) // ApiDescriptionMap API描述映射 var ApiDescriptionMap = map[string]string{ "/webForward/add": "添加web", "/webForward/edit": "修改web", "/webForward/delete": "删除web", "/tcpForward/add": "添加tcp", "/tcpForward/edit": "修改tcp", "/tcpForward/delete": "删除tcp", "/udpForward/add": "添加udp", "/udpForward/edit": "修改udp", "/udpForward/delete": "删除udp", "/globalLimit/add": "添加实例", "/globalLimit/edit": "修改实例", "/globalLimit/delete": "删除实例", "/allowAndDeny/add": "添加黑白名单", "/allowAndDeny/edit": "修改黑白名单", "/allowAndDeny/delete": "删除黑白名单", "/cc/editState": "删除CC黑名单", "/ccIpList/add": "添加CC白名单", "/ccIpList/edit": "修改CC白名单", "/ccIpList/delete": "删除CC白名单", "分配网关组": "分配网关组", } type WafLogService interface { GetWafLog(ctx context.Context, id int64) (*model.WafLog, error) GetWafLogList(ctx context.Context, req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error) AddWafLog(ctx context.Context, req adminApi.WafLog) error BatchAddWafLog(ctx context.Context, reqs []*adminApi.WafLog) error PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog) SmartExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter) error GetApiDescriptions(ctx context.Context) map[string]string } func NewWafLogService( service *service.Service, wafLogRepository adminRep.WafLogRepository, globalLimitRepository waf.GlobalLimitRepository, mq *rabbitmq.RabbitMQ, wafLogDataCleanService WafLogDataCleanService, ) WafLogService { return &wafLogService{ Service: service, wafLogRepository: wafLogRepository, globalLimitRepository: globalLimitRepository, mq : mq, wafLogDataCleanService: wafLogDataCleanService, } } type wafLogService struct { *service.Service wafLogRepository adminRep.WafLogRepository globalLimitRepository waf.GlobalLimitRepository mq *rabbitmq.RabbitMQ wafLogDataCleanService WafLogDataCleanService } func (s *wafLogService) getFirstPathSegment(path string) (segment []string, ok bool) { // 1. 为了统一处理,先去掉路径最前面的 "/" // 这样 "/v1/admin" 会变成 "v1/admin",而 "v1/admin" 保持不变 trimmedPath := strings.TrimPrefix(path, "/") // 如果去掉 "/" 后字符串为空(比如原路径是 "/" 或 ""),则无法提取 if trimmedPath == "" { return nil, false } // 2. 使用 "/" 作为分隔符来切割字符串 // "v1/admin/menus" 会被切割成一个字符串切片 (slice): ["v1", "admin", "menus"] parts := strings.Split(trimmedPath, "/") // 3. 只要切片不为空,第一个元素就是我们需要的值 // len(parts) > 0 这个检查可以保证程序不会因为空切片而出错 if len(parts) > 0 { return parts, true } return nil, false } func (s *wafLogService) GetWafLog(ctx context.Context, id int64) (*model.WafLog, error) { return s.wafLogRepository.GetWafLog(ctx, id) } func (s *wafLogService) GetWafLogList(ctx context.Context,req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error) { return s.wafLogRepository.GetWafLogList(ctx, req) } func (s *wafLogService) AddWafLog(ctx context.Context, req adminApi.WafLog) error { if req.Api != "" { api := strings.TrimPrefix(req.Api, "/v1") if _, ok := ApiDescriptionMap[api]; ok { req.ApiName = ApiDescriptionMap[api] } apiType, ok := s.getFirstPathSegment(req.Api) if ok { req.ApiType = apiType[len(apiType)-1] } } userInfo, err := s.globalLimitRepository.GetUserInfo(ctx, int64(req.Uid)) if err != nil { return err } req.Name = userInfo.Username extraData, err := json.Marshal(req.ExtraData) if err != nil { return err } req.RequestIp = userInfo.LastLoginIp return s.wafLogRepository.AddWafLog(ctx, &model.WafLog{ Uid: req.Uid, Name: req.Name, RequestIp: req.RequestIp, RuleId: req.RuleId, HostId: req.HostId, UserAgent: req.UserAgent, Api: req.Api, ApiType: req.ApiType, ApiName: req.ApiName, ExtraData: extraData, }) } func (s *wafLogService) BatchAddWafLog(ctx context.Context, reqs []*adminApi.WafLog) error { if len(reqs) == 0 { return nil } wafLogs := make([]*model.WafLog, 0, len(reqs)) for _, req := range reqs { if req.Api != "" { api := strings.TrimPrefix(req.Api, "/v1") if _, ok := ApiDescriptionMap[api]; ok { req.ApiName = ApiDescriptionMap[api] } apiType, ok := s.getFirstPathSegment(req.Api) if ok { req.ApiType = apiType[len(apiType)-1] } } userInfo, err := s.globalLimitRepository.GetUserInfo(ctx, int64(req.Uid)) if err != nil { s.Logger.Error("获取用户信息失败", zap.Error(err), zap.Int("uid", req.Uid)) continue } req.Name = userInfo.Username req.RequestIp = userInfo.LastLoginIp extraData, err := json.Marshal(req.ExtraData) if err != nil { s.Logger.Error("序列化额外数据失败", zap.Error(err)) continue } wafLogs = append(wafLogs, &model.WafLog{ Uid: req.Uid, Name: req.Name, RequestIp: req.RequestIp, RuleId: req.RuleId, HostId: req.HostId, UserAgent: req.UserAgent, Api: req.Api, ApiType: req.ApiType, ApiName: req.ApiName, ExtraData: extraData, }) } // 调用repository层的批量插入方法 return s.wafLogRepository.BatchAddWafLog(ctx, wafLogs) } func (s *wafLogService) PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog) { payload := &req // Serialize the message msgBody, err := json.Marshal(payload) if err != nil { s.Logger.Error("序列化 WafLog 任务消息失败", zap.Error(err), zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req)) return } // Get task configuration taskCfg, ok := s.mq.GetTaskConfig("waf_log") if !ok { s.Logger.Error("无法获取“waf_Log”任务配置") return } // Construct the routing key dynamically based on the action routingKey := fmt.Sprintf("wafLog.%s", "add") // Construct the amqp.Publishing message publishingMsg := amqp.Publishing{ ContentType: "application/json", Body: msgBody, DeliveryMode: amqp.Persistent, // Persistent message } // Publish the message err = s.mq.PublishWithCh(taskCfg.Exchange, routingKey, publishingMsg) if err != nil { s.Logger.Error("发布 WafLog 任务消息失败", zap.Error(err), zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req)) } else { s.Logger.Info("已成功发布 WafLog 任务消息", zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req)) } } func (s *wafLogService) ExPortWafLog(ctx context.Context, req adminApi.ExportWafLog) ([]adminApi.ExportWafLogRes, error) { // 获取原始数据 data, err := s.wafLogRepository.ExportWafLog(ctx, req) if err != nil { return nil, err } // 使用优化后的转换方法,避免N+1查询 return s.convertRawDataToExportResults(ctx, data) } // SmartExportWafLog 智能导出WAF日志为Excel func (s *wafLogService) SmartExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter) error { // 1. 先获取总数量用于智能选择传输方式 count, err := s.wafLogRepository.GetWafLogExportCount(ctx, req) if err != nil { return fmt.Errorf("获取导出数据总数失败: %w", err) } // 2. 智能选择导出方式 // 估算每行数据大小约200字节(包含用户名、IP、API名称、域名等字段) exportType := excel.SmartExport(count, 200) // 3. 设置Excel表头映射 headers := []string{"name", "request_ip", "host_id", "rule_id", "api_name", "addr_backend_list", "domain", "comment", "custom_host", "expose_addr", "created_at"} headerMap := map[string]string{ "name": "用户名", "request_ip": "请求IP", "host_id": "主机ID", "rule_id": "规则ID", "api_name": "API名称", "addr_backend_list": "后端地址", "domain": "域名", "comment": "备注", "custom_host": "回源地址", "expose_addr": "暴露地址", "created_at": "创建时间", } // 4. 创建Excel生成器 generator := excel.NewExcelGenerator("WAF日志", headers, headerMap) if err := generator.WriteHeaders(); err != nil { return fmt.Errorf("写入Excel表头失败: %w", err) } // 5. 根据导出类型选择不同的处理方式 switch exportType { case excel.ExportTypeNormal: return s.normalExportWafLog(ctx, req, generator, w) case excel.ExportTypeStream: return s.streamExportWafLog(ctx, req, generator, w) case excel.ExportTypeChunk: return s.chunkExportWafLog(ctx, req, w, count) default: return s.normalExportWafLog(ctx, req, generator, w) } } // normalExportWafLog 普通导出(小文件) func (s *wafLogService) normalExportWafLog(ctx context.Context, req adminApi.ExportWafLog, generator *excel.ExcelGenerator, w http.ResponseWriter) error { // 获取所有数据(已经优化了批量查询) exportData, err := s.ExPortWafLog(ctx, req) if err != nil { return fmt.Errorf("获取导出数据失败: %w", err) } // 转换数据格式 data := make([]map[string]interface{}, 0, len(exportData)) for _, item := range exportData { row := map[string]interface{}{ "name": item.Name, "request_ip": item.RequestIp, "host_id": item.HostId, "rule_id": item.RuleId, "api_name": item.ApiName, "addr_backend_list": s.formatBackendList(item.AddrBackendList), "domain": item.Domain, "comment": item.Comment, "custom_host": s.formatExposeAddr(item.CustomHost), "expose_addr": s.formatExposeAddr(item.ExposeAddr), "created_at": item.CreatedAt, } data = append(data, row) } // 写入数据 if err := generator.WriteRows(data); err != nil { return fmt.Errorf("写入Excel数据失败: %w", err) } // 普通导出 fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405")) return excel.NormalExport(generator, w, excel.TransferOption{ FileName: fileName, ContentType: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", }) } // streamExportWafLog 流式导出(大文件) func (s *wafLogService) streamExportWafLog(ctx context.Context, req adminApi.ExportWafLog, generator *excel.ExcelGenerator, w http.ResponseWriter) error { fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405")) // 设置响应头 w.Header().Set("Content-Type", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", fileName)) w.Header().Set("Transfer-Encoding", "chunked") // 分批处理数据,每批1000条 pageSize := 1000 page := 1 for { // 使用分页导出方法 exportData, err := s.wafLogRepository.ExportWafLogWithPagination(ctx, req, page, pageSize) if err != nil { return fmt.Errorf("获取第%d页数据失败: %w", page, err) } // 转换为导出格式(复用原有的ExPortWafLog逻辑) exportResults, err := s.convertRawDataToExportResults(ctx, exportData) if err != nil { return fmt.Errorf("转换导出数据失败: %w", err) } if len(exportResults) == 0 { break // 没有更多数据 } // 转换并写入当前批次数据 for _, item := range exportResults { row := map[string]interface{}{ "name": item.Name, "request_ip": item.RequestIp, "host_id": item.HostId, "api_name": item.ApiName, "addr_backend_list": s.formatBackendList(item.AddrBackendList), "domain": item.Domain, "comment": item.Comment, "custom_host": s.formatExposeAddr(item.CustomHost), "expose_addr": s.formatExposeAddr(item.ExposeAddr), "created_at": item.CreatedAt, } if err := generator.WriteRow(row); err != nil { return fmt.Errorf("写入第%d页数据失败: %w", page, err) } } // 如果当前批次数据少于页大小,说明已经是最后一页 if len(exportResults) < pageSize { break } page++ } // 流式导出 return excel.StreamExport(generator, w, excel.TransferOption{ FileName: fileName, ContentType: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", }) } // chunkExportWafLog 分块导出(超大文件) func (s *wafLogService) chunkExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter, totalRecords int) error { fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405")) pageSize := 5000 // 每个分块5000条记录 // 分块导出需要前端配合实现 excel.ChunkExport(w, excel.TransferOption{ FileName: fileName, ContentType: "application/json", // 返回分块信息 }, totalRecords, pageSize) return nil } // formatBackendList 格式化后端地址列表 func (s *wafLogService) formatBackendList(backendList interface{}) string { if backendList == nil { return "" } switch v := backendList.(type) { case string: return v case []string: return strings.Join(v, ", ") default: // 对于其他类型,先转换为字符串再处理 str := fmt.Sprintf("%v", v) if strings.Contains(str, " ") && !strings.Contains(str, "\n") { parts := strings.Fields(str) if len(parts) > 1 { return strings.Join(parts, ", ") } } return str } } // formatExposeAddr 格式化暴露地址 func (s *wafLogService) formatExposeAddr(exposeAddr []string) string { if len(exposeAddr) == 0 { return "" } return strings.Join(exposeAddr, ", ") } // convertRawDataToExportResults 将原始数据转换为导出结果(复用原有的ExPortWafLog逻辑) func (s *wafLogService) convertRawDataToExportResults(ctx context.Context, rawData []model.WafLog) ([]adminApi.ExportWafLogRes, error) { if len(rawData) == 0 { return []adminApi.ExportWafLogRes{}, nil } // 批量准备逻辑保持不变... hostIds := make([]int64, 0, len(rawData)) uids := make([]int64, 0, len(rawData)) maxCreatedAt := time.Time{} for _, v := range rawData { hostIds = append(hostIds, int64(v.HostId)) uids = append(uids, int64(v.Uid)) if v.CreatedAt.After(maxCreatedAt) { maxCreatedAt = v.CreatedAt } } gatewayMap, err := s.wafLogRepository.BatchGetWafLogGateWayIps(ctx, hostIds, uids, maxCreatedAt) if err != nil { s.Logger.Warn("批量获取网关组失败,降级为单个查询", zap.Error(err)) gatewayMap = make(map[string]model.WafLog) } var res []adminApi.ExportWafLogRes for _, v := range rawData { // --- 核心改动:一行代码完成所有数据清洗 --- cleanedData := s.wafLogDataCleanService.ParseWafLogExtraData(v.ExtraData, v.ApiName) // 网关 IP 获取逻辑保持不变,但使用清洗后的 port var exposeAddr []string key := fmt.Sprintf("%d_%d", v.HostId, v.Uid) if gatewayModel, exists := gatewayMap[key]; exists { var gateWayIps []string if err := json.Unmarshal(gatewayModel.ExtraData, &gateWayIps); err == nil && len(gateWayIps) > 0 && cleanedData.Port != "" { for _, ip := range gateWayIps { exposeAddr = append(exposeAddr, ip+":"+cleanedData.Port) } } } else { // 降级查询逻辑... gateWayIpModel, err := s.wafLogRepository.GetWafLogGateWayIp(ctx, int64(v.HostId), int64(v.Uid), v.CreatedAt) if err == nil { var gateWayIps []string if err := json.Unmarshal(gateWayIpModel.ExtraData, &gateWayIps); err == nil && len(gateWayIps) > 0 && cleanedData.Port != "" { for _, ip := range gateWayIps { exposeAddr = append(exposeAddr, ip+":"+cleanedData.Port) } } } } // 构造结果,代码更清晰 res = append(res, adminApi.ExportWafLogRes{ Name: v.Name, RequestIp: v.RequestIp, HostId: v.HostId, RuleId: v.RuleId, ApiName: v.ApiName, AddrBackendList: cleanedData.AddrBackendList, Domain: cleanedData.Domain, Comment: cleanedData.Comment, CustomHost: cleanedData.CustomHost, ExposeAddr: exposeAddr, CreatedAt: v.CreatedAt, }) } return res, nil } // GetApiDescriptions 获取API描述映射 func (s *wafLogService) GetApiDescriptions(ctx context.Context) map[string]string { return ApiDescriptionMap }