Parcourir la source

feat(repository): 重构过期套餐处理逻辑

- 新增 PlanListType 类型,用于区分不同的套餐列表
- 重新设计 ExpiredRepository 接口,简化方法数量和逻辑
- 使用 Redis Set存储已关闭套餐,Sorted Set 存储即将过期套餐
fusu il y a 3 semaines
Parent
commit
42bca18da7

+ 2 - 2
cmd/task/wire/wire_gen.go

@@ -62,13 +62,13 @@ func NewWire(viperViper *viper.Viper, logger *log.Logger) (*app.App, func(), err
 	cdnService := service.NewCdnService(serviceService, viperViper, requestService, cdnRepository)
 	globalLimitRepository := repository.NewGlobalLimitRepository(repositoryRepository)
 	expiredRepository := repository.NewExpiredRepository(repositoryRepository)
-	wafTask := task.NewWafTask(webForwardingRepository, tcpforwardingRepository, udpForWardingRepository, cdnService, hostRepository, globalLimitRepository, expiredRepository, taskTask)
+	gateWayGroupIpRepository := repository.NewGateWayGroupIpRepository(repositoryRepository)
+	wafTask := task.NewWafTask(webForwardingRepository, tcpforwardingRepository, udpForWardingRepository, cdnService, hostRepository, globalLimitRepository, expiredRepository, taskTask, gateWayGroupIpRepository)
 	taskServer := server.NewTaskServer(logger, userTask, gameShieldTask, wafTask)
 	jobJob := job.NewJob(transaction, logger, sidSid, rabbitMQ)
 	userJob := job.NewUserJob(jobJob, userRepository)
 	aoDunService := service.NewAoDunService(serviceService, viperViper)
 	gatewayGroupRepository := repository.NewGatewayGroupRepository(repositoryRepository)
-	gateWayGroupIpRepository := repository.NewGateWayGroupIpRepository(repositoryRepository)
 	wafFormatterService := service.NewWafFormatterService(serviceService, globalLimitRepository, hostRepository, requiredService, parserService, tcpforwardingRepository, udpForWardingRepository, webForwardingRepository, rabbitMQ, hostService, gatewayGroupRepository, gateWayGroupIpRepository, cdnService)
 	whitelistJob := job.NewWhitelistJob(jobJob, aoDunService, wafFormatterService)
 	jobServer := server.NewJobServer(logger, userJob, whitelistJob)

+ 91 - 110
internal/repository/expired.go

@@ -2,31 +2,32 @@ package repository
 
 import (
 	"context"
-	"encoding/json"
 	"fmt"
+	"github.com/redis/go-redis/v9"
 	"strconv"
-	"strings"
 	"time"
 )
 
-// ExpiredInfo 包含了关闭套餐的详细信息
-type ExpiredInfo struct {
-	HostID int64     `json:"host_id"` // hostid, a.k.a. planid
-	Expiry time.Time `json:"expiry"` // 过期时间
-}
+// PlanListType 定义了要操作的套餐列表类型
+type PlanListType string
+
+const (
+	// ClosedPlansList 代表已关闭的套餐列表 (使用 Redis Set)
+	ClosedPlansList PlanListType = "closed"
+	// ExpiringSoonPlansList 代表即将过期的套餐列表 (使用 Redis Sorted Set)
+	ExpiringSoonPlansList PlanListType = "expiring_soon"
+)
 
 // ExpiredRepository 定义了与过期套餐相关的操作接口
 type ExpiredRepository interface {
-	// AddClosePlans 批量添加要关闭的套餐信息,并设置过期时间
-	AddClosePlans(ctx context.Context, infos ...ExpiredInfo) error
-	// GetClosePlanInfo 获取单个已关闭套餐的详细信息
-	GetClosePlanInfo(ctx context.Context, planId int64) (*ExpiredInfo, error)
-	// RemoveClosePlanIds 批量移除已关闭的套餐
-	RemoveClosePlanIds(ctx context.Context, planIds ...int64) error
-	// GetAllClosePlanIds 获取所有当前套餐ID
-	GetAllClosePlanIds(ctx context.Context) ([]int64, error)
-	// IsPlanClosed 检查一个套餐是否被标记为关闭
-	IsPlanClosed(ctx context.Context, planId int64) (bool, error)
+	// AddPlans 将一个或多个套餐ID添加到指定的列表中
+	AddPlans(ctx context.Context, listType PlanListType, planIds ...int64) error
+	// RemovePlans 从指定的列表中移除一个或多个套餐ID
+	RemovePlans(ctx context.Context, listType PlanListType, planIds ...int64) error
+	// GetAllPlanIds 获取指定列表中的所有套餐ID
+	GetAllPlanIds(ctx context.Context, listType PlanListType) ([]int64, error)
+	// IsPlanInList 检查一个套餐ID是否存在于指定的列表中
+	IsPlanInList(ctx context.Context, listType PlanListType, planId int64) (bool, error)
 }
 
 func NewExpiredRepository(
@@ -42,127 +43,107 @@ type expiredRepository struct {
 }
 
 // Key的前缀,用于标识所有已关闭套餐的Key
-const closePlanIdKeyPrefix = "waf:closed_plan:"
+// closePlansKey 用于存储所有已关闭套餐ID的Set
+const closePlansKey = "waf:closed_plans"
 
-// 辅助函数:根据 planId 生成对应的 Redis Key
-func (r *expiredRepository) getPlanKey(planId int64) string {
-	return fmt.Sprintf("%s%d", closePlanIdKeyPrefix, planId)
-}
+// expiringSoonPlansKey 用于存储7天后过期套餐ID的Sorted Set
+// Score: 过期时间戳, Value: planId
+const expiringSoonPlansKey = "waf:expiring_soon_plans"
 
-// AddClosePlans 为每个套餐创建一个独立的 key,并将详细信息作为 value 存储
-func (r *expiredRepository) AddClosePlans(ctx context.Context, infos ...ExpiredInfo) error {
-	if len(infos) == 0 {
+// AddPlans 将一个或多个套餐ID添加到指定的列表中
+func (r *expiredRepository) AddPlans(ctx context.Context, listType PlanListType, planIds ...int64) error {
+	if len(planIds) == 0 {
 		return nil
 	}
 
-	pipe := r.rdb.Pipeline()
-	for _, info := range infos {
-		key := r.getPlanKey(info.HostID)
-		
-		// 将结构体序列化为 JSON 字符串
-		value, err := json.Marshal(info)
-		if err != nil {
-			// 在实际应用中,这里应该记录错误日志
-			// log.Printf("Error marshalling ExpiredInfo for plan %d: %v", info.HostID, err)
-			continue // 跳过这个错误的数据
+	switch listType {
+	case ClosedPlansList:
+		members := make([]interface{}, len(planIds))
+		for i, id := range planIds {
+			members[i] = id
 		}
+		return r.rdb.SAdd(ctx, closePlansKey, members...).Err()
+
+	case ExpiringSoonPlansList:
+		// Score 代表套餐的实际过期时间戳,用于后续查询
+		expirationTimestamp := float64(time.Now().Add(7 * 24 * time.Hour).Unix())
+		members := make([]redis.Z, len(planIds))
+		for i, id := range planIds {
+			members[i] = redis.Z{
+				Score:  expirationTimestamp,
+				Member: id,
+			}
+		}
+		return r.rdb.ZAdd(ctx, expiringSoonPlansKey, members...).Err()
 
-		// 设置一个固定的7天过期时间,用于记录关闭状态
-		// 这样可以确保在7天内,该套餐的状态是“已关闭”,可以被恢复
-		// 7天后,该key会自动过期
-		const sevenDays = 7 * 24 * time.Hour
-		pipe.Set(ctx, key, value, sevenDays)
-	}
-
-	_, err := pipe.Exec(ctx)
-	return err
-}
-
-// GetClosePlanInfo 获取并解析单个套餐的信息
-func (r *expiredRepository) GetClosePlanInfo(ctx context.Context, planId int64) (*ExpiredInfo, error) {
-	key := r.getPlanKey(planId)
-	value, err := r.rdb.Get(ctx, key).Result()
-	if err != nil {
-		return nil, err // 包括 redis.Nil 的情况
-	}
-
-	var info ExpiredInfo
-	if err := json.Unmarshal([]byte(value), &info); err != nil {
-		return nil, fmt.Errorf("failed to unmarshal plan info for key %s: %w", key, err)
+	default:
+		return fmt.Errorf("未知的列表类型: %s", listType)
 	}
-
-	return &info, nil
 }
 
-// RemoveClosePlanIds 删除每个 planId 对应的 key
-func (r *expiredRepository) RemoveClosePlanIds(ctx context.Context, planIds ...int64) error {
+// RemovePlans 从指定的列表中移除一个或多个套餐ID
+func (r *expiredRepository) RemovePlans(ctx context.Context, listType PlanListType, planIds ...int64) error {
 	if len(planIds) == 0 {
 		return nil
 	}
 
-	// 生成所有需要删除的 key
-	keys := make([]string, len(planIds))
+	members := make([]interface{}, len(planIds))
 	for i, id := range planIds {
-		keys[i] = r.getPlanKey(id)
+		members[i] = id
 	}
 
-	// DEL 命令可以一次性删除多个 key
-	return r.rdb.Del(ctx, keys...).Err()
+	switch listType {
+	case ClosedPlansList:
+		return r.rdb.SRem(ctx, closePlansKey, members...).Err()
+	case ExpiringSoonPlansList:
+		return r.rdb.ZRem(ctx, expiringSoonPlansKey, members...).Err()
+	default:
+		return fmt.Errorf("未知的列表类型: %s", listType)
+	}
 }
 
-// GetAllClosePlanIds 使用 SCAN 遍历所有匹配的 key 并解析出 planId
-func (r *expiredRepository) GetAllClosePlanIds(ctx context.Context) ([]int64, error) {
-	var cursor uint64
-	var allKeys []string
-
-	// 使用 SCAN 命令来安全地遍历大量的 key,避免阻塞 Redis
-	// KEYS 命令会导致性能问题,在生产环境中严禁使用
-	scanPattern := closePlanIdKeyPrefix + "*"
-
-	for {
-		var keys []string
-		var err error
-		keys, cursor, err = r.rdb.Scan(ctx, cursor, scanPattern, 100).Result() // 每次扫描100个
-		if err != nil {
-			return nil, err
-		}
-
-		allKeys = append(allKeys, keys...)
+// GetAllPlanIds 获取指定列表中的所有套餐ID
+func (r *expiredRepository) GetAllPlanIds(ctx context.Context, listType PlanListType) ([]int64, error) {
+	var members []string
+	var err error
+
+	switch listType {
+	case ClosedPlansList:
+		members, err = r.rdb.SMembers(ctx, closePlansKey).Result()
+	case ExpiringSoonPlansList:
+		members, err = r.rdb.ZRange(ctx, expiringSoonPlansKey, 0, -1).Result()
+	default:
+		return nil, fmt.Errorf("未知的列表类型: %s", listType)
+	}
 
-		// 如果 cursor 回到 0,表示遍历完成
-		if cursor == 0 {
-			break
-		}
+	if err != nil {
+		return nil, err
 	}
 
-	// 从 key 的字符串中解析出 planId
-	planIds := make([]int64, 0, len(allKeys))
-	for _, key := range allKeys {
-		// key 的格式是 "waf:closed_plan:12345"
-		// 我们需要移除前缀 "waf:closed_plan:" 来获取ID部分
-		idStr := strings.TrimPrefix(key, closePlanIdKeyPrefix)
-		id, err := strconv.ParseInt(idStr, 10, 64)
+	planIds := make([]int64, len(members))
+	for i, memberStr := range members {
+		id, err := strconv.ParseInt(memberStr, 10, 64)
 		if err != nil {
-			// 如果有无法解析的key,最好记录日志并跳过
-			// log.Printf("Warning: could not parse planId from key '%s': %v", key, err)
-			continue
+			return nil, fmt.Errorf("无法解析套餐ID '%s': %w", memberStr, err)
 		}
-		planIds = append(planIds, id)
+		planIds[i] = id
 	}
 
 	return planIds, nil
 }
 
-// IsPlanClosed 检查 planId 对应的 key 是否存在
-func (r *expiredRepository) IsPlanClosed(ctx context.Context, planId int64) (bool, error) {
-	key := r.getPlanKey(planId)
-
-	// EXISTS 命令是 O(1) 的高效操作,返回存在的key的数量
-	count, err := r.rdb.Exists(ctx, key).Result()
-	if err != nil {
-		return false, err
+// IsPlanInList 检查一个套餐ID是否存在于指定的列表中
+func (r *expiredRepository) IsPlanInList(ctx context.Context, listType PlanListType, planId int64) (bool, error) {
+	switch listType {
+	case ClosedPlansList:
+		return r.rdb.SIsMember(ctx, closePlansKey, planId).Result()
+	case ExpiringSoonPlansList:
+		_, err := r.rdb.ZScore(ctx, expiringSoonPlansKey, strconv.FormatInt(planId, 10)).Result()
+		if err == redis.Nil {
+			return false, nil
+		}
+		return err == nil, err
+	default:
+		return false, fmt.Errorf("未知的列表类型: %s", listType)
 	}
-
-	// 如果 count > 0,说明key存在,即套餐已关闭
-	return count > 0, nil
 }

+ 1 - 1
internal/repository/gatewaygroupip.go

@@ -153,4 +153,4 @@ func (r *gateWayGroupIpRepository) GetGatewayGroupIpList(ctx context.Context,req
 		TotalPages: int(math.Ceil(float64(total) / float64(pageSize))),
 
 	}, nil
-}
+}

+ 30 - 30
internal/server/task.go

@@ -76,36 +76,36 @@ func (t *TaskServer) Start(ctx context.Context) error {
 
 
 
-	_, err := t.scheduler.Cron("* * * * *").Do(func() {
-		err := t.wafTask.SynchronizationTime(ctx)
-		if err != nil {
-			t.log.Error("同步到期时间失败", zap.Error(err))
-		}
-	})
-	if err != nil {
-		t.log.Error("同步到期时间注册任务失败", zap.Error(err))
-	}
-
-	_, err = t.scheduler.Cron("* * * * *").Do(func() {
-		err := t.wafTask.StopPlan(ctx)
-		if err != nil {
-			t.log.Error("停止套餐失败", zap.Error(err))
-		}
-	})
-	if err != nil {
-		t.log.Error("停止套餐注册任务失败", zap.Error(err))
-	}
-
-
-	_, err = t.scheduler.Cron("* * * * *").Do(func() {
-		err := t.wafTask.RecoverStopPlan(ctx)
-		if err != nil {
-			t.log.Error("续费失败", zap.Error(err))
-		}
-	})
-	if err != nil {
-		t.log.Error("续费注册任务失败", zap.Error(err))
-	}
+	//_, err := t.scheduler.Cron("* * * * *").Do(func() {
+	//	err := t.wafTask.SynchronizationTime(ctx)
+	//	if err != nil {
+	//		t.log.Error("同步到期时间失败", zap.Error(err))
+	//	}
+	//})
+	//if err != nil {
+	//	t.log.Error("同步到期时间注册任务失败", zap.Error(err))
+	//}
+	//
+	//_, err = t.scheduler.Cron("* * * * *").Do(func() {
+	//	err := t.wafTask.StopPlan(ctx)
+	//	if err != nil {
+	//		t.log.Error("停止套餐失败", zap.Error(err))
+	//	}
+	//})
+	//if err != nil {
+	//	t.log.Error("停止套餐注册任务失败", zap.Error(err))
+	//}
+	//
+	//
+	//_, err = t.scheduler.Cron("* * * * *").Do(func() {
+	//	err := t.wafTask.RecoverStopPlan(ctx)
+	//	if err != nil {
+	//		t.log.Error("续费失败", zap.Error(err))
+	//	}
+	//})
+	//if err != nil {
+	//	t.log.Error("续费注册任务失败", zap.Error(err))
+	//}
 
 
 

+ 233 - 298
internal/task/waf.go

@@ -13,14 +13,21 @@ import (
 	"time"
 )
 
+// WafTask 定义了WAF相关的五个独立定时任务接口
 type WafTask interface {
-	//获取到期时间小于3天的同步时间
+	// 1. 同步即将到期(1天内)的套餐时间
 	SynchronizationTime(ctx context.Context) error
+	// 2. 停止所有已到期的套餐
 	StopPlan(ctx context.Context) error
-	RecoverStopPlan(ctx context.Context) error
+	// 3. 恢复7天内续费的套餐
+	RecoverRecentPlan(ctx context.Context) error
+	// 4. 清理过期超过7天且仍未续费的记录
+	CleanUpStaleRecords(ctx context.Context) error
+	// 5. 恢复超过7天后才续费的套餐
+	RecoverStalePlan(ctx context.Context) error
 }
 
-func NewWafTask (
+func NewWafTask(
 	webForWardingRep repository.WebForwardingRepository,
 	tcpforwardingRep repository.TcpforwardingRepository,
 	udpForWardingRep repository.UdpForWardingRepository,
@@ -29,230 +36,123 @@ func NewWafTask (
 	globalLimitRep repository.GlobalLimitRepository,
 	expiredRep repository.ExpiredRepository,
 	task *Task,
-	) WafTask{
+	gatewayGroupIpRep repository.GateWayGroupIpRepository,
+) WafTask {
 	return &wafTask{
-		Task: task,
+		Task:             task,
 		webForWardingRep: webForWardingRep,
 		tcpforwardingRep: tcpforwardingRep,
 		udpForWardingRep: udpForWardingRep,
-		cdn: cdn,
-		hostRep: hostRep,
-		globalLimitRep: globalLimitRep,
-		expiredRep: expiredRep,
+		cdn:              cdn,
+		hostRep:          hostRep,
+		globalLimitRep:   globalLimitRep,
+		expiredRep:       expiredRep,
+		gatewayGroupIpRep: gatewayGroupIpRep,
 	}
 }
+
 type wafTask struct {
 	*Task
 	webForWardingRep repository.WebForwardingRepository
 	tcpforwardingRep repository.TcpforwardingRepository
 	udpForWardingRep repository.UdpForWardingRepository
-	cdn service.CdnService
-	hostRep repository.HostRepository
-	globalLimitRep repository.GlobalLimitRepository
-	expiredRep repository.ExpiredRepository
+	cdn              service.CdnService
+	hostRep          repository.HostRepository
+	globalLimitRep   repository.GlobalLimitRepository
+	expiredRep       repository.ExpiredRepository
+	gatewayGroupIpRep repository.GateWayGroupIpRepository
 }
 
-
 const (
-	// 1天秒数
+	// 1天对应的秒数
 	OneDaysInSeconds = 1 * 24 * 60 * 60
-	// 7天秒数
-	SevenDaysInSeconds = 7 * 24 * 60 * 60 * -1
+	// 7天对应的秒数
+	SevenDaysInSeconds = 7 * 24 * 60 * 60
 )
-// 获取cdn web id
-func (t wafTask) GetCdnWebId(ctx context.Context,hostId []int) ([]int, error) {
-	tcpIds, err := t.tcpforwardingRep.GetTcpAll(ctx, hostId)
-	if err != nil {
-		return nil, err
-	}
-	udpIds, err := t.udpForWardingRep.GetUdpAll(ctx, hostId)
-	if err != nil {
-		return nil, err
-	}
-	webIds, err := t.webForWardingRep.GetWebAll(ctx, hostId)
-	if err != nil {
-		return nil, err
-	}
-	var ids []int
-	ids = append(ids, tcpIds...)
-	ids = append(ids, udpIds...)
-	ids = append(ids, webIds...)
-	return ids, nil
+
+// RenewalRequest 续费操作请求结构体
+type RenewalRequest struct {
+	HostId    int
+	PlanId    int
+	ExpiredAt int64
 }
 
-// 启用/禁用 网站
+// =================================================================
+// =================== 原始辅助函数 (Helpers) =====================
+// =================================================================
+
+// BanServer 启用/禁用 网站 (并发执行)
 func (t wafTask) BanServer(ctx context.Context, ids []int, isBan bool) error {
+	if len(ids) == 0 { return nil }
 	var wg sync.WaitGroup
 	errChan := make(chan error, len(ids))
-
-	// 修正1:为每个 goroutine 增加 WaitGroup 的计数
 	wg.Add(len(ids))
-
 	for _, id := range ids {
 		go func(id int) {
-			// 修正2:确保每个 goroutine 在退出时都调用 Done()
 			defer wg.Done()
-
-			err := t.cdn.EditWebIsOn(ctx, int64(id), isBan)
-			if err != nil {
+			if err := t.cdn.EditWebIsOn(ctx, int64(id), isBan); err != nil {
 				errChan <- err
-				// 这里不需要 return,因为 defer wg.Done() 会在函数退出时执行
 			}
 		}(id)
 	}
-
-	// 现在 wg.Wait() 会正确地阻塞,直到所有 goroutine 都调用了 Done()
 	wg.Wait()
-
-	// 在所有 goroutine 都结束后,安全地关闭 channel
 	close(errChan)
-
 	var result error
 	for err := range errChan {
-		result = multierror.Append(result, err)  // 将多个 error 对象合并成一个单一的 error 对象
+		result = multierror.Append(result, err)
 	}
-
-	// 修正3:返回收集到的错误,而不是 nil
 	return result
 }
 
-
-
-// 获取指定到期时间
-func (t wafTask) GetAlmostExpiring(ctx context.Context,hostIds []int,addTime int64) ([]v1.GetAlmostExpireHostResponse,error) {
-	// 3 天
-	res, err := t.hostRep.GetAlmostExpired(ctx, hostIds, addTime)
-	if err != nil {
-		return nil,err
+// EditExpired 统一的续费操作入口
+func (t wafTask) EditExpired(ctx context.Context, reqs []RenewalRequest) error {
+	if len(reqs) == 0 { return nil }
+	var globalLimitUpdates []struct { hostId int; expiredAt int64 }
+	var planRenewals []struct { planId int; expiredAt int64 }
+	for _, req := range reqs {
+		globalLimitUpdates = append(globalLimitUpdates, struct{ hostId int; expiredAt int64 }{req.HostId, req.ExpiredAt})
+		planRenewals = append(planRenewals, struct{ planId int; expiredAt int64 }{req.PlanId, req.ExpiredAt})
 	}
-
-	return res, nil
-}
-
-
-// 获取waf全局到期时间
-func (t wafTask) GetGlobalAlmostExpiring(ctx context.Context,addTime int64) ([]model.GlobalLimit,error) {
-	res, err := t.globalLimitRep.GetGlobalLimitAlmostExpired(ctx, addTime)
-	if err != nil {
-		return nil, err
+	var result *multierror.Error
+	if err := t.editGlobalLimitState(ctx, globalLimitUpdates, true); err != nil {
+		result = multierror.Append(result, err)
 	}
-	return res, nil
-}
-
-
-
-// 修改全局续费
-func (t wafTask) EditGlobalExpired(ctx context.Context, req []struct{
-	hostId int
-	expiredAt int64
-}, state bool) error {
-	var result *multierror.Error // 使用 multierror
-
-	for _, v := range req {
-		err := t.globalLimitRep.UpdateGlobalLimitByHostId(ctx, &model.GlobalLimit{
-			HostId:    v.hostId,
-			ExpiredAt: v.expiredAt,
-			State:     state,
-		})
-		if err != nil {
-			// 收集错误,而不是直接返回
-			result = multierror.Append(result, err)
-		}
+	if err := t.renewCdnPlan(ctx, planRenewals); err != nil {
+		result = multierror.Append(result, err)
 	}
-
-	// 返回所有收集到的错误
 	return result.ErrorOrNil()
 }
 
-
-
-// 续费套餐
-func (t wafTask) EnablePlan(ctx context.Context, req []struct{
-	planId int
-	expiredAt int64
-}) error {
+// editGlobalLimitState 内部函数,用于更新数据库中的状态和时间
+func (t wafTask) editGlobalLimitState(ctx context.Context, req []struct { hostId int; expiredAt int64 }, state bool) error {
 	var result *multierror.Error
-
 	for _, v := range req {
-		err := t.cdn.RenewPlan(ctx, v1.RenewalPlan{
-			UserPlanId:  int64(v.planId),
-			IsFree:      true,
-			DayTo:       time.Unix(v.expiredAt, 0).Format("2006-01-02"),
-			Period:      "monthly",
-			CountPeriod: 1,
-			PeriodDayTo: time.Unix(v.expiredAt, 0).Format("2006-01-02"),
-		})
-		if err != nil {
-			result = multierror.Append(result, err)
-		}
+		err := t.globalLimitRep.UpdateGlobalLimitByHostId(ctx, &model.GlobalLimit{HostId: v.hostId, ExpiredAt: v.expiredAt, State: state})
+		if err != nil { result = multierror.Append(result, err) }
 	}
-
 	return result.ErrorOrNil()
 }
 
-
-
-// 续费操作
-type RenewalRequest struct {
-	HostId    int
-	PlanId    int
-	ExpiredAt int64
-}
-
-// 续费操作
-func (t wafTask) EditExpired(ctx context.Context, reqs []RenewalRequest) error {
-	// 如果请求为空,直接返回
-	if len(reqs) == 0 {
-		return nil
-	}
-
-	// 1. 准备用于更新 GlobalLimit 的数据
-	var globalLimitUpdates []struct {
-		hostId    int
-		expiredAt int64
-	}
-	for _, req := range reqs {
-		globalLimitUpdates = append(globalLimitUpdates, struct {
-			hostId    int
-			expiredAt int64
-		}{req.HostId, req.ExpiredAt})
-	}
-
-	// 2. 准备用于续费套餐的数据
-	var planRenewals []struct {
-		planId    int
-		expiredAt int64
-	}
-	for _, req := range reqs {
-		planRenewals = append(planRenewals, struct {
-			planId    int
-			expiredAt int64
-		}{req.PlanId, req.ExpiredAt})
-	}
-
+// renewCdnPlan 内部函数,用于调用CDN服务进行续费
+func (t wafTask) renewCdnPlan(ctx context.Context, req []struct { planId int; expiredAt int64 }) error {
 	var result *multierror.Error
-
-	// 3. 执行更新,并收集错误
-	if err := t.EditGlobalExpired(ctx, globalLimitUpdates, true); err != nil {
-		result = multierror.Append(result, err)
-	}
-
-	if err := t.EnablePlan(ctx, planRenewals); err != nil {
-		result = multierror.Append(result, err)
+	for _, v := range req {
+		err := t.cdn.RenewPlan(ctx, v1.RenewalPlan{
+			UserPlanId: int64(v.planId), IsFree: true, DayTo: time.Unix(v.expiredAt, 0).Format("2006-01-02"),
+			Period: "monthly", CountPeriod: 1, PeriodDayTo: time.Unix(v.expiredAt, 0).Format("2006-01-02"),
+		})
+		if err != nil { result = multierror.Append(result, err) }
 	}
-
 	return result.ErrorOrNil()
 }
 
+// =================================================================
+// =================== 1. 数据查找层 (Finders) =====================
+// =================================================================
 
-
-// findMismatchedExpirations 检查 WAF 和 Host 的到期时间差异,并返回需要同步的请求。
+// findMismatchedExpirations 检查 WAF 和 Host 的到期时间差异。这是决策的核心。
 func (t *wafTask) findMismatchedExpirations(ctx context.Context, wafLimits []model.GlobalLimit) ([]RenewalRequest, error) {
-	if len(wafLimits) == 0 {
-		return nil, nil
-	}
-
-	// 2. 将 WAF 数据组织成 Map
+	if len(wafLimits) == 0 { return nil, nil }
 	wafExpiredMap := make(map[int]int64, len(wafLimits))
 	wafPlanMap := make(map[int]int, len(wafLimits))
 	var hostIds []int
@@ -261,99 +161,136 @@ func (t *wafTask) findMismatchedExpirations(ctx context.Context, wafLimits []mod
 		wafExpiredMap[limit.HostId] = limit.ExpiredAt
 		wafPlanMap[limit.HostId] = limit.RuleId
 	}
-
-	// 3. 获取对应 Host 的到期时间
 	hostExpirations, err := t.hostRep.GetExpireTimeByHostId(ctx, hostIds)
-	if err != nil {
-		return nil, fmt.Errorf("获取主机到期时间失败: %w", err)
-	}
+	if err != nil { return nil, fmt.Errorf("获取主机到期时间失败: %w", err) }
 	hostExpiredMap := make(map[int]int64, len(hostExpirations))
-	for _, h := range hostExpirations {
-		hostExpiredMap[h.HostId] = h.ExpiredAt
-	}
-
-	// 4. 找出时间不一致的记录
+	for _, h := range hostExpirations { hostExpiredMap[h.HostId] = h.ExpiredAt }
 	var renewalRequests []RenewalRequest
 	for hostId, wafExpiredTime := range wafExpiredMap {
 		hostTime, ok := hostExpiredMap[hostId]
-
-		// 如果 Host 时间与 WAF 时间不一致,则需要同步
 		if !ok || hostTime != wafExpiredTime {
 			planId, planOk := wafPlanMap[hostId]
 			if !planOk {
 				t.logger.Warn("数据不一致:在waf_limits中找不到hostId对应的套餐ID", zap.Int("hostId", hostId))
 				continue
 			}
-			renewalRequests = append(renewalRequests, RenewalRequest{
-				HostId:    hostId,
-				ExpiredAt: hostTime, // 以 host 表的时间为准
-				PlanId:    planId,
-			})
+			renewalRequests = append(renewalRequests, RenewalRequest{HostId: hostId, ExpiredAt: hostTime, PlanId: planId})
 		}
 	}
-
 	return renewalRequests, nil
 }
 
+// findAllCurrentlyExpiredPlans 查找所有当前时间点已经到期的WAF记录。
+func (t *wafTask) findAllCurrentlyExpiredPlans(ctx context.Context) ([]model.GlobalLimit, error) {
+	return t.globalLimitRep.GetGlobalLimitAlmostExpired(ctx, 0)
+}
 
-//获取同步到期时间小于1天的套餐
+// findRecentlyExpiredPlans (精确查找) 查找在过去7天内到期的WAF记录。
+func (t *wafTask) findRecentlyExpiredPlans(ctx context.Context) ([]model.GlobalLimit, error) {
+	sevenDaysAgo := time.Now().Add(-7 * 24 * time.Hour).Unix()
+	now := time.Now().Unix()
+	return t.globalLimitRep.GetGlobalLimitsByExpirationRange(ctx, sevenDaysAgo, now)
+}
 
-func (t *wafTask) SynchronizationTime(ctx context.Context) error {
-	// 1. 获取 WAF 全局配置中即将到期(小于3天)的数据
-	wafLimits, err := t.GetGlobalAlmostExpiring(ctx, OneDaysInSeconds)
-	if err != nil {
-		return fmt.Errorf("获取全局到期配置失败: %w", err)
-	}
+// findStaleExpiredPlans (精确查找) 查找7天前或更早就已到期的WAF记录。
+func (t *wafTask) findStaleExpiredPlans(ctx context.Context) ([]model.GlobalLimit, error) {
+	sevenDaysAgoOffset := int64(-1 * SevenDaysInSeconds)
+	return t.globalLimitRep.GetGlobalLimitAlmostExpired(ctx, sevenDaysAgoOffset)
+}
 
-	// 2. 找出需要同步的数据
-	renewalRequests, err := t.findMismatchedExpirations(ctx, wafLimits)
+// =================================================================
+// =================== 2. 业务决策层 (Filters) =====================
+// =================================================================
+
+// filterCleanablePlans (精确决策) 从长期过期的列表中,筛选出确认未续费且需要被清理的记录。
+func (t *wafTask) filterCleanablePlans(ctx context.Context, staleLimits []model.GlobalLimit) ([]model.GlobalLimit, error) {
+	renewedStalePlans, err := t.findMismatchedExpirations(ctx, staleLimits)
 	if err != nil {
-		return err // 错误已在辅助函数中包装
+		return nil, fmt.Errorf("决策[清理]: 检查续费状态失败: %w", err)
 	}
-
-	// 3. 如果有需要同步的数据,执行续费操作
-	if len(renewalRequests) > 0 {
-		t.logger.Info("发现记录需要同步到期时间。", zap.Int("数量", len(renewalRequests)))
-		return t.EditExpired(ctx, renewalRequests)
+	renewedHostIds := make(map[int]struct{}, len(renewedStalePlans))
+	for _, req := range renewedStalePlans {
+		renewedHostIds[req.HostId] = struct{}{}
 	}
-
-	return nil
+	var plansToClean []model.GlobalLimit
+	for _, limit := range staleLimits {
+		if _, found := renewedHostIds[limit.HostId]; !found {
+			plansToClean = append(plansToClean, limit)
+		}
+	}
+	return plansToClean, nil
 }
 
 
+// =================================================================
+// ============== 3. 业务执行层 (Executors & Public API) =============
+// =================================================================
 
-// 获取到期的进行关闭套餐操作
-func (t *wafTask) StopPlan(ctx context.Context) error {
-	// 1. 获取 WAF 全局配置中已经到期的数据
-	// 使用 time.Now().Unix() 表示获取所有 expired_at <= 当前时间的记录
-	wafLimits, err := t.globalLimitRep.GetGlobalLimitAlmostExpired(ctx, 0)
-	if err != nil {
-		return fmt.Errorf("获取全局到期配置失败: %w", err)
+// executePlanRecovery (可重用) 负责恢复套餐的所有步骤。
+func (t *wafTask) executePlanRecovery(ctx context.Context, renewalRequests []RenewalRequest, taskName string,key repository.PlanListType) error {
+	t.logger.Info(fmt.Sprintf("开始执行[%s]套餐恢复流程", taskName), zap.Int("数量", len(renewalRequests)))
+	var hostIds []int
+	for _, req := range renewalRequests {
+		hostIds = append(hostIds, req.HostId)
 	}
-	if len(wafLimits) == 0 {
-		return nil // 没有到期的,任务完成
+
+	if err := t.BanServer(ctx, hostIds, true); err != nil {
+		return fmt.Errorf("执行[%s]-启用服务失败: %w", taskName, err)
 	}
 
-	// 2. (可选,但推荐)先同步任何时间不一致的数据,确保状态准确
-	renewalRequests, err := t.findMismatchedExpirations(ctx, wafLimits)
-	if err != nil {
-		t.logger.Error("在关闭套餐前,同步时间失败", zap.Error(err))
-		// 根据业务决定是否要继续,这里我们选择继续,但记录错误
+	var allErrors *multierror.Error
+	if err := t.EditExpired(ctx, renewalRequests); err != nil {
+		allErrors = multierror.Append(allErrors, fmt.Errorf("执行[%s]-同步续费信息失败: %w", taskName, err))
+	}
+	planIdsToRecover := make([]int64, len(hostIds))
+	for i, id := range hostIds { planIdsToRecover[i] = int64(id) }
+	if err := t.expiredRep.RemovePlans(ctx, key, planIdsToRecover...); err != nil {
+		allErrors = multierror.Append(allErrors, fmt.Errorf("执行[%s]-移除Redis关闭标记失败: %w", taskName, err))
 	}
+	return allErrors.ErrorOrNil()
+}
+
+// 1. SynchronizationTime 同步即将到期(1天内)的套餐时间
+func (t *wafTask) SynchronizationTime(ctx context.Context) error {
+	wafLimits, err := t.globalLimitRep.GetGlobalLimitAlmostExpired(ctx, OneDaysInSeconds)
+	if err != nil { return fmt.Errorf("执行[同步]-查找失败: %w", err) }
+
+	renewalRequests, err := t.findMismatchedExpirations(ctx, wafLimits)
+	if err != nil { return fmt.Errorf("执行[同步]-决策失败: %w", err) }
+
 	if len(renewalRequests) > 0 {
-		t.logger.Info("关闭套餐前,发现并同步不一致的时间记录", zap.Int("数量", len(renewalRequests)))
-		if err := t.EditExpired(ctx, renewalRequests); err != nil {
-			t.logger.Error("同步不一致的时间记录失败", zap.Error(err))
-		}
+		t.logger.Info("发现记录需要同步到期时间。", zap.Int("数量", len(renewalRequests)))
+		return t.EditExpired(ctx, renewalRequests)
 	}
+	return nil
+}
 
-	// 3. 筛选出尚未被关闭的套餐
+// 2. StopPlan (已优化) 停止所有已到期的套餐
+func (t *wafTask) StopPlan(ctx context.Context) error {
+	// 1. 查找所有理论上已到期的记录
+	expiredLimits, err := t.findAllCurrentlyExpiredPlans(ctx)
+	if err != nil { return fmt.Errorf("执行[停止]-查找失败: %w", err) }
+	if len(expiredLimits) == 0 { return nil }
+
+	// 2. 决策 - 第1步:检查这些记录中是否已有续费但未同步的
+	renewalRequests, err := t.findMismatchedExpirations(ctx, expiredLimits)
+	if err != nil { return fmt.Errorf("执行[停止]-决策检查续费失败: %w", err) }
+	renewedHostIds := make(map[int]struct{}, len(renewalRequests))
+	for _, req := range renewalRequests {
+		renewedHostIds[req.HostId] = struct{}{}
+	}
+
+	// 2. 决策 - 第2步:筛选出真正需要停止的记录
 	var plansToClose []model.GlobalLimit
-	for _, limit := range wafLimits {
-		isClosed, err := t.expiredRep.IsPlanClosed(ctx, int64(limit.HostId))
+	for _, limit := range expiredLimits {
+		if _, found := renewedHostIds[limit.HostId]; found {
+			t.logger.Info("发现已到期但刚续费的套餐,跳过停止操作", zap.Int("hostId", limit.HostId))
+			continue
+		}
+		isClosed, err := t.expiredRep.IsPlanInList(ctx, repository.ClosedPlansList, int64(limit.HostId))
 		if err != nil {
-			t.logger.Error("检查Redis中套餐关闭状态失败", zap.Int("hostId", limit.HostId), zap.Error(err))
-			continue // 跳过这个,处理下一个
+			t.logger.Error("决策[停止]:检查套餐是否已关闭失败", zap.Int("hostId", limit.HostId), zap.Error(err))
+			continue
 		}
 		if !isClosed {
 			plansToClose = append(plansToClose, limit)
@@ -361,97 +298,95 @@ func (t *wafTask) StopPlan(ctx context.Context) error {
 	}
 
 	if len(plansToClose) == 0 {
-		t.logger.Info("没有新的到期套餐需要关闭")
+		t.logger.Info("没有需要停止的套餐(可能均已续费或已关闭)")
 		return nil
 	}
 
-	// 4. 对筛选出的套餐执行关闭操作
-	t.logger.Info("开始关闭新的到期WAF服务", zap.Int("数量", len(plansToClose)))
-	var allErrors *multierror.Error
-
-	var webIds []int
+	// 3. 执行停止操作
+	t.logger.Info("开始关闭到期的WAF服务", zap.Int("数量", len(plansToClose)))
+	var hostIds []int
 	for _, limit := range plansToClose {
-		webIds = append(webIds, limit.HostId)
+		hostIds = append(hostIds, limit.HostId)
+	}
+	if err := t.BanServer(ctx, hostIds, false); err != nil {
+		return fmt.Errorf("执行[停止]-禁用服务失败: %w", err)
 	}
+	closedPlanIds := make([]int64, len(hostIds))
+	for i, id := range hostIds { closedPlanIds[i] = int64(id) }
+	if err := t.expiredRep.AddPlans(ctx, repository.ClosedPlansList, closedPlanIds...); err != nil {
+		return fmt.Errorf("执行[停止]-标记为已关闭失败: %w", err)
+	}
+	return nil
+}
 
-	if err := t.BanServer(ctx, webIds, false); err != nil {
-		allErrors = multierror.Append(allErrors, fmt.Errorf("关闭hostId %v 的服务失败: %w", webIds, err))
-	} else {
-		// 服务关闭成功后,将这些套餐信息添加到 Redis
-		var expiredInfos []repository.ExpiredInfo
-		for _, limit := range plansToClose {
-			expiredInfos = append(expiredInfos, repository.ExpiredInfo{
-				HostID: int64(limit.HostId),
-				Expiry: time.Unix(limit.ExpiredAt, 0),
-			})
-		}
+// 3. RecoverRecentPlan 恢复7天内续费的套餐
+func (t *wafTask) RecoverRecentPlan(ctx context.Context) error {
+	recentlyExpiredLimits, err := t.findRecentlyExpiredPlans(ctx)
+	if err != nil { return fmt.Errorf("执行[近期恢复]-查找失败: %w", err) }
+	if len(recentlyExpiredLimits) == 0 { return nil }
 
-		if len(expiredInfos) > 0 {
-			if err := t.expiredRep.AddClosePlans(ctx, expiredInfos...); err != nil {
-				allErrors = multierror.Append(allErrors, fmt.Errorf("添加已关闭套餐信息到Redis失败: %w", err))
-			}
-		}
+	renewalRequests, err := t.findMismatchedExpirations(ctx, recentlyExpiredLimits)
+	if err != nil { return fmt.Errorf("执行[近期恢复]-决策失败: %w", err) }
+	if len(renewalRequests) == 0 {
+		t.logger.Info("在近期过期的套餐中,没有发现已续费的")
+		return nil
 	}
 
-	return allErrors.ErrorOrNil()
+	return t.executePlanRecovery(ctx, renewalRequests, "近期恢复",repository.ClosedPlansList)
 }
-//对于到期7天内续费的产品需要进行恢复操作
 
-func (t *wafTask) RecoverStopPlan(ctx context.Context) error {
-	// 1. 获取所有已过期(expired_at < now)但状态仍为 true 的 WAF 记录
-	// StopPlan 任务会禁用这些服务,但不会改变它们的 state
-	wafLimits, err := t.globalLimitRep.GetGlobalLimitAlmostExpired(ctx, SevenDaysInSeconds) // addTime=0 表示获取所有当前时间之前到期的
-	if err != nil {
-		return fmt.Errorf("获取过期WAF配置失败: %w", err)
-	}
-	if len(wafLimits) == 0 {
-		t.logger.Info("没有已过期且需要检查恢复的服务")
+// 4. CleanUpStaleRecords 清理过期超过7天且仍未续费的记录
+func (t *wafTask) CleanUpStaleRecords(ctx context.Context) error {
+	staleLimits, err := t.findStaleExpiredPlans(ctx)
+	if err != nil { return fmt.Errorf("执行[清理]-查找失败: %w", err) }
+	if len(staleLimits) == 0 { return nil }
+
+	plansToClean, err := t.filterCleanablePlans(ctx, staleLimits)
+	if err != nil { return fmt.Errorf("执行[清理]-决策失败: %w", err) }
+	if len(plansToClean) == 0 {
+		t.logger.Info("没有长期未续费的记录需要清理")
 		return nil
 	}
 
-	// 2. 检查这些记录对应的 host 是否已续费
-	// findMismatchedExpirations 会比较 waf.expired_at 和 host.nextduedate
-	renewalRequests, err := t.findMismatchedExpirations(ctx, wafLimits)
-	if err != nil {
-		return fmt.Errorf("检查续费状态失败: %w", err)
+	t.logger.Info("开始清理长期未续费的WAF记录", zap.Int("数量", len(plansToClean)))
+	var planIdsToClean []int64
+	for _, limit := range plansToClean {
+		planIdsToClean = append(planIdsToClean, int64(limit.HostId))
 	}
 
-	if len(renewalRequests) == 0 {
-		t.logger.Info("在已过期的服务中,没有发现已续费且需要恢复的服务")
-		return nil
+	if err := t.expiredRep.RemovePlans(ctx, repository.ClosedPlansList, planIdsToClean...); err != nil {
+		return fmt.Errorf("执行[清理]-从Redis移除关闭标记失败: %w", err)
 	}
+	// 在这里可以添加从数据库删除或调用CDN API彻底删除的逻辑
+	for _, limit := range plansToClean {
+		err = t.globalLimitRep.UpdateGlobalLimitByHostId(ctx, &model.GlobalLimit{
+			HostId: limit.HostId,
+			GatewayGroupId: limit.GatewayGroupId,
+			State: true,
+		})
+		if err != nil {
+			return fmt.Errorf("执行[清理]-更新套餐状态失败: %w", err)
+		}
 
-	// 3. 对已续费的服务执行恢复操作
-	t.logger.Info("发现已续费、需要恢复的WAF服务", zap.Int("数量", len(renewalRequests)))
-	var allErrors *multierror.Error
 
-	var webIds []int
-	for _, req := range renewalRequests {
-		webIds = append(webIds, req.HostId)
-	}
 
-	if err := t.BanServer(ctx, webIds, true); err != nil {
-		allErrors = multierror.Append(allErrors, fmt.Errorf("恢复hostId %v: 启用服务失败: %w", webIds, err))
-	} else {
-		// 服务恢复成功后,从 Redis 中移除这些套餐的关闭记录
-		planIds := make([]int64, len(webIds))
-		for i, id := range webIds {
-			planIds[i] = int64(id)
-		}
-		if err := t.expiredRep.RemoveClosePlanIds(ctx, planIds...); err != nil {
-			allErrors = multierror.Append(allErrors, fmt.Errorf("从Redis移除已恢复的套餐失败: %w", err))
-		}
 	}
 
-	if len(renewalRequests) > 0 {
-		// 统一执行续费和数据库更新操作
-		if err := t.EditExpired(ctx, renewalRequests); err != nil {
-			allErrors = multierror.Append(allErrors, fmt.Errorf("批量更新已恢复服务的数据库状态失败: %w", err))
-		}
-	}
+	return nil
+}
 
+// 5. RecoverStalePlan 恢复超过7天后才续费的套餐
+func (t *wafTask) RecoverStalePlan(ctx context.Context) error {
+	staleLimits, err := t.findStaleExpiredPlans(ctx)
+	if err != nil { return fmt.Errorf("执行[长期恢复]-查找失败: %w", err) }
+	if len(staleLimits) == 0 { return nil }
 
-	return allErrors.ErrorOrNil()
-}
+	renewalRequests, err := t.findMismatchedExpirations(ctx, staleLimits)
+	if err != nil { return fmt.Errorf("执行[长期恢复]-决策失败: %w", err) }
+	if len(renewalRequests) == 0 {
+		t.logger.Info("在长期过期的套餐中,没有发现已续费的")
+		return nil
+	}
 
-//对于大于7天的药进行数据情侣操作
+	return t.executePlanRecovery(ctx, renewalRequests, "长期恢复",repository.ExpiringSoonPlansList)
+}