data.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package data
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/go-kratos/kratos-layout/internal/conf"
  7. "github.com/go-kratos/kratos-layout/internal/data/models"
  8. "github.com/go-kratos/kratos/v2/log"
  9. "github.com/google/wire"
  10. "github.com/redis/go-redis/v9"
  11. "go.mongodb.org/mongo-driver/mongo"
  12. "go.mongodb.org/mongo-driver/mongo/options"
  13. "go.mongodb.org/mongo-driver/mongo/readconcern"
  14. "go.mongodb.org/mongo-driver/mongo/writeconcern"
  15. )
  16. // ProviderSet is data providers.
  17. var ProviderSet = wire.NewSet(NewRedis, NewData, NewMongo)
  18. // Data .
  19. type Data struct {
  20. log *log.Helper
  21. rdb *redis.Client
  22. *models.Mysql
  23. mongodb *mongo.Database
  24. }
  25. // NewData .
  26. func NewData(c *conf.Bootstrap, rdb *redis.Client, mongodb *mongo.Database, logger log.Logger, ctx context.Context) (*Data, func(), error) {
  27. cleanup := func() {
  28. }
  29. data := &Data{
  30. log: log.NewHelper(log.With(logger, "module", "data/data")),
  31. }
  32. var err error
  33. if c.Data.Database != nil {
  34. data.Mysql, err = models.NewMysql(c.Data.Database.Source, logger)
  35. if err != nil {
  36. return nil, cleanup, err
  37. }
  38. data.Mysql.XDB.SetMaxIdleConns(int(c.Data.Database.MaxIdleConns))
  39. data.Mysql.XDB.SetConnMaxIdleTime(time.Minute * 10)
  40. data.Mysql.XDB.SetMaxOpenConns(int(c.Data.Database.MaxOpenConns))
  41. }
  42. if rdb != nil {
  43. data.rdb = rdb
  44. }
  45. if mongodb != nil {
  46. data.mongodb = mongodb
  47. }
  48. cleanup = func() {
  49. if data.XDB != nil {
  50. data.XDB.Close()
  51. }
  52. if data.rdb != nil {
  53. data.rdb.Close()
  54. }
  55. if data.mongodb != nil {
  56. data.mongodb.Client().Disconnect(ctx)
  57. }
  58. }
  59. return data, cleanup, nil
  60. }
  61. func NewRedis(c *conf.Bootstrap) *redis.Client {
  62. if c.Data.Redis == nil {
  63. return nil
  64. }
  65. rdb := redis.NewClient(&redis.Options{
  66. Addr: c.Data.Redis.Addr,
  67. DB: int(c.Data.Redis.Db),
  68. DialTimeout: c.Data.Redis.DialTimeout.AsDuration(),
  69. WriteTimeout: c.Data.Redis.WriteTimeout.AsDuration(),
  70. ReadTimeout: c.Data.Redis.ReadTimeout.AsDuration(),
  71. PoolSize: int(c.Data.Redis.PoolSize),
  72. MinIdleConns: int(c.Data.Redis.MinIdleConns),
  73. })
  74. key := fmt.Sprintf("%s:%s", c.Server.Registrar.Group, c.Server.Registrar.Data)
  75. rdb.Set(context.Background(), fmt.Sprintf("used_by_%s", key), 1, -1)
  76. return rdb
  77. }
  78. func NewMongo(ctx context.Context, c *conf.Bootstrap) (*mongo.Database, error) {
  79. if c.Data.Mongo == nil {
  80. return nil, nil
  81. }
  82. clientOptions := options.Client().ApplyURI(c.Data.Mongo.Uri).SetSocketTimeout(time.Second * 30).
  83. SetReadConcern(readconcern.Majority()).SetWriteConcern(writeconcern.New(writeconcern.WMajority(), writeconcern.J(true)))
  84. client, err := mongo.Connect(ctx, clientOptions)
  85. if err != nil {
  86. return nil, err
  87. }
  88. database := client.Database(c.Data.Mongo.Database)
  89. return database, nil
  90. }