| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 | package registryimport (	"context"	"fmt"	"math/rand"	"time"	clientv3 "go.etcd.io/etcd/client/v3"	"github.com/go-kratos/kratos/v2/registry")var (	_ registry.Registrar = (*Registry)(nil)	_ registry.Discovery = (*Registry)(nil))// Option is etcd registry option.type Option func(o *options)type options struct {	ctx       context.Context	namespace string	ttl       time.Duration	maxRetry  int}// Context with registry context.func Context(ctx context.Context) Option {	return func(o *options) { o.ctx = ctx }}// Namespace with registry namespace.func Namespace(ns string) Option {	return func(o *options) { o.namespace = ns }}// RegisterTTL with register ttl.func RegisterTTL(ttl time.Duration) Option {	return func(o *options) { o.ttl = ttl }}func MaxRetry(num int) Option {	return func(o *options) { o.maxRetry = num }}// Registry is etcd registry.type Registry struct {	opts   *options	client *clientv3.Client	kv     clientv3.KV	lease  clientv3.Lease	/*		ctxMap is used to store the context cancel function of each service instance.		When the service instance is deregistered, the corresponding context cancel function is called to stop the heartbeat.	*/	ctxMap map[*registry.ServiceInstance]context.CancelFunc}// New creates etcd registryfunc New(client *clientv3.Client, opts ...Option) (r *Registry) {	op := &options{		ctx:       context.Background(),		namespace: "/microservices",		ttl:       time.Second * 15,		maxRetry:  5,	}	for _, o := range opts {		o(op)	}	return &Registry{		opts:   op,		client: client,		kv:     clientv3.NewKV(client),		ctxMap: make(map[*registry.ServiceInstance]context.CancelFunc),	}}// Register the registration.func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstance) error {	key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID)	value, err := marshal(service)	if err != nil {		return err	}	if r.lease != nil {		r.lease.Close()	}	r.lease = clientv3.NewLease(r.client)	leaseID, err := r.registerWithKV(ctx, key, value)	if err != nil {		return err	}	hctx, cancel := context.WithCancel(r.opts.ctx)	r.ctxMap[service] = cancel	go r.heartBeat(hctx, leaseID, key, value)	return nil}// Deregister the registration.func (r *Registry) Deregister(ctx context.Context, service *registry.ServiceInstance) error {	defer func() {		if r.lease != nil {			r.lease.Close()		}	}()	// cancel heartbeat	if cancel, ok := r.ctxMap[service]; ok {		cancel()		delete(r.ctxMap, service)	}	key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID)	_, err := r.client.Delete(ctx, key)	return err}// GetService return the service instances in memory according to the service name.func (r *Registry) GetService(ctx context.Context, name string) ([]*registry.ServiceInstance, error) {	key := fmt.Sprintf("%s/%s", r.opts.namespace, name)	resp, err := r.kv.Get(ctx, key, clientv3.WithPrefix())	if err != nil {		return nil, err	}	items := make([]*registry.ServiceInstance, 0, len(resp.Kvs))	for _, kv := range resp.Kvs {		si, err := unmarshal(kv.Value)		if err != nil {			return nil, err		}		if si.Name != name {			continue		}		items = append(items, si)	}	return items, nil}// Watch creates a watcher according to the service name.func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, error) {	key := fmt.Sprintf("%s/%s", r.opts.namespace, name)	return newWatcher(ctx, key, name, r.client)}// registerWithKV create a new lease, return current leaseIDfunc (r *Registry) registerWithKV(ctx context.Context, key string, value string) (clientv3.LeaseID, error) {	grant, err := r.lease.Grant(ctx, int64(r.opts.ttl.Seconds()))	if err != nil {		return 0, err	}	_, err = r.client.Put(ctx, key, value, clientv3.WithLease(grant.ID))	if err != nil {		return 0, err	}	return grant.ID, nil}func (r *Registry) heartBeat(ctx context.Context, leaseID clientv3.LeaseID, key string, value string) {	curLeaseID := leaseID	kac, err := r.client.KeepAlive(ctx, leaseID)	if err != nil {		curLeaseID = 0	}	randSource := rand.New(rand.NewSource(time.Now().Unix()))	for {		if curLeaseID == 0 {			// try to registerWithKV			var retreat []int			for retryCnt := 0; retryCnt < r.opts.maxRetry; retryCnt++ {				if ctx.Err() != nil {					return				}				// prevent infinite blocking				idChan := make(chan clientv3.LeaseID, 1)				errChan := make(chan error, 1)				cancelCtx, cancel := context.WithCancel(ctx)				go func() {					defer cancel()					id, registerErr := r.registerWithKV(cancelCtx, key, value)					if registerErr != nil {						errChan <- registerErr					} else {						idChan <- id					}				}()				select {				case <-time.After(3 * time.Second):					cancel()					continue				case <-errChan:					continue				case curLeaseID = <-idChan:				}				kac, err = r.client.KeepAlive(ctx, curLeaseID)				if err == nil {					break				}				retreat = append(retreat, 1<<retryCnt)				time.Sleep(time.Duration(retreat[randSource.Intn(len(retreat))]) * time.Second)			}			if _, ok := <-kac; !ok {				// retry failed				return			}		}		select {		case _, ok := <-kac:			if !ok {				if ctx.Err() != nil {					// channel closed due to context cancel					return				}				// need to retry registration				curLeaseID = 0				continue			}		case <-r.opts.ctx.Done():			return		}	}}
 |