Skip to content

Commit

Permalink
Merge pull request #21 from gojuukaze/dev2.3.0
Browse files Browse the repository at this point in the history
自动创建Rocket topic
  • Loading branch information
gojuukaze committed Nov 23, 2020
2 parents bcb72cb + bc804bc commit 05a731c
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 91 deletions.
76 changes: 44 additions & 32 deletions v2/brokers/rocketmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package brokers

import (
"fmt"

"github.com/gojuukaze/YTask/v2/drive"
"github.com/gojuukaze/YTask/v2/message"
"github.com/gojuukaze/YTask/v2/util/yjson"
Expand All @@ -11,31 +12,35 @@ import (

type RocketMqBroker struct {
client *drive.RocketMqClient
host string
port string
namesrvAddr []string
brokerAddr []string
auto bool
}

func NewRocketMqBroker(host, port string) RocketMqBroker {
func NewRocketMqBroker(namesrvAddr []string,brokerAddr... []string) RocketMqBroker {
/*
1、目前不能自动创建topic (mqadmin手动创建,并设置读写队列数为1)
2、rocketmq topic名称不允许存在 ‘:’ ,
(所以在生产、消费前先做了名称转换topic RocketMqClient.topicChecker 将非法字符全部转换为 ‘_’)
3、为提供pullConsumer实现,所以添加了在worker和consumer之间添加了 RocketMqClient.MsgChan
4、consumerOffset不能同步更新,所以任务执行时间更长
FIX:1、目前不能自动创建topic (mqadmin手动创建,并设置读写队列数为1)
2、consumerOffset不能同步更新,所以任务执行时间更长
(需要将队列中多余的message消费掉才能消费到当前taskId对应的消息)
5、未支持RocketMqBroker.LSend
6、rockemq日志级别设置,设置环境变量"ROCKETMQ_GO_LOG_LEVEL"="error"||"info"||“debug”||...
3、未支持RocketMqBroker.LSend
*/
var auto bool
if len(brokerAddr)>0 {
auto=true
}
return RocketMqBroker{
host: host,
port: port,
//poolSize: 0,
namesrvAddr: namesrvAddr,
brokerAddr: brokerAddr[0],
auto: auto,
}
}
func (r *RocketMqBroker) Activate() {

client := drive.NewRocketMqClient(r.host, r.port)
client := drive.NewRocketMqClient(
drive.WithNameSrvAddr(r.namesrvAddr),
drive.WithBrokerAddr(r.brokerAddr),
drive.WithAutoCreateTopic(r.auto))
r.client = &client

}

func (r *RocketMqBroker) SetPoolSize(n int) {
Expand All @@ -58,12 +63,11 @@ func (r *RocketMqBroker) Next(topic string) (message.Message, error) {
select {
case value=<-queue:

case <-time.After(2 * time.Second):
case <-time.After(5 * time.Second):
return msg,yerrors.ErrEmptyQuery{}
}

err = yjson.YJson.UnmarshalFromString(value, &msg)

return msg, err
}

Expand Down Expand Up @@ -92,28 +96,36 @@ func (r *RocketMqBroker) LSend(queueName string, msg message.Message) error {
func (r RocketMqBroker) Clone() BrokerInterface {

return &RocketMqBroker{
host: r.host,
port: r.port,

//poolSize: 0,
namesrvAddr: r.namesrvAddr,
brokerAddr: r.brokerAddr,
auto: r.auto,
}
}
//目前不做使用
func (r RocketMqBroker)Shutdown(){
for topic,producer:=range r.client.RocketMqProducerMap{
TRY1:
err:=producer.Shutdown()
if err !=nil{
fmt.Println(topic,"producer shutdown err",err)
goto TRY1
}
TRY1:
err:=r.client.Producer.Shutdown()
if err !=nil{
fmt.Println("YTask[RocketMQ]: producer shutdown err:",err)
goto TRY1
}
for topic,consumer:=range r.client.RocketMqConsumerMap{
close(r.client.MsgChanMap[topic])
for topic,consumer:=range r.client.ConsumerMap{
err:=consumer.Unsubscribe(topic)
if err!=nil {
fmt.Println("YTask[RocketMQ]: Unsubscribe err: ",err)
}
TRY2:
err:=consumer.Shutdown()
err=consumer.Shutdown()
if err !=nil{
fmt.Println(topic,"consumer shutdown err",err)
fmt.Println(topic,"YTask[RocketMQ]: consumer shutdown err: ",err)
goto TRY2
}
close(r.client.MsgChanMap[topic])

r.client.TopicDeleter(topic)
//consumer.Shutdown()方法没法及时同步,所以在异步任务结束后删除topic
//重新开启任务时创建topic,代理点位和消费点位重置为0
//不得已为之,待改善
}
r.client.Admin.Close()
}
170 changes: 116 additions & 54 deletions v2/drive/rocketmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,61 +4,137 @@ import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/producer"
"regexp"
"github.com/apache/rocketmq-client-go/v2/admin"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"regexp"
"sync"
)

type RocketMqClient struct {
addr primitive.NamesrvAddr
group string
RocketMqProducerMap map[string]rocketmq.Producer
RocketMqConsumerMap map[string]rocketmq.PushConsumer
MsgChanMap map[string]chan string
options *clientOptions
Producer rocketmq.Producer
ConsumerMap map[string]rocketmq.PushConsumer
MsgChanMap map[string]chan string
Admin admin.Admin
topicMap sync.Map

}
type clientOptions struct {
NamesrvAddr primitive.NamesrvAddr
AutoCreateTopic bool
BrokerAddr []string
auto bool
}
type ClientOption func(options *clientOptions)

func defaultAdminOptions() *clientOptions {
return &clientOptions{}
}
func WithNameSrvAddr(addr []string) ClientOption{
return func(opts *clientOptions) {
opts.NamesrvAddr=addr
}
}
func WithBrokerAddr(addr []string) ClientOption{
return func(opts *clientOptions) {
opts.BrokerAddr=addr
}
}
func WithAutoCreateTopic(auto bool) ClientOption {
return func(opts *clientOptions) {
opts.auto=auto
}
}

func NewRocketMqClient(host,port string) RocketMqClient{

func NewRocketMqClient(opts... ClientOption) RocketMqClient{
defaultOpts := defaultAdminOptions()
for _, opt := range opts {
opt(defaultOpts)
}
var adm admin.Admin
var err error
addr,err:=primitive.NewNamesrvAddr(host+":"+port)

adm, err = admin.NewAdmin(admin.WithResolver(
primitive.NewPassthroughResolver(defaultOpts.NamesrvAddr)))
if err!=nil {
panic("YTask: rocketMq error : " + err.Error())
panic("YTask: admin create error : "+err.Error())
}

input, err := rocketmq.NewProducer(
producer.WithNameServer(defaultOpts.NamesrvAddr),
//producer.WithCreateTopicKey(topic),
)
if err!=nil {
panic("YTask[RockerMQ]: Producer create error : " + err.Error())
}
err=input.Start()
if err!=nil {
panic("YTask[RockerMQ]: Producer start error : " +err.Error())
}
return RocketMqClient{
addr:addr,
RocketMqProducerMap: make(map[string]rocketmq.Producer) ,
RocketMqConsumerMap: make(map[string]rocketmq.PushConsumer),
options: defaultOpts,
Producer: input,
ConsumerMap: make(map[string]rocketmq.PushConsumer),
MsgChanMap: make(map[string]chan string),
Admin: adm,
}
}
func (c *RocketMqClient) topicCreator(topic string) {
if c.options.BrokerAddr == nil {
return
}
//create topic
for _, addr := range c.options.BrokerAddr {
err := c.Admin.CreateTopic(
context.Background(),
admin.WithTopicCreate(topic),
admin.WithBrokerAddrCreate(addr),
admin.WithReadQueueNums(1),
admin.WithWriteQueueNums(1),
admin.WithPerm(6),
)
if err != nil {
fmt.Println("YTask[RocketMQ]: create topic error:", err.Error())
}
}
}
func (c *RocketMqClient) TopicDeleter(topic string) {
//delete topic
err:=c.Admin.DeleteTopic(
context.Background(),
admin.WithTopicDelete(topic),
)
if err != nil {
fmt.Println("YTask[RocketMQ]: delete topic error:", err.Error())
}
}


func (c *RocketMqClient) topicChecker(topic string)(string) {
//rocketmq topic 只能包含%数字大小写字母及下划线和中划线
re := regexp.MustCompile("[^A-z0-9_-]")
//所以用下划线替换非法字符
return re.ReplaceAllString(topic, "_")
}
func (c *RocketMqClient) Register(topic string) (<-chan string,error){
topic=c.topicChecker(topic)


if _,ok:=c.MsgChanMap[topic];!ok{
func (c *RocketMqClient) Register(topic string)(<-chan string,error){
topic=c.topicChecker(topic)
if _,ok:=c.ConsumerMap[topic];!ok{
if _,ok:=c.topicMap.LoadOrStore(topic,1);!ok {
if c.options.auto {
c.topicCreator(topic)
}
}
c.MsgChanMap[topic]=make(chan string,0)
output,err:=rocketmq.NewPushConsumer(
consumer.WithNameServer(c.addr),
consumer.WithNameServer(c.options.NamesrvAddr),
consumer.WithGroupName(topic),
)
c.RocketMqConsumerMap[topic]=output
/*addr,_:=internal.NewNamesrv(c.addr)
options:=internal.ClientOptions{
GroupName: topic,
Namesrv: addr,
if err!=nil {
panic("YTask[RockerMQ]: Consumer create error : " + err.Error())
}
callBackChan:=make(chan interface{})
client:=internal.GetOrNewRocketMQClient(options,callBackChan)
offset:=consumer.NewRemoteOffsetStore(topic,client,addr)*/
output.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

Expand All @@ -69,40 +145,26 @@ func (c *RocketMqClient) Register(topic string) (<-chan string,error){
return consumer.ConsumeSuccess, nil
})
err=output.Start()
if err!=nil{
fmt.Println("consumer start error ",err.Error())
if err!=nil {
panic("YTask[RockerMQ]: Consumer start error : " +err.Error())
}
return c.MsgChanMap[topic],err
c.ConsumerMap[topic]=output
return c.MsgChanMap[topic],nil
}
//pull方式未实现
//ref:=reflect.ValueOf(c.rocketMqConsumer)
//method:=ref.MethodByName("Pull")
//args:=[]reflect.Value{reflect.ValueOf(context.Background()),
// reflect.ValueOf(topic),
// reflect.ValueOf(consumer.MessageSelector{}),
// reflect.ValueOf(1)}
//result:=method.Call(args)
//res,err:=result[0].Interface().((*primitive.PullResult)),result[1].Interface().(error)

return c.MsgChanMap[topic],nil
}
func (c *RocketMqClient) Publish(topic string,value interface{}, Priority uint8) error {

topic=c.topicChecker(topic)
if _,ok:=c.RocketMqProducerMap[topic];!ok{
input, err := rocketmq.NewProducer(
producer.WithNameServer(c.addr),
producer.WithCreateTopicKey(topic),
producer.WithGroupName(topic),
)
err=input.Start()
if err!=nil {
panic("YTask: rocketMq error : " + err.Error())
return err

func (c *RocketMqClient) Publish(topic string,value interface{}, Priority uint8) error {
if _,ok:=c.topicMap.LoadOrStore(topic,1);!ok {
if c.options.auto {
c.topicCreator(topic)
}
c.RocketMqProducerMap[topic]=input
}
fmt.Println("product:",string(value.([]byte)))
_, err := c.RocketMqProducerMap[topic].SendSync(context.Background(),
topic=c.topicChecker(topic)
fmt.Println("produce:",string(value.([]byte)))
_, err := c.Producer.SendSync(context.Background(),
primitive.NewMessage(topic,value.([]byte)))
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/gojuukaze/YTask/v2
go 1.12

require (
github.com/apache/rocketmq-client-go/v2 v2.0.0
github.com/apache/rocketmq-client-go/v2 v2.1.0-rc5.0.20201102074636-e1d9be806c18
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/go-redis/redis/v7 v7.4.0
github.com/golang/protobuf v1.3.3 // indirect
Expand Down
9 changes: 7 additions & 2 deletions v2/test/rockemqBroker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (

func TestRocketMqBroker(t *testing.T) {

broker := brokers.NewRocketMqBroker("127.0.0.1", "9876")

broker := brokers.NewRocketMqBroker([]string{"127.0.0.1:9876"},[]string{"127.0.0.1:10911"})

broker.Activate()
//broker.Shutdown()主要是为了关闭consumer,同步offset到broker
//BUG:会出现同步失败
Expand Down Expand Up @@ -47,7 +49,10 @@ func TestRocketMqBroker(t *testing.T) {
}

func TestRocketMqBrokerLSend(t *testing.T) {
broker := brokers.NewRocketMqBroker("127.0.0.1", "9876")
broker := brokers.NewRocketMqBroker(
[]string{"127.0.0.1:9876"},
[]string{"127.0.0.1:10911"})

broker.Activate()
defer broker.Shutdown()
msg := message.NewMessage(controller.NewTaskCtl())
Expand Down
1 change: 1 addition & 0 deletions v2/test/ytask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func testWorker1(ser server.Server, t *testing.T) {
if !result.IsSuccess() {
t.Fatal("result is not success")
}

}

func testWorker2(ser server.Server, t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions v2/ytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func (i iBroker) NewRabbitMqBroker(host, port, user, password, vhost string) bro
return brokers.NewRabbitMqBroker(host, port, user, password, vhost)
}

func (i iBroker) NewRocketMqBroker(host, port string) brokers.RocketMqBroker {
return brokers.NewRocketMqBroker(host, port)
func (i iBroker) NewRocketMqBroker(namesrvAddr []string,brokerAddr... []string) brokers.RocketMqBroker {
return brokers.NewRocketMqBroker(namesrvAddr,brokerAddr...)
}

type iConfig struct {
Expand Down

0 comments on commit 05a731c

Please sign in to comment.