waflog.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. package admin
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
  7. adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
  8. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  9. adminRep "github.com/go-nunu/nunu-layout-advanced/internal/repository/admin"
  10. "github.com/go-nunu/nunu-layout-advanced/internal/repository/api/waf"
  11. "github.com/go-nunu/nunu-layout-advanced/internal/service"
  12. "github.com/go-nunu/nunu-layout-advanced/pkg/excel"
  13. "github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
  14. amqp "github.com/rabbitmq/amqp091-go"
  15. "go.uber.org/zap"
  16. "net/http"
  17. "strings"
  18. "time"
  19. )
  20. // ApiDescriptionMap API描述映射
  21. var ApiDescriptionMap = map[string]string{
  22. "/webForward/get": "获取web详情",
  23. "/webForward/getList": "获取web列表",
  24. "/webForward/add": "添加web",
  25. "/webForward/edit": "修改web",
  26. "/webForward/delete": "删除web",
  27. "/tcpForward/add": "添加tcp",
  28. "/tcpForward/edit": "修改tcp",
  29. "/tcpForward/delete": "删除tcp",
  30. "/tcpForward/getList": "获取tcp列表",
  31. "/tcpForward/get": "获取tcp详情",
  32. "/udpForward/add": "添加udp",
  33. "/udpForward/edit": "修改udp",
  34. "/udpForward/delete": "删除udp",
  35. "/udpForward/getList": "获取udp列表",
  36. "/udpForward/get": "获取udp详情",
  37. "/globalLimit/add": "添加实例",
  38. "/globalLimit/edit": "修改实例",
  39. "/globalLimit/delete": "删除实例",
  40. "/allowAndDeny/get": "获取黑白名单详情",
  41. "/allowAndDeny/getList": "获取黑白名单列表",
  42. "/allowAndDeny/add": "添加黑白名单",
  43. "/allowAndDeny/edit": "修改黑白名单",
  44. "/allowAndDeny/delete": "删除黑白名单",
  45. "/cc/getList": "获取CC列表",
  46. "/cc/editState": "删除CC黑名单",
  47. "/ccIpList/getList": "获取CC白名单列表",
  48. "/ccIpList/add": "添加CC白名单",
  49. "/ccIpList/edit": "修改CC白名单",
  50. "/ccIpList/delete": "删除CC白名单",
  51. "分配网关组": "分配网关组",
  52. }
  53. type WafLogService interface {
  54. GetWafLog(ctx context.Context, id int64) (*model.WafLog, error)
  55. GetWafLogList(ctx context.Context, req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error)
  56. AddWafLog(ctx context.Context, req adminApi.WafLog) error
  57. BatchAddWafLog(ctx context.Context, reqs []*adminApi.WafLog) error
  58. PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog)
  59. SmartExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter) error
  60. GetApiDescriptions(ctx context.Context) map[string]string
  61. }
  62. func NewWafLogService(
  63. service *service.Service,
  64. wafLogRepository adminRep.WafLogRepository,
  65. globalLimitRepository waf.GlobalLimitRepository,
  66. mq *rabbitmq.RabbitMQ,
  67. wafLogDataCleanService WafLogDataCleanService,
  68. ) WafLogService {
  69. return &wafLogService{
  70. Service: service,
  71. wafLogRepository: wafLogRepository,
  72. globalLimitRepository: globalLimitRepository,
  73. mq : mq,
  74. wafLogDataCleanService: wafLogDataCleanService,
  75. }
  76. }
  77. type wafLogService struct {
  78. *service.Service
  79. wafLogRepository adminRep.WafLogRepository
  80. globalLimitRepository waf.GlobalLimitRepository
  81. mq *rabbitmq.RabbitMQ
  82. wafLogDataCleanService WafLogDataCleanService
  83. }
  84. func (s *wafLogService) getFirstPathSegment(path string) (segment []string, ok bool) {
  85. // 1. 为了统一处理,先去掉路径最前面的 "/"
  86. // 这样 "/v1/admin" 会变成 "v1/admin",而 "v1/admin" 保持不变
  87. trimmedPath := strings.TrimPrefix(path, "/")
  88. // 如果去掉 "/" 后字符串为空(比如原路径是 "/" 或 ""),则无法提取
  89. if trimmedPath == "" {
  90. return nil, false
  91. }
  92. // 2. 使用 "/" 作为分隔符来切割字符串
  93. // "v1/admin/menus" 会被切割成一个字符串切片 (slice): ["v1", "admin", "menus"]
  94. parts := strings.Split(trimmedPath, "/")
  95. // 3. 只要切片不为空,第一个元素就是我们需要的值
  96. // len(parts) > 0 这个检查可以保证程序不会因为空切片而出错
  97. if len(parts) > 0 {
  98. return parts, true
  99. }
  100. return nil, false
  101. }
  102. func (s *wafLogService) GetWafLog(ctx context.Context, id int64) (*model.WafLog, error) {
  103. return s.wafLogRepository.GetWafLog(ctx, id)
  104. }
  105. func (s *wafLogService) GetWafLogList(ctx context.Context,req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error) {
  106. return s.wafLogRepository.GetWafLogList(ctx, req)
  107. }
  108. func (s *wafLogService) AddWafLog(ctx context.Context, req adminApi.WafLog) error {
  109. if req.Api != "" {
  110. api := strings.TrimPrefix(req.Api, "/v1")
  111. if _, ok := ApiDescriptionMap[api]; ok {
  112. req.ApiName = ApiDescriptionMap[api]
  113. }
  114. apiType, ok := s.getFirstPathSegment(req.Api)
  115. if ok {
  116. req.ApiType = apiType[len(apiType)-1]
  117. }
  118. }
  119. userInfo, err := s.globalLimitRepository.GetUserInfo(ctx, int64(req.Uid))
  120. if err != nil {
  121. return err
  122. }
  123. req.Name = userInfo.Username
  124. extraData, err := json.Marshal(req.ExtraData)
  125. if err != nil {
  126. return err
  127. }
  128. req.RequestIp = userInfo.LastLoginIp
  129. return s.wafLogRepository.AddWafLog(ctx, &model.WafLog{
  130. Uid: req.Uid,
  131. Name: req.Name,
  132. RequestIp: req.RequestIp,
  133. RuleId: req.RuleId,
  134. HostId: req.HostId,
  135. UserAgent: req.UserAgent,
  136. Api: req.Api,
  137. ApiType: req.ApiType,
  138. ApiName: req.ApiName,
  139. ExtraData: extraData,
  140. })
  141. }
  142. func (s *wafLogService) BatchAddWafLog(ctx context.Context, reqs []*adminApi.WafLog) error {
  143. if len(reqs) == 0 {
  144. return nil
  145. }
  146. wafLogs := make([]*model.WafLog, 0, len(reqs))
  147. for _, req := range reqs {
  148. if req.Api != "" {
  149. api := strings.TrimPrefix(req.Api, "/v1")
  150. if _, ok := ApiDescriptionMap[api]; ok {
  151. req.ApiName = ApiDescriptionMap[api]
  152. }
  153. apiType, ok := s.getFirstPathSegment(req.Api)
  154. if ok {
  155. req.ApiType = apiType[len(apiType)-1]
  156. }
  157. }
  158. userInfo, err := s.globalLimitRepository.GetUserInfo(ctx, int64(req.Uid))
  159. if err != nil {
  160. s.Logger.Error("获取用户信息失败", zap.Error(err), zap.Int("uid", req.Uid))
  161. continue
  162. }
  163. req.Name = userInfo.Username
  164. req.RequestIp = userInfo.LastLoginIp
  165. extraData, err := json.Marshal(req.ExtraData)
  166. if err != nil {
  167. s.Logger.Error("序列化额外数据失败", zap.Error(err))
  168. continue
  169. }
  170. wafLogs = append(wafLogs, &model.WafLog{
  171. Uid: req.Uid,
  172. Name: req.Name,
  173. RequestIp: req.RequestIp,
  174. RuleId: req.RuleId,
  175. HostId: req.HostId,
  176. UserAgent: req.UserAgent,
  177. Api: req.Api,
  178. ApiType: req.ApiType,
  179. ApiName: req.ApiName,
  180. ExtraData: extraData,
  181. })
  182. }
  183. // 调用repository层的批量插入方法
  184. return s.wafLogRepository.BatchAddWafLog(ctx, wafLogs)
  185. }
  186. func (s *wafLogService) PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog) {
  187. payload := &req
  188. // Serialize the message
  189. msgBody, err := json.Marshal(payload)
  190. if err != nil {
  191. s.Logger.Error("序列化 WafLog 任务消息失败", zap.Error(err), zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
  192. return
  193. }
  194. // Get task configuration
  195. taskCfg, ok := s.mq.GetTaskConfig("waf_log")
  196. if !ok {
  197. s.Logger.Error("无法获取“waf_Log”任务配置")
  198. return
  199. }
  200. // Construct the routing key dynamically based on the action
  201. routingKey := fmt.Sprintf("wafLog.%s", "add")
  202. // Construct the amqp.Publishing message
  203. publishingMsg := amqp.Publishing{
  204. ContentType: "application/json",
  205. Body: msgBody,
  206. DeliveryMode: amqp.Persistent, // Persistent message
  207. }
  208. // Publish the message
  209. err = s.mq.PublishWithCh(taskCfg.Exchange, routingKey, publishingMsg)
  210. if err != nil {
  211. s.Logger.Error("发布 WafLog 任务消息失败", zap.Error(err), zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
  212. } else {
  213. s.Logger.Info("已成功发布 WafLog 任务消息", zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
  214. }
  215. }
  216. func (s *wafLogService) ExPortWafLog(ctx context.Context, req adminApi.ExportWafLog) ([]adminApi.ExportWafLogRes, error) {
  217. // 获取原始数据
  218. data, err := s.wafLogRepository.ExportWafLog(ctx, req)
  219. if err != nil {
  220. return nil, err
  221. }
  222. // 使用优化后的转换方法,避免N+1查询
  223. return s.convertRawDataToExportResults(ctx, data)
  224. }
  225. // SmartExportWafLog 智能导出WAF日志为Excel
  226. func (s *wafLogService) SmartExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter) error {
  227. // 1. 先获取总数量用于智能选择传输方式
  228. count, err := s.wafLogRepository.GetWafLogExportCount(ctx, req)
  229. if err != nil {
  230. return fmt.Errorf("获取导出数据总数失败: %w", err)
  231. }
  232. // 2. 智能选择导出方式
  233. // 估算每行数据大小约200字节(包含用户名、IP、API名称、域名等字段)
  234. exportType := excel.SmartExport(count, 200)
  235. // 3. 设置Excel表头映射
  236. headers := []string{"name", "request_ip", "host_id", "rule_id", "api_name", "addr_backend_list", "domain", "comment", "custom_host", "expose_addr", "created_at"}
  237. headerMap := map[string]string{
  238. "name": "用户名",
  239. "request_ip": "请求IP",
  240. "host_id": "主机ID",
  241. "rule_id": "规则ID",
  242. "api_name": "API名称",
  243. "addr_backend_list": "后端地址",
  244. "domain": "域名",
  245. "comment": "备注",
  246. "custom_host": "回源地址",
  247. "expose_addr": "暴露地址",
  248. "created_at": "创建时间",
  249. }
  250. // 4. 创建Excel生成器
  251. generator := excel.NewExcelGenerator("WAF日志", headers, headerMap)
  252. if err := generator.WriteHeaders(); err != nil {
  253. return fmt.Errorf("写入Excel表头失败: %w", err)
  254. }
  255. // 5. 根据导出类型选择不同的处理方式
  256. switch exportType {
  257. case excel.ExportTypeNormal:
  258. return s.normalExportWafLog(ctx, req, generator, w)
  259. case excel.ExportTypeStream:
  260. return s.streamExportWafLog(ctx, req, generator, w)
  261. case excel.ExportTypeChunk:
  262. return s.chunkExportWafLog(ctx, req, w, count)
  263. default:
  264. return s.normalExportWafLog(ctx, req, generator, w)
  265. }
  266. }
  267. // normalExportWafLog 普通导出(小文件)
  268. func (s *wafLogService) normalExportWafLog(ctx context.Context, req adminApi.ExportWafLog, generator *excel.ExcelGenerator, w http.ResponseWriter) error {
  269. // 获取所有数据(已经优化了批量查询)
  270. exportData, err := s.ExPortWafLog(ctx, req)
  271. if err != nil {
  272. return fmt.Errorf("获取导出数据失败: %w", err)
  273. }
  274. // 转换数据格式
  275. data := make([]map[string]interface{}, 0, len(exportData))
  276. for _, item := range exportData {
  277. row := map[string]interface{}{
  278. "name": item.Name,
  279. "request_ip": item.RequestIp,
  280. "host_id": item.HostId,
  281. "rule_id": item.RuleId,
  282. "api_name": item.ApiName,
  283. "addr_backend_list": s.formatBackendList(item.AddrBackendList),
  284. "domain": item.Domain,
  285. "comment": item.Comment,
  286. "custom_host": s.formatExposeAddr(item.CustomHost),
  287. "expose_addr": s.formatExposeAddr(item.ExposeAddr),
  288. "created_at": item.CreatedAt,
  289. }
  290. data = append(data, row)
  291. }
  292. // 写入数据
  293. if err := generator.WriteRows(data); err != nil {
  294. return fmt.Errorf("写入Excel数据失败: %w", err)
  295. }
  296. // 普通导出
  297. fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405"))
  298. return excel.NormalExport(generator, w, excel.TransferOption{
  299. FileName: fileName,
  300. ContentType: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
  301. })
  302. }
  303. // streamExportWafLog 流式导出(大文件)
  304. func (s *wafLogService) streamExportWafLog(ctx context.Context, req adminApi.ExportWafLog, generator *excel.ExcelGenerator, w http.ResponseWriter) error {
  305. fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405"))
  306. // 设置响应头
  307. w.Header().Set("Content-Type", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
  308. w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", fileName))
  309. w.Header().Set("Transfer-Encoding", "chunked")
  310. // 分批处理数据,每批1000条
  311. pageSize := 1000
  312. page := 1
  313. for {
  314. // 使用分页导出方法
  315. exportData, err := s.wafLogRepository.ExportWafLogWithPagination(ctx, req, page, pageSize)
  316. if err != nil {
  317. return fmt.Errorf("获取第%d页数据失败: %w", page, err)
  318. }
  319. // 转换为导出格式(复用原有的ExPortWafLog逻辑)
  320. exportResults, err := s.convertRawDataToExportResults(ctx, exportData)
  321. if err != nil {
  322. return fmt.Errorf("转换导出数据失败: %w", err)
  323. }
  324. if len(exportResults) == 0 {
  325. break // 没有更多数据
  326. }
  327. // 转换并写入当前批次数据
  328. for _, item := range exportResults {
  329. row := map[string]interface{}{
  330. "name": item.Name,
  331. "request_ip": item.RequestIp,
  332. "host_id": item.HostId,
  333. "api_name": item.ApiName,
  334. "addr_backend_list": s.formatBackendList(item.AddrBackendList),
  335. "domain": item.Domain,
  336. "comment": item.Comment,
  337. "custom_host": s.formatExposeAddr(item.CustomHost),
  338. "expose_addr": s.formatExposeAddr(item.ExposeAddr),
  339. "created_at": item.CreatedAt,
  340. }
  341. if err := generator.WriteRow(row); err != nil {
  342. return fmt.Errorf("写入第%d页数据失败: %w", page, err)
  343. }
  344. }
  345. // 如果当前批次数据少于页大小,说明已经是最后一页
  346. if len(exportResults) < pageSize {
  347. break
  348. }
  349. page++
  350. }
  351. // 流式导出
  352. return excel.StreamExport(generator, w, excel.TransferOption{
  353. FileName: fileName,
  354. ContentType: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
  355. })
  356. }
  357. // chunkExportWafLog 分块导出(超大文件)
  358. func (s *wafLogService) chunkExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter, totalRecords int) error {
  359. fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405"))
  360. pageSize := 5000 // 每个分块5000条记录
  361. // 分块导出需要前端配合实现
  362. excel.ChunkExport(w, excel.TransferOption{
  363. FileName: fileName,
  364. ContentType: "application/json", // 返回分块信息
  365. }, totalRecords, pageSize)
  366. return nil
  367. }
  368. // formatBackendList 格式化后端地址列表
  369. func (s *wafLogService) formatBackendList(backendList interface{}) string {
  370. if backendList == nil {
  371. return ""
  372. }
  373. switch v := backendList.(type) {
  374. case string:
  375. return v
  376. case []string:
  377. return strings.Join(v, ", ")
  378. default:
  379. // 对于其他类型,先转换为字符串再处理
  380. str := fmt.Sprintf("%v", v)
  381. if strings.Contains(str, " ") && !strings.Contains(str, "\n") {
  382. parts := strings.Fields(str)
  383. if len(parts) > 1 {
  384. return strings.Join(parts, ", ")
  385. }
  386. }
  387. return str
  388. }
  389. }
  390. // formatExposeAddr 格式化暴露地址
  391. func (s *wafLogService) formatExposeAddr(exposeAddr []string) string {
  392. if len(exposeAddr) == 0 {
  393. return ""
  394. }
  395. return strings.Join(exposeAddr, ", ")
  396. }
  397. // convertRawDataToExportResults 将原始数据转换为导出结果(复用原有的ExPortWafLog逻辑)
  398. func (s *wafLogService) convertRawDataToExportResults(ctx context.Context, rawData []model.WafLog) ([]adminApi.ExportWafLogRes, error) {
  399. if len(rawData) == 0 {
  400. return []adminApi.ExportWafLogRes{}, nil
  401. }
  402. // 批量准备逻辑保持不变...
  403. hostIds := make([]int64, 0, len(rawData))
  404. uids := make([]int64, 0, len(rawData))
  405. maxCreatedAt := time.Time{}
  406. for _, v := range rawData {
  407. hostIds = append(hostIds, int64(v.HostId))
  408. uids = append(uids, int64(v.Uid))
  409. if v.CreatedAt.After(maxCreatedAt) {
  410. maxCreatedAt = v.CreatedAt
  411. }
  412. }
  413. gatewayMap, err := s.wafLogRepository.BatchGetWafLogGateWayIps(ctx, hostIds, uids, maxCreatedAt)
  414. if err != nil {
  415. s.Logger.Warn("批量获取网关组失败,降级为单个查询", zap.Error(err))
  416. gatewayMap = make(map[string]model.WafLog)
  417. }
  418. var res []adminApi.ExportWafLogRes
  419. for _, v := range rawData {
  420. // --- 核心改动:一行代码完成所有数据清洗 ---
  421. cleanedData := s.wafLogDataCleanService.ParseWafLogExtraData(v.ExtraData, v.ApiName)
  422. // 网关 IP 获取逻辑保持不变,但使用清洗后的 port
  423. var exposeAddr []string
  424. key := fmt.Sprintf("%d_%d", v.HostId, v.Uid)
  425. if gatewayModel, exists := gatewayMap[key]; exists {
  426. var gateWayIps []string
  427. if err := json.Unmarshal(gatewayModel.ExtraData, &gateWayIps); err == nil && len(gateWayIps) > 0 && cleanedData.Port != "" {
  428. for _, ip := range gateWayIps {
  429. exposeAddr = append(exposeAddr, ip+":"+cleanedData.Port)
  430. }
  431. }
  432. } else {
  433. // 降级查询逻辑...
  434. gateWayIpModel, err := s.wafLogRepository.GetWafLogGateWayIp(ctx, int64(v.HostId), int64(v.Uid), v.CreatedAt)
  435. if err == nil {
  436. var gateWayIps []string
  437. if err := json.Unmarshal(gateWayIpModel.ExtraData, &gateWayIps); err == nil && len(gateWayIps) > 0 && cleanedData.Port != "" {
  438. for _, ip := range gateWayIps {
  439. exposeAddr = append(exposeAddr, ip+":"+cleanedData.Port)
  440. }
  441. }
  442. }
  443. }
  444. // 构造结果,代码更清晰
  445. res = append(res, adminApi.ExportWafLogRes{
  446. Name: v.Name,
  447. RequestIp: v.RequestIp,
  448. HostId: v.HostId,
  449. RuleId: v.RuleId,
  450. ApiName: v.ApiName,
  451. AddrBackendList: cleanedData.AddrBackendList,
  452. Domain: cleanedData.Domain,
  453. Comment: cleanedData.Comment,
  454. CustomHost: cleanedData.CustomHost,
  455. ExposeAddr: exposeAddr,
  456. CreatedAt: v.CreatedAt,
  457. })
  458. }
  459. return res, nil
  460. }
  461. // GetApiDescriptions 获取API描述映射
  462. func (s *wafLogService) GetApiDescriptions(ctx context.Context) map[string]string {
  463. return ApiDescriptionMap
  464. }