waflog.go 14 KB


  1. package admin
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
  7. adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
  8. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  9. adminRep "github.com/go-nunu/nunu-layout-advanced/internal/repository/admin"
  10. "github.com/go-nunu/nunu-layout-advanced/internal/repository/api/waf"
  11. "github.com/go-nunu/nunu-layout-advanced/internal/service"
  12. "github.com/go-nunu/nunu-layout-advanced/pkg/excel"
  13. "github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
  14. amqp "github.com/rabbitmq/amqp091-go"
  15. "go.uber.org/zap"
  16. "net/http"
  17. "strings"
  18. "time"
  19. )
  20. // ApiDescriptionMap API描述映射
  21. var ApiDescriptionMap = map[string]string{
  22. "/webForward/add": "添加web",
  23. "/webForward/edit": "修改web",
  24. "/webForward/delete": "删除web",
  25. "/tcpForward/add": "添加tcp",
  26. "/tcpForward/edit": "修改tcp",
  27. "/tcpForward/delete": "删除tcp",
  28. "/udpForward/add": "添加udp",
  29. "/udpForward/edit": "修改udp",
  30. "/udpForward/delete": "删除udp",
  31. "/globalLimit/add": "添加实例",
  32. "/globalLimit/edit": "修改实例",
  33. "/globalLimit/delete": "删除实例",
  34. "/allowAndDeny/add": "添加黑白名单",
  35. "/allowAndDeny/edit": "修改黑白名单",
  36. "/allowAndDeny/delete": "删除黑白名单",
  37. "/cc/editState": "删除CC黑名单",
  38. "/ccIpList/add": "添加CC白名单",
  39. "/ccIpList/edit": "修改CC白名单",
  40. "/ccIpList/delete": "删除CC白名单",
  41. "分配网关组": "分配网关组",
  42. }
  43. type WafLogService interface {
  44. GetWafLog(ctx context.Context, id int64) (*model.WafLog, error)
  45. GetWafLogList(ctx context.Context, req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error)
  46. AddWafLog(ctx context.Context, req adminApi.WafLog) error
  47. BatchAddWafLog(ctx context.Context, reqs []*adminApi.WafLog) error
  48. PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog)
  49. SmartExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter) error
  50. GetApiDescriptions(ctx context.Context) map[string]string
  51. }
  52. func NewWafLogService(
  53. service *service.Service,
  54. wafLogRepository adminRep.WafLogRepository,
  55. globalLimitRepository waf.GlobalLimitRepository,
  56. mq *rabbitmq.RabbitMQ,
  57. wafLogDataCleanService WafLogDataCleanService,
  58. ) WafLogService {
  59. return &wafLogService{
  60. Service: service,
  61. wafLogRepository: wafLogRepository,
  62. globalLimitRepository: globalLimitRepository,
  63. mq : mq,
  64. wafLogDataCleanService: wafLogDataCleanService,
  65. }
  66. }
  67. type wafLogService struct {
  68. *service.Service
  69. wafLogRepository adminRep.WafLogRepository
  70. globalLimitRepository waf.GlobalLimitRepository
  71. mq *rabbitmq.RabbitMQ
  72. wafLogDataCleanService WafLogDataCleanService
  73. }
  74. func (s *wafLogService) getFirstPathSegment(path string) (segment []string, ok bool) {
  75. trimmedPath := strings.TrimPrefix(path, "/")
  76. if trimmedPath == "" {
  77. return nil, false
  78. }
  79. parts := strings.Split(trimmedPath, "/")
  80. if len(parts) > 0 {
  81. return parts, true
  82. }
  83. return nil, false
  84. }
  85. func (s *wafLogService) GetWafLog(ctx context.Context, id int64) (*model.WafLog, error) {
  86. return s.wafLogRepository.GetWafLog(ctx, id)
  87. }
  88. func (s *wafLogService) GetWafLogList(ctx context.Context,req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error) {
  89. return s.wafLogRepository.GetWafLogList(ctx, req)
  90. }
  91. func (s *wafLogService) AddWafLog(ctx context.Context, req adminApi.WafLog) error {
  92. if req.Api != "" {
  93. api := strings.TrimPrefix(req.Api, "/v1")
  94. if _, ok := ApiDescriptionMap[api]; ok {
  95. req.ApiName = ApiDescriptionMap[api]
  96. }
  97. apiType, ok := s.getFirstPathSegment(req.Api)
  98. if ok {
  99. req.ApiType = apiType[len(apiType)-1]
  100. }
  101. }
  102. userInfo, err := s.globalLimitRepository.GetUserInfo(ctx, int64(req.Uid))
  103. if err != nil {
  104. return err
  105. }
  106. req.Name = userInfo.Username
  107. extraData, err := json.Marshal(req.ExtraData)
  108. if err != nil {
  109. return err
  110. }
  111. req.RequestIp = userInfo.LastLoginIp
  112. return s.wafLogRepository.AddWafLog(ctx, &model.WafLog{
  113. Uid: req.Uid,
  114. Name: req.Name,
  115. RequestIp: req.RequestIp,
  116. RuleId: req.RuleId,
  117. HostId: req.HostId,
  118. UserAgent: req.UserAgent,
  119. Api: req.Api,
  120. ApiType: req.ApiType,
  121. ApiName: req.ApiName,
  122. ExtraData: extraData,
  123. })
  124. }
  125. func (s *wafLogService) BatchAddWafLog(ctx context.Context, reqs []*adminApi.WafLog) error {
  126. if len(reqs) == 0 {
  127. return nil
  128. }
  129. wafLogs := make([]*model.WafLog, 0, len(reqs))
  130. for _, req := range reqs {
  131. if req.Api != "" {
  132. api := strings.TrimPrefix(req.Api, "/v1")
  133. if _, ok := ApiDescriptionMap[api]; ok {
  134. req.ApiName = ApiDescriptionMap[api]
  135. }
  136. apiType, ok := s.getFirstPathSegment(req.Api)
  137. if ok {
  138. req.ApiType = apiType[len(apiType)-1]
  139. }
  140. }
  141. userInfo, err := s.globalLimitRepository.GetUserInfo(ctx, int64(req.Uid))
  142. if err != nil {
  143. s.Logger.Error("获取用户信息失败", zap.Error(err), zap.Int("uid", req.Uid))
  144. continue
  145. }
  146. req.Name = userInfo.Username
  147. req.RequestIp = userInfo.LastLoginIp
  148. extraData, err := json.Marshal(req.ExtraData)
  149. if err != nil {
  150. s.Logger.Error("序列化额外数据失败", zap.Error(err))
  151. continue
  152. }
  153. wafLogs = append(wafLogs, &model.WafLog{
  154. Uid: req.Uid,
  155. Name: req.Name,
  156. RequestIp: req.RequestIp,
  157. RuleId: req.RuleId,
  158. HostId: req.HostId,
  159. UserAgent: req.UserAgent,
  160. Api: req.Api,
  161. ApiType: req.ApiType,
  162. ApiName: req.ApiName,
  163. ExtraData: extraData,
  164. })
  165. }
  166. return s.wafLogRepository.BatchAddWafLog(ctx, wafLogs)
  167. }
  168. func (s *wafLogService) PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog) {
  169. payload := &req
  170. msgBody, err := json.Marshal(payload)
  171. if err != nil {
  172. s.Logger.Error("序列化 WafLog 任务消息失败", zap.Error(err), zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
  173. return
  174. }
  175. taskCfg, ok := s.mq.GetTaskConfig("waf_log")
  176. if !ok {
  177. s.Logger.Error("无法获取“waf_Log”任务配置")
  178. return
  179. }
  180. routingKey := fmt.Sprintf("wafLog.%s", "add")
  181. publishingMsg := amqp.Publishing{
  182. ContentType: "application/json",
  183. Body: msgBody,
  184. DeliveryMode: amqp.Persistent,
  185. }
  186. err = s.mq.PublishWithCh(taskCfg.Exchange, routingKey, publishingMsg)
  187. if err != nil {
  188. s.Logger.Error("发布 WafLog 任务消息失败", zap.Error(err), zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
  189. } else {
  190. s.Logger.Info("已成功发布 WafLog 任务消息", zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
  191. }
  192. }
  193. func (s *wafLogService) ExPortWafLog(ctx context.Context, req adminApi.ExportWafLog) ([]adminApi.ExportWafLogRes, error) {
  194. data, err := s.wafLogRepository.ExportWafLog(ctx, req)
  195. if err != nil {
  196. return nil, err
  197. }
  198. return s.convertRawDataToExportResults(ctx, data)
  199. }
  200. func (s *wafLogService) SmartExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter) error {
  201. count, err := s.wafLogRepository.GetWafLogExportCount(ctx, req)
  202. if err != nil {
  203. return fmt.Errorf("获取导出数据总数失败: %w", err)
  204. }
  205. exportType := excel.SmartExport(count, 200)
  206. headers := []string{"name", "request_ip", "host_id", "rule_id", "api_name", "addr_backend_list", "allow_and_deny_ips", "domain", "custom_host", "expose_addr", "comment", "created_at"}
  207. headerMap := map[string]string{
  208. "name": "用户名",
  209. "request_ip": "请求IP",
  210. "host_id": "实例ID",
  211. "rule_id": "规则ID",
  212. "api_name": "操作名称",
  213. "addr_backend_list": "后端地址",
  214. "allow_and_deny_ips": "黑白名单",
  215. "domain": "域名",
  216. "custom_host": "回源地址",
  217. "expose_addr": "暴露地址",
  218. "comment": "备注",
  219. "created_at": "操作时间",
  220. }
  221. generator := excel.NewExcelGenerator("WAF日志", headers, headerMap)
  222. if err := generator.WriteHeaders(); err != nil {
  223. return fmt.Errorf("写入Excel表头失败: %w", err)
  224. }
  225. switch exportType {
  226. case excel.ExportTypeNormal:
  227. return s.normalExportWafLog(ctx, req, generator, w)
  228. case excel.ExportTypeStream:
  229. return s.streamExportWafLog(ctx, req, generator, w)
  230. case excel.ExportTypeChunk:
  231. return s.chunkExportWafLog(ctx, req, w, count)
  232. default:
  233. return s.normalExportWafLog(ctx, req, generator, w)
  234. }
  235. }
  236. func (s *wafLogService) normalExportWafLog(ctx context.Context, req adminApi.ExportWafLog, generator *excel.ExcelGenerator, w http.ResponseWriter) error {
  237. exportData, err := s.ExPortWafLog(ctx, req)
  238. if err != nil {
  239. return fmt.Errorf("获取导出数据失败: %w", err)
  240. }
  241. data := make([]map[string]interface{}, 0, len(exportData))
  242. for _, item := range exportData {
  243. row := map[string]interface{}{
  244. "name": item.Name,
  245. "request_ip": item.RequestIp,
  246. "host_id": item.HostId,
  247. "rule_id": s.wafLogDataCleanService.FormatBackendList(item.RuleId),
  248. "api_name": item.ApiName,
  249. "addr_backend_list": s.wafLogDataCleanService.FormatBackendList(item.AddrBackendList),
  250. "allow_and_deny_ips": item.AllowAndDenyIps,
  251. "domain": item.Domain,
  252. "comment": item.Comment,
  253. "custom_host": s.wafLogDataCleanService.FormatBackendList(item.CustomHost),
  254. "expose_addr": s.wafLogDataCleanService.FormatBackendList(item.ExposeAddr),
  255. "created_at": item.CreatedAt,
  256. }
  257. data = append(data, row)
  258. }
  259. if err := generator.WriteRows(data); err != nil {
  260. return fmt.Errorf("写入Excel数据失败: %w", err)
  261. }
  262. fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405"))
  263. return excel.NormalExport(generator, w, excel.TransferOption{
  264. FileName: fileName,
  265. ContentType: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
  266. })
  267. }
  268. func (s *wafLogService) streamExportWafLog(ctx context.Context, req adminApi.ExportWafLog, generator *excel.ExcelGenerator, w http.ResponseWriter) error {
  269. fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405"))
  270. w.Header().Set("Content-Type", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
  271. w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", fileName))
  272. w.Header().Set("Transfer-Encoding", "chunked")
  273. pageSize := 1000
  274. page := 1
  275. for {
  276. exportData, err := s.wafLogRepository.ExportWafLogWithPagination(ctx, req, page, pageSize)
  277. if err != nil {
  278. return fmt.Errorf("获取第%d页数据失败: %w", page, err)
  279. }
  280. exportResults, err := s.convertRawDataToExportResults(ctx, exportData)
  281. if err != nil {
  282. return fmt.Errorf("转换导出数据失败: %w", err)
  283. }
  284. if len(exportResults) == 0 {
  285. break
  286. }
  287. for _, item := range exportResults {
  288. row := map[string]interface{}{
  289. "name": item.Name,
  290. "request_ip": item.RequestIp,
  291. "host_id": item.HostId,
  292. "rule_id": s.wafLogDataCleanService.FormatBackendList(item.RuleId),
  293. "api_name": item.ApiName,
  294. "addr_backend_list": s.wafLogDataCleanService.FormatBackendList(item.AddrBackendList),
  295. "allow_and_deny_ips": item.AllowAndDenyIps,
  296. "domain": item.Domain,
  297. "comment": item.Comment,
  298. "custom_host": s.wafLogDataCleanService.FormatBackendList(item.CustomHost),
  299. "expose_addr": s.wafLogDataCleanService.FormatBackendList(item.ExposeAddr),
  300. "created_at": item.CreatedAt,
  301. }
  302. if err := generator.WriteRow(row); err != nil {
  303. return fmt.Errorf("写入第%d页数据失败: %w", page, err)
  304. }
  305. }
  306. if len(exportResults) < pageSize {
  307. break
  308. }
  309. page++
  310. }
  311. return excel.StreamExport(generator, w, excel.TransferOption{
  312. FileName: fileName,
  313. ContentType: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
  314. })
  315. }
  316. func (s *wafLogService) chunkExportWafLog(ctx context.Context, req adminApi.ExportWafLog, w http.ResponseWriter, totalRecords int) error {
  317. fileName := fmt.Sprintf("waf_logs_%s.xlsx", time.Now().Format("20060102_150405"))
  318. pageSize := 5000
  319. excel.ChunkExport(w, excel.TransferOption{
  320. FileName: fileName,
  321. ContentType: "application/json",
  322. }, totalRecords, pageSize)
  323. return nil
  324. }
  325. // convertRawDataToExportResults 将从数据库获取的带有关联IP的数据转换为最终导出格式
  326. func (s *wafLogService) convertRawDataToExportResults(ctx context.Context, rawData []model.WafLogWithGatewayIP) ([]adminApi.ExportWafLogRes, error) {
  327. if len(rawData) == 0 {
  328. return []adminApi.ExportWafLogRes{}, nil
  329. }
  330. var res []adminApi.ExportWafLogRes
  331. for _, v := range rawData {
  332. // --- 数据清洗 ---
  333. cleanedData := s.wafLogDataCleanService.ParseWafLogExtraData(v.ExtraData, v.ApiName)
  334. // --- 网关 IP 处理 ---
  335. var exposeAddr []string
  336. // 直接使用查询结果中附带的 gateway_ip_data
  337. if len(v.GatewayIpData) > 0 && string(v.GatewayIpData) != "null" {
  338. var gateWayIps []string
  339. // 解析从数据库子查询得到的JSON数据
  340. if err := json.Unmarshal(v.GatewayIpData, &gateWayIps); err == nil && len(gateWayIps) > 0 && cleanedData.Port != "" {
  341. for _, ip := range gateWayIps {
  342. exposeAddr = append(exposeAddr, ip+":"+cleanedData.Port)
  343. }
  344. }
  345. }
  346. var ruleIds []int64
  347. if len(cleanedData.RuleID) > 0 {
  348. ruleIds = cleanedData.RuleID
  349. } else {
  350. ruleIds = []int64{int64(v.RuleId)}
  351. }
  352. if cleanedData.IsHttps == 1 {
  353. for i := range exposeAddr {
  354. exposeAddr[i] = "https://" + exposeAddr[i]
  355. }
  356. }
  357. // --- 构造结果 ---
  358. res = append(res, adminApi.ExportWafLogRes{
  359. Name: v.Name,
  360. RequestIp: v.RequestIp,
  361. HostId: v.HostId,
  362. RuleId: ruleIds,
  363. ApiName: v.ApiName,
  364. AddrBackendList: cleanedData.AddrBackendList,
  365. AllowAndDenyIps: cleanedData.AllowAndDenyIps,
  366. Domain: cleanedData.Domain,
  367. Comment: cleanedData.Comment,
  368. CustomHost: cleanedData.CustomHost,
  369. ExposeAddr: exposeAddr,
  370. CreatedAt: v.CreatedAt,
  371. })
  372. }
  373. return res, nil
  374. }
  375. // GetApiDescriptions 获取API描述映射
  376. func (s *wafLogService) GetApiDescriptions(ctx context.Context) map[string]string {
  377. return ApiDescriptionMap
  378. }