1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- package job
- import (
- "context"
- "encoding/json"
- adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
- "github.com/go-nunu/nunu-layout-advanced/internal/service/admin"
- "github.com/rabbitmq/amqp091-go"
- "go.uber.org/zap"
- )
- // 使用公共的 TaskHandler 定义
- // WafLogJob 定义了处理白名单相关任务的接口
- type WafLogJob interface {
- // DomainConsumer 启动消费者,处理域名白名单任务
- AddWafLogConsumer(ctx context.Context)
- }
- // NewWafLogJob 创建一个新的 WhitelistJob
- func NewWafLogJob(job *Job,
- wafLogService admin.WafLogService,
- ) WafLogJob {
- return &wafLogJob{
- Job: job,
- wafLogService: wafLogService,
- }
- }
- type wafLogJob struct {
- *Job
- wafLogService admin.WafLogService
- }
- // DomainConsumer 启动域名白名单消费者
- func (j *wafLogJob) AddWafLogConsumer(ctx context.Context) {
- j.consume(ctx, "waf_log", "waf_log_consumer", j.handleDomainMessage)
- }
- // consume 调用公共的 Consume 方法
- func (j *wafLogJob) consume(ctx context.Context, taskName, consumerName string, handler TaskHandler) {
- j.Job.Consume(ctx, taskName, consumerName, handler)
- }
- // handleDomainMessage 是域名白名单任务的具体处理器
- func (j *wafLogJob) handleDomainMessage(ctx context.Context, logger *zap.Logger, d amqp091.Delivery) error {
- var payload adminApi.WafLog
- if err := json.Unmarshal(d.Body, &payload); err != nil {
- logger.Error("解析添加日志消息失败", zap.Error(err), zap.ByteString("body", d.Body))
- return nil // 返回 nil 以避免消息重入队列,因为这是一个格式错误
- }
- logger.Info("收到添加日志任务",
- zap.Int("hostId", payload.HostId),
- zap.Int("uid", payload.Uid),
- zap.String("routing_key", d.RoutingKey),
- )
- var processingErr error
- processingErr = j.wafLogService.AddWafLog(ctx, payload)
- if processingErr != nil {
- logger.Error("处理域名白名单任务失败", zap.Error(processingErr),zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid),zap.Any("req", payload))
- } else {
- logger.Info("已成功处理域名白名单任务", zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", payload))
- }
- return processingErr
- }
|