Skip to content

Commit

Permalink
refactor(generic): refactor existing generic to have new ServiceInfo …
Browse files Browse the repository at this point in the history
…which has the generic's reader and writer info directly (cloudwego#1365)
  • Loading branch information
Marina-Sakai committed Jun 24, 2024
1 parent 8203ebb commit 8abf2c5
Show file tree
Hide file tree
Showing 17 changed files with 2,115 additions and 0 deletions.
148 changes: 148 additions & 0 deletions pkg/generic/grpcjson_test/generic_init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package test

//import (
// "context"
// "fmt"
// "io"
// "net"
// "strings"
// "sync"
// "time"
//
// "github.com/cloudwego/kitex/client"
// "github.com/cloudwego/kitex/client/genericclient"
// "github.com/cloudwego/kitex/internal/mocks/proto/kitex_gen/pbapi/mock"
// "github.com/cloudwego/kitex/pkg/generic"
// "github.com/cloudwego/kitex/server"
// "github.com/cloudwego/kitex/transport"
//)
//
//func newGenericClient(g generic.Generic, targetIPPort string) genericclient.Client {
// cli, err := genericclient.NewStreamingClient("destService", g,
// client.WithTransportProtocol(transport.GRPC),
// client.WithHostPorts(targetIPPort),
// )
// if err != nil {
// panic(err)
// }
// return cli
//}
//
//func newMockTestServer(handler mock.Mock, addr net.Addr, opts ...server.Option) server.Server {
// opts = append(opts, server.WithServiceAddr(addr))
// svr := mock.NewServer(handler, opts...)
// go func() {
// err := svr.Run()
// if err != nil {
// panic(err)
// }
// }()
// return svr
//}
//
//var _ mock.Mock = &StreamingTestImpl{}
//
//type StreamingTestImpl struct{}
//
//func (s *StreamingTestImpl) UnaryTest(ctx context.Context, req *mock.MockReq) (resp *mock.MockResp, err error) {
// fmt.Println("UnaryTest called")
// resp = &mock.MockResp{}
// resp.Message = "hello " + req.Message
// return
//}
//
//func (s *StreamingTestImpl) ClientStreamingTest(stream mock.Mock_ClientStreamingTestServer) (err error) {
// fmt.Println("ClientStreamingTest called")
// var msgs []string
// for {
// req, err := stream.Recv()
// if err != nil {
// if err == io.EOF {
// break
// }
// return err
// }
// fmt.Printf("Recv: %s\n", req.Message)
// msgs = append(msgs, req.Message)
// time.Sleep(time.Second)
// }
// return stream.SendAndClose(&mock.MockResp{Message: "all message: " + strings.Join(msgs, ", ")})
//}
//
//func (s *StreamingTestImpl) ServerStreamingTest(req *mock.MockReq, stream mock.Mock_ServerStreamingTestServer) (err error) {
// fmt.Println("ServerStreamingTest called")
// resp := &mock.MockResp{}
// for i := 0; i < 3; i++ {
// resp.Message = fmt.Sprintf("%v -> %dth response", req.Message, i)
// err := stream.Send(resp)
// if err != nil {
// return err
// }
// time.Sleep(time.Second)
// }
// return
//}
//
//func (s *StreamingTestImpl) BidirectionalStreamingTest(stream mock.Mock_BidirectionalStreamingTestServer) (err error) {
// fmt.Println("BidirectionalStreamingTest called")
// wg := &sync.WaitGroup{}
// wg.Add(2)
//
// go func() {
// defer func() {
// if p := recover(); p != nil {
// err = fmt.Errorf("panic: %v", p)
// }
// wg.Done()
// }()
// defer stream.Close()
// for {
// msg, recvErr := stream.Recv()
// if recvErr == io.EOF {
// return
// } else if recvErr != nil {
// err = recvErr
// return
// }
// fmt.Printf("BidirectionaStreamingTest: received message = %s\n", msg.Message)
// time.Sleep(time.Second)
// }
// }()
//
// go func() {
// defer func() {
// if p := recover(); p != nil {
// err = fmt.Errorf("panic: %v", p)
// }
// wg.Done()
// }()
// resp := &mock.MockResp{}
// for i := 0; i < 3; i++ {
// resp.Message = fmt.Sprintf("%dth response", i)
// if sendErr := stream.Send(resp); sendErr != nil {
// err = sendErr
// return
// }
// fmt.Printf("BidirectionaStreamingTest: sent message = %s\n", resp)
// time.Sleep(time.Second)
// }
// }()
// wg.Wait()
// return
//}
163 changes: 163 additions & 0 deletions pkg/generic/grpcjson_test/generic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package test

//import (
// "context"
// "fmt"
// "io"
// "net"
// "reflect"
// "strings"
// "sync"
// "testing"
// "time"
//
// dproto "github.com/cloudwego/dynamicgo/proto"
// "github.com/tidwall/gjson"
//
// "github.com/cloudwego/kitex/client/genericclient"
// "github.com/cloudwego/kitex/internal/mocks/proto/kitex_gen/pbapi/mock"
// "github.com/cloudwego/kitex/internal/test"
// "github.com/cloudwego/kitex/pkg/generic"
// "github.com/cloudwego/kitex/server"
//)
//
//func TestClientStreaming(t *testing.T) {
// ctx := context.Background()
// addr := test.GetLocalAddress()
//
// svr := initMockTestServer(new(StreamingTestImpl), addr)
// defer svr.Stop()
//
// cli := initStreamingClient(t, ctx, addr, "./idl/pbapi.proto")
// streamCli, err := genericclient.NewClientStreaming(ctx, cli, "ClientStreamingTest")
// test.Assert(t, err == nil, err)
// for i := 0; i < 3; i++ {
// req := fmt.Sprintf(`{"message": "grpc client streaming generic %dth request"}`, i)
// err = streamCli.Send(req)
// test.Assert(t, err == nil)
// time.Sleep(time.Second)
// }
// resp, err := streamCli.CloseAndRecv()
// test.Assert(t, err == nil)
// strResp, ok := resp.(string)
// test.Assert(t, ok)
// fmt.Printf("clientStreaming message received: %v\n", strResp)
// test.Assert(t, reflect.DeepEqual(gjson.Get(strResp, "message").String(),
// "all message: grpc client streaming generic 0th request, grpc client streaming generic 1th request, grpc client streaming generic 2th request"))
//}
//
//func TestServerStreaming(t *testing.T) {
// ctx := context.Background()
// addr := test.GetLocalAddress()
//
// svr := initMockTestServer(new(StreamingTestImpl), addr)
// defer svr.Stop()
//
// cli := initStreamingClient(t, ctx, addr, "./idl/pbapi.proto")
// streamCli, err := genericclient.NewServerStreaming(ctx, cli, "ServerStreamingTest", `{"message": "grpc server streaming generic request"}`)
// test.Assert(t, err == nil, err)
// for {
// resp, err := streamCli.Recv()
// if err != nil {
// test.Assert(t, err == io.EOF)
// fmt.Println("serverStreaming message receive done")
// break
// } else {
// strResp, ok := resp.(string)
// test.Assert(t, ok)
// fmt.Printf("serverStreaming message received: %s\n", strResp)
// test.Assert(t, strings.Contains(strResp, "grpc server streaming generic request ->"))
// }
// time.Sleep(time.Second)
// }
//}
//
//func TestBidirectionalStreaming(t *testing.T) {
// ctx := context.Background()
// addr := test.GetLocalAddress()
//
// svr := initMockTestServer(new(StreamingTestImpl), addr)
// defer svr.Stop()
//
// cli := initStreamingClient(t, ctx, addr, "./idl/pbapi.proto")
// streamCli, err := genericclient.NewBidirectionalStreaming(ctx, cli, "BidirectionalStreamingTest")
// test.Assert(t, err == nil)
//
// wg := &sync.WaitGroup{}
// wg.Add(2)
//
// go func() {
// defer wg.Done()
// defer streamCli.Close()
// for i := 0; i < 3; i++ {
// req := fmt.Sprintf(`{"message": "grpc bidirectional streaming generic %dth request"}`, i)
// err = streamCli.Send(req)
// test.Assert(t, err == nil)
// fmt.Printf("BidirectionalStreamingTest send: req = %s\n", req)
// }
// }()
//
// go func() {
// defer wg.Done()
// for {
// resp, err := streamCli.Recv()
// if err != nil {
// test.Assert(t, err == io.EOF)
// fmt.Println("bidirectionalStreaming message receive done")
// break
// } else {
// strResp, ok := resp.(string)
// test.Assert(t, ok)
// fmt.Printf("bidirectionalStreaming message received: %s\n", strResp)
// test.Assert(t, strings.Contains(strResp, "th response"))
// }
// time.Sleep(time.Second)
// }
// }()
// wg.Wait()
//}
//
//func TestUnary(t *testing.T) {
// ctx := context.Background()
// addr := test.GetLocalAddress()
//
// svr := initMockTestServer(new(StreamingTestImpl), addr)
// defer svr.Stop()
//
// cli := initStreamingClient(t, ctx, addr, "./idl/pbapi.proto")
// resp, err := cli.GenericCall(ctx, "UnaryTest", `{"message": "unary request"}`)
// test.Assert(t, err == nil)
// strResp, ok := resp.(string)
// test.Assert(t, ok)
// test.Assert(t, reflect.DeepEqual(gjson.Get(strResp, "message").String(), "hello unary request"))
//}
//
//func initStreamingClient(t *testing.T, ctx context.Context, addr, idl string) genericclient.Client {
// dOpts := dproto.Options{}
// p, err := generic.NewPbFileProviderWithDynamicGo(idl, ctx, dOpts)
// test.Assert(t, err == nil)
// g, err := generic.JSONPbGeneric(p)
// test.Assert(t, err == nil)
// return newGenericClient(g, addr)
//}
//
//func initMockTestServer(handler mock.Mock, address string) server.Server {
// addr, _ := net.ResolveTCPAddr("tcp", address)
// return newMockTestServer(handler, addr)
//}
40 changes: 40 additions & 0 deletions tool/cmd/kitex/demo.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

syntax = "proto3";

option go_package = "multi/service";

package multiservice;

service CombineService {
rpc ChatA (RequestA) returns (Reply) {}
}

service ServiceB {
rpc ChatB (RequestB) returns (Reply) {}
}

message RequestA {
string name = 1;
}

message RequestB {
string name = 1;
}

message Reply {
string message = 1;
}
Loading

0 comments on commit 8abf2c5

Please sign in to comment.