Skip to content

Commit

Permalink
amqp为空时获取任务的bug
Browse files Browse the repository at this point in the history
  • Loading branch information
gojuukaze committed Oct 22, 2020
1 parent e37a75a commit a427d76
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 6 deletions.
24 changes: 21 additions & 3 deletions v2/brokers/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"github.com/gojuukaze/YTask/v2/drive"
"github.com/gojuukaze/YTask/v2/message"
"github.com/gojuukaze/YTask/v2/util/yjson"
"github.com/gojuukaze/YTask/v2/yerrors"
"time"
)

type RabbitMqBroker struct {
Expand Down Expand Up @@ -42,10 +44,26 @@ func (r *RabbitMqBroker) GetPoolSize() int {

func (r *RabbitMqBroker) Next(queueName string) (message.Message, error) {
var msg message.Message
value, err := r.client.Get(queueName)
if err != nil {
return msg, err
var value string
var err error
// amqp没找到类似redis的blpop方法,只能手动循环
for i := 0; i < 10; i++ {
value, err = r.client.Get(queueName)
if err == nil {
break
}
if err == drive.AMQPNil {
time.Sleep(100*time.Millisecond)
continue
}else {
return msg, err
}
}

if err == drive.AMQPNil {
return msg, yerrors.ErrEmptyQuery{}
}

err = yjson.YJson.UnmarshalFromString(value, &msg)
return msg, err
}
Expand Down
16 changes: 14 additions & 2 deletions v2/drive/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ import (
"github.com/streadway/amqp"
)



type amqpErr string

func (e amqpErr) Error() string { return string(e) }

const AMQPNil = amqpErr("amqp: nil")

type RabbitMqClient struct {
rabbitMqConn *amqp.Connection
rabbitMqChan *amqp.Channel
Expand Down Expand Up @@ -50,10 +58,14 @@ func (c *RabbitMqClient) Get(queueName string) (string, error) {
return "", err
}
msg, ok, err := c.rabbitMqChan.Get(queueName, true)
if ok && err == nil {
if err!=nil{
return "", err
}
if ok {
return string(msg.Body), nil
}else {
return "", AMQPNil
}
return "", err
}

func (c *RabbitMqClient) Publish(queueName string, value interface{}, Priority uint8) error {
Expand Down
8 changes: 7 additions & 1 deletion v2/test/rabbitmqBroker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/gojuukaze/YTask/v2/brokers"
"github.com/gojuukaze/YTask/v2/controller"
"github.com/gojuukaze/YTask/v2/message"
"github.com/gojuukaze/YTask/v2/yerrors"
"testing"
)

Expand All @@ -14,7 +15,12 @@ func TestRabbitmqBroker(t *testing.T) {
msg := message.NewMessage(controller.NewTaskCtl())
msg2 := message.NewMessage(controller.NewTaskCtl())

err := broker.Send("test_amqp", msg)
_, err := broker.Next("test_amqp")
if !yerrors.IsEqual(err, yerrors.ErrTypeEmptyQuery) {
t.Fatal(err)
}

err = broker.Send("test_amqp", msg)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit a427d76

Please sign in to comment.