repository.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. package repository
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/glebarez/sqlite"
  6. "github.com/go-nunu/nunu-layout-advanced/pkg/log"
  7. "github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
  8. "github.com/go-nunu/nunu-layout-advanced/pkg/zapgorm2"
  9. "github.com/qiniu/qmgo"
  10. "github.com/redis/go-redis/v9"
  11. "github.com/spf13/viper"
  12. "gorm.io/driver/mysql"
  13. "gorm.io/driver/postgres"
  14. "gorm.io/gorm"
  15. gormlogger "gorm.io/gorm/logger"
  16. "gorm.io/plugin/dbresolver"
  17. "time"
  18. )
  19. const ctxTxKey = "TxKey"
  20. type Repository struct {
  21. db *gorm.DB
  22. mongoClient *qmgo.Client
  23. mongoDB *qmgo.Database
  24. mq *rabbitmq.RabbitMQ
  25. logger *log.Logger
  26. }
  27. func NewRepository(
  28. logger *log.Logger,
  29. db *gorm.DB,
  30. mongoClient *qmgo.Client,
  31. mongoDB *qmgo.Database,
  32. mq *rabbitmq.RabbitMQ,
  33. ) *Repository {
  34. return &Repository{
  35. db: db,
  36. mongoClient: mongoClient,
  37. mongoDB: mongoDB,
  38. mq: mq,
  39. logger: logger,
  40. }
  41. }
  42. type Transaction interface {
  43. Transaction(ctx context.Context, fn func(ctx context.Context) error) error
  44. // 在特定数据库上执行事务
  45. TransactionWithDB(ctx context.Context, dbName string, fn func(ctx context.Context) error) error
  46. }
  47. func NewTransaction(r *Repository) Transaction {
  48. return r
  49. }
  50. // DB return tx
  51. // If you need to create a Transaction, you must call DB(ctx) and Transaction(ctx,fn)
  52. func (r *Repository) DB(ctx context.Context) *gorm.DB {
  53. v := ctx.Value(ctxTxKey)
  54. if v != nil {
  55. if tx, ok := v.(*gorm.DB); ok {
  56. return tx
  57. }
  58. }
  59. return r.db.WithContext(ctx)
  60. }
  61. // DBWithName 使用特定名称的数据库连接
  62. func (r *Repository) DBWithName(ctx context.Context, dbName string) *gorm.DB {
  63. // 先检查上下文中是否已存在事务
  64. v := ctx.Value(ctxTxKey)
  65. if v != nil {
  66. if tx, ok := v.(*gorm.DB); ok {
  67. // 如果事务中已经指定了数据库,则直接返回
  68. return tx
  69. }
  70. }
  71. // 使用指定名称的数据库连接
  72. if dbName != "" {
  73. return r.db.Clauses(dbresolver.Use(dbName)).WithContext(ctx)
  74. }
  75. return r.db.WithContext(ctx)
  76. }
  77. func (r *Repository) Transaction(ctx context.Context, fn func(ctx context.Context) error) error {
  78. return r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
  79. ctxWithTx := context.WithValue(ctx, ctxTxKey, tx)
  80. return fn(ctxWithTx)
  81. })
  82. }
  83. // TransactionWithDB 在特定数据库上执行事务
  84. func (r *Repository) TransactionWithDB(ctx context.Context, dbName string, fn func(ctx context.Context) error) error {
  85. // 使用特定的数据库连接
  86. db := r.db
  87. if dbName != "" {
  88. db = db.Clauses(dbresolver.Use(dbName))
  89. }
  90. return db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
  91. // tx已经是针对特定数据库的事务句柄,无需再次指定数据库
  92. ctxWithTx := context.WithValue(ctx, ctxTxKey, tx)
  93. return fn(ctxWithTx)
  94. })
  95. }
  96. func NewDB(conf *viper.Viper, l *log.Logger) *gorm.DB {
  97. var (
  98. db *gorm.DB
  99. err error
  100. )
  101. // 获取主数据库键名
  102. primaryDBKey := conf.GetString("data.primary_db_key")
  103. if primaryDBKey == "" {
  104. // 默认使用user作为主数据库键名(向后兼容)
  105. primaryDBKey = "user"
  106. }
  107. // 从配置中获取主数据库配置
  108. driver := conf.GetString(fmt.Sprintf("data.db.%s.driver", primaryDBKey))
  109. if driver == "" {
  110. panic("主数据库驱动配置不能为空")
  111. }
  112. dsn := conf.GetString(fmt.Sprintf("data.db.%s.dsn", primaryDBKey))
  113. if dsn == "" {
  114. panic("主数据库连接字符串不能为空")
  115. }
  116. // 读取日志级别配置
  117. logLevelStr := conf.GetString(fmt.Sprintf("data.db.%s.logLevel", primaryDBKey))
  118. var logLevel gormlogger.LogLevel
  119. switch logLevelStr {
  120. case "silent":
  121. logLevel = gormlogger.Silent
  122. case "error":
  123. logLevel = gormlogger.Error
  124. case "warn":
  125. logLevel = gormlogger.Warn
  126. case "info":
  127. logLevel = gormlogger.Info
  128. default:
  129. // MySQL 默认只记录警告和错误
  130. if driver == "mysql" {
  131. logLevel = gormlogger.Warn
  132. } else {
  133. logLevel = gormlogger.Info
  134. }
  135. }
  136. logger := zapgorm2.New(l.Logger).LogMode(logLevel)
  137. // 连接主数据库
  138. switch driver {
  139. case "mysql":
  140. db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
  141. Logger: logger,
  142. })
  143. case "postgres":
  144. db, err = gorm.Open(postgres.New(postgres.Config{
  145. DSN: dsn,
  146. PreferSimpleProtocol: true,
  147. }), &gorm.Config{
  148. Logger: logger,
  149. })
  150. case "sqlite":
  151. db, err = gorm.Open(sqlite.Open(dsn), &gorm.Config{
  152. Logger: logger,
  153. })
  154. default:
  155. panic("不支持的数据库驱动类型: " + driver)
  156. }
  157. if err != nil {
  158. panic(fmt.Sprintf("连接主数据库失败: %s", err.Error()))
  159. }
  160. // 创建 dbresolver 实例
  161. resolver := dbresolver.Register(dbresolver.Config{})
  162. // 获取所有配置的数据库列表
  163. databases := conf.GetStringMap("data.db")
  164. // 遍历所有数据库配置(跳过主数据库,因为已经连接)
  165. for dbKey, _ := range databases {
  166. // 跳过主数据库(已经直接连接了)
  167. if dbKey == primaryDBKey {
  168. continue
  169. }
  170. // 检查该键是否确实是一个数据库配置对象
  171. dbDriver := conf.GetString(fmt.Sprintf("data.db.%s.driver", dbKey))
  172. dbDSN := conf.GetString(fmt.Sprintf("data.db.%s.dsn", dbKey))
  173. if dbDriver != "" && dbDSN != "" {
  174. // 构建数据库连接器
  175. var dialector gorm.Dialector
  176. switch dbDriver {
  177. case "mysql":
  178. dialector = mysql.Open(dbDSN)
  179. case "postgres":
  180. dialector = postgres.New(postgres.Config{
  181. DSN: dbDSN,
  182. PreferSimpleProtocol: true,
  183. })
  184. case "sqlite":
  185. dialector = sqlite.Open(dbDSN)
  186. default:
  187. l.Warn(fmt.Sprintf("跳过不支持的数据库驱动类型: %s (dbKey: %s)", dbDriver, dbKey))
  188. continue
  189. }
  190. // 注册到resolver
  191. resolver.Register(dbresolver.Config{
  192. Sources: []gorm.Dialector{dialector},
  193. Replicas: []gorm.Dialector{dialector},
  194. Policy: dbresolver.RandomPolicy{},
  195. }, dbKey) // 使用配置键作为数据库名称
  196. l.Info(fmt.Sprintf("成功配置数据库连接: %s", dbKey))
  197. }
  198. }
  199. // 设置连接池参数
  200. resolver.SetConnMaxIdleTime(time.Hour).
  201. SetConnMaxLifetime(24 * time.Hour).
  202. SetMaxIdleConns(10).
  203. SetMaxOpenConns(100)
  204. // 应用配置好的 dbresolver 到 db
  205. err = db.Use(resolver)
  206. if err != nil {
  207. panic(fmt.Sprintf("应用数据库连接配置失败: %s", err.Error()))
  208. }
  209. // 主数据库连接池配置
  210. sqlDB, err := db.DB()
  211. if err != nil {
  212. panic(err)
  213. }
  214. sqlDB.SetMaxIdleConns(10)
  215. sqlDB.SetMaxOpenConns(100)
  216. sqlDB.SetConnMaxLifetime(time.Hour)
  217. return db
  218. }
  219. func NewRedis(conf *viper.Viper) *redis.Client {
  220. rdb := redis.NewClient(&redis.Options{
  221. Addr: conf.GetString("data.redis.addr"),
  222. Password: conf.GetString("data.redis.password"),
  223. DB: conf.GetInt("data.redis.db"),
  224. })
  225. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  226. defer cancel()
  227. _, err := rdb.Ping(ctx).Result()
  228. if err != nil {
  229. panic(fmt.Sprintf("redis error: %s", err.Error()))
  230. }
  231. return rdb
  232. }
  233. func NewMongoClient(conf *viper.Viper) *qmgo.Client {
  234. timeout := conf.GetDuration("data.mongodb.timeout")
  235. if timeout == 0 {
  236. timeout = 10 * time.Second
  237. }
  238. maxPoolSize := conf.GetUint64("data.mongodb.max_pool_size")
  239. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  240. defer cancel()
  241. // 创建连接配置
  242. clientOpts := &qmgo.Config{
  243. Uri: conf.GetString("data.mongodb.uri"),
  244. MaxPoolSize: &maxPoolSize,
  245. }
  246. // 连接到MongoDB
  247. client, err := qmgo.NewClient(ctx, clientOpts)
  248. if err != nil {
  249. panic(fmt.Sprintf("连接MongoDB失败: %s", err.Error()))
  250. }
  251. return client
  252. }
  253. func NewMongoDB(client *qmgo.Client, conf *viper.Viper) *qmgo.Database {
  254. databaseName := conf.GetString("data.mongodb.database")
  255. if databaseName == "" {
  256. panic("MongoDB数据库名不能为空")
  257. }
  258. return client.Database(databaseName)
  259. }
  260. func NewRabbitMQ(conf *viper.Viper, logger *log.Logger) (*rabbitmq.RabbitMQ, func()) {
  261. var cfg rabbitmq.Config
  262. if err := conf.UnmarshalKey("rabbitmq", &cfg); err != nil {
  263. panic(fmt.Sprintf("unmarshal rabbitmq config error: %s", err.Error()))
  264. }
  265. mq, err := rabbitmq.New(cfg, logger)
  266. if err != nil {
  267. panic(fmt.Sprintf("init rabbitmq error: %s", err.Error()))
  268. }
  269. // Setup task queue
  270. if err := mq.SetupAllTaskQueues(); err != nil {
  271. panic(fmt.Sprintf("failed to setup rabbitmq task queues: %v", err))
  272. }
  273. cleanup := func() {
  274. logger.Info("Closing RabbitMQ connection")
  275. _ = mq.Close()
  276. }
  277. return mq, cleanup
  278. }