watcher.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package register
  2. import (
  3. "context"
  4. "time"
  5. clientv3 "go.etcd.io/etcd/client/v3"
  6. "github.com/go-kratos/kratos/v2/registry"
  7. )
  8. var _ registry.Watcher = (*watcher)(nil)
  9. type watcher struct {
  10. key string
  11. ctx context.Context
  12. cancel context.CancelFunc
  13. client *clientv3.Client
  14. watchChan clientv3.WatchChan
  15. watcher clientv3.Watcher
  16. kv clientv3.KV
  17. first bool
  18. serviceName string
  19. }
  20. func newWatcher(ctx context.Context, key, name string, client *clientv3.Client) (*watcher, error) {
  21. w := &watcher{
  22. key: key,
  23. client: client,
  24. watcher: clientv3.NewWatcher(client),
  25. kv: clientv3.NewKV(client),
  26. first: true,
  27. serviceName: name,
  28. }
  29. w.ctx, w.cancel = context.WithCancel(ctx)
  30. w.watchChan = w.watcher.Watch(w.ctx, key, clientv3.WithPrefix(), clientv3.WithRev(0), clientv3.WithKeysOnly())
  31. err := w.watcher.RequestProgress(w.ctx)
  32. if err != nil {
  33. return nil, err
  34. }
  35. return w, nil
  36. }
  37. func (w *watcher) Next() ([]*registry.ServiceInstance, error) {
  38. if w.first {
  39. item, err := w.getInstance()
  40. w.first = false
  41. return item, err
  42. }
  43. select {
  44. case <-w.ctx.Done():
  45. return nil, w.ctx.Err()
  46. case watchResp, ok := <-w.watchChan:
  47. if !ok || watchResp.Err() != nil {
  48. time.Sleep(time.Second)
  49. err := w.reWatch()
  50. if err != nil {
  51. return nil, err
  52. }
  53. }
  54. return w.getInstance()
  55. }
  56. }
  57. func (w *watcher) Stop() error {
  58. w.cancel()
  59. return w.watcher.Close()
  60. }
  61. func (w *watcher) getInstance() ([]*registry.ServiceInstance, error) {
  62. resp, err := w.kv.Get(w.ctx, w.key, clientv3.WithPrefix())
  63. if err != nil {
  64. return nil, err
  65. }
  66. items := make([]*registry.ServiceInstance, 0, len(resp.Kvs))
  67. for _, kv := range resp.Kvs {
  68. si, err := unmarshal(kv.Value)
  69. if err != nil {
  70. return nil, err
  71. }
  72. if si.Name != w.serviceName {
  73. continue
  74. }
  75. items = append(items, si)
  76. }
  77. return items, nil
  78. }
  79. func (w *watcher) reWatch() error {
  80. w.watcher.Close()
  81. w.watcher = clientv3.NewWatcher(w.client)
  82. w.watchChan = w.watcher.Watch(w.ctx, w.key, clientv3.WithPrefix(), clientv3.WithRev(0), clientv3.WithKeysOnly())
  83. return w.watcher.RequestProgress(w.ctx)
  84. }