|
- package admin
- import (
- "context"
- "encoding/json"
- "fmt"
- v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
- adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
- "github.com/go-nunu/nunu-layout-advanced/internal/model"
- adminRep "github.com/go-nunu/nunu-layout-advanced/internal/repository/admin"
- "github.com/go-nunu/nunu-layout-advanced/internal/repository/api/waf"
- "github.com/go-nunu/nunu-layout-advanced/internal/service"
- "github.com/go-nunu/nunu-layout-advanced/pkg/excel"
- "github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
- amqp "github.com/rabbitmq/amqp091-go"
- "go.uber.org/zap"
- "net/http"
- "strings"
- "time"
- )
- // ApiDescriptionMap API描述映射
- var ApiDescriptionMap = map[string]string{
- "/webForward/add": "添加web",
- "/webForward/edit": "修改web",
- "/webForward/delete": "删除web",
- "/tcpForward/add": "添加tcp",
- "/tcpForward/edit": "修改tcp",
- "/tcpForward/delete": "删除tcp",
- "/udpForward/add": "添加udp",
- "/udpForward/edit": "修改udp",
- "/udpForward/delete": "删除udp",
- "/globalLimit/add": "添加实例",
- "/globalLimit/edit": "修改实例",
- "/globalLimit/delete": "删除实例",
- "/allowAndDeny/add": "添加黑白名单",
- "/allowAndDeny/edit": "修改黑白名单",
- "/allowAndDeny/delete": "删除黑白名单",
- "/cc/editState": "删除CC黑名单",
- "/ccIpList/add": "添加CC白名单",
- "/ccIpList/edit": "修改CC白名单",
- "/ccIpList/delete": "删除CC白名单",
- "分配网关组": "分配网关组",
- }
- type WafLogService interface {
- GetWafLog(ctx context.Context, id int64) (*model.WafLog, error)
- GetWafLogList(ctx context.Context, req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error)
- AddWafLog(ctx context.Context, req adminApi.WafLog) error
- BatchAddWafLog(ctx context.Context, reqs []*adminApi.WafLog) error
- PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog)
- SmartExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter) error
- GetApiDescriptions(ctx context.Context) map[string]string
- }
- func NewWafLogService(
- service *service.Service,
- wafLogRepository adminRep.WafLogRepository,
- globalLimitRepository waf.GlobalLimitRepository,
- mq *rabbitmq.RabbitMQ,
- wafLogDataCleanService WafLogDataCleanService,
- ) WafLogService {
- return &wafLogService{
- Service: service,
- wafLogRepository: wafLogRepository,
- globalLimitRepository: globalLimitRepository,
- mq : mq,
- wafLogDataCleanService: wafLogDataCleanService,
- }
- }
- type wafLogService struct {
- *service.Service
- wafLogRepository adminRep.WafLogRepository
- globalLimitRepository waf.GlobalLimitRepository
- mq *rabbitmq.RabbitMQ
- wafLogDataCleanService WafLogDataCleanService
- }
- func (s *wafLogService) getFirstPathSegment(path string) (segment []string, ok bool) {
- // 1. 为了统一处理,先去掉路径最前面的 "/"
- // 这样 "/v1/admin" 会变成 "v1/admin",而 "v1/admin" 保持不变
- trimmedPath := strings.TrimPrefix(path, "/")
- // 如果去掉 "/" 后字符串为空(比如原路径是 "/" 或 ""),则无法提取
- if trimmedPath == "" {
- return nil, false
- }
- // 2. 使用 "/" 作为分隔符来切割字符串
- // "v1/admin/menus" 会被切割成一个字符串切片 (slice): ["v1", "admin", "menus"]
- parts := strings.Split(trimmedPath, "/")
- // 3. 只要切片不为空,第一个元素就是我们需要的值
- // len(parts) > 0 这个检查可以保证程序不会因为空切片而出错
- if len(parts) > 0 {
- return parts, true
- }
- return nil, false
- }
- func (s *wafLogService) GetWafLog(ctx context.Context, id int64) (*model.WafLog, error) {
- return s.wafLogRepository.GetWafLog(ctx, id)
- }
- func (s *wafLogService) GetWafLogList(ctx context.Context,req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error) {
- return s.wafLogRepository.GetWafLogList(ctx, req)
- }
- func (s *wafLogService) AddWafLog(ctx context.Context, req adminApi.WafLog) error {
- if req.Api != "" {
- api := strings.TrimPrefix(req.Api, "/v1")
- if _, ok := ApiDescriptionMap[api]; ok {
- req.ApiName = ApiDescriptionMap[api]
- }
- apiType, ok := s.getFirstPathSegment(req.Api)
- if ok {
- req.ApiType = apiType[len(apiType)-1]
- }
- }
- userInfo, err := s.globalLimitRepository.GetUserInfo(ctx, int64(req.Uid))
- if err != nil {
- return err
- }
- req.Name = userInfo.Username
- extraData, err := json.Marshal(req.ExtraData)
- if err != nil {
- return err
- }
- req.RequestIp = userInfo.LastLoginIp
- return s.wafLogRepository.AddWafLog(ctx, &model.WafLog{
- Uid: req.Uid,
- Name: req.Name,
- RequestIp: req.RequestIp,
- RuleId: req.RuleId,
- HostId: req.HostId,
- UserAgent: req.UserAgent,
- Api: req.Api,
- ApiType: req.ApiType,
- ApiName: req.ApiName,
- ExtraData: extraData,
- })
- }
- func (s *wafLogService) BatchAddWafLog(ctx context.Context, reqs []*adminApi.WafLog) error {
- if len(reqs) == 0 {
- return nil
- }
-
- wafLogs := make([]*model.WafLog, 0, len(reqs))
-
- for _, req := range reqs {
- if req.Api != "" {
- api := strings.TrimPrefix(req.Api, "/v1")
- if _, ok := ApiDescriptionMap[api]; ok {
- req.ApiName = ApiDescriptionMap[api]
- }
-
- apiType, ok := s.getFirstPathSegment(req.Api)
- if ok {
- req.ApiType = apiType[len(apiType)-1]
- }
- }
-
- userInfo, err := s.globalLimitRepository.GetUserInfo(ctx, int64(req.Uid))
- if err != nil {
- s.Logger.Error("获取用户信息失败", zap.Error(err), zap.Int("uid", req.Uid))
- continue
- }
-
- req.Name = userInfo.Username
- req.RequestIp = userInfo.LastLoginIp
- extraData, err := json.Marshal(req.ExtraData)
- if err != nil {
- s.Logger.Error("序列化额外数据失败", zap.Error(err))
- continue
- }
-
- wafLogs = append(wafLogs, &model.WafLog{
- Uid: req.Uid,
- Name: req.Name,
- RequestIp: req.RequestIp,
- RuleId: req.RuleId,
- HostId: req.HostId,
- UserAgent: req.UserAgent,
- Api: req.Api,
- ApiType: req.ApiType,
- ApiName: req.ApiName,
- ExtraData: extraData,
- })
- }
-
- // 调用repository层的批量插入方法
- return s.wafLogRepository.BatchAddWafLog(ctx, wafLogs)
- }
- func (s *wafLogService) PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog) {
- payload := &req
- // Serialize the message
- msgBody, err := json.Marshal(payload)
- if err != nil {
- s.Logger.Error("序列化 WafLog 任务消息失败", zap.Error(err), zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
- return
- }
- // Get task configuration
- taskCfg, ok := s.mq.GetTaskConfig("waf_log")
- if !ok {
- s.Logger.Error("无法获取“waf_Log”任务配置")
- return
- }
- // Construct the routing key dynamically based on the action
- routingKey := fmt.Sprintf("wafLog.%s", "add")
- // Construct the amqp.Publishing message
- publishingMsg := amqp.Publishing{
- ContentType: "application/json",
- Body: msgBody,
- DeliveryMode: amqp.Persistent, // Persistent message
- }
- // Publish the message
- err = s.mq.PublishWithCh(taskCfg.Exchange, routingKey, publishingMsg)
- if err != nil {
- s.Logger.Error("发布 WafLog 任务消息失败", zap.Error(err), zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
- } else {
- s.Logger.Info("已成功发布 WafLog 任务消息", zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
- }
- }
- func (s *wafLogService) ExPortWafLog(ctx context.Context, req adminApi.ExportWafLog) ([]adminApi.ExportWafLogRes, error) {
- // 获取原始数据
- data, err := s.wafLogRepository.ExportWafLog(ctx, req)
- if err != nil {
- return nil, err
- }
- // 使用优化后的转换方法,避免N+1查询
- return s.convertRawDataToExportResults(ctx, data)
- }
- // SmartExportWafLog 智能导出WAF日志为Excel
- func (s *wafLogService) SmartExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter) error {
- // 1. 先获取总数量用于智能选择传输方式
- count, err := s.wafLogRepository.GetWafLogExportCount(ctx, req)
- if err != nil {
- return fmt.Errorf("获取导出数据总数失败: %w", err)
- }
- // 2. 智能选择导出方式
- // 估算每行数据大小约200字节(包含用户名、IP、API名称、域名等字段)
- exportType := excel.SmartExport(count, 200)
-
- // 3. 设置Excel表头映射
- headers := []string{"name", "request_ip", "host_id", "rule_id", "api_name", "addr_backend_list", "domain", "comment", "custom_host", "expose_addr", "created_at"}
- headerMap := map[string]string{
- "name": "用户名",
- "request_ip": "请求IP",
- "host_id": "主机ID",
- "rule_id": "规则ID",
- "api_name": "API名称",
- "addr_backend_list": "后端地址",
- "domain": "域名",
- "comment": "备注",
- "custom_host": "回源地址",
- "expose_addr": "暴露地址",
- "created_at": "创建时间",
- }
- // 4. 创建Excel生成器
- generator := excel.NewExcelGenerator("WAF日志", headers, headerMap)
- if err := generator.WriteHeaders(); err != nil {
- return fmt.Errorf("写入Excel表头失败: %w", err)
- }
- // 5. 根据导出类型选择不同的处理方式
- switch exportType {
- case excel.ExportTypeNormal:
- return s.normalExportWafLog(ctx, req, generator, w)
- case excel.ExportTypeStream:
- return s.streamExportWafLog(ctx, req, generator, w)
- case excel.ExportTypeChunk:
- return s.chunkExportWafLog(ctx, req, w, count)
- default:
- return s.normalExportWafLog(ctx, req, generator, w)
- }
- }
- // normalExportWafLog 普通导出(小文件)
- func (s *wafLogService) normalExportWafLog(ctx context.Context, req adminApi.ExportWafLog, generator *excel.ExcelGenerator, w http.ResponseWriter) error {
- // 获取所有数据(已经优化了批量查询)
- exportData, err := s.ExPortWafLog(ctx, req)
- if err != nil {
- return fmt.Errorf("获取导出数据失败: %w", err)
- }
- // 转换数据格式
- data := make([]map[string]interface{}, 0, len(exportData))
- for _, item := range exportData {
- row := map[string]interface{}{
- "name": item.Name,
- "request_ip": item.RequestIp,
- "host_id": item.HostId,
- "rule_id": item.RuleId,
- "api_name": item.ApiName,
- "addr_backend_list": s.formatBackendList(item.AddrBackendList),
- "domain": item.Domain,
- "comment": item.Comment,
- "custom_host": s.formatExposeAddr(item.CustomHost),
- "expose_addr": s.formatExposeAddr(item.ExposeAddr),
- "created_at": item.CreatedAt,
- }
- data = append(data, row)
- }
- // 写入数据
- if err := generator.WriteRows(data); err != nil {
- return fmt.Errorf("写入Excel数据失败: %w", err)
- }
- // 普通导出
- fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405"))
- return excel.NormalExport(generator, w, excel.TransferOption{
- FileName: fileName,
- ContentType: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
- })
- }
- // streamExportWafLog 流式导出(大文件)
- func (s *wafLogService) streamExportWafLog(ctx context.Context, req adminApi.ExportWafLog, generator *excel.ExcelGenerator, w http.ResponseWriter) error {
- fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405"))
-
- // 设置响应头
- w.Header().Set("Content-Type", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
- w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", fileName))
- w.Header().Set("Transfer-Encoding", "chunked")
- // 分批处理数据,每批1000条
- pageSize := 1000
- page := 1
- for {
- // 使用分页导出方法
- exportData, err := s.wafLogRepository.ExportWafLogWithPagination(ctx, req, page, pageSize)
- if err != nil {
- return fmt.Errorf("获取第%d页数据失败: %w", page, err)
- }
- // 转换为导出格式(复用原有的ExPortWafLog逻辑)
- exportResults, err := s.convertRawDataToExportResults(ctx, exportData)
- if err != nil {
- return fmt.Errorf("转换导出数据失败: %w", err)
- }
- if len(exportResults) == 0 {
- break // 没有更多数据
- }
- // 转换并写入当前批次数据
- for _, item := range exportResults {
- row := map[string]interface{}{
- "name": item.Name,
- "request_ip": item.RequestIp,
- "host_id": item.HostId,
- "api_name": item.ApiName,
- "addr_backend_list": s.formatBackendList(item.AddrBackendList),
- "domain": item.Domain,
- "comment": item.Comment,
- "custom_host": s.formatExposeAddr(item.CustomHost),
- "expose_addr": s.formatExposeAddr(item.ExposeAddr),
- "created_at": item.CreatedAt,
- }
-
- if err := generator.WriteRow(row); err != nil {
- return fmt.Errorf("写入第%d页数据失败: %w", page, err)
- }
- }
- // 如果当前批次数据少于页大小,说明已经是最后一页
- if len(exportResults) < pageSize {
- break
- }
- page++
- }
- // 流式导出
- return excel.StreamExport(generator, w, excel.TransferOption{
- FileName: fileName,
- ContentType: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
- })
- }
- // chunkExportWafLog 分块导出(超大文件)
- func (s *wafLogService) chunkExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter, totalRecords int) error {
- fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405"))
- pageSize := 5000 // 每个分块5000条记录
-
- // 分块导出需要前端配合实现
- excel.ChunkExport(w, excel.TransferOption{
- FileName: fileName,
- ContentType: "application/json", // 返回分块信息
- }, totalRecords, pageSize)
-
- return nil
- }
- // formatBackendList 格式化后端地址列表
- func (s *wafLogService) formatBackendList(backendList interface{}) string {
- if backendList == nil {
- return ""
- }
-
- switch v := backendList.(type) {
- case string:
- return v
- case []string:
- return strings.Join(v, ", ")
- default:
- // 对于其他类型,先转换为字符串再处理
- str := fmt.Sprintf("%v", v)
- if strings.Contains(str, " ") && !strings.Contains(str, "\n") {
- parts := strings.Fields(str)
- if len(parts) > 1 {
- return strings.Join(parts, ", ")
- }
- }
- return str
- }
- }
- // formatExposeAddr 格式化暴露地址
- func (s *wafLogService) formatExposeAddr(exposeAddr []string) string {
- if len(exposeAddr) == 0 {
- return ""
- }
- return strings.Join(exposeAddr, ", ")
- }
- // convertRawDataToExportResults 将原始数据转换为导出结果(复用原有的ExPortWafLog逻辑)
- func (s *wafLogService) convertRawDataToExportResults(ctx context.Context, rawData []model.WafLog) ([]adminApi.ExportWafLogRes, error) {
- if len(rawData) == 0 {
- return []adminApi.ExportWafLogRes{}, nil
- }
- // 批量准备逻辑保持不变...
- hostIds := make([]int64, 0, len(rawData))
- uids := make([]int64, 0, len(rawData))
- maxCreatedAt := time.Time{}
- for _, v := range rawData {
- hostIds = append(hostIds, int64(v.HostId))
- uids = append(uids, int64(v.Uid))
- if v.CreatedAt.After(maxCreatedAt) {
- maxCreatedAt = v.CreatedAt
- }
- }
- gatewayMap, err := s.wafLogRepository.BatchGetWafLogGateWayIps(ctx, hostIds, uids, maxCreatedAt)
- if err != nil {
- s.Logger.Warn("批量获取网关组失败,降级为单个查询", zap.Error(err))
- gatewayMap = make(map[string]model.WafLog)
- }
- var res []adminApi.ExportWafLogRes
- for _, v := range rawData {
- // --- 核心改动:一行代码完成所有数据清洗 ---
- cleanedData := s.wafLogDataCleanService.ParseWafLogExtraData(v.ExtraData, v.ApiName)
- // 网关 IP 获取逻辑保持不变,但使用清洗后的 port
- var exposeAddr []string
- key := fmt.Sprintf("%d_%d", v.HostId, v.Uid)
- if gatewayModel, exists := gatewayMap[key]; exists {
- var gateWayIps []string
- if err := json.Unmarshal(gatewayModel.ExtraData, &gateWayIps); err == nil && len(gateWayIps) > 0 && cleanedData.Port != "" {
- for _, ip := range gateWayIps {
- exposeAddr = append(exposeAddr, ip+":"+cleanedData.Port)
- }
- }
- } else {
- // 降级查询逻辑...
- gateWayIpModel, err := s.wafLogRepository.GetWafLogGateWayIp(ctx, int64(v.HostId), int64(v.Uid), v.CreatedAt)
- if err == nil {
- var gateWayIps []string
- if err := json.Unmarshal(gateWayIpModel.ExtraData, &gateWayIps); err == nil && len(gateWayIps) > 0 && cleanedData.Port != "" {
- for _, ip := range gateWayIps {
- exposeAddr = append(exposeAddr, ip+":"+cleanedData.Port)
- }
- }
- }
- }
- // 构造结果,代码更清晰
- res = append(res, adminApi.ExportWafLogRes{
- Name: v.Name,
- RequestIp: v.RequestIp,
- HostId: v.HostId,
- RuleId: v.RuleId,
- ApiName: v.ApiName,
- AddrBackendList: cleanedData.AddrBackendList,
- Domain: cleanedData.Domain,
- Comment: cleanedData.Comment,
- CustomHost: cleanedData.CustomHost,
- ExposeAddr: exposeAddr,
- CreatedAt: v.CreatedAt,
- })
- }
- return res, nil
- }
- // GetApiDescriptions 获取API描述映射
- func (s *wafLogService) GetApiDescriptions(ctx context.Context) map[string]string {
- return ApiDescriptionMap
- }
|