waflog.go 17 KB


  1. package admin
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
  7. adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
  8. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  9. adminRep "github.com/go-nunu/nunu-layout-advanced/internal/repository/admin"
  10. "github.com/go-nunu/nunu-layout-advanced/internal/repository/api/waf"
  11. "github.com/go-nunu/nunu-layout-advanced/internal/service"
  12. "github.com/go-nunu/nunu-layout-advanced/pkg/excel"
  13. "github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
  14. amqp "github.com/rabbitmq/amqp091-go"
  15. "go.uber.org/zap"
  16. "net/http"
  17. "strings"
  18. "time"
  19. )
  20. // ApiDescriptionMap API描述映射
  21. var ApiDescriptionMap = map[string]string{
  22. "/webForward/add": "添加web",
  23. "/webForward/edit": "修改web",
  24. "/webForward/delete": "删除web",
  25. "/tcpForward/add": "添加tcp",
  26. "/tcpForward/edit": "修改tcp",
  27. "/tcpForward/delete": "删除tcp",
  28. "/udpForward/add": "添加udp",
  29. "/udpForward/edit": "修改udp",
  30. "/udpForward/delete": "删除udp",
  31. "/globalLimit/add": "添加实例",
  32. "/globalLimit/edit": "修改实例",
  33. "/globalLimit/delete": "删除实例",
  34. "/allowAndDeny/add": "添加黑白名单",
  35. "/allowAndDeny/edit": "修改黑白名单",
  36. "/allowAndDeny/delete": "删除黑白名单",
  37. "/cc/editState": "删除CC黑名单",
  38. "/ccIpList/add": "添加CC白名单",
  39. "/ccIpList/edit": "修改CC白名单",
  40. "/ccIpList/delete": "删除CC白名单",
  41. "分配网关组": "分配网关组",
  42. }
  43. type WafLogService interface {
  44. GetWafLog(ctx context.Context, id int64) (*model.WafLog, error)
  45. GetWafLogList(ctx context.Context, req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error)
  46. AddWafLog(ctx context.Context, req adminApi.WafLog) error
  47. BatchAddWafLog(ctx context.Context, reqs []*adminApi.WafLog) error
  48. PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog)
  49. SmartExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter) error
  50. GetApiDescriptions(ctx context.Context) map[string]string
  51. }
  52. func NewWafLogService(
  53. service *service.Service,
  54. wafLogRepository adminRep.WafLogRepository,
  55. globalLimitRepository waf.GlobalLimitRepository,
  56. mq *rabbitmq.RabbitMQ,
  57. wafLogDataCleanService WafLogDataCleanService,
  58. ) WafLogService {
  59. return &wafLogService{
  60. Service: service,
  61. wafLogRepository: wafLogRepository,
  62. globalLimitRepository: globalLimitRepository,
  63. mq : mq,
  64. wafLogDataCleanService: wafLogDataCleanService,
  65. }
  66. }
  67. type wafLogService struct {
  68. *service.Service
  69. wafLogRepository adminRep.WafLogRepository
  70. globalLimitRepository waf.GlobalLimitRepository
  71. mq *rabbitmq.RabbitMQ
  72. wafLogDataCleanService WafLogDataCleanService
  73. }
  74. func (s *wafLogService) getFirstPathSegment(path string) (segment []string, ok bool) {
  75. // 1. 为了统一处理,先去掉路径最前面的 "/"
  76. // 这样 "/v1/admin" 会变成 "v1/admin",而 "v1/admin" 保持不变
  77. trimmedPath := strings.TrimPrefix(path, "/")
  78. // 如果去掉 "/" 后字符串为空(比如原路径是 "/" 或 ""),则无法提取
  79. if trimmedPath == "" {
  80. return nil, false
  81. }
  82. // 2. 使用 "/" 作为分隔符来切割字符串
  83. // "v1/admin/menus" 会被切割成一个字符串切片 (slice): ["v1", "admin", "menus"]
  84. parts := strings.Split(trimmedPath, "/")
  85. // 3. 只要切片不为空,第一个元素就是我们需要的值
  86. // len(parts) > 0 这个检查可以保证程序不会因为空切片而出错
  87. if len(parts) > 0 {
  88. return parts, true
  89. }
  90. return nil, false
  91. }
  92. func (s *wafLogService) GetWafLog(ctx context.Context, id int64) (*model.WafLog, error) {
  93. return s.wafLogRepository.GetWafLog(ctx, id)
  94. }
  95. func (s *wafLogService) GetWafLogList(ctx context.Context,req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error) {
  96. return s.wafLogRepository.GetWafLogList(ctx, req)
  97. }
  98. func (s *wafLogService) AddWafLog(ctx context.Context, req adminApi.WafLog) error {
  99. if req.Api != "" {
  100. api := strings.TrimPrefix(req.Api, "/v1")
  101. if _, ok := ApiDescriptionMap[api]; ok {
  102. req.ApiName = ApiDescriptionMap[api]
  103. }
  104. apiType, ok := s.getFirstPathSegment(req.Api)
  105. if ok {
  106. req.ApiType = apiType[len(apiType)-1]
  107. }
  108. }
  109. userInfo, err := s.globalLimitRepository.GetUserInfo(ctx, int64(req.Uid))
  110. if err != nil {
  111. return err
  112. }
  113. req.Name = userInfo.Username
  114. extraData, err := json.Marshal(req.ExtraData)
  115. if err != nil {
  116. return err
  117. }
  118. req.RequestIp = userInfo.LastLoginIp
  119. return s.wafLogRepository.AddWafLog(ctx, &model.WafLog{
  120. Uid: req.Uid,
  121. Name: req.Name,
  122. RequestIp: req.RequestIp,
  123. RuleId: req.RuleId,
  124. HostId: req.HostId,
  125. UserAgent: req.UserAgent,
  126. Api: req.Api,
  127. ApiType: req.ApiType,
  128. ApiName: req.ApiName,
  129. ExtraData: extraData,
  130. })
  131. }
  132. func (s *wafLogService) BatchAddWafLog(ctx context.Context, reqs []*adminApi.WafLog) error {
  133. if len(reqs) == 0 {
  134. return nil
  135. }
  136. wafLogs := make([]*model.WafLog, 0, len(reqs))
  137. for _, req := range reqs {
  138. if req.Api != "" {
  139. api := strings.TrimPrefix(req.Api, "/v1")
  140. if _, ok := ApiDescriptionMap[api]; ok {
  141. req.ApiName = ApiDescriptionMap[api]
  142. }
  143. apiType, ok := s.getFirstPathSegment(req.Api)
  144. if ok {
  145. req.ApiType = apiType[len(apiType)-1]
  146. }
  147. }
  148. userInfo, err := s.globalLimitRepository.GetUserInfo(ctx, int64(req.Uid))
  149. if err != nil {
  150. s.Logger.Error("获取用户信息失败", zap.Error(err), zap.Int("uid", req.Uid))
  151. continue
  152. }
  153. req.Name = userInfo.Username
  154. req.RequestIp = userInfo.LastLoginIp
  155. extraData, err := json.Marshal(req.ExtraData)
  156. if err != nil {
  157. s.Logger.Error("序列化额外数据失败", zap.Error(err))
  158. continue
  159. }
  160. wafLogs = append(wafLogs, &model.WafLog{
  161. Uid: req.Uid,
  162. Name: req.Name,
  163. RequestIp: req.RequestIp,
  164. RuleId: req.RuleId,
  165. HostId: req.HostId,
  166. UserAgent: req.UserAgent,
  167. Api: req.Api,
  168. ApiType: req.ApiType,
  169. ApiName: req.ApiName,
  170. ExtraData: extraData,
  171. })
  172. }
  173. // 调用repository层的批量插入方法
  174. return s.wafLogRepository.BatchAddWafLog(ctx, wafLogs)
  175. }
  176. func (s *wafLogService) PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog) {
  177. payload := &req
  178. // Serialize the message
  179. msgBody, err := json.Marshal(payload)
  180. if err != nil {
  181. s.Logger.Error("序列化 WafLog 任务消息失败", zap.Error(err), zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
  182. return
  183. }
  184. // Get task configuration
  185. taskCfg, ok := s.mq.GetTaskConfig("waf_log")
  186. if !ok {
  187. s.Logger.Error("无法获取“waf_Log”任务配置")
  188. return
  189. }
  190. // Construct the routing key dynamically based on the action
  191. routingKey := fmt.Sprintf("wafLog.%s", "add")
  192. // Construct the amqp.Publishing message
  193. publishingMsg := amqp.Publishing{
  194. ContentType: "application/json",
  195. Body: msgBody,
  196. DeliveryMode: amqp.Persistent, // Persistent message
  197. }
  198. // Publish the message
  199. err = s.mq.PublishWithCh(taskCfg.Exchange, routingKey, publishingMsg)
  200. if err != nil {
  201. s.Logger.Error("发布 WafLog 任务消息失败", zap.Error(err), zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
  202. } else {
  203. s.Logger.Info("已成功发布 WafLog 任务消息", zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
  204. }
  205. }
  206. func (s *wafLogService) ExPortWafLog(ctx context.Context, req adminApi.ExportWafLog) ([]adminApi.ExportWafLogRes, error) {
  207. // 获取原始数据
  208. data, err := s.wafLogRepository.ExportWafLog(ctx, req)
  209. if err != nil {
  210. return nil, err
  211. }
  212. // 使用优化后的转换方法,避免N+1查询
  213. return s.convertRawDataToExportResults(ctx, data)
  214. }
  215. // SmartExportWafLog 智能导出WAF日志为Excel
  216. func (s *wafLogService) SmartExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter) error {
  217. // 1. 先获取总数量用于智能选择传输方式
  218. count, err := s.wafLogRepository.GetWafLogExportCount(ctx, req)
  219. if err != nil {
  220. return fmt.Errorf("获取导出数据总数失败: %w", err)
  221. }
  222. // 2. 智能选择导出方式
  223. // 估算每行数据大小约200字节(包含用户名、IP、API名称、域名等字段)
  224. exportType := excel.SmartExport(count, 200)
  225. // 3. 设置Excel表头映射
  226. headers := []string{"name", "request_ip", "host_id", "rule_id", "api_name", "addr_backend_list", "domain", "comment", "custom_host", "expose_addr", "created_at"}
  227. headerMap := map[string]string{
  228. "name": "用户名",
  229. "request_ip": "请求IP",
  230. "host_id": "主机ID",
  231. "rule_id": "规则ID",
  232. "api_name": "API名称",
  233. "addr_backend_list": "后端地址",
  234. "domain": "域名",
  235. "comment": "备注",
  236. "custom_host": "回源地址",
  237. "expose_addr": "暴露地址",
  238. "created_at": "创建时间",
  239. }
  240. // 4. 创建Excel生成器
  241. generator := excel.NewExcelGenerator("WAF日志", headers, headerMap)
  242. if err := generator.WriteHeaders(); err != nil {
  243. return fmt.Errorf("写入Excel表头失败: %w", err)
  244. }
  245. // 5. 根据导出类型选择不同的处理方式
  246. switch exportType {
  247. case excel.ExportTypeNormal:
  248. return s.normalExportWafLog(ctx, req, generator, w)
  249. case excel.ExportTypeStream:
  250. return s.streamExportWafLog(ctx, req, generator, w)
  251. case excel.ExportTypeChunk:
  252. return s.chunkExportWafLog(ctx, req, w, count)
  253. default:
  254. return s.normalExportWafLog(ctx, req, generator, w)
  255. }
  256. }
  257. // normalExportWafLog 普通导出(小文件)
  258. func (s *wafLogService) normalExportWafLog(ctx context.Context, req adminApi.ExportWafLog, generator *excel.ExcelGenerator, w http.ResponseWriter) error {
  259. // 获取所有数据(已经优化了批量查询)
  260. exportData, err := s.ExPortWafLog(ctx, req)
  261. if err != nil {
  262. return fmt.Errorf("获取导出数据失败: %w", err)
  263. }
  264. // 转换数据格式
  265. data := make([]map[string]interface{}, 0, len(exportData))
  266. for _, item := range exportData {
  267. row := map[string]interface{}{
  268. "name": item.Name,
  269. "request_ip": item.RequestIp,
  270. "host_id": item.HostId,
  271. "rule_id": item.RuleId,
  272. "api_name": item.ApiName,
  273. "addr_backend_list": s.formatBackendList(item.AddrBackendList),
  274. "domain": item.Domain,
  275. "comment": item.Comment,
  276. "custom_host": s.formatExposeAddr(item.CustomHost),
  277. "expose_addr": s.formatExposeAddr(item.ExposeAddr),
  278. "created_at": item.CreatedAt,
  279. }
  280. data = append(data, row)
  281. }
  282. // 写入数据
  283. if err := generator.WriteRows(data); err != nil {
  284. return fmt.Errorf("写入Excel数据失败: %w", err)
  285. }
  286. // 普通导出
  287. fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405"))
  288. return excel.NormalExport(generator, w, excel.TransferOption{
  289. FileName: fileName,
  290. ContentType: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
  291. })
  292. }
  293. // streamExportWafLog 流式导出(大文件)
  294. func (s *wafLogService) streamExportWafLog(ctx context.Context, req adminApi.ExportWafLog, generator *excel.ExcelGenerator, w http.ResponseWriter) error {
  295. fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405"))
  296. // 设置响应头
  297. w.Header().Set("Content-Type", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
  298. w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", fileName))
  299. w.Header().Set("Transfer-Encoding", "chunked")
  300. // 分批处理数据,每批1000条
  301. pageSize := 1000
  302. page := 1
  303. for {
  304. // 使用分页导出方法
  305. exportData, err := s.wafLogRepository.ExportWafLogWithPagination(ctx, req, page, pageSize)
  306. if err != nil {
  307. return fmt.Errorf("获取第%d页数据失败: %w", page, err)
  308. }
  309. // 转换为导出格式(复用原有的ExPortWafLog逻辑)
  310. exportResults, err := s.convertRawDataToExportResults(ctx, exportData)
  311. if err != nil {
  312. return fmt.Errorf("转换导出数据失败: %w", err)
  313. }
  314. if len(exportResults) == 0 {
  315. break // 没有更多数据
  316. }
  317. // 转换并写入当前批次数据
  318. for _, item := range exportResults {
  319. row := map[string]interface{}{
  320. "name": item.Name,
  321. "request_ip": item.RequestIp,
  322. "host_id": item.HostId,
  323. "api_name": item.ApiName,
  324. "addr_backend_list": s.formatBackendList(item.AddrBackendList),
  325. "domain": item.Domain,
  326. "comment": item.Comment,
  327. "custom_host": s.formatExposeAddr(item.CustomHost),
  328. "expose_addr": s.formatExposeAddr(item.ExposeAddr),
  329. "created_at": item.CreatedAt,
  330. }
  331. if err := generator.WriteRow(row); err != nil {
  332. return fmt.Errorf("写入第%d页数据失败: %w", page, err)
  333. }
  334. }
  335. // 如果当前批次数据少于页大小,说明已经是最后一页
  336. if len(exportResults) < pageSize {
  337. break
  338. }
  339. page++
  340. }
  341. // 流式导出
  342. return excel.StreamExport(generator, w, excel.TransferOption{
  343. FileName: fileName,
  344. ContentType: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
  345. })
  346. }
  347. // chunkExportWafLog 分块导出(超大文件)
  348. func (s *wafLogService) chunkExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter, totalRecords int) error {
  349. fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405"))
  350. pageSize := 5000 // 每个分块5000条记录
  351. // 分块导出需要前端配合实现
  352. excel.ChunkExport(w, excel.TransferOption{
  353. FileName: fileName,
  354. ContentType: "application/json", // 返回分块信息
  355. }, totalRecords, pageSize)
  356. return nil
  357. }
  358. // formatBackendList 格式化后端地址列表
  359. func (s *wafLogService) formatBackendList(backendList interface{}) string {
  360. if backendList == nil {
  361. return ""
  362. }
  363. switch v := backendList.(type) {
  364. case string:
  365. return v
  366. case []string:
  367. return strings.Join(v, ", ")
  368. default:
  369. // 对于其他类型,先转换为字符串再处理
  370. str := fmt.Sprintf("%v", v)
  371. if strings.Contains(str, " ") && !strings.Contains(str, "\n") {
  372. parts := strings.Fields(str)
  373. if len(parts) > 1 {
  374. return strings.Join(parts, ", ")
  375. }
  376. }
  377. return str
  378. }
  379. }
  380. // formatExposeAddr 格式化暴露地址
  381. func (s *wafLogService) formatExposeAddr(exposeAddr []string) string {
  382. if len(exposeAddr) == 0 {
  383. return ""
  384. }
  385. return strings.Join(exposeAddr, ", ")
  386. }
  387. // convertRawDataToExportResults 将原始数据转换为导出结果(复用原有的ExPortWafLog逻辑)
  388. func (s *wafLogService) convertRawDataToExportResults(ctx context.Context, rawData []model.WafLog) ([]adminApi.ExportWafLogRes, error) {
  389. if len(rawData) == 0 {
  390. return []adminApi.ExportWafLogRes{}, nil
  391. }
  392. // 批量准备逻辑保持不变...
  393. hostIds := make([]int64, 0, len(rawData))
  394. uids := make([]int64, 0, len(rawData))
  395. maxCreatedAt := time.Time{}
  396. for _, v := range rawData {
  397. hostIds = append(hostIds, int64(v.HostId))
  398. uids = append(uids, int64(v.Uid))
  399. if v.CreatedAt.After(maxCreatedAt) {
  400. maxCreatedAt = v.CreatedAt
  401. }
  402. }
  403. gatewayMap, err := s.wafLogRepository.BatchGetWafLogGateWayIps(ctx, hostIds, uids, maxCreatedAt)
  404. if err != nil {
  405. s.Logger.Warn("批量获取网关组失败,降级为单个查询", zap.Error(err))
  406. gatewayMap = make(map[string]model.WafLog)
  407. }
  408. var res []adminApi.ExportWafLogRes
  409. for _, v := range rawData {
  410. // --- 核心改动:一行代码完成所有数据清洗 ---
  411. cleanedData := s.wafLogDataCleanService.ParseWafLogExtraData(v.ExtraData, v.ApiName)
  412. // 网关 IP 获取逻辑保持不变,但使用清洗后的 port
  413. var exposeAddr []string
  414. key := fmt.Sprintf("%d_%d", v.HostId, v.Uid)
  415. if gatewayModel, exists := gatewayMap[key]; exists {
  416. var gateWayIps []string
  417. if err := json.Unmarshal(gatewayModel.ExtraData, &gateWayIps); err == nil && len(gateWayIps) > 0 && cleanedData.Port != "" {
  418. for _, ip := range gateWayIps {
  419. exposeAddr = append(exposeAddr, ip+":"+cleanedData.Port)
  420. }
  421. }
  422. } else {
  423. // 降级查询逻辑...
  424. gateWayIpModel, err := s.wafLogRepository.GetWafLogGateWayIp(ctx, int64(v.HostId), int64(v.Uid), v.CreatedAt)
  425. if err == nil {
  426. var gateWayIps []string
  427. if err := json.Unmarshal(gateWayIpModel.ExtraData, &gateWayIps); err == nil && len(gateWayIps) > 0 && cleanedData.Port != "" {
  428. for _, ip := range gateWayIps {
  429. exposeAddr = append(exposeAddr, ip+":"+cleanedData.Port)
  430. }
  431. }
  432. }
  433. }
  434. // 构造结果,代码更清晰
  435. res = append(res, adminApi.ExportWafLogRes{
  436. Name: v.Name,
  437. RequestIp: v.RequestIp,
  438. HostId: v.HostId,
  439. RuleId: v.RuleId,
  440. ApiName: v.ApiName,
  441. AddrBackendList: cleanedData.AddrBackendList,
  442. Domain: cleanedData.Domain,
  443. Comment: cleanedData.Comment,
  444. CustomHost: cleanedData.CustomHost,
  445. ExposeAddr: exposeAddr,
  446. CreatedAt: v.CreatedAt,
  447. })
  448. }
  449. return res, nil
  450. }
  451. // GetApiDescriptions 获取API描述映射
  452. func (s *wafLogService) GetApiDescriptions(ctx context.Context) map[string]string {
  453. return ApiDescriptionMap
  454. }