Skip to content

Commit

Permalink
Merge pull request #13 from gojuukaze/dev2.2.1
Browse files Browse the repository at this point in the history
amqp bug修改
  • Loading branch information
gojuukaze committed Oct 27, 2020
2 parents 050365e + a427d76 commit eae38d6
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 30 deletions.
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,23 +1,15 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190830142957-1e83adbbebd0 h1:7z820YPX9pxWR59qM7BE5+fglp4D/mKqAwCvGt11b+8=
golang.org/x/sys v0.0.0-20190830142957-1e83adbbebd0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
30 changes: 25 additions & 5 deletions v2/brokers/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"github.com/gojuukaze/YTask/v2/drive"
"github.com/gojuukaze/YTask/v2/message"
"github.com/gojuukaze/YTask/v2/util/yjson"
"github.com/gojuukaze/YTask/v2/yerrors"
"time"
)

type RabbitMqBroker struct {
Expand All @@ -12,21 +14,23 @@ type RabbitMqBroker struct {
port string
user string
password string
vhost string
//poolSize int
}

func NewRabbitMqBroker(host, port, user, password string) RabbitMqBroker {
func NewRabbitMqBroker(host, port, user, password, vhost string) RabbitMqBroker {
return RabbitMqBroker{
host: host,
port: port,
password: password,
user: user,
vhost: vhost,
//poolSize: 0,
}
}

func (r *RabbitMqBroker) Activate() {
client := drive.NewRabbitMqClient(r.host, r.port, r.user, r.password)
client := drive.NewRabbitMqClient(r.host, r.port, r.user, r.password, r.vhost)
r.client = &client
}

Expand All @@ -40,10 +44,26 @@ func (r *RabbitMqBroker) GetPoolSize() int {

func (r *RabbitMqBroker) Next(queueName string) (message.Message, error) {
var msg message.Message
value, err := r.client.Get(queueName)
if err != nil {
return msg, err
var value string
var err error
// amqp没找到类似redis的blpop方法,只能手动循环
for i := 0; i < 10; i++ {
value, err = r.client.Get(queueName)
if err == nil {
break
}
if err == drive.AMQPNil {
time.Sleep(100*time.Millisecond)
continue
}else {
return msg, err
}
}

if err == drive.AMQPNil {
return msg, yerrors.ErrEmptyQuery{}
}

err = yjson.YJson.UnmarshalFromString(value, &msg)
return msg, err
}
Expand Down
20 changes: 16 additions & 4 deletions v2/drive/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,22 @@ import (
"github.com/streadway/amqp"
)



type amqpErr string

func (e amqpErr) Error() string { return string(e) }

const AMQPNil = amqpErr("amqp: nil")

type RabbitMqClient struct {
rabbitMqConn *amqp.Connection
rabbitMqChan *amqp.Channel
queueName map[string]struct{}
}

func NewRabbitMqClient(host, port, user, password string) RabbitMqClient {
client, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", user, password, host, port))
func NewRabbitMqClient(host, port, user, password, vhost string) RabbitMqClient {
client, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/%s", user, password, host, port, vhost))
if err != nil {
panic("YTask: connect rabbitMq error : " + err.Error())
}
Expand Down Expand Up @@ -50,10 +58,14 @@ func (c *RabbitMqClient) Get(queueName string) (string, error) {
return "", err
}
msg, ok, err := c.rabbitMqChan.Get(queueName, true)
if ok && err == nil {
if err!=nil{
return "", err
}
if ok {
return string(msg.Body), nil
}else {
return "", AMQPNil
}
return "", err
}

func (c *RabbitMqClient) Publish(queueName string, value interface{}, Priority uint8) error {
Expand Down
21 changes: 13 additions & 8 deletions v2/test/rabbitmqBroker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,22 @@ import (
"github.com/gojuukaze/YTask/v2/brokers"
"github.com/gojuukaze/YTask/v2/controller"
"github.com/gojuukaze/YTask/v2/message"
"github.com/gojuukaze/YTask/v2/yerrors"
"testing"
)

func TestRabbitmqBroker(t *testing.T) {
broker := brokers.NewRabbitMqBroker("127.0.0.1", "5672", "guest", "guest")
broker := brokers.NewRabbitMqBroker("127.0.0.1", "5672", "guest", "guest", "")
broker.Activate()
msg := message.NewMessage(controller.NewTaskCtl())
msg2 := message.NewMessage(controller.NewTaskCtl())

err := broker.Send("test_amqp", msg)
_, err := broker.Next("test_amqp")
if !yerrors.IsEqual(err, yerrors.ErrTypeEmptyQuery) {
t.Fatal(err)
}

err = broker.Send("test_amqp", msg)
if err != nil {
t.Fatal(err)
}
Expand All @@ -41,14 +47,13 @@ func TestRabbitmqBroker(t *testing.T) {
}
}


func TestRabbitmqBrokerLSend(t *testing.T) {
broker := brokers.NewRabbitMqBroker("127.0.0.1", "5672", "guest", "guest")
broker := brokers.NewRabbitMqBroker("127.0.0.1", "5672", "guest", "guest", "")
broker.Activate()
msg := message.NewMessage(controller.NewTaskCtl())
msg.Id="1"
msg.Id = "1"
msg2 := message.NewMessage(controller.NewTaskCtl())
msg2.Id="2"
msg2.Id = "2"
err := broker.Send("test_amqp", msg)
if err != nil {
t.Fatal(err)
Expand All @@ -62,15 +67,15 @@ func TestRabbitmqBrokerLSend(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if m.Id!=msg2.Id {
if m.Id != msg2.Id {
t.Fatalf("%v != %v", m, msg2)
}

m2, err := broker.Next("test_amqp")
if err != nil {
t.Fatal(err)
}
if m2.Id!=msg.Id {
if m2.Id != msg.Id {
t.Fatalf("%v != %v", m2, msg)

}
Expand Down
10 changes: 5 additions & 5 deletions v2/ytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func (i iBroker) NewRedisBroker(host string, port string, password string, db in
return brokers.NewRedisBroker(host, port, password, db, clientPoolSize)
}

func (i iBroker) NewRabbitMqBroker(host, port, user, password string) brokers.RabbitMqBroker {
return brokers.NewRabbitMqBroker(host, port, user, password)
func (i iBroker) NewRabbitMqBroker(host, port, user, password, vhost string) brokers.RabbitMqBroker {
return brokers.NewRabbitMqBroker(host, port, user, password, vhost)
}

type iConfig struct {
Expand Down Expand Up @@ -79,6 +79,6 @@ func (i iBackend) NewMemCacheBackend(host, port string, poolSize int) backends.M
return backends.NewMemCacheBackend(host, port, poolSize)
}

func (i iBackend) NewMongoBackend(host, port , user, password, db, collection string) backends.MongoBackend {
return backends.NewMongoBackend(host, port , user, password, db, collection)
}
func (i iBackend) NewMongoBackend(host, port, user, password, db, collection string) backends.MongoBackend {
return backends.NewMongoBackend(host, port, user, password, db, collection)
}

0 comments on commit eae38d6

Please sign in to comment.