diff --git a/go.sum b/go.sum index 2967170..cf8dfe0 100644 --- a/go.sum +++ b/go.sum @@ -1,23 +1,15 @@ -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s= -github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190830142957-1e83adbbebd0 h1:7z820YPX9pxWR59qM7BE5+fglp4D/mKqAwCvGt11b+8= -golang.org/x/sys v0.0.0-20190830142957-1e83adbbebd0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/v2/brokers/rabbitmq.go b/v2/brokers/rabbitmq.go index 541952d..098c080 100644 --- a/v2/brokers/rabbitmq.go +++ b/v2/brokers/rabbitmq.go @@ -4,6 +4,8 @@ import ( "github.com/gojuukaze/YTask/v2/drive" "github.com/gojuukaze/YTask/v2/message" "github.com/gojuukaze/YTask/v2/util/yjson" + "github.com/gojuukaze/YTask/v2/yerrors" + "time" ) type RabbitMqBroker struct { @@ -12,21 +14,23 @@ type RabbitMqBroker struct { port string user string password string + vhost string //poolSize int } -func NewRabbitMqBroker(host, port, user, password string) RabbitMqBroker { +func NewRabbitMqBroker(host, port, user, password, vhost string) RabbitMqBroker { return RabbitMqBroker{ host: host, port: port, password: password, user: user, + vhost: vhost, //poolSize: 0, } } func (r *RabbitMqBroker) Activate() { - client := drive.NewRabbitMqClient(r.host, r.port, r.user, r.password) + client := drive.NewRabbitMqClient(r.host, r.port, r.user, r.password, r.vhost) r.client = &client } @@ -40,10 +44,26 @@ func (r *RabbitMqBroker) GetPoolSize() int { func (r *RabbitMqBroker) Next(queueName string) (message.Message, error) { var msg message.Message - value, err := r.client.Get(queueName) - if err != nil { - return msg, err + var value string + var err error + // amqp没找到类似redis的blpop方法,只能手动循环 + for i := 0; i < 10; i++ { + value, err = r.client.Get(queueName) + if err == nil { + break + } + if err == drive.AMQPNil { + time.Sleep(100*time.Millisecond) + continue + }else { + return msg, err + } + } + + if err == drive.AMQPNil { + return msg, yerrors.ErrEmptyQuery{} } + err = yjson.YJson.UnmarshalFromString(value, &msg) return msg, err } diff --git a/v2/drive/rabbitmq.go b/v2/drive/rabbitmq.go index d1cb09e..d77d2c6 100644 --- a/v2/drive/rabbitmq.go +++ b/v2/drive/rabbitmq.go @@ -7,14 +7,22 @@ import ( "github.com/streadway/amqp" ) + + +type amqpErr string + +func (e amqpErr) Error() string { return string(e) } + +const AMQPNil = amqpErr("amqp: nil") + type RabbitMqClient struct { rabbitMqConn *amqp.Connection rabbitMqChan *amqp.Channel queueName map[string]struct{} } -func NewRabbitMqClient(host, port, user, password string) RabbitMqClient { - client, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", user, password, host, port)) +func NewRabbitMqClient(host, port, user, password, vhost string) RabbitMqClient { + client, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/%s", user, password, host, port, vhost)) if err != nil { panic("YTask: connect rabbitMq error : " + err.Error()) } @@ -50,10 +58,14 @@ func (c *RabbitMqClient) Get(queueName string) (string, error) { return "", err } msg, ok, err := c.rabbitMqChan.Get(queueName, true) - if ok && err == nil { + if err!=nil{ + return "", err + } + if ok { return string(msg.Body), nil + }else { + return "", AMQPNil } - return "", err } func (c *RabbitMqClient) Publish(queueName string, value interface{}, Priority uint8) error { diff --git a/v2/test/rabbitmqBroker_test.go b/v2/test/rabbitmqBroker_test.go index 49f9017..75be90e 100644 --- a/v2/test/rabbitmqBroker_test.go +++ b/v2/test/rabbitmqBroker_test.go @@ -5,16 +5,22 @@ import ( "github.com/gojuukaze/YTask/v2/brokers" "github.com/gojuukaze/YTask/v2/controller" "github.com/gojuukaze/YTask/v2/message" + "github.com/gojuukaze/YTask/v2/yerrors" "testing" ) func TestRabbitmqBroker(t *testing.T) { - broker := brokers.NewRabbitMqBroker("127.0.0.1", "5672", "guest", "guest") + broker := brokers.NewRabbitMqBroker("127.0.0.1", "5672", "guest", "guest", "") broker.Activate() msg := message.NewMessage(controller.NewTaskCtl()) msg2 := message.NewMessage(controller.NewTaskCtl()) - err := broker.Send("test_amqp", msg) + _, err := broker.Next("test_amqp") + if !yerrors.IsEqual(err, yerrors.ErrTypeEmptyQuery) { + t.Fatal(err) + } + + err = broker.Send("test_amqp", msg) if err != nil { t.Fatal(err) } @@ -41,14 +47,13 @@ func TestRabbitmqBroker(t *testing.T) { } } - func TestRabbitmqBrokerLSend(t *testing.T) { - broker := brokers.NewRabbitMqBroker("127.0.0.1", "5672", "guest", "guest") + broker := brokers.NewRabbitMqBroker("127.0.0.1", "5672", "guest", "guest", "") broker.Activate() msg := message.NewMessage(controller.NewTaskCtl()) - msg.Id="1" + msg.Id = "1" msg2 := message.NewMessage(controller.NewTaskCtl()) - msg2.Id="2" + msg2.Id = "2" err := broker.Send("test_amqp", msg) if err != nil { t.Fatal(err) @@ -62,7 +67,7 @@ func TestRabbitmqBrokerLSend(t *testing.T) { if err != nil { t.Fatal(err) } - if m.Id!=msg2.Id { + if m.Id != msg2.Id { t.Fatalf("%v != %v", m, msg2) } @@ -70,7 +75,7 @@ func TestRabbitmqBrokerLSend(t *testing.T) { if err != nil { t.Fatal(err) } - if m2.Id!=msg.Id { + if m2.Id != msg.Id { t.Fatalf("%v != %v", m2, msg) } diff --git a/v2/ytask.go b/v2/ytask.go index 1c19d3d..54639d6 100644 --- a/v2/ytask.go +++ b/v2/ytask.go @@ -33,8 +33,8 @@ func (i iBroker) NewRedisBroker(host string, port string, password string, db in return brokers.NewRedisBroker(host, port, password, db, clientPoolSize) } -func (i iBroker) NewRabbitMqBroker(host, port, user, password string) brokers.RabbitMqBroker { - return brokers.NewRabbitMqBroker(host, port, user, password) +func (i iBroker) NewRabbitMqBroker(host, port, user, password, vhost string) brokers.RabbitMqBroker { + return brokers.NewRabbitMqBroker(host, port, user, password, vhost) } type iConfig struct { @@ -79,6 +79,6 @@ func (i iBackend) NewMemCacheBackend(host, port string, poolSize int) backends.M return backends.NewMemCacheBackend(host, port, poolSize) } -func (i iBackend) NewMongoBackend(host, port , user, password, db, collection string) backends.MongoBackend { - return backends.NewMongoBackend(host, port , user, password, db, collection) -} \ No newline at end of file +func (i iBackend) NewMongoBackend(host, port, user, password, db, collection string) backends.MongoBackend { + return backends.NewMongoBackend(host, port, user, password, db, collection) +}