Эх сурвалжийг харах

feat: Add the demo of task and job

chris 7 сар өмнө
parent
commit
1eee7fee67

+ 3 - 3
cmd/migration/wire/wire.go

@@ -19,15 +19,15 @@ var repositorySet = wire.NewSet(
 	repository.NewUserRepository,
 )
 var serverSet = wire.NewSet(
-	server.NewMigrate,
+	server.NewMigrateServer,
 )
 
 // build App
 func newApp(
-	migrate *server.Migrate,
+	migrateServer *server.MigrateServer,
 ) *app.App {
 	return app.NewApp(
-		app.WithServer(migrate),
+		app.WithServer(migrateServer),
 		app.WithName("demo-migrate"),
 	)
 }

+ 5 - 5
cmd/migration/wire/wire_gen.go

@@ -19,8 +19,8 @@ import (
 
 func NewWire(viperViper *viper.Viper, logger *log.Logger) (*app.App, func(), error) {
 	db := repository.NewDB(viperViper, logger)
-	migrate := server.NewMigrate(db, logger)
-	appApp := newApp(migrate)
+	migrateServer := server.NewMigrateServer(db, logger)
+	appApp := newApp(migrateServer)
 	return appApp, func() {
 	}, nil
 }
@@ -29,11 +29,11 @@ func NewWire(viperViper *viper.Viper, logger *log.Logger) (*app.App, func(), err
 
 var repositorySet = wire.NewSet(repository.NewDB, repository.NewRepository, repository.NewUserRepository)
 
-var serverSet = wire.NewSet(server.NewMigrate)
+var serverSet = wire.NewSet(server.NewMigrateServer)
 
 // build App
 func newApp(
-	migrate *server.Migrate,
+	migrateServer *server.MigrateServer,
 ) *app.App {
-	return app.NewApp(app.WithServer(migrate), app.WithName("demo-migrate"))
+	return app.NewApp(app.WithServer(migrateServer), app.WithName("demo-migrate"))
 }

+ 9 - 3
cmd/server/wire/wire.go

@@ -5,6 +5,7 @@ package wire
 
 import (
 	"github.com/go-nunu/nunu-layout-advanced/internal/handler"
+	"github.com/go-nunu/nunu-layout-advanced/internal/job"
 	"github.com/go-nunu/nunu-layout-advanced/internal/repository"
 	"github.com/go-nunu/nunu-layout-advanced/internal/server"
 	"github.com/go-nunu/nunu-layout-advanced/internal/service"
@@ -35,19 +36,23 @@ var handlerSet = wire.NewSet(
 	handler.NewUserHandler,
 )
 
+var jobSet = wire.NewSet(
+	job.NewJob,
+	job.NewUserJob,
+)
 var serverSet = wire.NewSet(
 	server.NewHTTPServer,
-	server.NewJob,
+	server.NewJobServer,
 )
 
 // build App
 func newApp(
 	httpServer *http.Server,
-	job *server.Job,
+	jobServer *server.JobServer,
 	// task *server.Task,
 ) *app.App {
 	return app.NewApp(
-		app.WithServer(httpServer, job),
+		app.WithServer(httpServer, jobServer),
 		app.WithName("demo-server"),
 	)
 }
@@ -57,6 +62,7 @@ func NewWire(*viper.Viper, *log.Logger) (*app.App, func(), error) {
 		repositorySet,
 		serviceSet,
 		handlerSet,
+		jobSet,
 		serverSet,
 		sid.NewSid,
 		jwt.NewJwt,

+ 10 - 5
cmd/server/wire/wire_gen.go

@@ -8,6 +8,7 @@ package wire
 
 import (
 	"github.com/go-nunu/nunu-layout-advanced/internal/handler"
+	"github.com/go-nunu/nunu-layout-advanced/internal/job"
 	"github.com/go-nunu/nunu-layout-advanced/internal/repository"
 	"github.com/go-nunu/nunu-layout-advanced/internal/server"
 	"github.com/go-nunu/nunu-layout-advanced/internal/service"
@@ -34,8 +35,10 @@ func NewWire(viperViper *viper.Viper, logger *log.Logger) (*app.App, func(), err
 	userService := service.NewUserService(serviceService, userRepository)
 	userHandler := handler.NewUserHandler(handlerHandler, userService)
 	httpServer := server.NewHTTPServer(logger, viperViper, jwtJWT, userHandler)
-	job := server.NewJob(logger)
-	appApp := newApp(httpServer, job)
+	jobJob := job.NewJob(transaction, logger, sidSid)
+	userJob := job.NewUserJob(jobJob, userRepository)
+	jobServer := server.NewJobServer(logger, userJob)
+	appApp := newApp(httpServer, jobServer)
 	return appApp, func() {
 	}, nil
 }
@@ -48,13 +51,15 @@ var serviceSet = wire.NewSet(service.NewService, service.NewUserService)
 
 var handlerSet = wire.NewSet(handler.NewHandler, handler.NewUserHandler)
 
-var serverSet = wire.NewSet(server.NewHTTPServer, server.NewJob)
+var jobSet = wire.NewSet(job.NewJob, job.NewUserJob)
+
+var serverSet = wire.NewSet(server.NewHTTPServer, server.NewJobServer)
 
 // build App
 func newApp(
 	httpServer *http.Server,
-	job *server.Job,
+	jobServer *server.JobServer,
 
 ) *app.App {
-	return app.NewApp(app.WithServer(httpServer, job), app.WithName("demo-server"))
+	return app.NewApp(app.WithServer(httpServer, jobServer), app.WithName("demo-server"))
 }

+ 20 - 2
cmd/task/wire/wire.go

@@ -4,20 +4,35 @@
 package wire
 
 import (
+	"github.com/go-nunu/nunu-layout-advanced/internal/repository"
 	"github.com/go-nunu/nunu-layout-advanced/internal/server"
+	"github.com/go-nunu/nunu-layout-advanced/internal/task"
 	"github.com/go-nunu/nunu-layout-advanced/pkg/app"
 	"github.com/go-nunu/nunu-layout-advanced/pkg/log"
+	"github.com/go-nunu/nunu-layout-advanced/pkg/sid"
 	"github.com/google/wire"
 	"github.com/spf13/viper"
 )
 
+var repositorySet = wire.NewSet(
+	repository.NewDB,
+	//repository.NewRedis,
+	repository.NewRepository,
+	repository.NewTransaction,
+	repository.NewUserRepository,
+)
+
+var taskSet = wire.NewSet(
+	task.NewTask,
+	task.NewUserTask,
+)
 var serverSet = wire.NewSet(
-	server.NewTask,
+	server.NewTaskServer,
 )
 
 // build App
 func newApp(
-	task *server.Task,
+	task *server.TaskServer,
 ) *app.App {
 	return app.NewApp(
 		app.WithServer(task),
@@ -27,7 +42,10 @@ func newApp(
 
 func NewWire(*viper.Viper, *log.Logger) (*app.App, func(), error) {
 	panic(wire.Build(
+		repositorySet,
+		taskSet,
 		serverSet,
 		newApp,
+		sid.NewSid,
 	))
 }

+ 19 - 6
cmd/task/wire/wire_gen.go

@@ -7,9 +7,12 @@
 package wire
 
 import (
+	"github.com/go-nunu/nunu-layout-advanced/internal/repository"
 	"github.com/go-nunu/nunu-layout-advanced/internal/server"
+	"github.com/go-nunu/nunu-layout-advanced/internal/task"
 	"github.com/go-nunu/nunu-layout-advanced/pkg/app"
 	"github.com/go-nunu/nunu-layout-advanced/pkg/log"
+	"github.com/go-nunu/nunu-layout-advanced/pkg/sid"
 	"github.com/google/wire"
 	"github.com/spf13/viper"
 )
@@ -17,19 +20,29 @@ import (
 // Injectors from wire.go:
 
 func NewWire(viperViper *viper.Viper, logger *log.Logger) (*app.App, func(), error) {
-	task := server.NewTask(logger)
-	appApp := newApp(task)
+	db := repository.NewDB(viperViper, logger)
+	repositoryRepository := repository.NewRepository(logger, db)
+	transaction := repository.NewTransaction(repositoryRepository)
+	sidSid := sid.NewSid()
+	taskTask := task.NewTask(transaction, logger, sidSid)
+	userRepository := repository.NewUserRepository(repositoryRepository)
+	userTask := task.NewUserTask(taskTask, userRepository)
+	taskServer := server.NewTaskServer(logger, userTask)
+	appApp := newApp(taskServer)
 	return appApp, func() {
 	}, nil
 }
 
 // wire.go:
 
-var serverSet = wire.NewSet(server.NewTask)
+var repositorySet = wire.NewSet(repository.NewDB, repository.NewRepository, repository.NewTransaction, repository.NewUserRepository)
+
+var taskSet = wire.NewSet(task.NewTask, task.NewUserTask)
+
+var serverSet = wire.NewSet(server.NewTaskServer)
 
 // build App
-func newApp(
-	task *server.Task,
+func newApp(task2 *server.TaskServer,
 ) *app.App {
-	return app.NewApp(app.WithServer(task), app.WithName("demo-task"))
+	return app.NewApp(app.WithServer(task2), app.WithName("demo-task"))
 }

+ 27 - 0
internal/job/job.go

@@ -0,0 +1,27 @@
+package job
+
+import (
+	"github.com/go-nunu/nunu-layout-advanced/internal/repository"
+	"github.com/go-nunu/nunu-layout-advanced/pkg/jwt"
+	"github.com/go-nunu/nunu-layout-advanced/pkg/log"
+	"github.com/go-nunu/nunu-layout-advanced/pkg/sid"
+)
+
+type Job struct {
+	logger *log.Logger
+	sid    *sid.Sid
+	jwt    *jwt.JWT
+	tm     repository.Transaction
+}
+
+func NewJob(
+	tm repository.Transaction,
+	logger *log.Logger,
+	sid *sid.Sid,
+) *Job {
+	return &Job{
+		logger: logger,
+		sid:    sid,
+		tm:     tm,
+	}
+}

+ 34 - 0
internal/job/user.go

@@ -0,0 +1,34 @@
+package job
+
+import (
+	"context"
+	"github.com/go-nunu/nunu-layout-advanced/internal/repository"
+	"time"
+)
+
+type UserJob interface {
+	KafkaConsumer(ctx context.Context) error
+}
+
+func NewUserJob(
+	job *Job,
+	userRepo repository.UserRepository,
+) UserJob {
+	return &userJob{
+		userRepo: userRepo,
+		Job:      job,
+	}
+}
+
+type userJob struct {
+	userRepo repository.UserRepository
+	*Job
+}
+
+func (t userJob) KafkaConsumer(ctx context.Context) error {
+	// do something
+	for {
+		t.logger.Info("KafkaConsumer")
+		time.Sleep(time.Second * 5)
+	}
+}

+ 17 - 9
internal/server/job.go

@@ -2,24 +2,32 @@ package server
 
 import (
 	"context"
+	"github.com/go-nunu/nunu-layout-advanced/internal/job"
 	"github.com/go-nunu/nunu-layout-advanced/pkg/log"
 )
 
-type Job struct {
-	log *log.Logger
+type JobServer struct {
+	log     *log.Logger
+	userJob job.UserJob
 }
 
-func NewJob(
+func NewJobServer(
 	log *log.Logger,
-) *Job {
-	return &Job{
-		log: log,
+	userJob job.UserJob,
+) *JobServer {
+	return &JobServer{
+		log:     log,
+		userJob: userJob,
 	}
 }
-func (j *Job) Start(ctx context.Context) error {
+
+func (j *JobServer) Start(ctx context.Context) error {
+	// Tips: If you want job to start as a separate process, just refer to the task implementation and adjust the code accordingly.
+
 	// eg: kafka consumer
-	return nil
+	err := j.userJob.KafkaConsumer(ctx)
+	return err
 }
-func (j *Job) Stop(ctx context.Context) error {
+func (j *JobServer) Stop(ctx context.Context) error {
 	return nil
 }

+ 8 - 6
internal/server/migration.go

@@ -9,19 +9,21 @@ import (
 	"os"
 )
 
-type Migrate struct {
+type MigrateServer struct {
 	db  *gorm.DB
 	log *log.Logger
 }
 
-func NewMigrate(db *gorm.DB, log *log.Logger) *Migrate {
-	return &Migrate{
+func NewMigrateServer(db *gorm.DB, log *log.Logger) *MigrateServer {
+	return &MigrateServer{
 		db:  db,
 		log: log,
 	}
 }
-func (m *Migrate) Start(ctx context.Context) error {
-	if err := m.db.AutoMigrate(&model.User{}); err != nil {
+func (m *MigrateServer) Start(ctx context.Context) error {
+	if err := m.db.AutoMigrate(
+		&model.User{},
+	); err != nil {
 		m.log.Error("user migrate error", zap.Error(err))
 		return err
 	}
@@ -29,7 +31,7 @@ func (m *Migrate) Start(ctx context.Context) error {
 	os.Exit(0)
 	return nil
 }
-func (m *Migrate) Stop(ctx context.Context) error {
+func (m *MigrateServer) Stop(ctx context.Context) error {
 	m.log.Info("AutoMigrate stop")
 	return nil
 }

+ 20 - 17
internal/server/task.go

@@ -3,24 +3,30 @@ package server
 import (
 	"context"
 	"github.com/go-co-op/gocron"
+	"github.com/go-nunu/nunu-layout-advanced/internal/task"
 	"github.com/go-nunu/nunu-layout-advanced/pkg/log"
 	"go.uber.org/zap"
 	"time"
 )
 
-type Task struct {
+type TaskServer struct {
 	log       *log.Logger
 	scheduler *gocron.Scheduler
+	userTask  task.UserTask
 }
 
-func NewTask(log *log.Logger) *Task {
-	return &Task{
-		log: log,
+func NewTaskServer(
+	log *log.Logger,
+	userTask task.UserTask,
+) *TaskServer {
+	return &TaskServer{
+		log:      log,
+		userTask: userTask,
 	}
 }
-func (t *Task) Start(ctx context.Context) error {
+func (t *TaskServer) Start(ctx context.Context) error {
 	gocron.SetPanicHandler(func(jobName string, recoverData interface{}) {
-		t.log.Error("Task Panic", zap.String("job", jobName), zap.Any("recover", recoverData))
+		t.log.Error("TaskServer Panic", zap.String("job", jobName), zap.Any("recover", recoverData))
 	})
 
 	// eg: crontab task
@@ -28,25 +34,22 @@ func (t *Task) Start(ctx context.Context) error {
 	// if you are in China, you will need to change the time zone as follows
 	// t.scheduler = gocron.NewScheduler(time.FixedZone("PRC", 8*60*60))
 
+	//_, err := t.scheduler.Every("3s").Do(func()
 	_, err := t.scheduler.CronWithSeconds("0/3 * * * * *").Do(func() {
-		t.log.Info("I'm a Task1.")
+		err := t.userTask.CheckUser(ctx)
+		if err != nil {
+			t.log.Error("CheckUser error", zap.Error(err))
+		}
 	})
 	if err != nil {
-		t.log.Error("Task1 error", zap.Error(err))
-	}
-
-	_, err = t.scheduler.Every("3s").Do(func() {
-		t.log.Info("I'm a Task2.")
-	})
-	if err != nil {
-		t.log.Error("Task2 error", zap.Error(err))
+		t.log.Error("CheckUser error", zap.Error(err))
 	}
 
 	t.scheduler.StartBlocking()
 	return nil
 }
-func (t *Task) Stop(ctx context.Context) error {
+func (t *TaskServer) Stop(ctx context.Context) error {
 	t.scheduler.Stop()
-	t.log.Info("Task stop...")
+	t.log.Info("TaskServer stop...")
 	return nil
 }

+ 27 - 0
internal/task/task.go

@@ -0,0 +1,27 @@
+package task
+
+import (
+	"github.com/go-nunu/nunu-layout-advanced/internal/repository"
+	"github.com/go-nunu/nunu-layout-advanced/pkg/jwt"
+	"github.com/go-nunu/nunu-layout-advanced/pkg/log"
+	"github.com/go-nunu/nunu-layout-advanced/pkg/sid"
+)
+
+type Task struct {
+	logger *log.Logger
+	sid    *sid.Sid
+	jwt    *jwt.JWT
+	tm     repository.Transaction
+}
+
+func NewTask(
+	tm repository.Transaction,
+	logger *log.Logger,
+	sid *sid.Sid,
+) *Task {
+	return &Task{
+		logger: logger,
+		sid:    sid,
+		tm:     tm,
+	}
+}

+ 31 - 0
internal/task/user.go

@@ -0,0 +1,31 @@
+package task
+
+import (
+	"context"
+	"github.com/go-nunu/nunu-layout-advanced/internal/repository"
+)
+
+type UserTask interface {
+	CheckUser(ctx context.Context) error
+}
+
+func NewUserTask(
+	task *Task,
+	userRepo repository.UserRepository,
+) UserTask {
+	return &userTask{
+		userRepo: userRepo,
+		Task:     task,
+	}
+}
+
+type userTask struct {
+	userRepo repository.UserRepository
+	*Task
+}
+
+func (t userTask) CheckUser(ctx context.Context) error {
+	// do something
+	t.logger.Info("CheckUser")
+	return nil
+}