server.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package swagger_api
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "sort"
  10. "sync"
  11. "github.com/go-kratos/kratos/v2/api/metadata"
  12. "google.golang.org/grpc"
  13. "google.golang.org/grpc/codes"
  14. "google.golang.org/grpc/status"
  15. "google.golang.org/protobuf/proto"
  16. "google.golang.org/protobuf/reflect/protodesc"
  17. "google.golang.org/protobuf/reflect/protoreflect"
  18. "google.golang.org/protobuf/reflect/protoregistry"
  19. dpb "google.golang.org/protobuf/types/descriptorpb"
  20. "github.com/go-kratos/kratos/v2/log"
  21. )
  22. // Server is api meta server
  23. type Server struct {
  24. metadata.UnimplementedMetadataServer
  25. srv *grpc.Server
  26. lock sync.Mutex
  27. services map[string]*dpb.FileDescriptorSet
  28. methods map[string][]string
  29. SkipError bool
  30. }
  31. // NewServer create server instance
  32. func NewServer(srv *grpc.Server, skipError bool) *Server {
  33. return &Server{
  34. srv: srv,
  35. SkipError: skipError,
  36. services: make(map[string]*dpb.FileDescriptorSet),
  37. methods: make(map[string][]string),
  38. }
  39. }
  40. func (s *Server) load() error {
  41. if len(s.services) > 0 {
  42. return nil
  43. }
  44. if s.srv != nil {
  45. for name, info := range s.srv.GetServiceInfo() {
  46. fd, err := s.parseMetadata(info.Metadata)
  47. if err != nil {
  48. return fmt.Errorf("invalid service %s metadata err:%v", name, err)
  49. }
  50. protoSet, err := s.allDependency(fd)
  51. if err != nil {
  52. return err
  53. }
  54. s.services[name] = &dpb.FileDescriptorSet{File: protoSet}
  55. for _, method := range info.Methods {
  56. s.methods[name] = append(s.methods[name], method.Name)
  57. }
  58. }
  59. return nil
  60. }
  61. var err error
  62. protoregistry.GlobalFiles.RangeFiles(func(fd protoreflect.FileDescriptor) bool {
  63. if fd.Services() == nil {
  64. return true
  65. }
  66. for i := 0; i < fd.Services().Len(); i++ {
  67. svc := fd.Services().Get(i)
  68. fdp, e := fileDescriptorProto(fd.Path())
  69. if e != nil {
  70. if s.SkipError {
  71. continue
  72. }
  73. err = e
  74. return false
  75. }
  76. fdps, e := s.allDependency(fdp)
  77. if e != nil {
  78. if errors.Is(e, protoregistry.NotFound) {
  79. // Skip this service if one of its dependencies is not found.
  80. continue
  81. }
  82. err = e
  83. return false
  84. }
  85. s.services[string(svc.FullName())] = &dpb.FileDescriptorSet{File: fdps}
  86. if svc.Methods() == nil {
  87. continue
  88. }
  89. for j := 0; j < svc.Methods().Len(); j++ {
  90. method := svc.Methods().Get(j)
  91. s.methods[string(svc.FullName())] = append(s.methods[string(svc.FullName())], string(method.Name()))
  92. }
  93. }
  94. return true
  95. })
  96. return err
  97. }
  98. // ListServices return all services
  99. func (s *Server) ListServices(_ context.Context, _ *metadata.ListServicesRequest) (*metadata.ListServicesReply, error) {
  100. s.lock.Lock()
  101. defer s.lock.Unlock()
  102. if err := s.load(); err != nil {
  103. return nil, err
  104. }
  105. reply := &metadata.ListServicesReply{
  106. Services: make([]string, 0, len(s.services)),
  107. Methods: make([]string, 0, len(s.methods)),
  108. }
  109. for name := range s.services {
  110. reply.Services = append(reply.Services, name)
  111. }
  112. for name, methods := range s.methods {
  113. for _, method := range methods {
  114. reply.Methods = append(reply.Methods, fmt.Sprintf("/%s/%s", name, method))
  115. }
  116. }
  117. sort.Strings(reply.Services)
  118. sort.Strings(reply.Methods)
  119. return reply, nil
  120. }
  121. // GetServiceDesc return service meta by name
  122. func (s *Server) GetServiceDesc(_ context.Context, in *metadata.GetServiceDescRequest) (*metadata.GetServiceDescReply, error) {
  123. s.lock.Lock()
  124. defer s.lock.Unlock()
  125. if err := s.load(); err != nil {
  126. return nil, err
  127. }
  128. fds, ok := s.services[in.Name]
  129. if !ok {
  130. return nil, status.Errorf(codes.NotFound, "service %s not found", in.Name)
  131. }
  132. return &metadata.GetServiceDescReply{FileDescSet: fds}, nil
  133. }
  134. // parseMetadata finds the file descriptor bytes specified meta.
  135. // For SupportPackageIsVersion4, m is the name of the proto file, we
  136. // call proto.FileDescriptor to get the byte slice.
  137. // For SupportPackageIsVersion3, m is a byte slice itself.
  138. func (s *Server) parseMetadata(meta interface{}) (*dpb.FileDescriptorProto, error) {
  139. // Check if meta is the file name.
  140. if fileNameForMeta, ok := meta.(string); ok {
  141. return fileDescriptorProto(fileNameForMeta)
  142. }
  143. // Check if meta is the byte slice.
  144. if enc, ok := meta.([]byte); ok {
  145. return s.decodeFileDesc(enc)
  146. }
  147. return nil, fmt.Errorf("proto not sumpport metadata: %v", meta)
  148. }
  149. // decodeFileDesc does decompression and unmarshalling on the given
  150. // file descriptor byte slice.
  151. func (s *Server) decodeFileDesc(enc []byte) (*dpb.FileDescriptorProto, error) {
  152. raw, err := s.decompress(enc)
  153. if err != nil {
  154. return nil, fmt.Errorf("failed to decompress enc: %v", err)
  155. }
  156. fd := new(dpb.FileDescriptorProto)
  157. if err := proto.Unmarshal(raw, fd); err != nil {
  158. return nil, fmt.Errorf("bad descriptor: %v", err)
  159. }
  160. return fd, nil
  161. }
  162. func (s *Server) allDependency(fd *dpb.FileDescriptorProto) ([]*dpb.FileDescriptorProto, error) {
  163. var files []*dpb.FileDescriptorProto
  164. for _, dep := range fd.Dependency {
  165. fdDep, err := fileDescriptorProto(dep)
  166. if err != nil {
  167. if !s.SkipError {
  168. log.Warnf("%s", err)
  169. }
  170. continue
  171. }
  172. temp, err := s.allDependency(fdDep)
  173. if err != nil {
  174. return nil, err
  175. }
  176. files = append(files, temp...)
  177. }
  178. files = append(files, fd)
  179. return files, nil
  180. }
  181. // decompress does gzip decompression.
  182. func (s *Server) decompress(b []byte) ([]byte, error) {
  183. r, err := gzip.NewReader(bytes.NewReader(b))
  184. if err != nil {
  185. return nil, fmt.Errorf("bad gzipped descriptor: %v", err)
  186. }
  187. out, err := io.ReadAll(r)
  188. if err != nil {
  189. return nil, fmt.Errorf("bad gzipped descriptor: %v", err)
  190. }
  191. return out, nil
  192. }
  193. func fileDescriptorProto(path string) (*dpb.FileDescriptorProto, error) {
  194. fd, err := protoregistry.GlobalFiles.FindFileByPath(path)
  195. if err != nil {
  196. return nil, fmt.Errorf("find proto by path failed, path: %s, err: %s", path, err)
  197. }
  198. fdpb := protodesc.ToFileDescriptorProto(fd)
  199. return fdpb, nil
  200. }