| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- package register
- import (
- "context"
- "fmt"
- "math/rand"
- "time"
- clientv3 "go.etcd.io/etcd/client/v3"
- "github.com/go-kratos/kratos/v2/registry"
- )
- var (
- _ registry.Registrar = (*Registry)(nil)
- _ registry.Discovery = (*Registry)(nil)
- )
- // Option is etcd registry option.
- type Option func(o *options)
- type options struct {
- ctx context.Context
- namespace string
- ttl time.Duration
- maxRetry int
- }
- // Context with registry context.
- func Context(ctx context.Context) Option {
- return func(o *options) { o.ctx = ctx }
- }
- // Namespace with registry namespace.
- func Namespace(ns string) Option {
- return func(o *options) { o.namespace = ns }
- }
- // RegisterTTL with register ttl.
- func RegisterTTL(ttl time.Duration) Option {
- return func(o *options) { o.ttl = ttl }
- }
- func MaxRetry(num int) Option {
- return func(o *options) { o.maxRetry = num }
- }
- // Registry is etcd registry.
- type Registry struct {
- opts *options
- client *clientv3.Client
- kv clientv3.KV
- lease clientv3.Lease
- /*
- ctxMap is used to store the context cancel function of each service instance.
- When the service instance is deregistered, the corresponding context cancel function is called to stop the heartbeat.
- */
- ctxMap map[*registry.ServiceInstance]context.CancelFunc
- }
- // New creates etcd registry
- func New(client *clientv3.Client, opts ...Option) (r *Registry) {
- op := &options{
- ctx: context.Background(),
- namespace: "/microservices",
- ttl: time.Second * 15,
- maxRetry: 5,
- }
- for _, o := range opts {
- o(op)
- }
- return &Registry{
- opts: op,
- client: client,
- kv: clientv3.NewKV(client),
- ctxMap: make(map[*registry.ServiceInstance]context.CancelFunc),
- }
- }
- // Register the registration.
- func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstance) error {
- key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID)
- value, err := marshal(service)
- if err != nil {
- return err
- }
- if r.lease != nil {
- r.lease.Close()
- }
- r.lease = clientv3.NewLease(r.client)
- leaseID, err := r.registerWithKV(ctx, key, value)
- if err != nil {
- return err
- }
- hctx, cancel := context.WithCancel(r.opts.ctx)
- r.ctxMap[service] = cancel
- go r.heartBeat(hctx, leaseID, key, value)
- return nil
- }
- // Deregister the registration.
- func (r *Registry) Deregister(ctx context.Context, service *registry.ServiceInstance) error {
- defer func() {
- if r.lease != nil {
- r.lease.Close()
- }
- }()
- // cancel heartbeat
- if cancel, ok := r.ctxMap[service]; ok {
- cancel()
- delete(r.ctxMap, service)
- }
- key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID)
- _, err := r.client.Delete(ctx, key)
- return err
- }
- // GetService return the service instances in memory according to the service name.
- func (r *Registry) GetService(ctx context.Context, name string) ([]*registry.ServiceInstance, error) {
- key := fmt.Sprintf("%s/%s", r.opts.namespace, name)
- resp, err := r.kv.Get(ctx, key, clientv3.WithPrefix())
- if err != nil {
- return nil, err
- }
- items := make([]*registry.ServiceInstance, 0, len(resp.Kvs))
- for _, kv := range resp.Kvs {
- si, err := unmarshal(kv.Value)
- if err != nil {
- return nil, err
- }
- if si.Name != name {
- continue
- }
- items = append(items, si)
- }
- return items, nil
- }
- // Watch creates a watcher according to the service name.
- func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, error) {
- key := fmt.Sprintf("%s/%s", r.opts.namespace, name)
- return newWatcher(ctx, key, name, r.client)
- }
- // registerWithKV create a new lease, return current leaseID
- func (r *Registry) registerWithKV(ctx context.Context, key string, value string) (clientv3.LeaseID, error) {
- grant, err := r.lease.Grant(ctx, int64(r.opts.ttl.Seconds()))
- if err != nil {
- return 0, err
- }
- _, err = r.client.Put(ctx, key, value, clientv3.WithLease(grant.ID))
- if err != nil {
- return 0, err
- }
- return grant.ID, nil
- }
- func (r *Registry) heartBeat(ctx context.Context, leaseID clientv3.LeaseID, key string, value string) {
- curLeaseID := leaseID
- kac, err := r.client.KeepAlive(ctx, leaseID)
- if err != nil {
- curLeaseID = 0
- }
- randSource := rand.New(rand.NewSource(time.Now().Unix()))
- for {
- if curLeaseID == 0 {
- // try to registerWithKV
- var retreat []int
- for retryCnt := 0; retryCnt < r.opts.maxRetry; retryCnt++ {
- if ctx.Err() != nil {
- return
- }
- // prevent infinite blocking
- idChan := make(chan clientv3.LeaseID, 1)
- errChan := make(chan error, 1)
- cancelCtx, cancel := context.WithCancel(ctx)
- go func() {
- defer cancel()
- id, registerErr := r.registerWithKV(cancelCtx, key, value)
- if registerErr != nil {
- errChan <- registerErr
- } else {
- idChan <- id
- }
- }()
- select {
- case <-time.After(3 * time.Second):
- cancel()
- continue
- case <-errChan:
- continue
- case curLeaseID = <-idChan:
- }
- kac, err = r.client.KeepAlive(ctx, curLeaseID)
- if err == nil {
- break
- }
- retreat = append(retreat, 1<<retryCnt)
- time.Sleep(time.Duration(retreat[randSource.Intn(len(retreat))]) * time.Second)
- }
- if _, ok := <-kac; !ok {
- // retry failed
- return
- }
- }
- select {
- case _, ok := <-kac:
- if !ok {
- if ctx.Err() != nil {
- // channel closed due to context cancel
- return
- }
- // need to retry registration
- curLeaseID = 0
- continue
- }
- case <-r.opts.ctx.Done():
- return
- }
- }
- }
|