repository.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. package repository
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/glebarez/sqlite"
  6. "github.com/go-nunu/nunu-layout-advanced/pkg/log"
  7. "github.com/go-nunu/nunu-layout-advanced/pkg/zapgorm2"
  8. "github.com/qiniu/qmgo"
  9. "github.com/redis/go-redis/v9"
  10. "github.com/spf13/viper"
  11. "gorm.io/driver/mysql"
  12. "gorm.io/driver/postgres"
  13. "gorm.io/gorm"
  14. gormlogger "gorm.io/gorm/logger"
  15. "gorm.io/plugin/dbresolver"
  16. "time"
  17. )
  18. const ctxTxKey = "TxKey"
  19. type Repository struct {
  20. db *gorm.DB
  21. //rdb *redis.Client
  22. mongoClient *qmgo.Client
  23. mongoDB *qmgo.Database
  24. logger *log.Logger
  25. }
  26. func NewRepository(
  27. logger *log.Logger,
  28. db *gorm.DB,
  29. // rdb *redis.Client,
  30. mongoClient *qmgo.Client,
  31. mongoDB *qmgo.Database,
  32. ) *Repository {
  33. return &Repository{
  34. db: db,
  35. //rdb: rdb,
  36. mongoClient: mongoClient,
  37. mongoDB: mongoDB,
  38. logger: logger,
  39. }
  40. }
  41. type Transaction interface {
  42. Transaction(ctx context.Context, fn func(ctx context.Context) error) error
  43. // 在特定数据库上执行事务
  44. TransactionWithDB(ctx context.Context, dbName string, fn func(ctx context.Context) error) error
  45. }
  46. func NewTransaction(r *Repository) Transaction {
  47. return r
  48. }
  49. // DB return tx
  50. // If you need to create a Transaction, you must call DB(ctx) and Transaction(ctx,fn)
  51. func (r *Repository) DB(ctx context.Context) *gorm.DB {
  52. v := ctx.Value(ctxTxKey)
  53. if v != nil {
  54. if tx, ok := v.(*gorm.DB); ok {
  55. return tx
  56. }
  57. }
  58. return r.db.WithContext(ctx)
  59. }
  60. // DBWithName 使用特定名称的数据库连接
  61. func (r *Repository) DBWithName(ctx context.Context, dbName string) *gorm.DB {
  62. // 先检查上下文中是否已存在事务
  63. v := ctx.Value(ctxTxKey)
  64. if v != nil {
  65. if tx, ok := v.(*gorm.DB); ok {
  66. // 如果事务中已经指定了数据库,则直接返回
  67. return tx
  68. }
  69. }
  70. // 使用指定名称的数据库连接
  71. if dbName != "" {
  72. return r.db.Clauses(dbresolver.Use(dbName)).WithContext(ctx)
  73. }
  74. return r.db.WithContext(ctx)
  75. }
  76. func (r *Repository) Transaction(ctx context.Context, fn func(ctx context.Context) error) error {
  77. return r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
  78. ctxWithTx := context.WithValue(ctx, ctxTxKey, tx)
  79. return fn(ctxWithTx)
  80. })
  81. }
  82. // TransactionWithDB 在特定数据库上执行事务
  83. func (r *Repository) TransactionWithDB(ctx context.Context, dbName string, fn func(ctx context.Context) error) error {
  84. // 使用特定的数据库连接
  85. db := r.db
  86. if dbName != "" {
  87. db = db.Clauses(dbresolver.Use(dbName))
  88. }
  89. return db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
  90. // tx已经是针对特定数据库的事务句柄,无需再次指定数据库
  91. ctxWithTx := context.WithValue(ctx, ctxTxKey, tx)
  92. return fn(ctxWithTx)
  93. })
  94. }
  95. func NewDB(conf *viper.Viper, l *log.Logger) *gorm.DB {
  96. var (
  97. db *gorm.DB
  98. err error
  99. )
  100. // 获取主数据库键名
  101. primaryDBKey := conf.GetString("data.primary_db_key")
  102. if primaryDBKey == "" {
  103. // 默认使用user作为主数据库键名(向后兼容)
  104. primaryDBKey = "user"
  105. }
  106. // 从配置中获取主数据库配置
  107. driver := conf.GetString(fmt.Sprintf("data.db.%s.driver", primaryDBKey))
  108. if driver == "" {
  109. panic("主数据库驱动配置不能为空")
  110. }
  111. dsn := conf.GetString(fmt.Sprintf("data.db.%s.dsn", primaryDBKey))
  112. if dsn == "" {
  113. panic("主数据库连接字符串不能为空")
  114. }
  115. // 读取日志级别配置
  116. logLevelStr := conf.GetString(fmt.Sprintf("data.db.%s.logLevel", primaryDBKey))
  117. var logLevel gormlogger.LogLevel
  118. switch logLevelStr {
  119. case "silent":
  120. logLevel = gormlogger.Silent
  121. case "error":
  122. logLevel = gormlogger.Error
  123. case "warn":
  124. logLevel = gormlogger.Warn
  125. case "info":
  126. logLevel = gormlogger.Info
  127. default:
  128. // MySQL 默认只记录警告和错误
  129. if driver == "mysql" {
  130. logLevel = gormlogger.Warn
  131. } else {
  132. logLevel = gormlogger.Info
  133. }
  134. }
  135. logger := zapgorm2.New(l.Logger).LogMode(logLevel)
  136. // 连接主数据库
  137. switch driver {
  138. case "mysql":
  139. db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
  140. Logger: logger,
  141. })
  142. case "postgres":
  143. db, err = gorm.Open(postgres.New(postgres.Config{
  144. DSN: dsn,
  145. PreferSimpleProtocol: true,
  146. }), &gorm.Config{
  147. Logger: logger,
  148. })
  149. case "sqlite":
  150. db, err = gorm.Open(sqlite.Open(dsn), &gorm.Config{
  151. Logger: logger,
  152. })
  153. default:
  154. panic("不支持的数据库驱动类型: " + driver)
  155. }
  156. if err != nil {
  157. panic(fmt.Sprintf("连接主数据库失败: %s", err.Error()))
  158. }
  159. // 创建 dbresolver 实例
  160. resolver := dbresolver.Register(dbresolver.Config{})
  161. // 获取所有配置的数据库列表
  162. databases := conf.GetStringMap("data.db")
  163. // 遍历所有数据库配置(跳过主数据库,因为已经连接)
  164. for dbKey, _ := range databases {
  165. // 跳过主数据库(已经直接连接了)
  166. if dbKey == primaryDBKey {
  167. continue
  168. }
  169. // 检查该键是否确实是一个数据库配置对象
  170. dbDriver := conf.GetString(fmt.Sprintf("data.db.%s.driver", dbKey))
  171. dbDSN := conf.GetString(fmt.Sprintf("data.db.%s.dsn", dbKey))
  172. if dbDriver != "" && dbDSN != "" {
  173. // 构建数据库连接器
  174. var dialector gorm.Dialector
  175. switch dbDriver {
  176. case "mysql":
  177. dialector = mysql.Open(dbDSN)
  178. case "postgres":
  179. dialector = postgres.New(postgres.Config{
  180. DSN: dbDSN,
  181. PreferSimpleProtocol: true,
  182. })
  183. case "sqlite":
  184. dialector = sqlite.Open(dbDSN)
  185. default:
  186. l.Warn(fmt.Sprintf("跳过不支持的数据库驱动类型: %s (dbKey: %s)", dbDriver, dbKey))
  187. continue
  188. }
  189. // 注册到resolver
  190. resolver.Register(dbresolver.Config{
  191. Sources: []gorm.Dialector{dialector},
  192. Replicas: []gorm.Dialector{dialector},
  193. Policy: dbresolver.RandomPolicy{},
  194. }, dbKey) // 使用配置键作为数据库名称
  195. l.Info(fmt.Sprintf("成功配置数据库连接: %s", dbKey))
  196. }
  197. }
  198. // 设置连接池参数
  199. resolver.SetConnMaxIdleTime(time.Hour).
  200. SetConnMaxLifetime(24 * time.Hour).
  201. SetMaxIdleConns(10).
  202. SetMaxOpenConns(100)
  203. // 应用配置好的 dbresolver 到 db
  204. err = db.Use(resolver)
  205. if err != nil {
  206. panic(fmt.Sprintf("应用数据库连接配置失败: %s", err.Error()))
  207. }
  208. // 主数据库连接池配置
  209. sqlDB, err := db.DB()
  210. if err != nil {
  211. panic(err)
  212. }
  213. sqlDB.SetMaxIdleConns(10)
  214. sqlDB.SetMaxOpenConns(100)
  215. sqlDB.SetConnMaxLifetime(time.Hour)
  216. return db
  217. }
  218. func NewRedis(conf *viper.Viper) *redis.Client {
  219. rdb := redis.NewClient(&redis.Options{
  220. Addr: conf.GetString("data.redis.addr"),
  221. Password: conf.GetString("data.redis.password"),
  222. DB: conf.GetInt("data.redis.db"),
  223. })
  224. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  225. defer cancel()
  226. _, err := rdb.Ping(ctx).Result()
  227. if err != nil {
  228. panic(fmt.Sprintf("redis error: %s", err.Error()))
  229. }
  230. return rdb
  231. }
  232. func NewMongoClient(conf *viper.Viper) *qmgo.Client {
  233. timeout := conf.GetDuration("data.mongodb.timeout")
  234. if timeout == 0 {
  235. timeout = 10 * time.Second
  236. }
  237. maxPoolSize := conf.GetUint64("data.mongodb.max_pool_size")
  238. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  239. defer cancel()
  240. // 创建连接配置
  241. clientOpts := &qmgo.Config{
  242. Uri: conf.GetString("data.mongodb.uri"),
  243. MaxPoolSize: &maxPoolSize,
  244. }
  245. // 连接到MongoDB
  246. client, err := qmgo.NewClient(ctx, clientOpts)
  247. if err != nil {
  248. panic(fmt.Sprintf("连接MongoDB失败: %s", err.Error()))
  249. }
  250. return client
  251. }
  252. func NewMongoDB(client *qmgo.Client, conf *viper.Viper) *qmgo.Database {
  253. databaseName := conf.GetString("data.mongodb.database")
  254. if databaseName == "" {
  255. panic("MongoDB数据库名不能为空")
  256. }
  257. return client.Database(databaseName)
  258. }