Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leveraging RabbitMQ Delayed Message Plugin #102

Open
allenhartwig opened this issue Jul 10, 2017 · 9 comments
Open

Leveraging RabbitMQ Delayed Message Plugin #102

allenhartwig opened this issue Jul 10, 2017 · 9 comments

Comments

@allenhartwig
Copy link
Contributor

I can't seem to figure out how to pass it the proper options to get the Delayed Message Plugin to work properly when using servicebus.

My current attempt is as follows:

Consumer:

  const options = {
      ack: true,
      queueOptions: {
        exchangeOptions: {
          type: 'x-delayed-message',
          'x-delayed-type': 'direct'
        }
      }
    };
    bus.listen(eventName, options, handlerFunc);

Producer:

const options = {
    ack: true,
    headers: {
      'x-delay': 1000
    }
  }
bus.send(eventName, message, options);

The message sends and is received by the listener immediately (~100ms).

I'm wondering if I am not providing the information correctly on the headers on the send or with exchangeOptions on the listen. Or perhaps servicebus just doesn't support propagating these settings into the RabbitMQ queue/message properly.

Any help is appreciated.

@mateodelnorte
Copy link
Owner

That looks like it should work. servicebus should be passing any of those options on to amqp.node which allows for a number of properties on the message, including a headers object: http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish.

I've got a unit test for exactly this: https://github.com/mateodelnorte/servicebus/blob/master/test/properties.js#L23-L33

@allenhartwig
Copy link
Contributor Author

Yeah, I had a feeling the headers were ok, as thats pretty well documented in your tests.

Applying this was less clear:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);

It looks like exchangeOptions were passed in to the queue (at least on subscribe, it was less clear on listen when tracing your code), but I wasn't sure how to best set the args specified here for the exchange option.

@mateodelnorte
Copy link
Owner

can you not just pass those options into the bus() function?

const bus = require('servicebus').bus(options)

@allenhartwig
Copy link
Contributor Author

I'm not seeing how exchangeOptions is leveraged here: https://github.com/mateodelnorte/servicebus/blob/master/bus/rabbitmq/bus.js#L21

The options object is never added as a whole to this for future reference, nor is the exchangeOptions prop directly referenced.

@allenhartwig
Copy link
Contributor Author

I tested it though and still did not get a delay applied:
const bus = servicebus.bus({url: rabbitmqUrl, exchangeOptions: {type: 'x-delayed-message'}});

@allenhartwig
Copy link
Contributor Author

allenhartwig commented Jul 11, 2017

I found this gist on how to setup ampqlib appropriately to utilize the plugin:
https://gist.github.com/mfressdorf/f46fdf266f35d8c525aea16719f837ac

I'm not sure that servicebus will propagate all of these values to ch.assertExchange

@allenhartwig
Copy link
Contributor Author

Never mind. It looks like its addressed here:

this.sendChannel.assertExchange(this.exchangeName, this.exchangeOptions.type || 'topic', this.exchangeOptions);

As long as exchangeOptions is available it should work. This leads me back to my origin concern of: #102 (comment)

@allenhartwig
Copy link
Contributor Author

This PR ensures that the exchangeOptions and exchangeName are applied when the channel is setup: #103

I still am having issues getting the delay to work properly, however. I have been able to get it to work properly with amqplib directly, and after analyzing the RabbitMQ setup, it appears the issue revolves around queue bindings. When using servicebus, there are no bindings created.

@mateodelnorte
Copy link
Owner

yep. you're right. must have been an oversight of switching from amqplib to node-amqp (done because amqplib opens a channel for every consumer, whereas I wanted one for send and one for receive).

Care to make a PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants