Browse Source

refactor(service): 并发执行 API 调用以提高效率

- 使用 sourcegraph/conc库实现并发执行多个 API调用
- 重构 GatewayGroupService 和 GlobalLimitService 接口的实现
- 优化 AddGatewayGroup 和 AddGlobalLimit 方法的执行效率
- 添加必要的错误处理和同步机制
fusu 2 months ago
parent
commit
21b53d9839
4 changed files with 101 additions and 40 deletions
  1. 1 0
      go.mod
  2. 2 0
      go.sum
  3. 10 13
      internal/service/gatewaygroup.go
  4. 88 27
      internal/service/globallimit.go

+ 1 - 0
go.mod

@@ -20,6 +20,7 @@ require (
 	github.com/mcuadros/go-defaults v1.2.0
 	github.com/redis/go-redis/v9 v9.0.5
 	github.com/sony/sonyflake v1.1.0
+	github.com/sourcegraph/conc v0.3.0
 	github.com/spf13/cast v1.5.1
 	github.com/spf13/viper v1.8.1
 	github.com/stretchr/testify v1.8.4

+ 2 - 0
go.sum

@@ -400,6 +400,8 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1
 github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
 github.com/sony/sonyflake v1.1.0 h1:wnrEcL3aOkWmPlhScLEGAXKkLAIslnBteNUq4Bw6MM4=
 github.com/sony/sonyflake v1.1.0/go.mod h1:LORtCywH/cq10ZbyfhKrHYgAUGH7mOBa76enV9txy/Y=
+github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
+github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
 github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I=
 github.com/spf13/afero v1.9.5 h1:stMpOSZFs//0Lv29HduCmli3GUfpFoF3Y1Q/aXj/wVM=
 github.com/spf13/afero v1.9.5/go.mod h1:UBogFpq8E9Hx+xc5CNTTEpTnuHVmXDwZcZcE1eb/UhQ=

+ 10 - 13
internal/service/gatewaygroup.go

@@ -1,7 +1,7 @@
 package service
 
 import (
-	"context"
+    "context"
 	"fmt"
 	v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
 	"github.com/go-nunu/nunu-layout-advanced/internal/model"
@@ -13,26 +13,24 @@ type GatewayGroupService interface {
 	GetGatewayGroup(ctx context.Context, id int64) (*model.GatewayGroup, error)
 	AddGatewayGroup(ctx context.Context, req v1.AddGateWayGroupRequest) (int, error)
 }
-
-func NewGatewayGroupService
-	service *Service,
-	gatewayGroupRepository repository.GatewayGroupRepository,
+func NewGatewayGroupService(
+    service *Service,
     gatewayGroupRepository repository.GatewayGroupRepository,
 	required RequiredService,
 	parser ParserService,
 ) GatewayGroupService {
-		Service:                service,
+	return &gatewayGroupService{
 		Service:        service,
-		required:               required,
-		parser:                 parser,
+		gatewayGroupRepository: gatewayGroupRepository,
+		required: required,
 		parser: parser,
 	}
 }
 
 type gatewayGroupService struct {
 	*Service
-	required               RequiredService
-	parser                 ParserService
+	gatewayGroupRepository repository.GatewayGroupRepository
+	required RequiredService
 	parser ParserService
 }
 
@@ -41,7 +39,7 @@ func (s *gatewayGroupService) GetGatewayGroup(ctx context.Context, id int64) (*m
 }
 
 func (s *gatewayGroupService) AddGatewayGroup(ctx context.Context, req v1.AddGateWayGroupRequest) (int, error) {
-		"name":    req.Name,
+	formData := map[string]interface{}{
 		"name": req.Name,
 		"comment": req.Comment,
 	}
@@ -61,9 +59,8 @@ func (s *gatewayGroupService) AddGatewayGroup(ctx context.Context, req v1.AddGat
 		return 0, fmt.Errorf(res)
 	}
 	gateWayGroupId, err := cast.ToIntE(gateWayGroupIdBase)
-		return 0, err
+	if err != nil {
 		return 0,  err
 	}
 	return gateWayGroupId, nil
-r
 }

+ 88 - 27
internal/service/globallimit.go

@@ -9,6 +9,9 @@ import (
 	"github.com/spf13/cast"
 	"github.com/spf13/viper"
 	"strconv"
+	"sync"
+
+	"github.com/sourcegraph/conc"
 )
 
 type GlobalLimitService interface {
@@ -119,37 +122,95 @@ func (s *globalLimitService) AddGlobalLimit(ctx context.Context, req v1.GlobalLi
 	if err != nil {
 		return err
 	}
-	tcpLimitRuleId, err := s.tcpLimit.AddTcpLimit(ctx, &v1.GeneralLimitRequireRequest{
-		Tag:    require.GlobalLimitName,
-		HostId: req.HostId,
-		RuleId: ruleId,
-		Uid:    req.Uid,
-	})
-	if err != nil {
-		return err
+	// 使用conc库并发执行API调用
+	var tcpLimitRuleId, udpLimitRuleId, webLimitRuleId, gateWayGroupId int
+	var mu sync.Mutex // 用于保护共享变量
+
+	// 为每个并发调用创建独立的请求参数(深拷贝)
+	// 避免共享同一个指针可能导致的数据竞争
+
+	// 创建网关组请求参数
+	gateWayReq := v1.AddGateWayGroupRequest{
+		Name:    require.GlobalLimitName,
+		Comment: req.Comment,
 	}
-	udpLimitRuleId, err := s.udpLimit.AddUdpLimit(ctx, &v1.GeneralLimitRequireRequest{
-		Tag:    require.GlobalLimitName,
-		HostId: req.HostId,
-		RuleId: ruleId,
-		Uid:    req.Uid,
+
+	// 创建一个WaitGroup来协调多个并发任务
+	wg := conc.NewWaitGroup()
+
+	// 启动tcpLimit调用 - 使用独立的请求参数副本
+	wg.Go(func() {
+		// 为该goroutine创建独立的请求参数副本
+		tcpLimitReq := &v1.GeneralLimitRequireRequest{
+			Tag:    require.GlobalLimitName,
+			HostId: req.HostId,
+			RuleId: ruleId,
+			Uid:    req.Uid,
+		}
+		result, e := s.tcpLimit.AddTcpLimit(ctx, tcpLimitReq)
+		mu.Lock()
+		if e != nil {
+			err = e
+		} else {
+			tcpLimitRuleId = result
+		}
+		mu.Unlock()
 	})
-	if err != nil {
-		return err
-	}
-	webLimitRuleId, err := s.webLimit.AddWebLimit(ctx, &v1.GeneralLimitRequireRequest{
-		Tag:    require.GlobalLimitName,
-		HostId: req.HostId,
-		RuleId: ruleId,
-		Uid:    req.Uid,
+
+	// 启动udpLimit调用 - 使用独立的请求参数副本
+	wg.Go(func() {
+		// 为该goroutine创建独立的请求参数副本
+		udpLimitReq := &v1.GeneralLimitRequireRequest{
+			Tag:    require.GlobalLimitName,
+			HostId: req.HostId,
+			RuleId: ruleId,
+			Uid:    req.Uid,
+		}
+		result, e := s.udpLimit.AddUdpLimit(ctx, udpLimitReq)
+		mu.Lock()
+		if e != nil {
+			err = e
+		} else {
+			udpLimitRuleId = result
+		}
+		mu.Unlock()
 	})
-	if err != nil {
-		return err
-	}
-	gateWayGroupId, err := s.gateWayGroup.AddGatewayGroup(ctx, v1.AddGateWayGroupRequest{
-		Name:    require.GlobalLimitName,
-		Comment: req.Comment,
+
+	// 启动webLimit调用 - 使用独立的请求参数副本
+	wg.Go(func() {
+		// 为该goroutine创建独立的请求参数副本
+		webLimitReq := &v1.GeneralLimitRequireRequest{
+			Tag:    require.GlobalLimitName,
+			HostId: req.HostId,
+			RuleId: ruleId,
+			Uid:    req.Uid,
+		}
+		result, e := s.webLimit.AddWebLimit(ctx, webLimitReq)
+		mu.Lock()
+		if e != nil {
+			err = e
+		} else {
+			webLimitRuleId = result
+		}
+		mu.Unlock()
 	})
+
+	// 启动gatewayGroup调用
+	wg.Go(func() {
+		result, e := s.gateWayGroup.AddGatewayGroup(ctx, gateWayReq)
+		mu.Lock()
+		if e != nil {
+			err = e
+		} else {
+			gateWayGroupId = result
+		}
+		mu.Unlock()
+	})
+
+	// 等待所有调用完成
+	wg.Wait()
+
+	// 检查是否有错误发生
 	if err != nil {
 		return err
 	}