| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- package register
- import (
- "context"
- "fmt"
- "testing"
- "time"
- "google.golang.org/grpc"
- clientv3 "go.etcd.io/etcd/client/v3"
- "github.com/go-kratos/kratos/v2/registry"
- )
- func TestRegistry(t *testing.T) {
- client, err := clientv3.New(clientv3.Config{
- Endpoints: []string{"127.0.0.1:2379"},
- DialTimeout: time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()},
- })
- if err != nil {
- t.Fatal(err)
- }
- defer client.Close()
- ctx := context.Background()
- s := ®istry.ServiceInstance{
- ID: "0",
- Name: "helloworld",
- }
- r := New(client)
- w, err := r.Watch(ctx, s.Name)
- if err != nil {
- t.Fatal(err)
- }
- defer func() {
- _ = w.Stop()
- }()
- go func() {
- for {
- res, err1 := w.Next()
- if err1 != nil {
- return
- }
- t.Logf("watch: %d", len(res))
- for _, r := range res {
- t.Logf("next: %+v", r)
- }
- }
- }()
- time.Sleep(time.Second)
- if err1 := r.Register(ctx, s); err1 != nil {
- t.Fatal(err1)
- }
- time.Sleep(time.Second)
- res, err := r.GetService(ctx, s.Name)
- if err != nil {
- t.Fatal(err)
- }
- if len(res) != 1 && res[0].Name != s.Name {
- t.Errorf("not expected: %+v", res)
- }
- if err1 := r.Deregister(ctx, s); err1 != nil {
- t.Fatal(err1)
- }
- time.Sleep(time.Second)
- res, err = r.GetService(ctx, s.Name)
- if err != nil {
- t.Fatal(err)
- }
- if len(res) != 0 {
- t.Errorf("not expected empty")
- }
- }
- func TestHeartBeat(t *testing.T) {
- client, err := clientv3.New(clientv3.Config{
- Endpoints: []string{"127.0.0.1:2379"},
- DialTimeout: time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()},
- })
- if err != nil {
- t.Fatal(err)
- }
- defer client.Close()
- ctx := context.Background()
- s := ®istry.ServiceInstance{
- ID: "0",
- Name: "helloworld",
- }
- go func() {
- r := New(client)
- w, err1 := r.Watch(ctx, s.Name)
- if err1 != nil {
- return
- }
- defer func() {
- _ = w.Stop()
- }()
- for {
- res, err2 := w.Next()
- if err2 != nil {
- return
- }
- t.Logf("watch: %d", len(res))
- for _, r := range res {
- t.Logf("next: %+v", r)
- }
- }
- }()
- time.Sleep(time.Second)
- // new a server
- r := New(client,
- RegisterTTL(2*time.Second),
- MaxRetry(5),
- )
- key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, s.Name, s.ID)
- value, _ := marshal(s)
- r.lease = clientv3.NewLease(r.client)
- leaseID, err := r.registerWithKV(ctx, key, value)
- if err != nil {
- t.Fatal(err)
- }
- // wait for lease expired
- time.Sleep(3 * time.Second)
- res, err := r.GetService(ctx, s.Name)
- if err != nil {
- t.Fatal(err)
- }
- if len(res) != 0 {
- t.Errorf("not expected empty")
- }
- go r.heartBeat(ctx, leaseID, key, value)
- time.Sleep(time.Second)
- res, err = r.GetService(ctx, s.Name)
- if err != nil {
- t.Fatal(err)
- }
- if len(res) == 0 {
- t.Errorf("reconnect failed")
- }
- }
|