lihf 8 месяцев назад
Сommit
d561a672d2
11 измененных файлов с 1035 добавлено и 0 удалено
  1. 44 0
      .gitignore
  2. 46 0
      config/README.md
  3. 96 0
      config/config.go
  4. 169 0
      config/config_test.go
  5. 51 0
      config/watcher.go
  6. 31 0
      go.mod
  7. 103 0
      go.sum
  8. 226 0
      register/registry.go
  9. 154 0
      register/registry_test.go
  10. 20 0
      register/service.go
  11. 95 0
      register/watcher.go

+ 44 - 0
.gitignore

@@ -0,0 +1,44 @@
+# Reference https://github.com/github/gitignore/blob/master/Go.gitignore
+# Binaries for programs and plugins
+*.exe
+*.exe~
+*.dll
+*.so
+*.dylib
+
+# Test binary, built with `go test -c`
+*.test
+
+# Output of the go coverage tool, specifically when used with LiteIDE
+*.out
+
+# Dependency directories (remove the comment below to include it)
+vendor/
+
+# Compiled Object files, Static and Dynamic libs (Shared Objects)
+*.o
+*.a
+*.so
+
+# OS General
+Thumbs.db
+.DS_Store
+
+# project
+*.cert
+*.key
+*.log
+bin/
+
+# Develop tools
+.vscode/
+.idea/
+*.swp
+
+cmd/server/server
+cmd/server/server.exe
+#build.sh
+#start.sh
+js
+go.work
+go.work.sum

+ 46 - 0
config/README.md

@@ -0,0 +1,46 @@
+# Etcd Config
+
+```go
+import (
+	"log"
+
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"google.golang.org/grpc"
+
+	cfg "github.com/go-kratos/kratos/contrib/config/etcd/v2"
+	"github.com/go-kratos/kratos/v2/config"
+)
+
+// create an etcd client
+client, err := clientv3.New(clientv3.Config{
+    Endpoints:   []string{"127.0.0.1:2379"},
+    DialTimeout: time.Second,
+    DialOptions: []grpc.DialOption{grpc.WithBlock()},
+})
+if err != nil {
+    log.Fatal(err)
+}
+
+// configure the source, "path" is required
+source, err := cfg.New(client, cfg.WithPath("/app-config"), cfg.WithPrefix(true))
+if err != nil {
+    log.Fatalln(err)
+}
+
+// create a config instance with source
+c := config.New(config.WithSource(source))
+defer c.Close()
+
+// load sources before get
+if err := c.Load(); err != nil {
+    log.Fatalln(err)
+}
+
+// acquire config value
+foo, err := c.Value("/app-config").String()
+if err != nil {
+    log.Fatalln(err)
+}
+
+log.Println(foo)
+```

+ 96 - 0
config/config.go

@@ -0,0 +1,96 @@
+package etcd
+
+import (
+	"context"
+	"errors"
+	"path/filepath"
+	"strings"
+
+	clientv3 "go.etcd.io/etcd/client/v3"
+
+	"github.com/go-kratos/kratos/v2/config"
+)
+
+// Option is etcd config option.
+type Option func(o *options)
+
+type options struct {
+	ctx    context.Context
+	path   string
+	prefix bool
+}
+
+// WithContext with registry context.
+func WithContext(ctx context.Context) Option {
+	return func(o *options) {
+		o.ctx = ctx
+	}
+}
+
+// WithPath is config path
+func WithPath(p string) Option {
+	return func(o *options) {
+		o.path = p
+	}
+}
+
+// WithPrefix is config prefix
+func WithPrefix(prefix bool) Option {
+	return func(o *options) {
+		o.prefix = prefix
+	}
+}
+
+type source struct {
+	client  *clientv3.Client
+	options *options
+}
+
+func New(client *clientv3.Client, opts ...Option) (config.Source, error) {
+	options := &options{
+		ctx:    context.Background(),
+		path:   "",
+		prefix: false,
+	}
+
+	for _, opt := range opts {
+		opt(options)
+	}
+
+	if options.path == "" {
+		return nil, errors.New("path invalid")
+	}
+
+	return &source{
+		client:  client,
+		options: options,
+	}, nil
+}
+
+// Load return the config values
+func (s *source) Load() ([]*config.KeyValue, error) {
+	var opts []clientv3.OpOption
+	if s.options.prefix {
+		opts = append(opts, clientv3.WithPrefix())
+	}
+
+	rsp, err := s.client.Get(s.options.ctx, s.options.path, opts...)
+	if err != nil {
+		return nil, err
+	}
+	kvs := make([]*config.KeyValue, 0, len(rsp.Kvs))
+	for _, item := range rsp.Kvs {
+		k := string(item.Key)
+		kvs = append(kvs, &config.KeyValue{
+			Key:    k,
+			Value:  item.Value,
+			Format: strings.TrimPrefix(filepath.Ext(k), "."),
+		})
+	}
+	return kvs, nil
+}
+
+// Watch return the watcher
+func (s *source) Watch() (config.Watcher, error) {
+	return newWatcher(s), nil
+}

+ 169 - 0
config/config_test.go

@@ -0,0 +1,169 @@
+package etcd
+
+import (
+	"context"
+	"reflect"
+	"testing"
+	"time"
+
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"google.golang.org/grpc"
+)
+
+const testKey = "/kratos/test/config"
+
+func TestConfig(t *testing.T) {
+	client, err := clientv3.New(clientv3.Config{
+		Endpoints:   []string{"127.0.0.1:2379"},
+		DialTimeout: time.Second,
+		DialOptions: []grpc.DialOption{grpc.WithBlock()},
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		_ = client.Close()
+	}()
+	if _, err = client.Put(context.Background(), testKey, "test config"); err != nil {
+		t.Fatal(err)
+	}
+
+	source, err := New(client, WithPath(testKey))
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	kvs, err := source.Load()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(kvs) != 1 || kvs[0].Key != testKey || string(kvs[0].Value) != "test config" {
+		t.Fatal("config error")
+	}
+
+	w, err := source.Watch()
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		_ = w.Stop()
+	}()
+
+	if _, err = client.Put(context.Background(), testKey, "new config"); err != nil {
+		t.Error(err)
+	}
+
+	if kvs, err = w.Next(); err != nil {
+		t.Fatal(err)
+	}
+
+	if len(kvs) != 1 || kvs[0].Key != testKey || string(kvs[0].Value) != "new config" {
+		t.Fatal("config error")
+	}
+
+	if _, err := client.Delete(context.Background(), testKey); err != nil {
+		t.Error(err)
+	}
+}
+
+func TestExtToFormat(t *testing.T) {
+	client, err := clientv3.New(clientv3.Config{
+		Endpoints:   []string{"127.0.0.1:2379"},
+		DialTimeout: time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()},
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		_ = client.Close()
+	}()
+
+	tp := "/kratos/test/ext"
+	tn := "a.bird.json"
+	tk := tp + "/" + tn
+	tc := `{"a":1}`
+	if _, err = client.Put(context.Background(), tk, tc); err != nil {
+		t.Fatal(err)
+	}
+
+	source, err := New(client, WithPath(tp), WithPrefix(true))
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	kvs, err := source.Load()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if !reflect.DeepEqual(len(kvs), 1) {
+		t.Errorf("len(kvs) = %d", len(kvs))
+	}
+	if !reflect.DeepEqual(tk, kvs[0].Key) {
+		t.Errorf("kvs[0].Key is %s", kvs[0].Key)
+	}
+	if !reflect.DeepEqual(tc, string(kvs[0].Value)) {
+		t.Errorf("kvs[0].Value is %s", kvs[0].Value)
+	}
+	if !reflect.DeepEqual("json", kvs[0].Format) {
+		t.Errorf("kvs[0].Format is %s", kvs[0].Format)
+	}
+}
+
+func TestEtcdWithPath(t *testing.T) {
+	tests := []struct {
+		name   string
+		fields string
+		want   string
+	}{
+		{
+			name:   "default",
+			fields: testKey,
+			want:   testKey,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			options := &options{
+				ctx: context.Background(),
+			}
+
+			got := WithPath(tt.fields)
+			got(options)
+
+			if options.path != tt.want {
+				t.Errorf("WithPath(tt.fields) = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestEtcdWithPrefix(t *testing.T) {
+	tests := []struct {
+		name   string
+		fields bool
+		want   bool
+	}{
+		{
+			name:   "default",
+			fields: false,
+			want:   false,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			options := &options{
+				ctx: context.Background(),
+			}
+
+			got := WithPrefix(tt.fields)
+			got(options)
+
+			if options.prefix != tt.want {
+				t.Errorf("WithPrefix(tt.fields) = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}

+ 51 - 0
config/watcher.go

@@ -0,0 +1,51 @@
+package etcd
+
+import (
+	"context"
+
+	clientv3 "go.etcd.io/etcd/client/v3"
+
+	"github.com/go-kratos/kratos/v2/config"
+)
+
+type watcher struct {
+	source *source
+	ch     clientv3.WatchChan
+
+	ctx    context.Context
+	cancel context.CancelFunc
+}
+
+func newWatcher(s *source) *watcher {
+	ctx, cancel := context.WithCancel(context.Background())
+	w := &watcher{
+		source: s,
+		ctx:    ctx,
+		cancel: cancel,
+	}
+
+	var opts []clientv3.OpOption
+	if s.options.prefix {
+		opts = append(opts, clientv3.WithPrefix())
+	}
+	w.ch = s.client.Watch(s.options.ctx, s.options.path, opts...)
+
+	return w
+}
+
+func (w *watcher) Next() ([]*config.KeyValue, error) {
+	select {
+	case resp := <-w.ch:
+		if resp.Err() != nil {
+			return nil, resp.Err()
+		}
+		return w.source.Load()
+	case <-w.ctx.Done():
+		return nil, w.ctx.Err()
+	}
+}
+
+func (w *watcher) Stop() error {
+	w.cancel()
+	return nil
+}

+ 31 - 0
go.mod

@@ -0,0 +1,31 @@
+module git.ikuban.com/server/kratos-etcd
+
+go 1.23.0
+
+require (
+	github.com/go-kratos/kratos/v2 v2.8.3
+	go.etcd.io/etcd/client/v3 v3.5.18
+	google.golang.org/grpc v1.61.1
+)
+
+require (
+	dario.cat/mergo v1.0.0 // indirect
+	github.com/coreos/go-semver v0.3.0 // indirect
+	github.com/coreos/go-systemd/v22 v22.3.2 // indirect
+	github.com/gogo/protobuf v1.3.2 // indirect
+	github.com/golang/protobuf v1.5.4 // indirect
+	github.com/kr/text v0.2.0 // indirect
+	go.etcd.io/etcd/api/v3 v3.5.18 // indirect
+	go.etcd.io/etcd/client/pkg/v3 v3.5.18 // indirect
+	go.uber.org/atomic v1.7.0 // indirect
+	go.uber.org/multierr v1.6.0 // indirect
+	go.uber.org/zap v1.17.0 // indirect
+	golang.org/x/net v0.34.0 // indirect
+	golang.org/x/sys v0.29.0 // indirect
+	golang.org/x/text v0.21.0 // indirect
+	google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 // indirect
+	google.golang.org/genproto/googleapis/api v0.0.0-20240102182953-50ed04b92917 // indirect
+	google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect
+	google.golang.org/protobuf v1.33.0 // indirect
+	gopkg.in/yaml.v3 v3.0.1 // indirect
+)

+ 103 - 0
go.sum

@@ -0,0 +1,103 @@
+dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
+dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
+github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
+github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
+github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
+github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
+github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/go-kratos/kratos/v2 v2.8.3 h1:kkNBq0gvdX+b8cbaN+p6Sdh95DgMhx7GimefXb4o7Ss=
+github.com/go-kratos/kratos/v2 v2.8.3/go.mod h1:+Vfe3FzF0d+BfMdajA11jT0rAyJWublRE/seZQNZVxE=
+github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
+github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
+github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
+github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
+github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
+github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
+github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
+github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
+github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
+github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
+github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
+github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
+github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
+github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+go.etcd.io/etcd/api/v3 v3.5.18 h1:Q4oDAKnmwqTo5lafvB+afbgCDF7E35E4EYV2g+FNGhs=
+go.etcd.io/etcd/api/v3 v3.5.18/go.mod h1:uY03Ob2H50077J7Qq0DeehjM/A9S8PhVfbQ1mSaMopU=
+go.etcd.io/etcd/client/pkg/v3 v3.5.18 h1:mZPOYw4h8rTk7TeJ5+3udUkfVGBqc+GCjOJYd68QgNM=
+go.etcd.io/etcd/client/pkg/v3 v3.5.18/go.mod h1:BxVf2o5wXG9ZJV+/Cu7QNUiJYk4A29sAhoI5tIRsCu4=
+go.etcd.io/etcd/client/v3 v3.5.18 h1:nvvYmNHGumkDjZhTHgVU36A9pykGa2K4lAJ0yY7hcXA=
+go.etcd.io/etcd/client/v3 v3.5.18/go.mod h1:kmemwOsPU9broExyhYsBxX4spCTDX3yLgPMWtpBXG6E=
+go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
+go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
+go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
+go.uber.org/zap v1.17.0 h1:MTjgFu6ZLKvY6Pvaqk97GlxNBuMpV4Hy/3P6tRGlI2U=
+go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
+golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
+golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
+golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
+golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
+golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 h1:YJ5pD9rF8o9Qtta0Cmy9rdBwkSjrTCT6XTiUQVOtIos=
+google.golang.org/genproto v0.0.0-20231212172506-995d672761c0/go.mod h1:l/k7rMz0vFTBPy+tFSGvXEd3z+BcoG1k7EHbqm+YBsY=
+google.golang.org/genproto/googleapis/api v0.0.0-20240102182953-50ed04b92917 h1:rcS6EyEaoCO52hQDupoSfrxI3R6C2Tq741is7X8OvnM=
+google.golang.org/genproto/googleapis/api v0.0.0-20240102182953-50ed04b92917/go.mod h1:CmlNWB9lSezaYELKS5Ym1r44VrrbPUa7JTvw+6MbpJ0=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 h1:6G8oQ016D88m1xAKljMlBOOGWDZkes4kMhgGFlf8WcQ=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917/go.mod h1:xtjpI3tXFPP051KaWnhvxkiubL/6dJ18vLVf7q2pTOU=
+google.golang.org/grpc v1.61.1 h1:kLAiWrZs7YeDM6MumDe7m3y4aM6wacLzM1Y/wiLP9XY=
+google.golang.org/grpc v1.61.1/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs=
+google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
+google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
+gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
+gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

+ 226 - 0
register/registry.go

@@ -0,0 +1,226 @@
+package register
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"time"
+
+	clientv3 "go.etcd.io/etcd/client/v3"
+
+	"github.com/go-kratos/kratos/v2/registry"
+)
+
+var (
+	_ registry.Registrar = (*Registry)(nil)
+	_ registry.Discovery = (*Registry)(nil)
+)
+
+// Option is etcd registry option.
+type Option func(o *options)
+
+type options struct {
+	ctx       context.Context
+	namespace string
+	ttl       time.Duration
+	maxRetry  int
+}
+
+// Context with registry context.
+func Context(ctx context.Context) Option {
+	return func(o *options) { o.ctx = ctx }
+}
+
+// Namespace with registry namespace.
+func Namespace(ns string) Option {
+	return func(o *options) { o.namespace = ns }
+}
+
+// RegisterTTL with register ttl.
+func RegisterTTL(ttl time.Duration) Option {
+	return func(o *options) { o.ttl = ttl }
+}
+
+func MaxRetry(num int) Option {
+	return func(o *options) { o.maxRetry = num }
+}
+
+// Registry is etcd registry.
+type Registry struct {
+	opts   *options
+	client *clientv3.Client
+	kv     clientv3.KV
+	lease  clientv3.Lease
+	/*
+		ctxMap is used to store the context cancel function of each service instance.
+		When the service instance is deregistered, the corresponding context cancel function is called to stop the heartbeat.
+	*/
+	ctxMap map[*registry.ServiceInstance]context.CancelFunc
+}
+
+// New creates etcd registry
+func New(client *clientv3.Client, opts ...Option) (r *Registry) {
+	op := &options{
+		ctx:       context.Background(),
+		namespace: "/microservices",
+		ttl:       time.Second * 15,
+		maxRetry:  5,
+	}
+	for _, o := range opts {
+		o(op)
+	}
+	return &Registry{
+		opts:   op,
+		client: client,
+		kv:     clientv3.NewKV(client),
+		ctxMap: make(map[*registry.ServiceInstance]context.CancelFunc),
+	}
+}
+
+// Register the registration.
+func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstance) error {
+	key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID)
+	value, err := marshal(service)
+	if err != nil {
+		return err
+	}
+	if r.lease != nil {
+		r.lease.Close()
+	}
+	r.lease = clientv3.NewLease(r.client)
+	leaseID, err := r.registerWithKV(ctx, key, value)
+	if err != nil {
+		return err
+	}
+
+	hctx, cancel := context.WithCancel(r.opts.ctx)
+	r.ctxMap[service] = cancel
+	go r.heartBeat(hctx, leaseID, key, value)
+	return nil
+}
+
+// Deregister the registration.
+func (r *Registry) Deregister(ctx context.Context, service *registry.ServiceInstance) error {
+	defer func() {
+		if r.lease != nil {
+			r.lease.Close()
+		}
+	}()
+	// cancel heartbeat
+	if cancel, ok := r.ctxMap[service]; ok {
+		cancel()
+		delete(r.ctxMap, service)
+	}
+	key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID)
+	_, err := r.client.Delete(ctx, key)
+	return err
+}
+
+// GetService return the service instances in memory according to the service name.
+func (r *Registry) GetService(ctx context.Context, name string) ([]*registry.ServiceInstance, error) {
+	key := fmt.Sprintf("%s/%s", r.opts.namespace, name)
+	resp, err := r.kv.Get(ctx, key, clientv3.WithPrefix())
+	if err != nil {
+		return nil, err
+	}
+	items := make([]*registry.ServiceInstance, 0, len(resp.Kvs))
+	for _, kv := range resp.Kvs {
+		si, err := unmarshal(kv.Value)
+		if err != nil {
+			return nil, err
+		}
+		if si.Name != name {
+			continue
+		}
+		items = append(items, si)
+	}
+	return items, nil
+}
+
+// Watch creates a watcher according to the service name.
+func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, error) {
+	key := fmt.Sprintf("%s/%s", r.opts.namespace, name)
+	return newWatcher(ctx, key, name, r.client)
+}
+
+// registerWithKV create a new lease, return current leaseID
+func (r *Registry) registerWithKV(ctx context.Context, key string, value string) (clientv3.LeaseID, error) {
+	grant, err := r.lease.Grant(ctx, int64(r.opts.ttl.Seconds()))
+	if err != nil {
+		return 0, err
+	}
+	_, err = r.client.Put(ctx, key, value, clientv3.WithLease(grant.ID))
+	if err != nil {
+		return 0, err
+	}
+	return grant.ID, nil
+}
+
+func (r *Registry) heartBeat(ctx context.Context, leaseID clientv3.LeaseID, key string, value string) {
+	curLeaseID := leaseID
+	kac, err := r.client.KeepAlive(ctx, leaseID)
+	if err != nil {
+		curLeaseID = 0
+	}
+	randSource := rand.New(rand.NewSource(time.Now().Unix()))
+
+	for {
+		if curLeaseID == 0 {
+			// try to registerWithKV
+			var retreat []int
+			for retryCnt := 0; retryCnt < r.opts.maxRetry; retryCnt++ {
+				if ctx.Err() != nil {
+					return
+				}
+				// prevent infinite blocking
+				idChan := make(chan clientv3.LeaseID, 1)
+				errChan := make(chan error, 1)
+				cancelCtx, cancel := context.WithCancel(ctx)
+				go func() {
+					defer cancel()
+					id, registerErr := r.registerWithKV(cancelCtx, key, value)
+					if registerErr != nil {
+						errChan <- registerErr
+					} else {
+						idChan <- id
+					}
+				}()
+
+				select {
+				case <-time.After(3 * time.Second):
+					cancel()
+					continue
+				case <-errChan:
+					continue
+				case curLeaseID = <-idChan:
+				}
+
+				kac, err = r.client.KeepAlive(ctx, curLeaseID)
+				if err == nil {
+					break
+				}
+				retreat = append(retreat, 1<<retryCnt)
+				time.Sleep(time.Duration(retreat[randSource.Intn(len(retreat))]) * time.Second)
+			}
+			if _, ok := <-kac; !ok {
+				// retry failed
+				return
+			}
+		}
+
+		select {
+		case _, ok := <-kac:
+			if !ok {
+				if ctx.Err() != nil {
+					// channel closed due to context cancel
+					return
+				}
+				// need to retry registration
+				curLeaseID = 0
+				continue
+			}
+		case <-r.opts.ctx.Done():
+			return
+		}
+	}
+}

+ 154 - 0
register/registry_test.go

@@ -0,0 +1,154 @@
+package register
+
+import (
+	"context"
+	"fmt"
+	"testing"
+	"time"
+
+	"google.golang.org/grpc"
+
+	clientv3 "go.etcd.io/etcd/client/v3"
+
+	"github.com/go-kratos/kratos/v2/registry"
+)
+
+func TestRegistry(t *testing.T) {
+	client, err := clientv3.New(clientv3.Config{
+		Endpoints:   []string{"127.0.0.1:2379"},
+		DialTimeout: time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()},
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+
+	ctx := context.Background()
+	s := &registry.ServiceInstance{
+		ID:   "0",
+		Name: "helloworld",
+	}
+
+	r := New(client)
+	w, err := r.Watch(ctx, s.Name)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		_ = w.Stop()
+	}()
+	go func() {
+		for {
+			res, err1 := w.Next()
+			if err1 != nil {
+				return
+			}
+			t.Logf("watch: %d", len(res))
+			for _, r := range res {
+				t.Logf("next: %+v", r)
+			}
+		}
+	}()
+	time.Sleep(time.Second)
+
+	if err1 := r.Register(ctx, s); err1 != nil {
+		t.Fatal(err1)
+	}
+	time.Sleep(time.Second)
+
+	res, err := r.GetService(ctx, s.Name)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(res) != 1 && res[0].Name != s.Name {
+		t.Errorf("not expected: %+v", res)
+	}
+
+	if err1 := r.Deregister(ctx, s); err1 != nil {
+		t.Fatal(err1)
+	}
+	time.Sleep(time.Second)
+
+	res, err = r.GetService(ctx, s.Name)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(res) != 0 {
+		t.Errorf("not expected empty")
+	}
+}
+
+func TestHeartBeat(t *testing.T) {
+	client, err := clientv3.New(clientv3.Config{
+		Endpoints:   []string{"127.0.0.1:2379"},
+		DialTimeout: time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()},
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+
+	ctx := context.Background()
+	s := &registry.ServiceInstance{
+		ID:   "0",
+		Name: "helloworld",
+	}
+
+	go func() {
+		r := New(client)
+		w, err1 := r.Watch(ctx, s.Name)
+		if err1 != nil {
+			return
+		}
+		defer func() {
+			_ = w.Stop()
+		}()
+		for {
+			res, err2 := w.Next()
+			if err2 != nil {
+				return
+			}
+			t.Logf("watch: %d", len(res))
+			for _, r := range res {
+				t.Logf("next: %+v", r)
+			}
+		}
+	}()
+	time.Sleep(time.Second)
+
+	// new a server
+	r := New(client,
+		RegisterTTL(2*time.Second),
+		MaxRetry(5),
+	)
+
+	key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, s.Name, s.ID)
+	value, _ := marshal(s)
+	r.lease = clientv3.NewLease(r.client)
+	leaseID, err := r.registerWithKV(ctx, key, value)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// wait for lease expired
+	time.Sleep(3 * time.Second)
+
+	res, err := r.GetService(ctx, s.Name)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(res) != 0 {
+		t.Errorf("not expected empty")
+	}
+
+	go r.heartBeat(ctx, leaseID, key, value)
+
+	time.Sleep(time.Second)
+	res, err = r.GetService(ctx, s.Name)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(res) == 0 {
+		t.Errorf("reconnect failed")
+	}
+}

+ 20 - 0
register/service.go

@@ -0,0 +1,20 @@
+package register
+
+import (
+	"encoding/json"
+
+	"github.com/go-kratos/kratos/v2/registry"
+)
+
+func marshal(si *registry.ServiceInstance) (string, error) {
+	data, err := json.Marshal(si)
+	if err != nil {
+		return "", err
+	}
+	return string(data), nil
+}
+
+func unmarshal(data []byte) (si *registry.ServiceInstance, err error) {
+	err = json.Unmarshal(data, &si)
+	return
+}

+ 95 - 0
register/watcher.go

@@ -0,0 +1,95 @@
+package register
+
+import (
+	"context"
+	"time"
+
+	clientv3 "go.etcd.io/etcd/client/v3"
+
+	"github.com/go-kratos/kratos/v2/registry"
+)
+
+var _ registry.Watcher = (*watcher)(nil)
+
+type watcher struct {
+	key         string
+	ctx         context.Context
+	cancel      context.CancelFunc
+	client      *clientv3.Client
+	watchChan   clientv3.WatchChan
+	watcher     clientv3.Watcher
+	kv          clientv3.KV
+	first       bool
+	serviceName string
+}
+
+func newWatcher(ctx context.Context, key, name string, client *clientv3.Client) (*watcher, error) {
+	w := &watcher{
+		key:         key,
+		client:      client,
+		watcher:     clientv3.NewWatcher(client),
+		kv:          clientv3.NewKV(client),
+		first:       true,
+		serviceName: name,
+	}
+	w.ctx, w.cancel = context.WithCancel(ctx)
+	w.watchChan = w.watcher.Watch(w.ctx, key, clientv3.WithPrefix(), clientv3.WithRev(0), clientv3.WithKeysOnly())
+	err := w.watcher.RequestProgress(w.ctx)
+	if err != nil {
+		return nil, err
+	}
+	return w, nil
+}
+
+func (w *watcher) Next() ([]*registry.ServiceInstance, error) {
+	if w.first {
+		item, err := w.getInstance()
+		w.first = false
+		return item, err
+	}
+
+	select {
+	case <-w.ctx.Done():
+		return nil, w.ctx.Err()
+	case watchResp, ok := <-w.watchChan:
+		if !ok || watchResp.Err() != nil {
+			time.Sleep(time.Second)
+			err := w.reWatch()
+			if err != nil {
+				return nil, err
+			}
+		}
+		return w.getInstance()
+	}
+}
+
+func (w *watcher) Stop() error {
+	w.cancel()
+	return w.watcher.Close()
+}
+
+func (w *watcher) getInstance() ([]*registry.ServiceInstance, error) {
+	resp, err := w.kv.Get(w.ctx, w.key, clientv3.WithPrefix())
+	if err != nil {
+		return nil, err
+	}
+	items := make([]*registry.ServiceInstance, 0, len(resp.Kvs))
+	for _, kv := range resp.Kvs {
+		si, err := unmarshal(kv.Value)
+		if err != nil {
+			return nil, err
+		}
+		if si.Name != w.serviceName {
+			continue
+		}
+		items = append(items, si)
+	}
+	return items, nil
+}
+
+func (w *watcher) reWatch() error {
+	w.watcher.Close()
+	w.watcher = clientv3.NewWatcher(w.client)
+	w.watchChan = w.watcher.Watch(w.ctx, w.key, clientv3.WithPrefix(), clientv3.WithRev(0), clientv3.WithKeysOnly())
+	return w.watcher.RequestProgress(w.ctx)
+}