From 25124124d155d8dad85a56a536e7372eb362836d Mon Sep 17 00:00:00 2001 From: Marina-Sakai Date: Thu, 30 May 2024 15:20:26 +0800 Subject: [PATCH] fix: lint --- client/stream.go | 2 ++ pkg/generic/descriptor/descriptor.go | 3 +++ pkg/generic/generic.go | 1 - pkg/generic/httpthrift_codec_test.go | 2 +- pkg/generic/mapthrift_codec.go | 4 ++-- pkg/generic/thrift/parse.go | 22 ++++++++++++++++++++++ 6 files changed, 30 insertions(+), 4 deletions(-) diff --git a/client/stream.go b/client/stream.go index a184ac7ecc..5f0154e604 100644 --- a/client/stream.go +++ b/client/stream.go @@ -49,6 +49,7 @@ func (kc *kClient) Stream(ctx context.Context, method string, request, response panic("ctx is nil") } var ri rpcinfo.RPCInfo + // TODO: generic & streamingMode ctx, ri, _ = kc.initRPCInfo(ctx, method, 0, nil) rpcinfo.AsMutableRPCConfig(ri.Config()).SetInteractionMode(rpcinfo.Streaming) @@ -98,6 +99,7 @@ func (kc *kClient) invokeStreamingEndpoint() (endpoint.Endpoint, error) { } func (kc *kClient) getStreamingMode(ri rpcinfo.RPCInfo) serviceinfo.StreamingMode { + // TODO: ここでキーがわかれば良い methodInfo := kc.svcInfo.MethodInfo(ri.Invocation().MethodName()) if methodInfo == nil { return serviceinfo.StreamingNone diff --git a/pkg/generic/descriptor/descriptor.go b/pkg/generic/descriptor/descriptor.go index 6431f214e8..8f742836b0 100644 --- a/pkg/generic/descriptor/descriptor.go +++ b/pkg/generic/descriptor/descriptor.go @@ -22,6 +22,8 @@ import ( "os" dthrift "github.com/cloudwego/dynamicgo/thrift" + + "github.com/cloudwego/kitex/pkg/serviceinfo" ) var isGoTagAliasDisabled = os.Getenv("KITEX_GENERIC_GOTAG_ALIAS_DISABLED") == "True" @@ -91,6 +93,7 @@ type FunctionDescriptor struct { Request *TypeDescriptor Response *TypeDescriptor HasRequestBase bool + StreamingMode serviceinfo.StreamingMode } // ServiceDescriptor idl service descriptor diff --git a/pkg/generic/generic.go b/pkg/generic/generic.go index 1c3a161abb..717524a302 100644 --- a/pkg/generic/generic.go +++ b/pkg/generic/generic.go @@ -265,7 +265,6 @@ func (g *jsonThriftGeneric) PayloadCodecType() serviceinfo.PayloadCodec { func (g *jsonThriftGeneric) PayloadCodec() remote.PayloadCodec { return nil - //return g.codec } func (g *jsonThriftGeneric) GetMethod(req interface{}, method string) (*Method, error) { diff --git a/pkg/generic/httpthrift_codec_test.go b/pkg/generic/httpthrift_codec_test.go index 15ebbb0f23..3226bef410 100644 --- a/pkg/generic/httpthrift_codec_test.go +++ b/pkg/generic/httpthrift_codec_test.go @@ -66,7 +66,7 @@ func TestHttpThriftCodec(t *testing.T) { test.Assert(t, htc.GetIDLServiceName() == "ExampleService") rw := htc.GetMessageReaderWriter() - err, ok := rw.(error) + _, ok := rw.(error) test.Assert(t, !ok) htc.SetMethod(method.Name) diff --git a/pkg/generic/mapthrift_codec.go b/pkg/generic/mapthrift_codec.go index 461507f36b..873bf43bbf 100644 --- a/pkg/generic/mapthrift_codec.go +++ b/pkg/generic/mapthrift_codec.go @@ -17,6 +17,7 @@ package generic import ( + "github.com/pkg/errors" "sync/atomic" "github.com/cloudwego/kitex/pkg/generic/descriptor" @@ -74,8 +75,7 @@ func (c *mapThriftCodec) update() { func (c *mapThriftCodec) GetMessageReaderWriter() interface{} { svcDsc, ok := c.svcDsc.Load().(*descriptor.ServiceDescriptor) if !ok { - return nil - //return nil, fmt.Errorf("get parser ServiceDescriptor failed") + return errors.New("get parser ServiceDescriptor failed") } var rw *thrift.StructReaderWriter var err error diff --git a/pkg/generic/thrift/parse.go b/pkg/generic/thrift/parse.go index 03a80b9b3b..e5f926eca6 100644 --- a/pkg/generic/thrift/parse.go +++ b/pkg/generic/thrift/parse.go @@ -22,12 +22,14 @@ import ( "fmt" "runtime/debug" + "github.com/cloudwego/thriftgo/generator/golang/streaming" "github.com/cloudwego/thriftgo/parser" "github.com/cloudwego/thriftgo/semantic" "github.com/cloudwego/kitex/pkg/generic/descriptor" "github.com/cloudwego/kitex/pkg/gofunc" "github.com/cloudwego/kitex/pkg/klog" + "github.com/cloudwego/kitex/pkg/serviceinfo" ) const ( @@ -140,6 +142,11 @@ func addFunction(fn *parser.Function, tree *parser.Thrift, sDsc *descriptor.Serv if len(fn.Arguments) == 0 { return fmt.Errorf("empty arguments in function: %s", fn.Name) } + st, err := streaming.ParseStreaming(fn) + if err != nil { + return err + } + mode := streamingMode(st) // only support single argument field := fn.Arguments[0] req := &descriptor.TypeDescriptor{ @@ -213,6 +220,7 @@ func addFunction(fn *parser.Function, tree *parser.Thrift, sDsc *descriptor.Serv Request: req, Response: resp, HasRequestBase: hasRequestBase, + StreamingMode: mode, } defer func() { if ret := recover(); ret != nil { @@ -234,6 +242,20 @@ func addFunction(fn *parser.Function, tree *parser.Thrift, sDsc *descriptor.Serv return nil } +func streamingMode(st *streaming.Streaming) serviceinfo.StreamingMode { + if st.BidirectionalStreaming { + return serviceinfo.StreamingBidirectional + } + if st.ClientStreaming { + return serviceinfo.StreamingClient + } + if st.ServerStreaming { + return serviceinfo.StreamingServer + } + return serviceinfo.StreamingNone + // TODO: unary +} + // reuse builtin types var builtinTypes = map[string]*descriptor.TypeDescriptor{ "void": {Name: "void", Type: descriptor.VOID, Struct: new(descriptor.StructDescriptor)},