waflog.go 19 KB


  1. package admin
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
  7. adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
  8. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  9. adminRep "github.com/go-nunu/nunu-layout-advanced/internal/repository/admin"
  10. "github.com/go-nunu/nunu-layout-advanced/internal/repository/api/waf"
  11. "github.com/go-nunu/nunu-layout-advanced/internal/service"
  12. "github.com/go-nunu/nunu-layout-advanced/pkg/excel"
  13. "github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
  14. amqp "github.com/rabbitmq/amqp091-go"
  15. "go.uber.org/zap"
  16. "net/http"
  17. "strings"
  18. "time"
  19. )
  20. // ApiDescriptionMap API描述映射
  21. var ApiDescriptionMap = map[string]string{
  22. "/webForward/get": "获取web详情",
  23. "/webForward/getList": "获取web列表",
  24. "/webForward/add": "添加web",
  25. "/webForward/edit": "修改web",
  26. "/webForward/delete": "删除web",
  27. "/tcpForward/add": "添加tcp",
  28. "/tcpForward/edit": "修改tcp",
  29. "/tcpForward/delete": "删除tcp",
  30. "/tcpForward/getList": "获取tcp列表",
  31. "/tcpForward/get": "获取tcp详情",
  32. "/udpForward/add": "添加udp",
  33. "/udpForward/edit": "修改udp",
  34. "/udpForward/delete": "删除udp",
  35. "/udpForward/getList": "获取udp列表",
  36. "/udpForward/get": "获取udp详情",
  37. "/globalLimit/add": "添加实例",
  38. "/globalLimit/edit": "修改实例",
  39. "/globalLimit/delete": "删除实例",
  40. "/allowAndDeny/get": "获取黑白名单详情",
  41. "/allowAndDeny/getList": "获取黑白名单列表",
  42. "/allowAndDeny/add": "添加黑白名单",
  43. "/allowAndDeny/edit": "修改黑白名单",
  44. "/allowAndDeny/delete": "删除黑白名单",
  45. "/cc/getList": "获取CC列表",
  46. "/cc/editState": "删除CC黑名单",
  47. "/ccIpList/getList": "获取CC白名单列表",
  48. "/ccIpList/add": "添加CC白名单",
  49. "/ccIpList/edit": "修改CC白名单",
  50. "/ccIpList/delete": "删除CC白名单",
  51. "分配网关组": "分配网关组",
  52. }
  53. type WafLogService interface {
  54. GetWafLog(ctx context.Context, id int64) (*model.WafLog, error)
  55. GetWafLogList(ctx context.Context, req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error)
  56. AddWafLog(ctx context.Context, req adminApi.WafLog) error
  57. BatchAddWafLog(ctx context.Context, reqs []*adminApi.WafLog) error
  58. PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog)
  59. SmartExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter) error
  60. GetApiDescriptions(ctx context.Context) map[string]string
  61. }
  62. func NewWafLogService(
  63. service *service.Service,
  64. wafLogRepository adminRep.WafLogRepository,
  65. globalLimitRepository waf.GlobalLimitRepository,
  66. mq *rabbitmq.RabbitMQ,
  67. ) WafLogService {
  68. return &wafLogService{
  69. Service: service,
  70. wafLogRepository: wafLogRepository,
  71. globalLimitRepository: globalLimitRepository,
  72. mq : mq,
  73. }
  74. }
  75. type wafLogService struct {
  76. *service.Service
  77. wafLogRepository adminRep.WafLogRepository
  78. globalLimitRepository waf.GlobalLimitRepository
  79. mq *rabbitmq.RabbitMQ
  80. }
  81. func (s *wafLogService) getFirstPathSegment(path string) (segment []string, ok bool) {
  82. // 1. 为了统一处理,先去掉路径最前面的 "/"
  83. // 这样 "/v1/admin" 会变成 "v1/admin",而 "v1/admin" 保持不变
  84. trimmedPath := strings.TrimPrefix(path, "/")
  85. // 如果去掉 "/" 后字符串为空(比如原路径是 "/" 或 ""),则无法提取
  86. if trimmedPath == "" {
  87. return nil, false
  88. }
  89. // 2. 使用 "/" 作为分隔符来切割字符串
  90. // "v1/admin/menus" 会被切割成一个字符串切片 (slice): ["v1", "admin", "menus"]
  91. parts := strings.Split(trimmedPath, "/")
  92. // 3. 只要切片不为空,第一个元素就是我们需要的值
  93. // len(parts) > 0 这个检查可以保证程序不会因为空切片而出错
  94. if len(parts) > 0 {
  95. return parts, true
  96. }
  97. return nil, false
  98. }
  99. func (s *wafLogService) GetWafLog(ctx context.Context, id int64) (*model.WafLog, error) {
  100. return s.wafLogRepository.GetWafLog(ctx, id)
  101. }
  102. func (s *wafLogService) GetWafLogList(ctx context.Context,req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error) {
  103. return s.wafLogRepository.GetWafLogList(ctx, req)
  104. }
  105. func (s *wafLogService) AddWafLog(ctx context.Context, req adminApi.WafLog) error {
  106. if req.Api != "" {
  107. api := strings.TrimPrefix(req.Api, "/v1")
  108. if _, ok := ApiDescriptionMap[api]; ok {
  109. req.ApiName = ApiDescriptionMap[api]
  110. }
  111. apiType, ok := s.getFirstPathSegment(req.Api)
  112. if ok {
  113. req.ApiType = apiType[len(apiType)-1]
  114. }
  115. }
  116. userInfo, err := s.globalLimitRepository.GetUserInfo(ctx, int64(req.Uid))
  117. if err != nil {
  118. return err
  119. }
  120. req.Name = userInfo.Username
  121. extraData, err := json.Marshal(req.ExtraData)
  122. if err != nil {
  123. return err
  124. }
  125. req.RequestIp = userInfo.LastLoginIp
  126. return s.wafLogRepository.AddWafLog(ctx, &model.WafLog{
  127. Uid: req.Uid,
  128. Name: req.Name,
  129. RequestIp: req.RequestIp,
  130. RuleId: req.RuleId,
  131. HostId: req.HostId,
  132. UserAgent: req.UserAgent,
  133. Api: req.Api,
  134. ApiType: req.ApiType,
  135. ApiName: req.ApiName,
  136. ExtraData: extraData,
  137. })
  138. }
  139. func (s *wafLogService) BatchAddWafLog(ctx context.Context, reqs []*adminApi.WafLog) error {
  140. if len(reqs) == 0 {
  141. return nil
  142. }
  143. wafLogs := make([]*model.WafLog, 0, len(reqs))
  144. for _, req := range reqs {
  145. if req.Api != "" {
  146. api := strings.TrimPrefix(req.Api, "/v1")
  147. if _, ok := ApiDescriptionMap[api]; ok {
  148. req.ApiName = ApiDescriptionMap[api]
  149. }
  150. apiType, ok := s.getFirstPathSegment(req.Api)
  151. if ok {
  152. req.ApiType = apiType[len(apiType)-1]
  153. }
  154. }
  155. userInfo, err := s.globalLimitRepository.GetUserInfo(ctx, int64(req.Uid))
  156. if err != nil {
  157. s.Logger.Error("获取用户信息失败", zap.Error(err), zap.Int("uid", req.Uid))
  158. continue
  159. }
  160. req.Name = userInfo.Username
  161. req.RequestIp = userInfo.LastLoginIp
  162. extraData, err := json.Marshal(req.ExtraData)
  163. if err != nil {
  164. s.Logger.Error("序列化额外数据失败", zap.Error(err))
  165. continue
  166. }
  167. wafLogs = append(wafLogs, &model.WafLog{
  168. Uid: req.Uid,
  169. Name: req.Name,
  170. RequestIp: req.RequestIp,
  171. RuleId: req.RuleId,
  172. HostId: req.HostId,
  173. UserAgent: req.UserAgent,
  174. Api: req.Api,
  175. ApiType: req.ApiType,
  176. ApiName: req.ApiName,
  177. ExtraData: extraData,
  178. })
  179. }
  180. // 调用repository层的批量插入方法
  181. return s.wafLogRepository.BatchAddWafLog(ctx, wafLogs)
  182. }
  183. func (s *wafLogService) PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog) {
  184. payload := &req
  185. // Serialize the message
  186. msgBody, err := json.Marshal(payload)
  187. if err != nil {
  188. s.Logger.Error("序列化 WafLog 任务消息失败", zap.Error(err), zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
  189. return
  190. }
  191. // Get task configuration
  192. taskCfg, ok := s.mq.GetTaskConfig("waf_log")
  193. if !ok {
  194. s.Logger.Error("无法获取“waf_Log”任务配置")
  195. return
  196. }
  197. // Construct the routing key dynamically based on the action
  198. routingKey := fmt.Sprintf("wafLog.%s", "add")
  199. // Construct the amqp.Publishing message
  200. publishingMsg := amqp.Publishing{
  201. ContentType: "application/json",
  202. Body: msgBody,
  203. DeliveryMode: amqp.Persistent, // Persistent message
  204. }
  205. // Publish the message
  206. err = s.mq.PublishWithCh(taskCfg.Exchange, routingKey, publishingMsg)
  207. if err != nil {
  208. s.Logger.Error("发布 WafLog 任务消息失败", zap.Error(err), zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
  209. } else {
  210. s.Logger.Info("已成功发布 WafLog 任务消息", zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
  211. }
  212. }
  213. func (s *wafLogService) ExPortWafLog(ctx context.Context, req adminApi.ExportWafLog) ([]adminApi.ExportWafLogRes, error) {
  214. // 获取原始数据
  215. data, err := s.wafLogRepository.ExportWafLog(ctx, req)
  216. if err != nil {
  217. return nil, err
  218. }
  219. // 使用优化后的转换方法,避免N+1查询
  220. return s.convertRawDataToExportResults(ctx, data)
  221. }
  222. // SmartExportWafLog 智能导出WAF日志为Excel
  223. func (s *wafLogService) SmartExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter) error {
  224. // 1. 先获取总数量用于智能选择传输方式
  225. count, err := s.wafLogRepository.GetWafLogExportCount(ctx, req)
  226. if err != nil {
  227. return fmt.Errorf("获取导出数据总数失败: %w", err)
  228. }
  229. // 2. 智能选择导出方式
  230. // 估算每行数据大小约200字节(包含用户名、IP、API名称、域名等字段)
  231. exportType := excel.SmartExport(count, 200)
  232. // 3. 设置Excel表头映射
  233. headers := []string{"name", "request_ip", "host_id", "api_name", "addr_backend_list", "domain", "comment", "custom_host", "expose_addr", "created_at"}
  234. headerMap := map[string]string{
  235. "name": "用户名",
  236. "request_ip": "请求IP",
  237. "host_id": "主机ID",
  238. "api_name": "API名称",
  239. "addr_backend_list": "后端地址",
  240. "domain": "域名",
  241. "comment": "备注",
  242. "custom_host": "回源地址",
  243. "expose_addr": "暴露地址",
  244. "created_at": "创建时间",
  245. }
  246. // 4. 创建Excel生成器
  247. generator := excel.NewExcelGenerator("WAF日志", headers, headerMap)
  248. if err := generator.WriteHeaders(); err != nil {
  249. return fmt.Errorf("写入Excel表头失败: %w", err)
  250. }
  251. // 5. 根据导出类型选择不同的处理方式
  252. switch exportType {
  253. case excel.ExportTypeNormal:
  254. return s.normalExportWafLog(ctx, req, generator, w)
  255. case excel.ExportTypeStream:
  256. return s.streamExportWafLog(ctx, req, generator, w)
  257. case excel.ExportTypeChunk:
  258. return s.chunkExportWafLog(ctx, req, w, count)
  259. default:
  260. return s.normalExportWafLog(ctx, req, generator, w)
  261. }
  262. }
  263. // normalExportWafLog 普通导出(小文件)
  264. func (s *wafLogService) normalExportWafLog(ctx context.Context, req adminApi.ExportWafLog, generator *excel.ExcelGenerator, w http.ResponseWriter) error {
  265. // 获取所有数据(已经优化了批量查询)
  266. exportData, err := s.ExPortWafLog(ctx, req)
  267. if err != nil {
  268. return fmt.Errorf("获取导出数据失败: %w", err)
  269. }
  270. // 转换数据格式
  271. data := make([]map[string]interface{}, 0, len(exportData))
  272. for _, item := range exportData {
  273. row := map[string]interface{}{
  274. "name": item.Name,
  275. "request_ip": item.RequestIp,
  276. "host_id": item.HostId,
  277. "api_name": item.ApiName,
  278. "addr_backend_list": s.formatBackendList(item.AddrBackendList),
  279. "domain": item.Domain,
  280. "comment": item.Comment,
  281. "custom_host": item.CustomHost,
  282. "expose_addr": s.formatExposeAddr(item.ExposeAddr),
  283. "created_at": item.CreatedAt,
  284. }
  285. data = append(data, row)
  286. }
  287. // 写入数据
  288. if err := generator.WriteRows(data); err != nil {
  289. return fmt.Errorf("写入Excel数据失败: %w", err)
  290. }
  291. // 普通导出
  292. fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405"))
  293. return excel.NormalExport(generator, w, excel.TransferOption{
  294. FileName: fileName,
  295. ContentType: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
  296. })
  297. }
  298. // streamExportWafLog 流式导出(大文件)
  299. func (s *wafLogService) streamExportWafLog(ctx context.Context, req adminApi.ExportWafLog, generator *excel.ExcelGenerator, w http.ResponseWriter) error {
  300. fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405"))
  301. // 设置响应头
  302. w.Header().Set("Content-Type", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
  303. w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", fileName))
  304. w.Header().Set("Transfer-Encoding", "chunked")
  305. // 分批处理数据,每批1000条
  306. pageSize := 1000
  307. page := 1
  308. for {
  309. // 使用分页导出方法
  310. exportData, err := s.wafLogRepository.ExportWafLogWithPagination(ctx, req, page, pageSize)
  311. if err != nil {
  312. return fmt.Errorf("获取第%d页数据失败: %w", page, err)
  313. }
  314. // 转换为导出格式(复用原有的ExPortWafLog逻辑)
  315. exportResults, err := s.convertRawDataToExportResults(ctx, exportData)
  316. if err != nil {
  317. return fmt.Errorf("转换导出数据失败: %w", err)
  318. }
  319. if len(exportResults) == 0 {
  320. break // 没有更多数据
  321. }
  322. // 转换并写入当前批次数据
  323. for _, item := range exportResults {
  324. row := map[string]interface{}{
  325. "name": item.Name,
  326. "request_ip": item.RequestIp,
  327. "host_id": item.HostId,
  328. "api_name": item.ApiName,
  329. "addr_backend_list": s.formatBackendList(item.AddrBackendList),
  330. "domain": item.Domain,
  331. "comment": item.Comment,
  332. "custom_host": item.CustomHost,
  333. "expose_addr": s.formatExposeAddr(item.ExposeAddr),
  334. "created_at": item.CreatedAt,
  335. }
  336. if err := generator.WriteRow(row); err != nil {
  337. return fmt.Errorf("写入第%d页数据失败: %w", page, err)
  338. }
  339. }
  340. // 如果当前批次数据少于页大小,说明已经是最后一页
  341. if len(exportResults) < pageSize {
  342. break
  343. }
  344. page++
  345. }
  346. // 流式导出
  347. return excel.StreamExport(generator, w, excel.TransferOption{
  348. FileName: fileName,
  349. ContentType: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
  350. })
  351. }
  352. // chunkExportWafLog 分块导出(超大文件)
  353. func (s *wafLogService) chunkExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter, totalRecords int) error {
  354. fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405"))
  355. pageSize := 5000 // 每个分块5000条记录
  356. // 分块导出需要前端配合实现
  357. excel.ChunkExport(w, excel.TransferOption{
  358. FileName: fileName,
  359. ContentType: "application/json", // 返回分块信息
  360. }, totalRecords, pageSize)
  361. return nil
  362. }
  363. // formatBackendList 格式化后端地址列表
  364. func (s *wafLogService) formatBackendList(backendList interface{}) string {
  365. if backendList == nil {
  366. return ""
  367. }
  368. switch v := backendList.(type) {
  369. case string:
  370. return v
  371. case []string:
  372. return strings.Join(v, ", ")
  373. default:
  374. return fmt.Sprintf("%v", v)
  375. }
  376. }
  377. // formatExposeAddr 格式化暴露地址
  378. func (s *wafLogService) formatExposeAddr(exposeAddr []string) string {
  379. if len(exposeAddr) == 0 {
  380. return ""
  381. }
  382. return strings.Join(exposeAddr, ", ")
  383. }
  384. // convertRawDataToExportResults 将原始数据转换为导出结果(复用原有的ExPortWafLog逻辑)
  385. func (s *wafLogService) convertRawDataToExportResults(ctx context.Context, rawData []model.WafLog) ([]adminApi.ExportWafLogRes, error) {
  386. if len(rawData) == 0 {
  387. return []adminApi.ExportWafLogRes{}, nil
  388. }
  389. // 收集所有需要查询的hostId和uid,用于批量获取网关组
  390. hostIds := make([]int64, 0, len(rawData))
  391. uids := make([]int64, 0, len(rawData))
  392. maxCreatedAt := time.Time{}
  393. for _, v := range rawData {
  394. hostIds = append(hostIds, int64(v.HostId))
  395. uids = append(uids, int64(v.Uid))
  396. if v.CreatedAt.After(maxCreatedAt) {
  397. maxCreatedAt = v.CreatedAt
  398. }
  399. }
  400. // 批量获取网关组数据
  401. gatewayMap, err := s.wafLogRepository.BatchGetWafLogGateWayIps(ctx, hostIds, uids, maxCreatedAt)
  402. if err != nil {
  403. s.Logger.Warn("批量获取网关组失败,降级为单个查询", zap.Error(err))
  404. gatewayMap = make(map[string]model.WafLog) // 空map,后续会降级处理
  405. }
  406. var res []adminApi.ExportWafLogRes
  407. for _, v := range rawData {
  408. var AddrBackendList interface{}
  409. var customHost string
  410. var port string
  411. var domain string
  412. var comment string
  413. var mapData map[string]interface{}
  414. err := json.Unmarshal(v.ExtraData, &mapData)
  415. if err != nil {
  416. // 尝试解析为数组格式
  417. var arrayData []interface{}
  418. if arrayErr := json.Unmarshal(v.ExtraData, &arrayData); arrayErr != nil {
  419. // 如果不是符合的JSON格式,直接把原始值作为字符串处理
  420. s.Logger.Warn("额外数据不是有效JSON格式,使用原始值", zap.Error(err), zap.Int("id", v.Id),
  421. zap.String("extra_data", string(v.ExtraData)))
  422. mapData = map[string]interface{}{
  423. "raw_data": string(v.ExtraData),
  424. }
  425. } else {
  426. // 如果是数组格式,将数组作为值存储
  427. s.Logger.Warn("额外数据为数组格式,保存为数组值", zap.Int("id", v.Id))
  428. mapData = map[string]interface{}{
  429. "array_data": arrayData,
  430. }
  431. }
  432. }
  433. if strings.Contains(v.ApiName, "tcp") || strings.Contains(v.ApiName, "udp") || strings.Contains(v.ApiName, "web") {
  434. // 安全地获取extraData
  435. var extraData map[string]interface{}
  436. if mapData["data"] != nil {
  437. if data, ok := mapData["data"].(map[string]interface{}); ok {
  438. extraData = data
  439. }
  440. }
  441. if extraData != nil {
  442. if extraData["port"] != nil {
  443. if portStr, ok := extraData["port"].(string); ok {
  444. port = portStr
  445. }
  446. }
  447. if extraData["domain"] != nil {
  448. if domainStr, ok := extraData["domain"].(string); ok {
  449. domain = domainStr
  450. }
  451. }
  452. if extraData["backendList"] != nil {
  453. if strings.Contains(v.ApiName, "web") {
  454. if backendListStr, ok := extraData["backendList"].(string); ok {
  455. var backendList []map[string]interface{}
  456. err := json.Unmarshal([]byte(backendListStr), &backendList)
  457. if err != nil {
  458. s.Logger.Error("解析后端列表失败", zap.Error(err))
  459. continue
  460. }
  461. for _, backend := range backendList {
  462. if backend["addr"] != nil {
  463. AddrBackendList = backend["addr"]
  464. }
  465. if backend["customHost"] != nil {
  466. if customHostStr, ok := backend["customHost"].(string); ok {
  467. customHost = customHostStr
  468. }
  469. }
  470. }
  471. }
  472. } else {
  473. AddrBackendList = extraData["backendList"]
  474. }
  475. }
  476. }
  477. }
  478. if mapData["comment"] != nil {
  479. if commentStr, ok := mapData["comment"].(string); ok {
  480. comment = commentStr
  481. }
  482. }
  483. // 优化:从批量获取的网关组数据中查找
  484. var exposeAddr []string
  485. key := fmt.Sprintf("%d_%d", v.HostId, v.Uid)
  486. if gatewayModel, exists := gatewayMap[key]; exists {
  487. var gateWayIps []string
  488. if err := json.Unmarshal(gatewayModel.ExtraData, &gateWayIps); err == nil {
  489. if len(gateWayIps) > 0 && port != "" {
  490. for _, ip := range gateWayIps {
  491. exposeAddr = append(exposeAddr, ip+":"+port)
  492. }
  493. }
  494. }
  495. } else {
  496. // 降级:单个查询
  497. gateWayIpModel, err := s.wafLogRepository.GetWafLogGateWayIp(ctx, int64(v.HostId), int64(v.Uid), v.CreatedAt)
  498. if err == nil {
  499. var gateWayIps []string
  500. if err := json.Unmarshal(gateWayIpModel.ExtraData, &gateWayIps); err == nil {
  501. if len(gateWayIps) > 0 && port != "" {
  502. for _, ip := range gateWayIps {
  503. exposeAddr = append(exposeAddr, ip+":"+port)
  504. }
  505. }
  506. }
  507. }
  508. }
  509. res = append(res, adminApi.ExportWafLogRes{
  510. Name: v.Name,
  511. RequestIp: v.RequestIp,
  512. HostId: v.HostId,
  513. ApiName: v.ApiName,
  514. AddrBackendList: AddrBackendList,
  515. Domain: domain,
  516. Comment: comment,
  517. CustomHost: customHost,
  518. ExposeAddr: exposeAddr,
  519. CreatedAt: v.CreatedAt,
  520. })
  521. }
  522. return res, nil
  523. }
  524. // GetApiDescriptions 获取API描述映射
  525. func (s *wafLogService) GetApiDescriptions(ctx context.Context) map[string]string {
  526. return ApiDescriptionMap
  527. }