waflog.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. package admin
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math"
  7. "strings"
  8. "time"
  9. v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
  10. adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
  11. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  12. "github.com/go-nunu/nunu-layout-advanced/internal/repository"
  13. "gorm.io/gorm"
  14. )
  15. type WafLogRepository interface {
  16. GetWafLog(ctx context.Context, id int64) (*model.WafLog, error)
  17. GetWafLogList(ctx context.Context, req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error)
  18. AddWafLog(ctx context.Context, log *model.WafLog) error
  19. BatchAddWafLog(ctx context.Context, logs []*model.WafLog) error
  20. ExportWafLog(ctx context.Context, req adminApi.ExportWafLog) ([]model.WafLogWithGatewayIP, error)
  21. ExportWafLogWithPagination(ctx context.Context, req adminApi.ExportWafLog, page, pageSize int) ([]model.WafLogWithGatewayIP, error)
  22. GetWafLogExportCount(ctx context.Context, req adminApi.ExportWafLog) (int, error)
  23. }
  24. func NewWafLogRepository(
  25. repository *repository.Repository,
  26. ) WafLogRepository {
  27. return &wafLogRepository{
  28. Repository: repository,
  29. }
  30. }
  31. type wafLogRepository struct {
  32. *repository.Repository
  33. }
  34. // buildExportQuery 是一个辅助函数,用于构建导出日志的公共查询条件(支持分表)
  35. func (r *wafLogRepository) buildExportQuery(ctx context.Context, req adminApi.ExportWafLog, tableName string) *gorm.DB {
  36. // 使用传入的表名,给表起一个别名,方便子查询中引用
  37. query := r.DBWithName(ctx, "admin").Model(&model.WafLog{}).Table(tableName + " as wl")
  38. if req.RequestIp != "" {
  39. query = query.Where("wl.request_ip = ?", strings.TrimSpace(req.RequestIp))
  40. }
  41. if req.Uid != 0 {
  42. query = query.Where("wl.uid = ?", req.Uid)
  43. }
  44. if req.Api != "" {
  45. query = query.Where("wl.api = ?", strings.TrimSpace(req.Api))
  46. }
  47. if req.Name != "" {
  48. query = query.Where("wl.name = ?", strings.TrimSpace(req.Name))
  49. }
  50. if req.RuleId != 0 {
  51. query = query.Where("wl.rule_id = ?", req.RuleId)
  52. }
  53. if len(req.HostIds) > 0 {
  54. query = query.Where("wl.host_id IN ?", req.HostIds)
  55. }
  56. if req.UserAgent != "" {
  57. query = query.Where("wl.user_agent = ?", strings.TrimSpace(req.UserAgent))
  58. }
  59. if len(req.ApiNames) > 0 {
  60. query = query.Where("wl.api_name IN ?", req.ApiNames)
  61. }
  62. if len(req.ApiTypes) > 0 {
  63. query = query.Where("wl.api_type IN ?", req.ApiTypes)
  64. }
  65. if req.StartTime != "" {
  66. query = query.Where("wl.created_at > ?", strings.TrimSpace(req.StartTime))
  67. }
  68. if req.EndTime != "" {
  69. query = query.Where("wl.created_at < ?", strings.TrimSpace(req.EndTime))
  70. }
  71. return query
  72. }
  73. // getExportTimeRange 根据请求参数解析时间范围,用于确定需要查询的分表
  74. func (r *wafLogRepository) getExportTimeRange(req adminApi.ExportWafLog) (*time.Time, *time.Time) {
  75. var startTime, endTime *time.Time
  76. if req.StartTime != "" {
  77. if t, err := time.Parse("2006-01-02 15:04:05", strings.TrimSpace(req.StartTime)); err == nil {
  78. startTime = &t
  79. } else if t, err := time.Parse("2006-01-02", strings.TrimSpace(req.StartTime)); err == nil {
  80. startTime = &t
  81. }
  82. }
  83. if req.EndTime != "" {
  84. if t, err := time.Parse("2006-01-02 15:04:05", strings.TrimSpace(req.EndTime)); err == nil {
  85. endTime = &t
  86. } else if t, err := time.Parse("2006-01-02", strings.TrimSpace(req.EndTime)); err == nil {
  87. endTime = &t
  88. }
  89. }
  90. return startTime, endTime
  91. }
  92. // buildSubQueryForTable 为指定的表构建子查询
  93. func (r *wafLogRepository) buildSubQueryForTable(ctx context.Context, tableName string) *gorm.DB {
  94. // 需要在所有可能包含"分配网关组"数据的表中查找
  95. // 这里简化处理,先在当前表中查找,如果需要跨表查找可以进一步优化
  96. return r.DBWithName(ctx, "admin").Table(tableName).
  97. Select("extra_data").
  98. Where("api_name = ?", "分配网关组").
  99. Where("host_id = wl.host_id").
  100. Where("uid = wl.uid").
  101. Where("created_at <= wl.created_at").
  102. Order("created_at DESC").
  103. Limit(1)
  104. }
  105. func (r *wafLogRepository) GetWafLog(ctx context.Context, id int64) (*model.WafLog, error) {
  106. var res model.WafLog
  107. // 获取存在的分表
  108. existingTables := r.Manager.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "waf_log", nil, nil)
  109. // 在各个分表中查找
  110. for _, tableName := range existingTables {
  111. err := r.DBWithName(ctx, "admin").Table(tableName).Where("id = ?", id).First(&res).Error
  112. if err == nil {
  113. res.SetTableName(tableName)
  114. return &res, nil
  115. }
  116. }
  117. return nil, fmt.Errorf("未找到ID为 %d 的WAF日志记录", id)
  118. }
  119. func (r *wafLogRepository) GetWafLogList(ctx context.Context, req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error) {
  120. //// 解析时间范围(如果有的话)
  121. //var startTime, endTime *time.Time
  122. //// TODO: 这里可以根据req中的时间字段来确定查询范围
  123. //// 暂时查询最近3个月的数据
  124. // 获取需要查询的表
  125. existingTables := r.Manager.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "waf_log", nil, nil)
  126. if len(existingTables) == 0 {
  127. // 没有分表,返回空结果
  128. return &v1.PaginatedResponse[model.WafLog]{
  129. Records: []model.WafLog{},
  130. Page: 1,
  131. PageSize: 10,
  132. Total: 0,
  133. TotalPages: 0,
  134. }, nil
  135. }
  136. if len(existingTables) == 1 {
  137. // 只有一个表,直接查询
  138. return r.queryWafLogFromSingleTable(ctx, req, existingTables[0])
  139. }
  140. // 跨表分页查询
  141. return r.queryWafLogFromMultipleTables(ctx, req, existingTables)
  142. }
  143. // queryWafLogFromSingleTable 单表查询
  144. func (r *wafLogRepository) queryWafLogFromSingleTable(ctx context.Context, req adminApi.SearchWafLogParams, tableName string) (*v1.PaginatedResponse[model.WafLog], error) {
  145. var res []model.WafLog
  146. var total int64
  147. query := r.DBWithName(ctx, "admin").Table(tableName)
  148. query = r.applyWafLogFilters(query, req)
  149. if err := query.Count(&total).Error; err != nil {
  150. return nil, err
  151. }
  152. page := req.Current
  153. pageSize := req.PageSize
  154. if page <= 0 {
  155. page = 1
  156. }
  157. if pageSize <= 0 {
  158. pageSize = 10
  159. } else if pageSize > 100 {
  160. pageSize = 100
  161. }
  162. offset := (page - 1) * pageSize
  163. if req.Column != "" {
  164. query = query.Order(req.Column + " " + req.Order)
  165. }
  166. result := query.Offset(offset).Limit(pageSize).Find(&res)
  167. if result.Error != nil {
  168. return nil, result.Error
  169. }
  170. return &v1.PaginatedResponse[model.WafLog]{
  171. Records: res,
  172. Page: page,
  173. PageSize: pageSize,
  174. Total: total,
  175. TotalPages: int(math.Ceil(float64(total) / float64(pageSize))),
  176. }, nil
  177. }
  178. // queryWafLogFromMultipleTables 多表联合查询
  179. func (r *wafLogRepository) queryWafLogFromMultipleTables(ctx context.Context, req adminApi.SearchWafLogParams, tableNames []string) (*v1.PaginatedResponse[model.WafLog], error) {
  180. var allResults []model.WafLog
  181. var totalCount int64
  182. // 先计算总数
  183. for _, tableName := range tableNames {
  184. var count int64
  185. query := r.DBWithName(ctx, "admin").Table(tableName)
  186. query = r.applyWafLogFilters(query, req)
  187. if err := query.Count(&count).Error; err != nil {
  188. return nil, err
  189. }
  190. totalCount += count
  191. }
  192. page := req.Current
  193. pageSize := req.PageSize
  194. if page <= 0 {
  195. page = 1
  196. }
  197. if pageSize <= 0 {
  198. pageSize = 10
  199. } else if pageSize > 100 {
  200. pageSize = 100
  201. }
  202. // 计算需要跳过的记录数
  203. offset := (page - 1) * pageSize
  204. limit := pageSize
  205. currentOffset := 0
  206. // 逐表查询直到获取足够的记录
  207. for _, tableName := range tableNames {
  208. if limit <= 0 {
  209. break
  210. }
  211. var tableCount int64
  212. countQuery := r.DBWithName(ctx, "admin").Table(tableName)
  213. countQuery = r.applyWafLogFilters(countQuery, req)
  214. if err := countQuery.Count(&tableCount).Error; err != nil {
  215. return nil, err
  216. }
  217. // 如果当前表的记录数不足以满足offset要求,跳过这个表
  218. if currentOffset+int(tableCount) <= offset {
  219. currentOffset += int(tableCount)
  220. continue
  221. }
  222. // 计算在当前表中的offset
  223. tableOffset := offset - currentOffset
  224. if tableOffset < 0 {
  225. tableOffset = 0
  226. }
  227. var tableResults []model.WafLog
  228. query := r.DBWithName(ctx, "admin").Table(tableName)
  229. query = r.applyWafLogFilters(query, req)
  230. if req.Column != "" {
  231. query = query.Order(req.Column + " " + req.Order)
  232. }
  233. err := query.Offset(tableOffset).Limit(limit).Find(&tableResults).Error
  234. if err != nil {
  235. return nil, err
  236. }
  237. // 设置表名
  238. for i := range tableResults {
  239. tableResults[i].SetTableName(tableName)
  240. }
  241. allResults = append(allResults, tableResults...)
  242. limit -= len(tableResults)
  243. currentOffset += int(tableCount)
  244. }
  245. return &v1.PaginatedResponse[model.WafLog]{
  246. Records: allResults,
  247. Page: page,
  248. PageSize: pageSize,
  249. Total: totalCount,
  250. TotalPages: int(math.Ceil(float64(totalCount) / float64(pageSize))),
  251. }, nil
  252. }
  253. func (r *wafLogRepository) AddWafLog(ctx context.Context, log *model.WafLog) error {
  254. // 设置创建时间
  255. if log.CreatedAt.IsZero() {
  256. log.CreatedAt = time.Now()
  257. }
  258. // 获取最优的写入表(自动根据表名获取阈值)
  259. tableName, err := r.Manager.GetOptimalWriteTable(ctx, r.DBWithName(ctx, "admin"), log)
  260. if err != nil {
  261. return fmt.Errorf("获取写入表失败: %v", err)
  262. }
  263. log.SetTableName(tableName)
  264. // 确保表存在
  265. err = r.Manager.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.WafLog{})
  266. if err != nil {
  267. return err
  268. }
  269. // 写入数据
  270. return r.DBWithName(ctx, "admin").Table(tableName).Create(log).Error
  271. }
  272. func (r *wafLogRepository) BatchAddWafLog(ctx context.Context, logs []*model.WafLog) error {
  273. if len(logs) == 0 {
  274. return nil
  275. }
  276. // 按表名分组
  277. tableGroups := make(map[string][]*model.WafLog)
  278. for _, log := range logs {
  279. // 设置创建时间
  280. if log.CreatedAt.IsZero() {
  281. log.CreatedAt = time.Now()
  282. }
  283. // 获取最优的写入表(自动根据表名获取阈值)
  284. tableName, err := r.Manager.GetOptimalWriteTable(ctx, r.DBWithName(ctx, "admin"), log)
  285. if err != nil {
  286. return fmt.Errorf("获取写入表失败: %v", err)
  287. }
  288. log.SetTableName(tableName)
  289. // 按表名分组
  290. tableGroups[tableName] = append(tableGroups[tableName], log)
  291. }
  292. // 为每个表批量插入
  293. for tableName, tableLogs := range tableGroups {
  294. // 确保表存在
  295. err := r.Manager.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.WafLog{})
  296. if err != nil {
  297. return err
  298. }
  299. // 批量插入
  300. err = r.DBWithName(ctx, "admin").Table(tableName).CreateInBatches(tableLogs, len(tableLogs)).Error
  301. if err != nil {
  302. return err
  303. }
  304. }
  305. return nil
  306. }
  307. func (r *wafLogRepository) ExportWafLog(ctx context.Context, req adminApi.ExportWafLog) ([]model.WafLogWithGatewayIP, error) {
  308. return r.ExportWafLogWithPagination(ctx, req, 0, 0)
  309. }
  310. // ExportWafLogWithPagination 获取导出日志的数据,并使用子查询获取每条日志在当时时间点的正确网关组IP(支持分表)
  311. func (r *wafLogRepository) ExportWafLogWithPagination(ctx context.Context, req adminApi.ExportWafLog, page, pageSize int) ([]model.WafLogWithGatewayIP, error) {
  312. // 1. 解析时间范围
  313. startTime, endTime := r.getExportTimeRange(req)
  314. // 2. 获取需要查询的分表
  315. existingTables := r.Manager.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "waf_log", startTime, endTime)
  316. if len(existingTables) == 0 {
  317. return []model.WafLogWithGatewayIP{}, nil
  318. }
  319. if len(existingTables) == 1 {
  320. // 单表查询
  321. return r.exportFromSingleTable(ctx, req, existingTables[0], page, pageSize)
  322. }
  323. // 多表查询(不分页,获取所有数据)
  324. if page > 0 && pageSize > 0 {
  325. return r.exportFromMultipleTablesWithPagination(ctx, req, existingTables, page, pageSize)
  326. } else {
  327. return r.exportFromMultipleTables(ctx, req, existingTables)
  328. }
  329. }
  330. // exportFromSingleTable 从单个表导出数据
  331. func (r *wafLogRepository) exportFromSingleTable(ctx context.Context, req adminApi.ExportWafLog, tableName string, page, pageSize int) ([]model.WafLogWithGatewayIP, error) {
  332. var res []model.WafLogWithGatewayIP
  333. // 1. 构建基础查询
  334. query := r.buildExportQuery(ctx, req, tableName)
  335. // 2. 构建子查询
  336. subQuery := r.buildSubQueryForTable(ctx, tableName)
  337. // 3. 添加 Select
  338. query = query.Select("wl.*, (?) as gateway_ip_data", subQuery)
  339. // 4. 添加分页
  340. if page > 0 && pageSize > 0 {
  341. offset := (page - 1) * pageSize
  342. query = query.Offset(offset).Limit(pageSize)
  343. }
  344. // 5. 执行查询
  345. if err := query.Find(&res).Error; err != nil {
  346. if errors.Is(err, gorm.ErrRecordNotFound) {
  347. return []model.WafLogWithGatewayIP{}, nil
  348. }
  349. return nil, err
  350. }
  351. return res, nil
  352. }
  353. // exportFromMultipleTables 从多个表导出所有数据(不分页)
  354. func (r *wafLogRepository) exportFromMultipleTables(ctx context.Context, req adminApi.ExportWafLog, tableNames []string) ([]model.WafLogWithGatewayIP, error) {
  355. var allResults []model.WafLogWithGatewayIP
  356. for _, tableName := range tableNames {
  357. tableResults, err := r.exportFromSingleTable(ctx, req, tableName, 0, 0)
  358. if err != nil {
  359. return nil, fmt.Errorf("查询表 %s 失败: %v", tableName, err)
  360. }
  361. allResults = append(allResults, tableResults...)
  362. }
  363. return allResults, nil
  364. }
  365. // exportFromMultipleTablesWithPagination 从多个表导出数据(支持分页)
  366. func (r *wafLogRepository) exportFromMultipleTablesWithPagination(ctx context.Context, req adminApi.ExportWafLog, tableNames []string, page, pageSize int) ([]model.WafLogWithGatewayIP, error) {
  367. var allResults []model.WafLogWithGatewayIP
  368. currentOffset := (page - 1) * pageSize
  369. remaining := pageSize
  370. for _, tableName := range tableNames {
  371. if remaining <= 0 {
  372. break
  373. }
  374. // 先获取表的总记录数
  375. countQuery := r.buildExportQuery(ctx, req, tableName)
  376. var tableCount int64
  377. if err := countQuery.Count(&tableCount).Error; err != nil {
  378. return nil, fmt.Errorf("统计表 %s 记录数失败: %v", tableName, err)
  379. }
  380. // 如果当前偏移量大于等于表记录数,跳过这个表
  381. if currentOffset >= int(tableCount) {
  382. currentOffset -= int(tableCount)
  383. continue
  384. }
  385. // 计算在当前表中需要查询的数据
  386. tablePageSize := remaining
  387. if int(tableCount) - currentOffset < tablePageSize {
  388. tablePageSize = int(tableCount) - currentOffset
  389. }
  390. tablePage := (currentOffset / pageSize) + 1
  391. tableOffset := currentOffset % pageSize
  392. // 查询当前表数据
  393. tableResults, err := r.exportFromSingleTable(ctx, req, tableName, tablePage, tablePageSize)
  394. if err != nil {
  395. return nil, fmt.Errorf("查询表 %s 失败: %v", tableName, err)
  396. }
  397. // 如果有偏移量,跳过前面的记录
  398. if tableOffset > 0 && len(tableResults) > tableOffset {
  399. tableResults = tableResults[tableOffset:]
  400. }
  401. allResults = append(allResults, tableResults...)
  402. remaining -= len(tableResults)
  403. currentOffset = 0 // 后续表从0开始
  404. }
  405. return allResults, nil
  406. }
  407. // GetWafLogExportCount 获取导出数据总数(支持分表)
  408. func (r *wafLogRepository) GetWafLogExportCount(ctx context.Context, req adminApi.ExportWafLog) (int, error) {
  409. // 1. 解析时间范围
  410. startTime, endTime := r.getExportTimeRange(req)
  411. // 2. 获取需要查询的分表
  412. existingTables := r.Manager.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "waf_log", startTime, endTime)
  413. if len(existingTables) == 0 {
  414. return 0, nil
  415. }
  416. var totalCount int64
  417. // 3. 逐表统计
  418. for _, tableName := range existingTables {
  419. var tableCount int64
  420. // 使用更新后的 buildExportQuery 方法
  421. query := r.buildExportQuery(ctx, req, tableName)
  422. if err := query.Count(&tableCount).Error; err != nil {
  423. return 0, fmt.Errorf("统计表 %s 记录数失败: %v", tableName, err)
  424. }
  425. totalCount += tableCount
  426. }
  427. return int(totalCount), nil
  428. }
  429. // applyWafLogFilters 应用WafLog查询过滤条件
  430. func (r *wafLogRepository) applyWafLogFilters(query *gorm.DB, req adminApi.SearchWafLogParams) *gorm.DB {
  431. if req.RequestIp != "" {
  432. trimmedName := strings.TrimSpace(req.RequestIp)
  433. query = query.Where("request_ip LIKE CONCAT('%', ?, '%')", trimmedName)
  434. }
  435. if req.Uid != 0 {
  436. query = query.Where("uid = ?", req.Uid)
  437. }
  438. if req.Api != "" {
  439. trimmedName := strings.TrimSpace(req.Api)
  440. query = query.Where("api LIKE CONCAT('%', ?, '%')", trimmedName)
  441. }
  442. if req.Name != "" {
  443. trimmedName := strings.TrimSpace(req.Name)
  444. query = query.Where("name LIKE CONCAT('%', ?, '%')", trimmedName)
  445. }
  446. if req.RuleId != 0 {
  447. query = query.Where("rule_id = ?", req.RuleId)
  448. }
  449. if req.HostId != 0 {
  450. query = query.Where("host_id = ?", req.HostId)
  451. }
  452. if req.UserAgent != "" {
  453. trimmedName := strings.TrimSpace(req.UserAgent)
  454. query = query.Where("user_agent LIKE CONCAT('%', ?, '%')", trimmedName)
  455. }
  456. if req.ApiName != "" {
  457. trimmedName := strings.TrimSpace(req.ApiName)
  458. query = query.Where("api_name LIKE CONCAT('%', ?, '%')", trimmedName)
  459. }
  460. if req.ApiType != "" {
  461. query = query.Where("api_type = ?", req.ApiType)
  462. }
  463. return query
  464. }