waflog.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. package admin
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "strings"
  7. "time"
  8. v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
  9. adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
  10. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  11. "github.com/go-nunu/nunu-layout-advanced/internal/repository"
  12. "github.com/go-nunu/nunu-layout-advanced/pkg/sharding"
  13. "gorm.io/gorm"
  14. )
  15. type WafLogRepository interface {
  16. GetWafLog(ctx context.Context, id int64) (*model.WafLog, error)
  17. GetWafLogList(ctx context.Context, req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error)
  18. AddWafLog(ctx context.Context, log *model.WafLog) error
  19. BatchAddWafLog(ctx context.Context, logs []*model.WafLog) error
  20. ExportWafLog(ctx context.Context, req adminApi.ExportWafLog) ([]model.WafLogWithGatewayIP, error)
  21. ExportWafLogWithPagination(ctx context.Context, req adminApi.ExportWafLog, page, pageSize int) ([]model.WafLogWithGatewayIP, error)
  22. GetWafLogExportCount(ctx context.Context, req adminApi.ExportWafLog) (int, error)
  23. }
  24. func NewWafLogRepository(
  25. repository *repository.Repository,
  26. ) WafLogRepository {
  27. return &wafLogRepository{
  28. Repository: repository,
  29. }
  30. }
  31. type wafLogRepository struct {
  32. *repository.Repository
  33. }
  34. // buildExportQuery 是一个辅助函数,用于构建导出日志的公共查询条件
  35. func (r *wafLogRepository) buildExportQuery(ctx context.Context, req adminApi.ExportWafLog) *gorm.DB {
  36. // 使用 Table("waf_log as wl") 是为了给主表起一个别名,方便子查询中引用
  37. query := r.DBWithName(ctx, "admin").Model(&model.WafLog{}).Table("waf_log as wl")
  38. if req.RequestIp != "" {
  39. query = query.Where("wl.request_ip = ?", strings.TrimSpace(req.RequestIp))
  40. }
  41. if req.Uid != 0 {
  42. query = query.Where("wl.uid = ?", req.Uid)
  43. }
  44. if req.Api != "" {
  45. query = query.Where("wl.api = ?", strings.TrimSpace(req.Api))
  46. }
  47. if req.Name != "" {
  48. query = query.Where("wl.name = ?", strings.TrimSpace(req.Name))
  49. }
  50. if req.RuleId != 0 {
  51. query = query.Where("wl.rule_id = ?", req.RuleId)
  52. }
  53. if len(req.HostIds) > 0 {
  54. query = query.Where("wl.host_id IN ?", req.HostIds)
  55. }
  56. if req.UserAgent != "" {
  57. query = query.Where("wl.user_agent = ?", strings.TrimSpace(req.UserAgent))
  58. }
  59. if len(req.ApiNames) > 0 {
  60. query = query.Where("wl.api_name IN ?", req.ApiNames)
  61. }
  62. if len(req.ApiTypes) > 0 {
  63. query = query.Where("wl.api_type IN ?", req.ApiTypes)
  64. }
  65. if req.StartTime != "" {
  66. query = query.Where("wl.created_at > ?", strings.TrimSpace(req.StartTime))
  67. }
  68. if req.EndTime != "" {
  69. query = query.Where("wl.created_at < ?", strings.TrimSpace(req.EndTime))
  70. }
  71. return query
  72. }
  73. func (r *wafLogRepository) GetWafLog(ctx context.Context, id int64) (*model.WafLog, error) {
  74. var res model.WafLog
  75. // 获取分表管理器
  76. shardingMgr := r.getShardingManager()
  77. // 获取存在的分表
  78. existingTables := shardingMgr.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "waf_log", nil, nil)
  79. // 在各个分表中查找
  80. for _, tableName := range existingTables {
  81. err := r.DBWithName(ctx, "admin").Table(tableName).Where("id = ?", id).First(&res).Error
  82. if err == nil {
  83. res.SetTableName(tableName)
  84. return &res, nil
  85. }
  86. }
  87. return nil, fmt.Errorf("未找到ID为 %d 的WAF日志记录", id)
  88. }
  89. func (r *wafLogRepository) GetWafLogList(ctx context.Context, req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error) {
  90. // 获取分表管理器
  91. shardingMgr := r.getShardingManager()
  92. // 解析时间范围(如果有的话)
  93. var startTime, endTime *time.Time
  94. // TODO: 这里可以根据req中的时间字段来确定查询范围
  95. // 暂时查询最近3个月的数据
  96. // 获取需要查询的表
  97. existingTables := shardingMgr.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "waf_log", startTime, endTime)
  98. if len(existingTables) == 0 {
  99. // 没有分表,返回空结果
  100. return &v1.PaginatedResponse[model.WafLog]{
  101. Records: []model.WafLog{},
  102. Page: 1,
  103. PageSize: 10,
  104. Total: 0,
  105. TotalPages: 0,
  106. }, nil
  107. }
  108. if len(existingTables) == 1 {
  109. // 只有一个表,直接查询
  110. return r.queryWafLogFromSingleTable(ctx, req, existingTables[0])
  111. }
  112. // 跨表分页查询
  113. return r.queryWafLogFromMultipleTables(ctx, req, existingTables)
  114. }
  115. // queryWafLogFromSingleTable 单表查询
  116. func (r *wafLogRepository) queryWafLogFromSingleTable(ctx context.Context, req adminApi.SearchWafLogParams, tableName string) (*v1.PaginatedResponse[model.WafLog], error) {
  117. var res []model.WafLog
  118. var total int64
  119. query := r.DBWithName(ctx, "admin").Table(tableName)
  120. query = r.applyWafLogFilters(query, req)
  121. if err := query.Count(&total).Error; err != nil {
  122. return nil, err
  123. }
  124. page := req.Current
  125. pageSize := req.PageSize
  126. if page <= 0 {
  127. page = 1
  128. }
  129. if pageSize <= 0 {
  130. pageSize = 10
  131. } else if pageSize > 100 {
  132. pageSize = 100
  133. }
  134. offset := (page - 1) * pageSize
  135. if req.Column != "" {
  136. query = query.Order(req.Column + " " + req.Order)
  137. }
  138. result := query.Offset(offset).Limit(pageSize).Find(&res)
  139. if result.Error != nil {
  140. return nil, result.Error
  141. }
  142. return &v1.PaginatedResponse[model.WafLog]{
  143. Records: res,
  144. Page: page,
  145. PageSize: pageSize,
  146. Total: total,
  147. TotalPages: int(math.Ceil(float64(total) / float64(pageSize))),
  148. }, nil
  149. }
  150. // queryWafLogFromMultipleTables 多表联合查询
  151. func (r *wafLogRepository) queryWafLogFromMultipleTables(ctx context.Context, req adminApi.SearchWafLogParams, tableNames []string) (*v1.PaginatedResponse[model.WafLog], error) {
  152. var allResults []model.WafLog
  153. var totalCount int64
  154. // 先计算总数
  155. for _, tableName := range tableNames {
  156. var count int64
  157. query := r.DBWithName(ctx, "admin").Table(tableName)
  158. query = r.applyWafLogFilters(query, req)
  159. if err := query.Count(&count).Error; err != nil {
  160. return nil, err
  161. }
  162. totalCount += count
  163. }
  164. page := req.Current
  165. pageSize := req.PageSize
  166. if page <= 0 {
  167. page = 1
  168. }
  169. if pageSize <= 0 {
  170. pageSize = 10
  171. } else if pageSize > 100 {
  172. pageSize = 100
  173. }
  174. // 计算需要跳过的记录数
  175. offset := (page - 1) * pageSize
  176. limit := pageSize
  177. currentOffset := 0
  178. // 逐表查询直到获取足够的记录
  179. for _, tableName := range tableNames {
  180. if limit <= 0 {
  181. break
  182. }
  183. var tableCount int64
  184. countQuery := r.DBWithName(ctx, "admin").Table(tableName)
  185. countQuery = r.applyWafLogFilters(countQuery, req)
  186. if err := countQuery.Count(&tableCount).Error; err != nil {
  187. return nil, err
  188. }
  189. // 如果当前表的记录数不足以满足offset要求,跳过这个表
  190. if currentOffset+int(tableCount) <= offset {
  191. currentOffset += int(tableCount)
  192. continue
  193. }
  194. // 计算在当前表中的offset
  195. tableOffset := offset - currentOffset
  196. if tableOffset < 0 {
  197. tableOffset = 0
  198. }
  199. var tableResults []model.WafLog
  200. query := r.DBWithName(ctx, "admin").Table(tableName)
  201. query = r.applyWafLogFilters(query, req)
  202. if req.Column != "" {
  203. query = query.Order(req.Column + " " + req.Order)
  204. }
  205. err := query.Offset(tableOffset).Limit(limit).Find(&tableResults).Error
  206. if err != nil {
  207. return nil, err
  208. }
  209. // 设置表名
  210. for i := range tableResults {
  211. tableResults[i].SetTableName(tableName)
  212. }
  213. allResults = append(allResults, tableResults...)
  214. limit -= len(tableResults)
  215. currentOffset += int(tableCount)
  216. }
  217. return &v1.PaginatedResponse[model.WafLog]{
  218. Records: allResults,
  219. Page: page,
  220. PageSize: pageSize,
  221. Total: totalCount,
  222. TotalPages: int(math.Ceil(float64(totalCount) / float64(pageSize))),
  223. }, nil
  224. }
  225. func (r *wafLogRepository) AddWafLog(ctx context.Context, log *model.WafLog) error {
  226. // 设置创建时间
  227. if log.CreatedAt.IsZero() {
  228. log.CreatedAt = time.Now()
  229. }
  230. // 获取分表管理器
  231. shardingMgr := r.getShardingManagerWithThreshold()
  232. // 获取最优的写入表(考虑数据量阈值)
  233. tableName, err := shardingMgr.GetOptimalWriteTable(ctx, r.DBWithName(ctx, "admin"), log, r.getMaxRowsForTable("waf_log"))
  234. if err != nil {
  235. return fmt.Errorf("获取写入表失败: %v", err)
  236. }
  237. log.SetTableName(tableName)
  238. // 确保表存在
  239. err = shardingMgr.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.WafLog{})
  240. if err != nil {
  241. return err
  242. }
  243. // 写入数据
  244. return r.DBWithName(ctx, "admin").Table(tableName).Create(log).Error
  245. }
  246. func (r *wafLogRepository) BatchAddWafLog(ctx context.Context, logs []*model.WafLog) error {
  247. if len(logs) == 0 {
  248. return nil
  249. }
  250. // 获取带阈值的分表管理器
  251. shardingMgr := r.getShardingManagerWithThreshold()
  252. maxRows := r.getMaxRowsForTable("waf_log")
  253. // 按表名分组
  254. tableGroups := make(map[string][]*model.WafLog)
  255. for _, log := range logs {
  256. // 设置创建时间
  257. if log.CreatedAt.IsZero() {
  258. log.CreatedAt = time.Now()
  259. }
  260. // 获取最优的写入表(考虑数据量阈值)
  261. tableName, err := shardingMgr.GetOptimalWriteTable(ctx, r.DBWithName(ctx, "admin"), log, maxRows)
  262. if err != nil {
  263. return fmt.Errorf("获取写入表失败: %v", err)
  264. }
  265. log.SetTableName(tableName)
  266. // 按表名分组
  267. tableGroups[tableName] = append(tableGroups[tableName], log)
  268. }
  269. // 为每个表批量插入
  270. for tableName, tableLogs := range tableGroups {
  271. // 确保表存在
  272. err := shardingMgr.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.WafLog{})
  273. if err != nil {
  274. return err
  275. }
  276. // 批量插入
  277. err = r.DBWithName(ctx, "admin").Table(tableName).CreateInBatches(tableLogs, len(tableLogs)).Error
  278. if err != nil {
  279. return err
  280. }
  281. }
  282. return nil
  283. }
  284. func (r *wafLogRepository) ExportWafLog(ctx context.Context, req adminApi.ExportWafLog) ([]model.WafLogWithGatewayIP, error) {
  285. return r.ExportWafLogWithPagination(ctx, req, 0, 0)
  286. }
  287. // ExportWafLogWithPagination 使用子查询获取每条日志在当时时间点的正确网关组IP
  288. func (r *wafLogRepository) ExportWafLogWithPagination(ctx context.Context, req adminApi.ExportWafLog, page, pageSize int) ([]model.WafLogWithGatewayIP, error) {
  289. var res []model.WafLogWithGatewayIP
  290. // 1. 使用辅助函数构建基础查询
  291. query := r.buildExportQuery(ctx, req)
  292. // 2. 构建子查询
  293. subQuery := r.DBWithName(ctx, "admin").Model(&model.WafLog{}).
  294. Select("extra_data").
  295. Where("api_name = ?", "分配网关组").
  296. Where("host_id = wl.host_id").
  297. Where("uid = wl.uid").
  298. Where("created_at <= wl.created_at").
  299. Order("created_at DESC").
  300. Limit(1)
  301. // 3. 添加 Select 和分页
  302. query = query.Select("wl.*, (?) as gateway_ip_data", subQuery)
  303. if page > 0 && pageSize > 0 {
  304. offset := (page - 1) * pageSize
  305. query = query.Offset(offset).Limit(pageSize)
  306. }
  307. // 4. 执行查询
  308. if err := query.Find(&res).Error; err != nil {
  309. if err == gorm.ErrRecordNotFound {
  310. return []model.WafLogWithGatewayIP{}, nil
  311. }
  312. return nil, err
  313. }
  314. return res, nil
  315. }
  316. // GetWafLogExportCount 获取导出数据总数(已优化)
  317. func (r *wafLogRepository) GetWafLogExportCount(ctx context.Context, req adminApi.ExportWafLog) (int, error) {
  318. var count int64
  319. // 直接复用 buildExportQuery 来构建查询
  320. query := r.buildExportQuery(ctx, req)
  321. if err := query.Count(&count).Error; err != nil {
  322. return 0, err
  323. }
  324. return int(count), nil
  325. }
  326. // getShardingManager 获取分表管理器
  327. func (r *wafLogRepository) getShardingManager() *sharding.ShardingManager {
  328. // 使用月度分表策略
  329. strategy := sharding.NewMonthlyShardingStrategy()
  330. return sharding.NewShardingManager(strategy, r.Logger)
  331. }
  332. // getShardingManagerWithThreshold 获取带阈值配置的分表管理器
  333. func (r *wafLogRepository) getShardingManagerWithThreshold() *sharding.ShardingManager {
  334. strategy := sharding.NewMonthlyShardingStrategy()
  335. // 阈值配置(这里可以从配置文件读取,暂时硬编码)
  336. thresholdConfig := &sharding.ThresholdConfig{
  337. Enabled: true,
  338. MaxRows: 5000000, // waf_log表默认500万条
  339. CheckInterval: time.Hour,
  340. }
  341. return sharding.NewShardingManagerWithThreshold(strategy, r.Logger, thresholdConfig)
  342. }
  343. // getMaxRowsForTable 获取指定表的最大行数配置
  344. func (r *wafLogRepository) getMaxRowsForTable(tableName string) int64 {
  345. switch tableName {
  346. case "log":
  347. return 3000000 // 300万条
  348. case "waf_log":
  349. return 5000000 // 500万条
  350. default:
  351. return 3000000 // 默认300万条
  352. }
  353. }
  354. // applyWafLogFilters 应用WafLog查询过滤条件
  355. func (r *wafLogRepository) applyWafLogFilters(query *gorm.DB, req adminApi.SearchWafLogParams) *gorm.DB {
  356. if req.RequestIp != "" {
  357. trimmedName := strings.TrimSpace(req.RequestIp)
  358. query = query.Where("request_ip LIKE CONCAT('%', ?, '%')", trimmedName)
  359. }
  360. if req.Uid != 0 {
  361. query = query.Where("uid = ?", req.Uid)
  362. }
  363. if req.Api != "" {
  364. trimmedName := strings.TrimSpace(req.Api)
  365. query = query.Where("api LIKE CONCAT('%', ?, '%')", trimmedName)
  366. }
  367. if req.Name != "" {
  368. trimmedName := strings.TrimSpace(req.Name)
  369. query = query.Where("name LIKE CONCAT('%', ?, '%')", trimmedName)
  370. }
  371. if req.RuleId != 0 {
  372. query = query.Where("rule_id = ?", req.RuleId)
  373. }
  374. if req.HostId != 0 {
  375. query = query.Where("host_id = ?", req.HostId)
  376. }
  377. if req.UserAgent != "" {
  378. trimmedName := strings.TrimSpace(req.UserAgent)
  379. query = query.Where("user_agent LIKE CONCAT('%', ?, '%')", trimmedName)
  380. }
  381. if req.ApiName != "" {
  382. trimmedName := strings.TrimSpace(req.ApiName)
  383. query = query.Where("api_name LIKE CONCAT('%', ?, '%')", trimmedName)
  384. }
  385. if req.ApiType != "" {
  386. query = query.Where("api_type = ?", req.ApiType)
  387. }
  388. return query
  389. }