registry.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package registry
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "time"
  7. clientv3 "go.etcd.io/etcd/client/v3"
  8. "github.com/go-kratos/kratos/v2/registry"
  9. )
  10. var (
  11. _ registry.Registrar = (*Registry)(nil)
  12. _ registry.Discovery = (*Registry)(nil)
  13. )
  14. // Option is etcd registry option.
  15. type Option func(o *options)
  16. type options struct {
  17. ctx context.Context
  18. namespace string
  19. ttl time.Duration
  20. maxRetry int
  21. }
  22. // Context with registry context.
  23. func Context(ctx context.Context) Option {
  24. return func(o *options) { o.ctx = ctx }
  25. }
  26. // Namespace with registry namespace.
  27. func Namespace(ns string) Option {
  28. return func(o *options) { o.namespace = ns }
  29. }
  30. // RegisterTTL with register ttl.
  31. func RegisterTTL(ttl time.Duration) Option {
  32. return func(o *options) { o.ttl = ttl }
  33. }
  34. func MaxRetry(num int) Option {
  35. return func(o *options) { o.maxRetry = num }
  36. }
  37. // Registry is etcd registry.
  38. type Registry struct {
  39. opts *options
  40. client *clientv3.Client
  41. kv clientv3.KV
  42. lease clientv3.Lease
  43. /*
  44. ctxMap is used to store the context cancel function of each service instance.
  45. When the service instance is deregistered, the corresponding context cancel function is called to stop the heartbeat.
  46. */
  47. ctxMap map[*registry.ServiceInstance]context.CancelFunc
  48. }
  49. // New creates etcd registry
  50. func New(client *clientv3.Client, opts ...Option) (r *Registry) {
  51. op := &options{
  52. ctx: context.Background(),
  53. namespace: "/microservices",
  54. ttl: time.Second * 15,
  55. maxRetry: 5,
  56. }
  57. for _, o := range opts {
  58. o(op)
  59. }
  60. return &Registry{
  61. opts: op,
  62. client: client,
  63. kv: clientv3.NewKV(client),
  64. ctxMap: make(map[*registry.ServiceInstance]context.CancelFunc),
  65. }
  66. }
  67. // Register the registration.
  68. func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstance) error {
  69. key := fmt.Sprintf("/%s/naming/%s/%s", r.opts.namespace, service.Name, service.ID)
  70. value, err := marshal(service)
  71. if err != nil {
  72. return err
  73. }
  74. if r.lease != nil {
  75. r.lease.Close()
  76. }
  77. r.lease = clientv3.NewLease(r.client)
  78. leaseID, err := r.registerWithKV(ctx, key, value)
  79. if err != nil {
  80. return err
  81. }
  82. hctx, cancel := context.WithCancel(r.opts.ctx)
  83. r.ctxMap[service] = cancel
  84. go r.heartBeat(hctx, leaseID, key, value)
  85. return nil
  86. }
  87. // Deregister the registration.
  88. func (r *Registry) Deregister(ctx context.Context, service *registry.ServiceInstance) error {
  89. defer func() {
  90. if r.lease != nil {
  91. r.lease.Close()
  92. }
  93. }()
  94. // cancel heartbeat
  95. if cancel, ok := r.ctxMap[service]; ok {
  96. cancel()
  97. delete(r.ctxMap, service)
  98. }
  99. key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID)
  100. _, err := r.client.Delete(ctx, key)
  101. return err
  102. }
  103. // GetService return the service instances in memory according to the service name.
  104. func (r *Registry) GetService(ctx context.Context, name string) ([]*registry.ServiceInstance, error) {
  105. key := fmt.Sprintf("%s/%s", r.opts.namespace, name)
  106. resp, err := r.kv.Get(ctx, key, clientv3.WithPrefix())
  107. if err != nil {
  108. return nil, err
  109. }
  110. items := make([]*registry.ServiceInstance, 0, len(resp.Kvs))
  111. for _, kv := range resp.Kvs {
  112. si, err := unmarshal(kv.Value)
  113. if err != nil {
  114. return nil, err
  115. }
  116. if si.Name != name {
  117. continue
  118. }
  119. items = append(items, si)
  120. }
  121. return items, nil
  122. }
  123. // Watch creates a watcher according to the service name.
  124. func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, error) {
  125. key := fmt.Sprintf("%s/%s", r.opts.namespace, name)
  126. return newWatcher(ctx, key, name, r.client)
  127. }
  128. // registerWithKV create a new lease, return current leaseID
  129. func (r *Registry) registerWithKV(ctx context.Context, key string, value string) (clientv3.LeaseID, error) {
  130. grant, err := r.lease.Grant(ctx, int64(r.opts.ttl.Seconds()))
  131. if err != nil {
  132. return 0, err
  133. }
  134. _, err = r.client.Put(ctx, key, value, clientv3.WithLease(grant.ID))
  135. if err != nil {
  136. return 0, err
  137. }
  138. return grant.ID, nil
  139. }
  140. func (r *Registry) heartBeat(ctx context.Context, leaseID clientv3.LeaseID, key string, value string) {
  141. curLeaseID := leaseID
  142. kac, err := r.client.KeepAlive(ctx, leaseID)
  143. if err != nil {
  144. curLeaseID = 0
  145. }
  146. randSource := rand.New(rand.NewSource(time.Now().Unix()))
  147. for {
  148. if curLeaseID == 0 {
  149. // try to registerWithKV
  150. var retreat []int
  151. for retryCnt := 0; retryCnt < r.opts.maxRetry; retryCnt++ {
  152. if ctx.Err() != nil {
  153. return
  154. }
  155. // prevent infinite blocking
  156. idChan := make(chan clientv3.LeaseID, 1)
  157. errChan := make(chan error, 1)
  158. cancelCtx, cancel := context.WithCancel(ctx)
  159. go func() {
  160. defer cancel()
  161. id, registerErr := r.registerWithKV(cancelCtx, key, value)
  162. if registerErr != nil {
  163. errChan <- registerErr
  164. } else {
  165. idChan <- id
  166. }
  167. }()
  168. select {
  169. case <-time.After(3 * time.Second):
  170. cancel()
  171. continue
  172. case <-errChan:
  173. continue
  174. case curLeaseID = <-idChan:
  175. }
  176. kac, err = r.client.KeepAlive(ctx, curLeaseID)
  177. if err == nil {
  178. break
  179. }
  180. retreat = append(retreat, 1<<retryCnt)
  181. time.Sleep(time.Duration(retreat[randSource.Intn(len(retreat))]) * time.Second)
  182. }
  183. if _, ok := <-kac; !ok {
  184. // retry failed
  185. return
  186. }
  187. }
  188. select {
  189. case _, ok := <-kac:
  190. if !ok {
  191. if ctx.Err() != nil {
  192. // channel closed due to context cancel
  193. return
  194. }
  195. // need to retry registration
  196. curLeaseID = 0
  197. continue
  198. }
  199. case <-r.opts.ctx.Done():
  200. return
  201. }
  202. }
  203. }