repository.go 11 KB


  1. package repository
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/casbin/casbin/v2"
  6. "github.com/casbin/casbin/v2/model"
  7. gormadapter "github.com/casbin/gorm-adapter/v3"
  8. "github.com/glebarez/sqlite"
  9. "github.com/go-nunu/nunu-layout-advanced/pkg/log"
  10. "github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
  11. "github.com/go-nunu/nunu-layout-advanced/pkg/zapgorm2"
  12. "github.com/qiniu/qmgo"
  13. "github.com/redis/go-redis/v9"
  14. "github.com/spf13/viper"
  15. "gorm.io/driver/mysql"
  16. "gorm.io/driver/postgres"
  17. "gorm.io/gorm"
  18. gormlogger "gorm.io/gorm/logger"
  19. "gorm.io/plugin/dbresolver"
  20. "time"
  21. )
  22. const ctxTxKey = "TxKey"
  23. type Repository struct {
  24. db *gorm.DB
  25. rdb *redis.Client
  26. mongoClient *qmgo.Client
  27. mongoDB *qmgo.Database
  28. mq *rabbitmq.RabbitMQ
  29. logger *log.Logger
  30. e *casbin.SyncedEnforcer
  31. }
  32. func NewRepository(
  33. logger *log.Logger,
  34. db *gorm.DB,
  35. rdb *redis.Client,
  36. mongoClient *qmgo.Client,
  37. mongoDB *qmgo.Database,
  38. mq *rabbitmq.RabbitMQ,
  39. e *casbin.SyncedEnforcer,
  40. ) *Repository {
  41. return &Repository{
  42. db: db,
  43. rdb: rdb,
  44. mongoClient: mongoClient,
  45. mongoDB: mongoDB,
  46. mq: mq,
  47. logger: logger,
  48. e: e,
  49. }
  50. }
  51. type Transaction interface {
  52. Transaction(ctx context.Context, fn func(ctx context.Context) error) error
  53. // 在特定数据库上执行事务
  54. TransactionWithDB(ctx context.Context, dbName string, fn func(ctx context.Context) error) error
  55. }
  56. func NewTransaction(r *Repository) Transaction {
  57. return r
  58. }
  59. // DB return tx
  60. // If you need to create a Transaction, you must call DB(ctx) and Transaction(ctx,fn)
  61. func (r *Repository) DB(ctx context.Context) *gorm.DB {
  62. v := ctx.Value(ctxTxKey)
  63. if v != nil {
  64. if tx, ok := v.(*gorm.DB); ok {
  65. return tx
  66. }
  67. }
  68. return r.db.WithContext(ctx)
  69. }
  70. // DBWithName 使用特定名称的数据库连接
  71. func (r *Repository) DBWithName(ctx context.Context, dbName string) *gorm.DB {
  72. // 先检查上下文中是否已存在事务
  73. v := ctx.Value(ctxTxKey)
  74. if v != nil {
  75. if tx, ok := v.(*gorm.DB); ok {
  76. // 如果事务中已经指定了数据库,则直接返回
  77. return tx
  78. }
  79. }
  80. // 使用指定名称的数据库连接
  81. if dbName != "" {
  82. return r.db.Clauses(dbresolver.Use(dbName)).WithContext(ctx)
  83. }
  84. return r.db.WithContext(ctx)
  85. }
  86. func (r *Repository) Transaction(ctx context.Context, fn func(ctx context.Context) error) error {
  87. return r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
  88. ctxWithTx := context.WithValue(ctx, ctxTxKey, tx)
  89. return fn(ctxWithTx)
  90. })
  91. }
  92. // TransactionWithDB 在特定数据库上执行事务
  93. func (r *Repository) TransactionWithDB(ctx context.Context, dbName string, fn func(ctx context.Context) error) error {
  94. // 使用特定的数据库连接
  95. db := r.db
  96. if dbName != "" {
  97. db = db.Clauses(dbresolver.Use(dbName))
  98. }
  99. return db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
  100. // tx已经是针对特定数据库的事务句柄,无需再次指定数据库
  101. ctxWithTx := context.WithValue(ctx, ctxTxKey, tx)
  102. return fn(ctxWithTx)
  103. })
  104. }
  105. func NewDB(conf *viper.Viper, l *log.Logger) *gorm.DB {
  106. var (
  107. db *gorm.DB
  108. err error
  109. )
  110. // 获取主数据库键名
  111. primaryDBKey := conf.GetString("data.primary_db_key")
  112. if primaryDBKey == "" {
  113. // 默认使用user作为主数据库键名(向后兼容)
  114. primaryDBKey = "user"
  115. }
  116. // 从配置中获取主数据库配置
  117. driver := conf.GetString(fmt.Sprintf("data.db.%s.driver", primaryDBKey))
  118. if driver == "" {
  119. panic("主数据库驱动配置不能为空")
  120. }
  121. dsn := conf.GetString(fmt.Sprintf("data.db.%s.dsn", primaryDBKey))
  122. if dsn == "" {
  123. panic("主数据库连接字符串不能为空")
  124. }
  125. // 读取日志级别配置
  126. logLevelStr := conf.GetString(fmt.Sprintf("data.db.%s.logLevel", primaryDBKey))
  127. var logLevel gormlogger.LogLevel
  128. switch logLevelStr {
  129. case "silent":
  130. logLevel = gormlogger.Silent
  131. case "error":
  132. logLevel = gormlogger.Error
  133. case "warn":
  134. logLevel = gormlogger.Warn
  135. case "info":
  136. logLevel = gormlogger.Info
  137. default:
  138. // MySQL 默认只记录警告和错误
  139. if driver == "mysql" {
  140. logLevel = gormlogger.Warn
  141. } else {
  142. logLevel = gormlogger.Info
  143. }
  144. }
  145. logger := zapgorm2.New(l.Logger).LogMode(logLevel)
  146. // 连接主数据库
  147. switch driver {
  148. case "mysql":
  149. db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
  150. Logger: logger,
  151. })
  152. case "postgres":
  153. db, err = gorm.Open(postgres.New(postgres.Config{
  154. DSN: dsn,
  155. PreferSimpleProtocol: true,
  156. }), &gorm.Config{
  157. Logger: logger,
  158. })
  159. case "sqlite":
  160. db, err = gorm.Open(sqlite.Open(dsn), &gorm.Config{
  161. Logger: logger,
  162. })
  163. default:
  164. panic("不支持的数据库驱动类型: " + driver)
  165. }
  166. if err != nil {
  167. panic(fmt.Sprintf("连接主数据库失败: %s", err.Error()))
  168. }
  169. // 创建 dbresolver 实例
  170. resolver := dbresolver.Register(dbresolver.Config{})
  171. // 获取所有配置的数据库列表
  172. databases := conf.GetStringMap("data.db")
  173. // 遍历所有数据库配置(跳过主数据库,因为已经连接)
  174. for dbKey, _ := range databases {
  175. // 跳过主数据库(已经直接连接了)
  176. if dbKey == primaryDBKey {
  177. continue
  178. }
  179. // 检查该键是否确实是一个数据库配置对象
  180. dbDriver := conf.GetString(fmt.Sprintf("data.db.%s.driver", dbKey))
  181. dbDSN := conf.GetString(fmt.Sprintf("data.db.%s.dsn", dbKey))
  182. if dbDriver != "" && dbDSN != "" {
  183. // 构建数据库连接器
  184. var dialector gorm.Dialector
  185. switch dbDriver {
  186. case "mysql":
  187. dialector = mysql.Open(dbDSN)
  188. case "postgres":
  189. dialector = postgres.New(postgres.Config{
  190. DSN: dbDSN,
  191. PreferSimpleProtocol: true,
  192. })
  193. case "sqlite":
  194. dialector = sqlite.Open(dbDSN)
  195. default:
  196. l.Warn(fmt.Sprintf("跳过不支持的数据库驱动类型: %s (dbKey: %s)", dbDriver, dbKey))
  197. continue
  198. }
  199. // 注册到resolver
  200. resolver.Register(dbresolver.Config{
  201. Sources: []gorm.Dialector{dialector},
  202. Replicas: []gorm.Dialector{dialector},
  203. Policy: dbresolver.RandomPolicy{},
  204. }, dbKey) // 使用配置键作为数据库名称
  205. l.Info(fmt.Sprintf("成功配置数据库连接: %s", dbKey))
  206. }
  207. }
  208. // 设置连接池参数
  209. resolver.SetConnMaxIdleTime(time.Hour).
  210. SetConnMaxLifetime(24 * time.Hour).
  211. SetMaxIdleConns(10).
  212. SetMaxOpenConns(100)
  213. // 应用配置好的 dbresolver 到 db
  214. err = db.Use(resolver)
  215. if err != nil {
  216. panic(fmt.Sprintf("应用数据库连接配置失败: %s", err.Error()))
  217. }
  218. // 主数据库连接池配置
  219. sqlDB, err := db.DB()
  220. if err != nil {
  221. panic(err)
  222. }
  223. sqlDB.SetMaxIdleConns(10)
  224. sqlDB.SetMaxOpenConns(100)
  225. sqlDB.SetConnMaxLifetime(time.Hour)
  226. return db
  227. }
  228. func NewRedis(conf *viper.Viper) *redis.Client {
  229. rdb := redis.NewClient(&redis.Options{
  230. Addr: conf.GetString("data.redis.addr"),
  231. Password: conf.GetString("data.redis.password"),
  232. DB: conf.GetInt("data.redis.db"),
  233. })
  234. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  235. defer cancel()
  236. _, err := rdb.Ping(ctx).Result()
  237. if err != nil {
  238. panic(fmt.Sprintf("redis error: %s", err.Error()))
  239. }
  240. return rdb
  241. }
  242. func NewMongoClient(conf *viper.Viper) *qmgo.Client {
  243. timeout := conf.GetDuration("data.mongodb.timeout")
  244. if timeout == 0 {
  245. timeout = 10 * time.Second
  246. }
  247. maxPoolSize := conf.GetUint64("data.mongodb.max_pool_size")
  248. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  249. defer cancel()
  250. // 创建连接配置
  251. clientOpts := &qmgo.Config{
  252. Uri: conf.GetString("data.mongodb.uri"),
  253. MaxPoolSize: &maxPoolSize,
  254. }
  255. // 连接到MongoDB
  256. client, err := qmgo.NewClient(ctx, clientOpts)
  257. if err != nil {
  258. panic(fmt.Sprintf("连接MongoDB失败: %s", err.Error()))
  259. }
  260. return client
  261. }
  262. func NewMongoDB(client *qmgo.Client, conf *viper.Viper) *qmgo.Database {
  263. databaseName := conf.GetString("data.mongodb.database")
  264. if databaseName == "" {
  265. panic("MongoDB数据库名不能为空")
  266. }
  267. return client.Database(databaseName)
  268. }
  269. func NewRabbitMQ(conf *viper.Viper, logger *log.Logger) (*rabbitmq.RabbitMQ, func()) {
  270. var cfg rabbitmq.Config
  271. if err := conf.UnmarshalKey("rabbitmq", &cfg); err != nil {
  272. panic(fmt.Sprintf("unmarshal rabbitmq config error: %s", err.Error()))
  273. }
  274. mq, err := rabbitmq.New(cfg, logger)
  275. if err != nil {
  276. panic(fmt.Sprintf("init rabbitmq error: %s", err.Error()))
  277. }
  278. // Setup task queue
  279. if err := mq.SetupAllTaskQueues(); err != nil {
  280. panic(fmt.Sprintf("failed to setup rabbitmq task queues: %v", err))
  281. }
  282. cleanup := func() {
  283. logger.Info("Closing RabbitMQ connection")
  284. _ = mq.Close()
  285. }
  286. return mq, cleanup
  287. }
  288. func NewCasbinEnforcer(conf *viper.Viper, l *log.Logger, db *gorm.DB) *casbin.SyncedEnforcer {
  289. var (
  290. adapter *gormadapter.Adapter
  291. err error
  292. casbinDb *gorm.DB = db // 默认使用主数据库连接
  293. )
  294. // 创建一个专门给Enforcer使用的、日志级别为Warn的日志记录器,以屏蔽轮询日志。
  295. // 这不会影响数据库连接的全局日志配置。
  296. enforcerLogger := zapgorm2.New(l.Logger).LogMode(gormlogger.Warn)
  297. // 扫描配置,查找为Casbin指定的数据库
  298. dbSettings := conf.GetStringMap("data.db")
  299. foundSpecialDb := false
  300. for dbKey := range dbSettings {
  301. casbinFlagPath := fmt.Sprintf("data.db.%s.casbin", dbKey)
  302. if conf.GetBool(casbinFlagPath) {
  303. l.Info(fmt.Sprintf("检测到Casbin专用数据库配置: '%s'。Enforcer将使用此数据库连接。", dbKey))
  304. // 从全局连接池中获取指定的数据库连接
  305. casbinDb = db.Clauses(dbresolver.Use(dbKey))
  306. foundSpecialDb = true
  307. break
  308. }
  309. }
  310. if !foundSpecialDb {
  311. l.Warn("未找到Casbin的专用数据库配置 (缺少 'casbin: true' 标志),Enforcer将回退使用主数据库连接。")
  312. }
  313. // 为Enforcer创建一个带有“安静”日志记录器的GORM会话。
  314. // 这样可以确保只有Enforcer自身的操作是安静的,
  315. // 而通过DBWithName()进行的直接数据库操作仍将使用原始的、更详细的日志记录器。
  316. dbForEnforcer := casbinDb.Session(&gorm.Session{Logger: enforcerLogger})
  317. // 使用带有“安静”日志记录器的会话来创建适配器
  318. adapter, err = gormadapter.NewAdapterByDB(dbForEnforcer)
  319. if err != nil {
  320. panic(fmt.Sprintf("创建Casbin gorm-adapter失败: %s", err))
  321. }
  322. m, err := model.NewModelFromString(`
  323. [request_definition]
  324. r = sub, obj, act
  325. [policy_definition]
  326. p = sub, obj, act
  327. [role_definition]
  328. g = _, _
  329. [policy_effect]
  330. e = some(where (p.eft == allow))
  331. [matchers]
  332. m = g(r.sub, p.sub) && r.obj == p.obj && r.act == p.act
  333. `)
  334. if err != nil {
  335. panic(fmt.Sprintf("创建Casbin模型失败: %s", err))
  336. }
  337. e, err := casbin.NewSyncedEnforcer(m, adapter)
  338. if err != nil {
  339. panic(fmt.Sprintf("创建Casbin Enforcer失败: %s", err))
  340. }
  341. // 每10秒自动加载策略
  342. e.StartAutoLoadPolicy(10 * time.Second)
  343. // 自动保存策略
  344. e.EnableAutoSave(true)
  345. return e
  346. }