waflog.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. package admin
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "strings"
  7. "time"
  8. v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
  9. adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
  10. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  11. "github.com/go-nunu/nunu-layout-advanced/internal/repository"
  12. "gorm.io/gorm"
  13. )
  14. type WafLogRepository interface {
  15. GetWafLog(ctx context.Context, id int64) (*model.WafLog, error)
  16. GetWafLogList(ctx context.Context, req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error)
  17. AddWafLog(ctx context.Context, log *model.WafLog) error
  18. BatchAddWafLog(ctx context.Context, logs []*model.WafLog) error
  19. ExportWafLog(ctx context.Context, req adminApi.ExportWafLog) ([]model.WafLogWithGatewayIP, error)
  20. ExportWafLogWithPagination(ctx context.Context, req adminApi.ExportWafLog, page, pageSize int) ([]model.WafLogWithGatewayIP, error)
  21. GetWafLogExportCount(ctx context.Context, req adminApi.ExportWafLog) (int, error)
  22. }
  23. func NewWafLogRepository(
  24. repository *repository.Repository,
  25. ) WafLogRepository {
  26. return &wafLogRepository{
  27. Repository: repository,
  28. }
  29. }
  30. type wafLogRepository struct {
  31. *repository.Repository
  32. }
  33. // buildExportQuery 是一个辅助函数,用于构建导出日志的公共查询条件
  34. func (r *wafLogRepository) buildExportQuery(ctx context.Context, req adminApi.ExportWafLog) *gorm.DB {
  35. // 使用 Table("waf_log as wl") 是为了给主表起一个别名,方便子查询中引用
  36. query := r.DBWithName(ctx, "admin").Model(&model.WafLog{}).Table("waf_log as wl")
  37. if req.RequestIp != "" {
  38. query = query.Where("wl.request_ip = ?", strings.TrimSpace(req.RequestIp))
  39. }
  40. if req.Uid != 0 {
  41. query = query.Where("wl.uid = ?", req.Uid)
  42. }
  43. if req.Api != "" {
  44. query = query.Where("wl.api = ?", strings.TrimSpace(req.Api))
  45. }
  46. if req.Name != "" {
  47. query = query.Where("wl.name = ?", strings.TrimSpace(req.Name))
  48. }
  49. if req.RuleId != 0 {
  50. query = query.Where("wl.rule_id = ?", req.RuleId)
  51. }
  52. if len(req.HostIds) > 0 {
  53. query = query.Where("wl.host_id IN ?", req.HostIds)
  54. }
  55. if req.UserAgent != "" {
  56. query = query.Where("wl.user_agent = ?", strings.TrimSpace(req.UserAgent))
  57. }
  58. if len(req.ApiNames) > 0 {
  59. query = query.Where("wl.api_name IN ?", req.ApiNames)
  60. }
  61. if len(req.ApiTypes) > 0 {
  62. query = query.Where("wl.api_type IN ?", req.ApiTypes)
  63. }
  64. if req.StartTime != "" {
  65. query = query.Where("wl.created_at > ?", strings.TrimSpace(req.StartTime))
  66. }
  67. if req.EndTime != "" {
  68. query = query.Where("wl.created_at < ?", strings.TrimSpace(req.EndTime))
  69. }
  70. return query
  71. }
  72. func (r *wafLogRepository) GetWafLog(ctx context.Context, id int64) (*model.WafLog, error) {
  73. var res model.WafLog
  74. // 获取存在的分表
  75. existingTables := r.Manager.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "waf_log", nil, nil)
  76. // 在各个分表中查找
  77. for _, tableName := range existingTables {
  78. err := r.DBWithName(ctx, "admin").Table(tableName).Where("id = ?", id).First(&res).Error
  79. if err == nil {
  80. res.SetTableName(tableName)
  81. return &res, nil
  82. }
  83. }
  84. return nil, fmt.Errorf("未找到ID为 %d 的WAF日志记录", id)
  85. }
  86. func (r *wafLogRepository) GetWafLogList(ctx context.Context, req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error) {
  87. //// 解析时间范围(如果有的话)
  88. //var startTime, endTime *time.Time
  89. //// TODO: 这里可以根据req中的时间字段来确定查询范围
  90. //// 暂时查询最近3个月的数据
  91. // 获取需要查询的表
  92. existingTables := r.Manager.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "waf_log", nil, nil)
  93. if len(existingTables) == 0 {
  94. // 没有分表,返回空结果
  95. return &v1.PaginatedResponse[model.WafLog]{
  96. Records: []model.WafLog{},
  97. Page: 1,
  98. PageSize: 10,
  99. Total: 0,
  100. TotalPages: 0,
  101. }, nil
  102. }
  103. if len(existingTables) == 1 {
  104. // 只有一个表,直接查询
  105. return r.queryWafLogFromSingleTable(ctx, req, existingTables[0])
  106. }
  107. // 跨表分页查询
  108. return r.queryWafLogFromMultipleTables(ctx, req, existingTables)
  109. }
  110. // queryWafLogFromSingleTable 单表查询
  111. func (r *wafLogRepository) queryWafLogFromSingleTable(ctx context.Context, req adminApi.SearchWafLogParams, tableName string) (*v1.PaginatedResponse[model.WafLog], error) {
  112. var res []model.WafLog
  113. var total int64
  114. query := r.DBWithName(ctx, "admin").Table(tableName)
  115. query = r.applyWafLogFilters(query, req)
  116. if err := query.Count(&total).Error; err != nil {
  117. return nil, err
  118. }
  119. page := req.Current
  120. pageSize := req.PageSize
  121. if page <= 0 {
  122. page = 1
  123. }
  124. if pageSize <= 0 {
  125. pageSize = 10
  126. } else if pageSize > 100 {
  127. pageSize = 100
  128. }
  129. offset := (page - 1) * pageSize
  130. if req.Column != "" {
  131. query = query.Order(req.Column + " " + req.Order)
  132. }
  133. result := query.Offset(offset).Limit(pageSize).Find(&res)
  134. if result.Error != nil {
  135. return nil, result.Error
  136. }
  137. return &v1.PaginatedResponse[model.WafLog]{
  138. Records: res,
  139. Page: page,
  140. PageSize: pageSize,
  141. Total: total,
  142. TotalPages: int(math.Ceil(float64(total) / float64(pageSize))),
  143. }, nil
  144. }
  145. // queryWafLogFromMultipleTables 多表联合查询
  146. func (r *wafLogRepository) queryWafLogFromMultipleTables(ctx context.Context, req adminApi.SearchWafLogParams, tableNames []string) (*v1.PaginatedResponse[model.WafLog], error) {
  147. var allResults []model.WafLog
  148. var totalCount int64
  149. // 先计算总数
  150. for _, tableName := range tableNames {
  151. var count int64
  152. query := r.DBWithName(ctx, "admin").Table(tableName)
  153. query = r.applyWafLogFilters(query, req)
  154. if err := query.Count(&count).Error; err != nil {
  155. return nil, err
  156. }
  157. totalCount += count
  158. }
  159. page := req.Current
  160. pageSize := req.PageSize
  161. if page <= 0 {
  162. page = 1
  163. }
  164. if pageSize <= 0 {
  165. pageSize = 10
  166. } else if pageSize > 100 {
  167. pageSize = 100
  168. }
  169. // 计算需要跳过的记录数
  170. offset := (page - 1) * pageSize
  171. limit := pageSize
  172. currentOffset := 0
  173. // 逐表查询直到获取足够的记录
  174. for _, tableName := range tableNames {
  175. if limit <= 0 {
  176. break
  177. }
  178. var tableCount int64
  179. countQuery := r.DBWithName(ctx, "admin").Table(tableName)
  180. countQuery = r.applyWafLogFilters(countQuery, req)
  181. if err := countQuery.Count(&tableCount).Error; err != nil {
  182. return nil, err
  183. }
  184. // 如果当前表的记录数不足以满足offset要求,跳过这个表
  185. if currentOffset+int(tableCount) <= offset {
  186. currentOffset += int(tableCount)
  187. continue
  188. }
  189. // 计算在当前表中的offset
  190. tableOffset := offset - currentOffset
  191. if tableOffset < 0 {
  192. tableOffset = 0
  193. }
  194. var tableResults []model.WafLog
  195. query := r.DBWithName(ctx, "admin").Table(tableName)
  196. query = r.applyWafLogFilters(query, req)
  197. if req.Column != "" {
  198. query = query.Order(req.Column + " " + req.Order)
  199. }
  200. err := query.Offset(tableOffset).Limit(limit).Find(&tableResults).Error
  201. if err != nil {
  202. return nil, err
  203. }
  204. // 设置表名
  205. for i := range tableResults {
  206. tableResults[i].SetTableName(tableName)
  207. }
  208. allResults = append(allResults, tableResults...)
  209. limit -= len(tableResults)
  210. currentOffset += int(tableCount)
  211. }
  212. return &v1.PaginatedResponse[model.WafLog]{
  213. Records: allResults,
  214. Page: page,
  215. PageSize: pageSize,
  216. Total: totalCount,
  217. TotalPages: int(math.Ceil(float64(totalCount) / float64(pageSize))),
  218. }, nil
  219. }
  220. func (r *wafLogRepository) AddWafLog(ctx context.Context, log *model.WafLog) error {
  221. // 设置创建时间
  222. if log.CreatedAt.IsZero() {
  223. log.CreatedAt = time.Now()
  224. }
  225. // 获取最优的写入表(自动根据表名获取阈值)
  226. tableName, err := r.Manager.GetOptimalWriteTable(ctx, r.DBWithName(ctx, "admin"), log)
  227. if err != nil {
  228. return fmt.Errorf("获取写入表失败: %v", err)
  229. }
  230. log.SetTableName(tableName)
  231. // 确保表存在
  232. err = r.Manager.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.WafLog{})
  233. if err != nil {
  234. return err
  235. }
  236. // 写入数据
  237. return r.DBWithName(ctx, "admin").Table(tableName).Create(log).Error
  238. }
  239. func (r *wafLogRepository) BatchAddWafLog(ctx context.Context, logs []*model.WafLog) error {
  240. if len(logs) == 0 {
  241. return nil
  242. }
  243. // 按表名分组
  244. tableGroups := make(map[string][]*model.WafLog)
  245. for _, log := range logs {
  246. // 设置创建时间
  247. if log.CreatedAt.IsZero() {
  248. log.CreatedAt = time.Now()
  249. }
  250. // 获取最优的写入表(自动根据表名获取阈值)
  251. tableName, err := r.Manager.GetOptimalWriteTable(ctx, r.DBWithName(ctx, "admin"), log)
  252. if err != nil {
  253. return fmt.Errorf("获取写入表失败: %v", err)
  254. }
  255. log.SetTableName(tableName)
  256. // 按表名分组
  257. tableGroups[tableName] = append(tableGroups[tableName], log)
  258. }
  259. // 为每个表批量插入
  260. for tableName, tableLogs := range tableGroups {
  261. // 确保表存在
  262. err := r.Manager.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.WafLog{})
  263. if err != nil {
  264. return err
  265. }
  266. // 批量插入
  267. err = r.DBWithName(ctx, "admin").Table(tableName).CreateInBatches(tableLogs, len(tableLogs)).Error
  268. if err != nil {
  269. return err
  270. }
  271. }
  272. return nil
  273. }
  274. func (r *wafLogRepository) ExportWafLog(ctx context.Context, req adminApi.ExportWafLog) ([]model.WafLogWithGatewayIP, error) {
  275. return r.ExportWafLogWithPagination(ctx, req, 0, 0)
  276. }
  277. // ExportWafLogWithPagination 使用子查询获取每条日志在当时时间点的正确网关组IP
  278. func (r *wafLogRepository) ExportWafLogWithPagination(ctx context.Context, req adminApi.ExportWafLog, page, pageSize int) ([]model.WafLogWithGatewayIP, error) {
  279. var res []model.WafLogWithGatewayIP
  280. // 1. 使用辅助函数构建基础查询
  281. query := r.buildExportQuery(ctx, req)
  282. // 2. 构建子查询
  283. subQuery := r.DBWithName(ctx, "admin").Model(&model.WafLog{}).
  284. Select("extra_data").
  285. Where("api_name = ?", "分配网关组").
  286. Where("host_id = wl.host_id").
  287. Where("uid = wl.uid").
  288. Where("created_at <= wl.created_at").
  289. Order("created_at DESC").
  290. Limit(1)
  291. // 3. 添加 Select 和分页
  292. query = query.Select("wl.*, (?) as gateway_ip_data", subQuery)
  293. if page > 0 && pageSize > 0 {
  294. offset := (page - 1) * pageSize
  295. query = query.Offset(offset).Limit(pageSize)
  296. }
  297. // 4. 执行查询
  298. if err := query.Find(&res).Error; err != nil {
  299. if err == gorm.ErrRecordNotFound {
  300. return []model.WafLogWithGatewayIP{}, nil
  301. }
  302. return nil, err
  303. }
  304. return res, nil
  305. }
  306. // GetWafLogExportCount 获取导出数据总数(已优化)
  307. func (r *wafLogRepository) GetWafLogExportCount(ctx context.Context, req adminApi.ExportWafLog) (int, error) {
  308. var count int64
  309. // 直接复用 buildExportQuery 来构建查询
  310. query := r.buildExportQuery(ctx, req)
  311. if err := query.Count(&count).Error; err != nil {
  312. return 0, err
  313. }
  314. return int(count), nil
  315. }
  316. // applyWafLogFilters 应用WafLog查询过滤条件
  317. func (r *wafLogRepository) applyWafLogFilters(query *gorm.DB, req adminApi.SearchWafLogParams) *gorm.DB {
  318. if req.RequestIp != "" {
  319. trimmedName := strings.TrimSpace(req.RequestIp)
  320. query = query.Where("request_ip LIKE CONCAT('%', ?, '%')", trimmedName)
  321. }
  322. if req.Uid != 0 {
  323. query = query.Where("uid = ?", req.Uid)
  324. }
  325. if req.Api != "" {
  326. trimmedName := strings.TrimSpace(req.Api)
  327. query = query.Where("api LIKE CONCAT('%', ?, '%')", trimmedName)
  328. }
  329. if req.Name != "" {
  330. trimmedName := strings.TrimSpace(req.Name)
  331. query = query.Where("name LIKE CONCAT('%', ?, '%')", trimmedName)
  332. }
  333. if req.RuleId != 0 {
  334. query = query.Where("rule_id = ?", req.RuleId)
  335. }
  336. if req.HostId != 0 {
  337. query = query.Where("host_id = ?", req.HostId)
  338. }
  339. if req.UserAgent != "" {
  340. trimmedName := strings.TrimSpace(req.UserAgent)
  341. query = query.Where("user_agent LIKE CONCAT('%', ?, '%')", trimmedName)
  342. }
  343. if req.ApiName != "" {
  344. trimmedName := strings.TrimSpace(req.ApiName)
  345. query = query.Where("api_name LIKE CONCAT('%', ?, '%')", trimmedName)
  346. }
  347. if req.ApiType != "" {
  348. query = query.Where("api_type = ?", req.ApiType)
  349. }
  350. return query
  351. }