Przeglądaj źródła

feat(http): 添加 HTTP 处理器支持服务器流和 unary调用

- 新增 server_stream_handler 和 unary_handler 文件实现 HTTP处理
- 添加中间件支持和自定义选项处理
- 更新 go.mod 文件,增加相关依赖
lihf 5 miesięcy temu
rodzic
commit
864b4923c5
6 zmienionych plików z 1709 dodań i 7 usunięć
  1. 11 4
      go.mod
  2. 1411 3
      go.sum
  3. 106 0
      http/handler/route.go
  4. 112 0
      http/handler/server_stream_handler.go
  5. 48 0
      http/handler/unary_handler.go
  6. 21 0
      http/middleware/option.go

+ 11 - 4
go.mod

@@ -1,32 +1,40 @@
 module git.ikuban.com/server/kratos-utils/v2
 
-go 1.23.0
+go 1.23.2
+
+toolchain go1.23.6
 
 require (
 	git.ikuban.com/server/json v0.0.0-20210408053838-50ac5ceda83a
 	git.ikuban.com/server/kratos-etcd v0.0.0-20250225030354-ebd49a034588
+	git.ikuban.com/server/kubanapis v0.0.0-20250304064227-1bbea9af3cfe
 	github.com/dcsunny/gocrypt v0.0.0-20200828060317-4dec5212cc15
 	github.com/dcsunny/mwt v0.0.0-20210128034911-2f50006077f5
 	github.com/dgrijalva/jwt-go v3.2.0+incompatible
 	github.com/go-kratos/kratos/v2 v2.8.3
 	github.com/go-resty/resty/v2 v2.7.0
+	github.com/google/gnostic v0.7.0
 	github.com/google/uuid v1.4.0
+	github.com/jhump/protoreflect v1.17.0
 	github.com/lestrrat-go/jwx v1.2.25
 	github.com/vmihailenco/msgpack/v5 v5.3.0
 	google.golang.org/genproto v0.0.0-20231212172506-995d672761c0
+	google.golang.org/genproto/googleapis/api v0.0.0-20240102182953-50ed04b92917
 	google.golang.org/grpc v1.61.1
-	google.golang.org/protobuf v1.33.0
+	google.golang.org/protobuf v1.34.2
 )
 
 require (
+	github.com/bufbuild/protocompile v0.14.1 // indirect
 	github.com/coreos/go-semver v0.3.0 // indirect
 	github.com/coreos/go-systemd/v22 v22.3.2 // indirect
 	github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.0-20210816181553-5444fa50b93d // indirect
 	github.com/go-kratos/aegis v0.2.0 // indirect
 	github.com/go-playground/form/v4 v4.2.0 // indirect
-	github.com/goccy/go-json v0.9.7 // indirect
+	github.com/goccy/go-json v0.9.11 // indirect
 	github.com/gogo/protobuf v1.3.2 // indirect
 	github.com/golang/protobuf v1.5.4 // indirect
+	github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect
 	github.com/gorilla/mux v1.8.1 // indirect
 	github.com/lestrrat-go/backoff/v2 v2.0.8 // indirect
 	github.com/lestrrat-go/blackmagic v1.0.0 // indirect
@@ -47,7 +55,6 @@ require (
 	golang.org/x/net v0.34.0 // indirect
 	golang.org/x/sys v0.29.0 // indirect
 	golang.org/x/text v0.21.0 // indirect
-	google.golang.org/genproto/googleapis/api v0.0.0-20240102182953-50ed04b92917 // indirect
 	google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect
 	gopkg.in/yaml.v3 v3.0.1 // indirect
 )

Plik diff jest za duży
+ 1411 - 3
go.sum


+ 106 - 0
http/handler/route.go

@@ -0,0 +1,106 @@
+package handler
+
+import (
+	"fmt"
+	"git.ikuban.com/server/kratos-utils/v2/http/middleware"
+	ku_annotations "git.ikuban.com/server/kubanapis/kuban/api/annotations"
+	"github.com/go-kratos/kratos/v2/transport/http"
+	openapi_v3 "github.com/google/gnostic/openapiv3"
+	"github.com/jhump/protoreflect/desc"
+	"github.com/jhump/protoreflect/grpcreflect"
+	"google.golang.org/genproto/googleapis/api/annotations"
+	"google.golang.org/grpc"
+	"google.golang.org/protobuf/proto"
+)
+
+func RegisterRoute(s *http.Server, srv any, svcDesc grpc.ServiceDesc) {
+	r := s.Route("/api/")
+
+	// 加载完整的服务描述符
+	fullSvcDesc, err := grpcreflect.LoadServiceDescriptor(&svcDesc)
+	if err != nil {
+		panic(fmt.Sprintf("加载服务描述符失败 err:%v", err))
+	}
+
+	// 注册一元方法handler
+	for _, md := range svcDesc.Methods {
+		// 获取方法option
+		option := GetOptionByServiceDescriptor(fullSvcDesc, svcDesc.ServiceName, md.MethodName)
+		// 跳过不需要生成http方法的
+		if !option.GenHttp {
+			continue
+		}
+
+		// 注册http接口
+		r.POST(option.Path, unaryHandler(srv, md, option))
+	}
+
+	// 注册流式方法handler
+	for _, std := range svcDesc.Streams {
+
+		// 获取方法option
+		option := GetOptionByServiceDescriptor(fullSvcDesc, svcDesc.ServiceName, std.StreamName)
+		// 跳过不需要生成http方法的
+		if !option.GenHttp {
+			continue
+		}
+
+		switch {
+		case std.ClientStreams && std.ServerStreams:
+			// 双向流
+
+		case std.ClientStreams && !std.ServerStreams:
+			// 客户端流
+
+		case !std.ClientStreams && std.ServerStreams:
+			// 服务端流
+			r.POST(option.Path, serverStreamHandler(srv, std, option))
+		}
+	}
+}
+
+func GetOptionByServiceDescriptor(sd *desc.ServiceDescriptor, serviceName, methodName string) *middleware.Option {
+	// 获取方法描述符
+	md := sd.FindMethodByName(methodName)
+	if md == nil {
+		panic(fmt.Sprintf("未找到方法描述符 service:%v, method:%v", serviceName, methodName))
+	}
+
+	// option
+	option := &middleware.Option{}
+
+	// 获取方法option
+	methodOptions := md.GetMethodOptions()
+	operation := proto.GetExtension(methodOptions, openapi_v3.E_Operation).(*openapi_v3.Operation)
+	if operation == nil {
+		// 跳过不需要生成http方法的
+		option.GenHttp = false
+		return option
+	}
+	option.GenHttp = true
+
+	// 不需要认证的
+	if len(operation.Security) == 0 {
+		option.NotAuth = true
+	}
+
+	// 解析自定义option [操作记录]
+	customOptions := proto.GetExtension(methodOptions, ku_annotations.E_Options).(*ku_annotations.Options)
+	if customOptions.GetOperationRecord().GetEnabled() {
+		option.OperationRecord = true
+	}
+
+	// 路由
+	rule := proto.GetExtension(methodOptions, annotations.E_Http).(*annotations.HttpRule)
+	if rule != nil {
+		switch httpRule := rule.GetPattern().(type) {
+		case *annotations.HttpRule_Post:
+			option.Path = httpRule.Post
+		}
+	}
+	if option.Path == "" {
+		option.Path = fmt.Sprintf("/%s/%s", serviceName, methodName)
+	}
+
+	return option
+}

+ 112 - 0
http/handler/server_stream_handler.go

@@ -0,0 +1,112 @@
+package handler
+
+import (
+	"context"
+	"encoding/json"
+	"git.ikuban.com/server/kratos-utils/v2/http/middleware"
+	"github.com/go-kratos/kratos/v2/transport/http"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/metadata"
+)
+
+func serverStreamHandler(srv any, stream grpc.StreamDesc, option *middleware.Option) http.HandlerFunc {
+	return func(ctx http.Context) error {
+		http.SetOperation(ctx, option.Path)
+
+		dec := func(in any) error {
+			if err := ctx.Bind(&in); err != nil {
+				return err
+			}
+			if err := ctx.BindQuery(&in); err != nil {
+				return err
+			}
+			return nil
+		}
+
+		httpCtx := ctx
+		h := ctx.Middleware(func(ctx context.Context, _ interface{}) (interface{}, error) {
+			err := stream.Handler(srv, newServerStream(ctx, dec, httpCtx.Response()))
+			return nil, err
+		})
+
+		newCtx := middleware.NewOptionContext(ctx, option)
+		_, err := h(newCtx, nil)
+		if err != nil {
+			return err
+		}
+
+		return nil
+	}
+}
+
+var _ grpc.ServerStream = (*serverStream)(nil)
+
+type serverStream struct {
+	ctx      context.Context
+	dec      func(interface{}) error
+	w        http.ResponseWriter
+	metadata metadata.MD
+}
+
+func newServerStream(ctx context.Context, dec func(interface{}) error, w http.ResponseWriter) grpc.ServerStream {
+	s := &serverStream{
+		ctx: ctx,
+		dec: dec,
+		w:   w,
+	}
+
+	return s
+}
+
+// 设置header
+func (s *serverStream) SetHeader(md metadata.MD) error {
+	s.metadata = md
+	return nil
+}
+
+// 发送header (只调一次)
+func (s *serverStream) SendHeader(md metadata.MD) error {
+	for k, v := range s.metadata {
+		if len(v) == 0 {
+			continue
+		}
+		s.w.Header().Set(k, v[0])
+	}
+	return nil
+}
+
+func (s *serverStream) SetTrailer(md metadata.MD) {
+	return
+}
+
+func (s *serverStream) Context() context.Context {
+	return s.ctx
+}
+
+func (s *serverStream) SendMsg(m interface{}) error {
+
+	switch data := m.(type) {
+	case []byte:
+		_, err := s.w.Write(data)
+		if err != nil {
+			return err
+		}
+
+	default:
+		j, err := json.Marshal(data)
+		if err != nil {
+			return err
+		}
+		_, err = s.w.Write(append(j, byte('\n')))
+		if err != nil {
+			return err
+		}
+	}
+
+	s.w.(http.Flusher).Flush()
+	return nil
+}
+
+func (s *serverStream) RecvMsg(m any) error {
+	return s.dec(m)
+}

+ 48 - 0
http/handler/unary_handler.go

@@ -0,0 +1,48 @@
+package handler
+
+import (
+	"context"
+	"git.ikuban.com/server/kratos-utils/v2/http/middleware"
+	"git.ikuban.com/server/kratos-utils/v2/http/reply"
+	"github.com/go-kratos/kratos/v2/transport/http"
+	"google.golang.org/grpc"
+)
+
+func unaryHandler(srv any, method grpc.MethodDesc, option *middleware.Option) http.HandlerFunc {
+	return func(ctx http.Context) error {
+		http.SetOperation(ctx, option.Path)
+
+		dec := func(in any) error {
+			if err := ctx.Bind(&in); err != nil {
+				return err
+			}
+			if err := ctx.BindQuery(&in); err != nil {
+				return err
+			}
+			return nil
+		}
+
+		httpCtx := ctx
+		interceptor := func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
+			ctx = middleware.NewOptionContext(ctx, option)
+			h := httpCtx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
+				return handler(ctx, req)
+			})
+			return h(ctx, req)
+		}
+
+		out, err := method.Handler(srv, ctx, dec, interceptor)
+		if err != nil {
+			return err
+		}
+
+		success := &reply.SuccessReply{
+			Code: 0,
+		}
+		if out != nil {
+			success.Data = out
+		}
+
+		return ctx.Result(200, success)
+	}
+}

+ 21 - 0
http/middleware/option.go

@@ -0,0 +1,21 @@
+package middleware
+
+import "context"
+
+type Option struct {
+	GenHttp         bool
+	NotAuth         bool
+	OperationRecord bool
+	Path            string
+}
+
+type httpOption struct{}
+
+func NewOptionContext(ctx context.Context, op *Option) context.Context {
+	return context.WithValue(ctx, httpOption{}, op)
+}
+
+func FromOptionContext(ctx context.Context) (*Option, bool) {
+	op, ok := ctx.Value(httpOption{}).(*Option)
+	return op, ok
+}

Niektóre pliki nie zostały wyświetlone z powodu dużej ilości zmienionych plików