Skip to content

Commit

Permalink
Merge pull request #18 from gojuukaze/dev2.3.0
Browse files Browse the repository at this point in the history
Dev2.3.0
  • Loading branch information
gojuukaze committed Nov 19, 2020
2 parents eae38d6 + 32adcef commit bcb72cb
Show file tree
Hide file tree
Showing 5 changed files with 318 additions and 1 deletion.
119 changes: 119 additions & 0 deletions v2/brokers/rocketmq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package brokers

import (
"fmt"
"github.com/gojuukaze/YTask/v2/drive"
"github.com/gojuukaze/YTask/v2/message"
"github.com/gojuukaze/YTask/v2/util/yjson"
"github.com/gojuukaze/YTask/v2/yerrors"
"time"
)

type RocketMqBroker struct {
client *drive.RocketMqClient
host string
port string
}

func NewRocketMqBroker(host, port string) RocketMqBroker {
/*
1、目前不能自动创建topic (mqadmin手动创建,并设置读写队列数为1)
2、rocketmq topic名称不允许存在 ‘:’ ,
(所以在生产、消费前先做了名称转换topic RocketMqClient.topicChecker 将非法字符全部转换为 ‘_’)
3、为提供pullConsumer实现,所以添加了在worker和consumer之间添加了 RocketMqClient.MsgChan
4、consumerOffset不能同步更新,所以任务执行时间更长
(需要将队列中多余的message消费掉才能消费到当前taskId对应的消息)
5、未支持RocketMqBroker.LSend
6、rockemq日志级别设置,设置环境变量"ROCKETMQ_GO_LOG_LEVEL"="error"||"info"||“debug”||...
*/
return RocketMqBroker{
host: host,
port: port,
//poolSize: 0,
}
}
func (r *RocketMqBroker) Activate() {

client := drive.NewRocketMqClient(r.host, r.port)
r.client = &client
}

func (r *RocketMqBroker) SetPoolSize(n int) {
//r.poolSize = n
}
func (r *RocketMqBroker) GetPoolSize() int {
//return r.poolSize
return 0
}

func (r *RocketMqBroker) Next(topic string) (message.Message, error) {
var msg message.Message
var value string
var err error

queue,err:=r.client.Register(topic)
if err!=nil{
return msg, err
}
select {
case value=<-queue:

case <-time.After(2 * time.Second):
return msg,yerrors.ErrEmptyQuery{}
}

err = yjson.YJson.UnmarshalFromString(value, &msg)

return msg, err
}

func (r *RocketMqBroker) Send(topic string, msg message.Message) error {

b, err := yjson.YJson.Marshal(msg)

if err != nil {
return err
}
err = r.client.Publish(topic, b, 0)
return err
}

func (r *RocketMqBroker) LSend(queueName string, msg message.Message) error {
// 未实现
b, err := yjson.YJson.Marshal(msg)

if err != nil {
return err
}
err = r.client.Publish(queueName, b, 5)
return err
}

func (r RocketMqBroker) Clone() BrokerInterface {

return &RocketMqBroker{
host: r.host,
port: r.port,

//poolSize: 0,
}
}
func (r RocketMqBroker)Shutdown(){
for topic,producer:=range r.client.RocketMqProducerMap{
TRY1:
err:=producer.Shutdown()
if err !=nil{
fmt.Println(topic,"producer shutdown err",err)
goto TRY1
}
}
for topic,consumer:=range r.client.RocketMqConsumerMap{
close(r.client.MsgChanMap[topic])
TRY2:
err:=consumer.Shutdown()
if err !=nil{
fmt.Println(topic,"consumer shutdown err",err)
goto TRY2
}
}
}
111 changes: 111 additions & 0 deletions v2/drive/rocketmq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package drive

import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/producer"
"regexp"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)

type RocketMqClient struct {
addr primitive.NamesrvAddr
group string
RocketMqProducerMap map[string]rocketmq.Producer
RocketMqConsumerMap map[string]rocketmq.PushConsumer
MsgChanMap map[string]chan string
}

func NewRocketMqClient(host,port string) RocketMqClient{

var err error
addr,err:=primitive.NewNamesrvAddr(host+":"+port)
if err!=nil {
panic("YTask: rocketMq error : " + err.Error())
}
return RocketMqClient{
addr:addr,
RocketMqProducerMap: make(map[string]rocketmq.Producer) ,
RocketMqConsumerMap: make(map[string]rocketmq.PushConsumer),
MsgChanMap: make(map[string]chan string),
}
}


func (c *RocketMqClient) topicChecker(topic string)(string) {
//rocketmq topic 只能包含%数字大小写字母及下划线和中划线
re := regexp.MustCompile("[^A-z0-9_-]")
//所以用下划线替换非法字符
return re.ReplaceAllString(topic, "_")
}
func (c *RocketMqClient) Register(topic string) (<-chan string,error){
topic=c.topicChecker(topic)


if _,ok:=c.MsgChanMap[topic];!ok{
c.MsgChanMap[topic]=make(chan string,0)
output,err:=rocketmq.NewPushConsumer(
consumer.WithNameServer(c.addr),
consumer.WithGroupName(topic),
)
c.RocketMqConsumerMap[topic]=output
/*addr,_:=internal.NewNamesrv(c.addr)
options:=internal.ClientOptions{
GroupName: topic,
Namesrv: addr,
}
callBackChan:=make(chan interface{})
client:=internal.GetOrNewRocketMQClient(options,callBackChan)
offset:=consumer.NewRemoteOffsetStore(topic,client,addr)*/
output.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

for _,msg := range msgs {
fmt.Println("consumer:",string(msg.Body))
c.MsgChanMap[topic]<-string(msg.Body)
}
return consumer.ConsumeSuccess, nil
})
err=output.Start()
if err!=nil{
fmt.Println("consumer start error ",err.Error())
}
return c.MsgChanMap[topic],err
}
//pull方式未实现
//ref:=reflect.ValueOf(c.rocketMqConsumer)
//method:=ref.MethodByName("Pull")
//args:=[]reflect.Value{reflect.ValueOf(context.Background()),
// reflect.ValueOf(topic),
// reflect.ValueOf(consumer.MessageSelector{}),
// reflect.ValueOf(1)}
//result:=method.Call(args)
//res,err:=result[0].Interface().((*primitive.PullResult)),result[1].Interface().(error)
return c.MsgChanMap[topic],nil
}
func (c *RocketMqClient) Publish(topic string,value interface{}, Priority uint8) error {

topic=c.topicChecker(topic)
if _,ok:=c.RocketMqProducerMap[topic];!ok{
input, err := rocketmq.NewProducer(
producer.WithNameServer(c.addr),
producer.WithCreateTopicKey(topic),
producer.WithGroupName(topic),
)
err=input.Start()
if err!=nil {
panic("YTask: rocketMq error : " + err.Error())
return err
}
c.RocketMqProducerMap[topic]=input
}
fmt.Println("product:",string(value.([]byte)))
_, err := c.RocketMqProducerMap[topic].SendSync(context.Background(),
primitive.NewMessage(topic,value.([]byte)))
if err != nil {
return err
}
return nil
}
3 changes: 2 additions & 1 deletion v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ module github.com/gojuukaze/YTask/v2
go 1.12

require (
github.com/apache/rocketmq-client-go/v2 v2.0.0
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/go-redis/redis/v7 v7.4.0
github.com/golang/protobuf v1.3.3 // indirect
github.com/google/go-cmp v0.4.0 // indirect
github.com/google/uuid v1.1.1
github.com/json-iterator/go v1.1.7
github.com/json-iterator/go v1.1.9
github.com/sirupsen/logrus v1.4.2
github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71
github.com/stretchr/testify v1.4.0 // indirect
Expand Down
82 changes: 82 additions & 0 deletions v2/test/rockemqBroker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package test

import (
"fmt"
"github.com/gojuukaze/YTask/v2/brokers"
"github.com/gojuukaze/YTask/v2/controller"
"github.com/gojuukaze/YTask/v2/message"
"testing"
)

func TestRocketMqBroker(t *testing.T) {

broker := brokers.NewRocketMqBroker("127.0.0.1", "9876")
broker.Activate()
//broker.Shutdown()主要是为了关闭consumer,同步offset到broker
//BUG:会出现同步失败
defer broker.Shutdown()
msg := message.NewMessage(controller.NewTaskCtl())
msg2 := message.NewMessage(controller.NewTaskCtl())

err := broker.Send("test_rock", msg)
if err != nil {
t.Fatal(err)
}
err = broker.Send("test_rock", msg2)
if err != nil {
t.Fatal(err)
}
m, err := broker.Next("test_rock")
if err != nil {
t.Fatal(err)
}
if fmt.Sprintf("%v", m) != fmt.Sprintf("%v", msg) {
t.Fatalf("%v != %v", m, msg)
}

m2, err := broker.Next("test_rock")
if err != nil {
t.Fatal(err)
}
if fmt.Sprintf("%v", m2) != fmt.Sprintf("%v", msg2) {
t.Fatalf("%v != %v", m2, msg2)

}


}

func TestRocketMqBrokerLSend(t *testing.T) {
broker := brokers.NewRocketMqBroker("127.0.0.1", "9876")
broker.Activate()
defer broker.Shutdown()
msg := message.NewMessage(controller.NewTaskCtl())
msg.Id = "1"
msg2 := message.NewMessage(controller.NewTaskCtl())
msg2.Id = "2"
err := broker.Send("test_rock", msg)
if err != nil {
t.Fatal(err)
}
err = broker.LSend("test_rock", msg2)
if err != nil {
t.Fatal(err)
}

m, err := broker.Next("test_rock")
if err != nil {
t.Fatal(err)
}
if m.Id != msg2.Id {
t.Fatalf("%v != %v", m, msg2)
}

m2, err := broker.Next("test_rock")
if err != nil {
t.Fatal(err)
}
if m2.Id != msg.Id {
t.Fatalf("%v != %v", m2, msg)

}
}
4 changes: 4 additions & 0 deletions v2/ytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (i iBroker) NewRabbitMqBroker(host, port, user, password, vhost string) bro
return brokers.NewRabbitMqBroker(host, port, user, password, vhost)
}

func (i iBroker) NewRocketMqBroker(host, port string) brokers.RocketMqBroker {
return brokers.NewRocketMqBroker(host, port)
}

type iConfig struct {
}

Expand Down

0 comments on commit bcb72cb

Please sign in to comment.