watcher.go 857 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package etcd
  2. import (
  3. "context"
  4. clientv3 "go.etcd.io/etcd/client/v3"
  5. "github.com/go-kratos/kratos/v2/config"
  6. )
  7. type watcher struct {
  8. source *source
  9. ch clientv3.WatchChan
  10. ctx context.Context
  11. cancel context.CancelFunc
  12. }
  13. func newWatcher(s *source) *watcher {
  14. ctx, cancel := context.WithCancel(context.Background())
  15. w := &watcher{
  16. source: s,
  17. ctx: ctx,
  18. cancel: cancel,
  19. }
  20. var opts []clientv3.OpOption
  21. if s.options.prefix {
  22. opts = append(opts, clientv3.WithPrefix())
  23. }
  24. w.ch = s.client.Watch(s.options.ctx, s.options.path, opts...)
  25. return w
  26. }
  27. func (w *watcher) Next() ([]*config.KeyValue, error) {
  28. select {
  29. case resp := <-w.ch:
  30. if resp.Err() != nil {
  31. return nil, resp.Err()
  32. }
  33. return w.source.Load()
  34. case <-w.ctx.Done():
  35. return nil, w.ctx.Err()
  36. }
  37. }
  38. func (w *watcher) Stop() error {
  39. w.cancel()
  40. return nil
  41. }