Skip to content

πŸ‡ Build consistent and well-balanced Producer/Consumer pipelines

License

Notifications You must be signed in to change notification settings

hqoss/rabbit_mq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Elixir CI Codacy Badge Hex.pm Coverage Status

πŸ‡ Elixir RabbitMQ Client

rabbit_mq helps you build consistent and well-balanced Producer/Consumer pipelines.

Table of contents

Installation and Usage

Add :rabbit_mq as a dependency to your project's mix.exs:

defp deps do
  [
    {:rabbit_mq, "~> 0.0.19"}
  ]
end

Documentation

The full documentation is published on hex.

The following modules are provided;

Sample usage

Establish routing topology

⚠️ All examples in this guide assume you've already set up your (RabbitMQ) routing topology as shown below.

source_name source_kind destination_name destination_kind routing_key arguments
customer exchange customer/customer.created queue customer.created []
customer exchange customer/customer.updated queue customer.updated []

You can run these commands against your rabbitmq instance to establish the desired routing topology.

# Declare the customer exchange
rabbitmqadmin declare exchange name=customer type=topic durable=true

# Declare and bind the customer/customer.created queue
rabbitmqadmin declare queue name=customer/customer.created durable=true
rabbitmqadmin declare binding source=customer destination=customer/customer.created routing_key=customer.created

# Declare and bind the customer/customer.updated queue
rabbitmqadmin declare queue name=customer/customer.updated durable=true
rabbitmqadmin declare binding source=customer destination=customer/customer.updated routing_key=customer.updated

ℹ️ You can also use the RabbitMQ.Topology module to quickly establish desired routing topology via your application.

This is what the result should look like in the RabbitMQ Management dashboard:

RabbitMQ Topology

Minimal configuration

First, ensure you point to a valid amqp_url by configuring :rabbit_mq in your config.exs.

ℹ️ To run RabbitMQ locally, see our docker-compose.yaml for a sample Docker Compose set up.

config :rabbit_mq, :amqp_url, "amqp://guest:guest@localhost:5672"

For advanced configuration options, consult the Configuration section.

Producers

Let's define our CustomerProducer first. We will use this module to publish messages onto the "customer" exchange.

defmodule RabbitSample.CustomerProducer do
  @moduledoc """
  Publishes pre-configured events onto the "customer" exchange.
  """

  use RabbitMQ.Producer, exchange: "customer", worker_count: 3

  @doc """
  Publishes an event routed via "customer.created".
  """
  def customer_created(customer_id) when is_binary(customer_id) do
    opts = [
      content_type: "application/json",
      correlation_id: UUID.uuid4(),
      mandatory: true
    ]

    data = Jason.encode!(%{v: "1.0.0", customer_id: customer_id})

    publish("customer.created", data, opts)
  end

  @doc """
  Publishes an event routed via "customer.updated".
  """
  def customer_updated(updated_customer) when is_map(updated_customer) do
    opts = [
      content_type: "application/json",
      correlation_id: UUID.uuid4(),
      mandatory: true
    ]

    data = Jason.encode!(%{v: "1.0.0", customer_data: updated_customer})

    publish("customer.updated", data, opts)
  end
end

Consumers

To consume messages off the respective queues, we will define 2 separate consumers.

⚠️ Please note that automatic message acknowledgement is disabled in rabbit_mq, therefore it's your responsibility to ensure messages are ack'd or nack'd.

To consume off "customer/customer.created":

defmodule RabbitSample.CustomerCreatedConsumer do
  use RabbitMQ.Consumer, queue: "customer/customer.created", worker_count: 2, prefetch_count: 3

  require Logger

  def handle_message(payload, meta, channel) do
    %{delivery_tag: delivery_tag, redelivered: redelivered} = meta

    try do
      Logger.info("Customer created. Event data: #{payload}.")
      ack(channel, delivery_tag)
    rescue
      _ -> nack(channel, delivery_tag, requeue: redelivered !== true)
    end
  end
end

To consume off "customer/customer.updated":

defmodule RabbitSample.CustomerUpdatedConsumer do
  use RabbitMQ.Consumer, queue: "customer/customer.updated", worker_count: 2, prefetch_count: 6

  require Logger

  def handle_message(payload, meta, channel) do
    %{delivery_tag: delivery_tag, redelivered: redelivered} = meta

    try do
      Logger.info("Customer updated. Event data: #{payload}.")
      ack(channel, delivery_tag)
    rescue
      _ -> nack(channel, delivery_tag, requeue: redelivered !== true)
    end
  end
end

Start under supervision tree

And finally, we will start our application.

defmodule RabbitSample.Application do
  use Application

  def start(_type, _args) do
    children = [
      RabbitSample.CustomerProducer,
      RabbitSample.CustomerCreatedConsumer,
      RabbitSample.CustomerUpdatedConsumer
    ]

    opts = [strategy: :one_for_one, name: RabbitSample.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Using iex;

iex -S mix

The resulting application topology should look like this:

Application Topology

Upon closer inspection using the RabbitMQ Management dashboard, we see that:

  • a) each of our modules maintains its dedicated connection; and
  • b) each of our modules' workers maintains its dedicated channel under the respective connection.

Connections

ℹ️ Detailed view of how individual workers have set up their channels. Note that the different prefetch counts correspond to the different configuration we provided in our Consumers, and that the Producer's 3 worker channels operate in Confirm mode.

Channels

Produce and Consume messages

⚠️ Due to the asynchronous nature of the application, the order of outputs in the console may vary.

iex(1)> RabbitSample.CustomerProducer.customer_created(UUID.uuid4())
{:ok, 1}

21:52:55.098 [debug] Received ACK of 1.

21:52:55.098 [info]  Customer created. Event data: {"customer_id":"b6712186-43be-46ce-a7b2-a4c4ab42efe7","v":"1.0.0"}.

iex(2)> RabbitSample.CustomerProducer.customer_updated(%{id: UUID.uuid4()})
{:ok, 1}

21:53:06.918 [debug] Received ACK of 1.

21:53:06.918 [info]  Customer updated. Event data: {"customer_data":{"id":"e83e92c9-1915-4e9c-85bb-bf78b056fd76"},"v":"1.0.0"}.

iex(3)>

Advanced configuration

The following options can be configured.

config :rabbit_mq,
  amqp_url: "amqp://guest:guest@localhost:5672",
  heartbeat_interval_sec: 60,
  reconnect_interval_ms: 2500,
  max_channels_per_connection: 16
  • amqp_url; required, the broker URL.
  • heartbeat_interval_sec; defines after what period of time the peer TCP connection should be considered unreachable. Defaults to 30.
  • reconnect_interval_ms; the interval before another attempt to re-connect to the broker should occur. Defaults to 2500.
  • max_channels_per_connection; maximum number of channels per connection. Also determines the maximum number of workers per Producer/Consumer module. Defaults to 8.

⚠️ Please consult the following guides to understand how to best configure :max_channels_per_connection and :heartbeat_interval_sec respectively.

Excessive logging

See original section in amqp docs.

Add the following configuration.

config :logger, handle_otp_reports: false

Lager conflicts with Elixir logger

Lager is used by rabbit_common and is not Elixir's best friend yet. You need a workaround.

⚠️ In mix.exs, you have to load :lager before :logger.

  extra_applications: [:lager, :logger]

Testing

The library itself has been rigorously tested, so you should ideally only need to test whether you've configured your modules correctly.

Additionally, you should test any side-effects driven by your Producers or Consumers.

Producers

Here is a few ideas on how you can test your Producers.

⚠️ The below snippet assumes your application starts the CustomerProducer module as shown in earlier examples.

defmodule RabbitSampleTest.CustomerProducer do
  alias AMQP.{Basic, Channel, Connection, Queue}
  alias RabbitSample.CustomerProducer

  use ExUnit.Case

  @amqp_url Application.get_env(:rabbit_mq, :amqp_url)
  @exchange "customer"

  setup_all do
    assert {:ok, connection} = Connection.open(@amqp_url)
    assert {:ok, channel} = Channel.open(connection)

    # Declare an exclusive queue and bind it to the customer exchange.
    {:ok, %{queue: queue}} = Queue.declare(channel, "", exclusive: true)
    :ok = Queue.bind(channel, queue, @exchange, routing_key: "#")

    # Clean up after all tests have ran.
    on_exit(fn ->
      # This queue would have been deleted automatically when the connection
      # gets closed, however we prefer to be explicit. Also, we ensure there
      # are no messages left hanging in the queue.
      assert {:ok, %{message_count: 0}} = Queue.delete(channel, queue)

      assert :ok = Channel.close(channel)
      assert :ok = Connection.close(connection)
    end)

    [channel: channel, queue: queue]
  end

  setup %{channel: channel, queue: queue} do
    # Each test will be notified when a message is consumed.
    assert {:ok, consumer_tag} = Basic.consume(channel, queue)

    # This will always be the first message received by the process.
    assert_receive({:basic_consume_ok, %{consumer_tag: ^consumer_tag}})

    on_exit(fn ->
      # Ensure there are no messages in the queue as the next test is about to start.
      assert true = Queue.empty?(channel, queue)
    end)

    [
      channel: channel,
      consumer_tag: consumer_tag
    ]
  end

  describe "#{__MODULE__}" do
    test "defines correctly configured child specification", %{
      channel: channel,
      consumer_tag: consumer_tag
    } do
      assert %{
               id: CustomerProducer,
               restart: :permanent,
               shutdown: :brutal_kill,
               start:
                 {RabbitMQ.Producer, :start_link,
                  [
                    %{confirm_type: :async, exchange: @exchange, worker_count: 3},
                    [name: CustomerProducer]
                  ]},
               type: :supervisor
             } = CustomerProducer.child_spec([])

      Basic.cancel(channel, consumer_tag)

      # This will always be the last message received by the process.
      assert_receive({:basic_cancel_ok, %{consumer_tag: ^consumer_tag}})

      # Ensure no further messages are received.
      refute_receive(_)
    end

    test "customer_created/1 publishes correctly configured events", %{
      channel: channel,
      consumer_tag: consumer_tag
    } do
      customer_id = UUID.uuid4()
      expected_payload = Jason.encode!(%{v: "1.0.0", customer_id: customer_id})

      assert {:ok, _seq_no} = CustomerProducer.customer_created(customer_id)

      assert_receive(
        {:basic_deliver, ^expected_payload,
         %{
           consumer_tag: ^consumer_tag,
           content_type: "application/json",
           correlation_id: correlation_id,
           delivery_tag: delivery_tag,
           routing_key: "customer.created"
         }}
      )

      # Ensure correlation_id is a valid UUID.
      assert {:ok, _} = UUID.info(correlation_id)

      # Acknowledge that the message has been received.
      Basic.ack(channel, delivery_tag)

      # Stop consuming.
      Basic.cancel(channel, consumer_tag)

      # This will always be the last message received by the process.
      assert_receive({:basic_cancel_ok, %{consumer_tag: ^consumer_tag}})

      # Ensure no further messages are received.
      refute_receive(_)
    end

    test "customer_updated/1 publishes correctly configured events", %{
      channel: channel,
      consumer_tag: consumer_tag
    } do
      customer_data = %{id: UUID.uuid4()}
      expected_payload = Jason.encode!(%{v: "1.0.0", customer_data: customer_data})

      assert {:ok, _seq_no} = CustomerProducer.customer_updated(customer_data)

      assert_receive(
        {:basic_deliver, ^expected_payload,
         %{
           consumer_tag: ^consumer_tag,
           content_type: "application/json",
           correlation_id: correlation_id,
           delivery_tag: delivery_tag,
           routing_key: "customer.updated"
         }}
      )

      # Ensure correlation_id is a valid UUID.
      assert {:ok, _} = UUID.info(correlation_id)

      # Acknowledge that the message has been received.
      Basic.ack(channel, delivery_tag)

      # Stop consuming.
      Basic.cancel(channel, consumer_tag)

      # This will always be the last message received by the process.
      assert_receive({:basic_cancel_ok, %{consumer_tag: ^consumer_tag}})

      # Ensure no further messages are received.
      refute_receive(_)
    end
  end
end

Consumers

Here is a few ideas on how you can test your Consumers.

⚠️ The below snippet assumes your application starts the CustomerCreatedConsumer module as shown in earlier examples.

defmodule RabbitSampleTest.CustomerCreatedConsumer do
  alias AMQP.{Basic, Channel, Connection, Queue}
  alias RabbitSample.CustomerCreatedConsumer

  import ExUnit.CaptureLog

  use ExUnit.Case

  @amqp_url Application.get_env(:rabbit_mq, :amqp_url)
  @exchange "customer"
  @queue "#{@exchange}/customer.created"

  setup_all do
    assert {:ok, connection} = Connection.open(@amqp_url)
    assert {:ok, channel} = Channel.open(connection)

    # Declare an exclusive queue and bind it to the customer exchange.
    {:ok, %{queue: queue}} = Queue.declare(channel, "", exclusive: true)
    :ok = Queue.bind(channel, queue, @exchange, routing_key: "#")

    # Clean up after all tests have ran.
    on_exit(fn ->
      # This queue would have been deleted automatically when the connection
      # gets closed, however we prefer to be explicit. Also, we ensure there
      # are no messages left hanging in the queue.
      assert {:ok, %{message_count: 0}} = Queue.delete(channel, queue)

      assert :ok = Channel.close(channel)
      assert :ok = Connection.close(connection)
    end)

    [channel: channel, queue: queue]
  end

  setup %{channel: channel, queue: queue} do
    # Each test will be notified when a message is consumed.
    assert {:ok, consumer_tag} = Basic.consume(channel, queue)

    # This will always be the first message received by the process.
    assert_receive({:basic_consume_ok, %{consumer_tag: ^consumer_tag}})

    on_exit(fn ->
      # Ensure there are no messages in the queue as the next test is about to start.
      assert true = Queue.empty?(channel, queue)
    end)

    [
      channel: channel,
      consumer_tag: consumer_tag
    ]
  end

  describe "#{__MODULE__}" do
    test "defines correctly configured child specification", %{
      channel: channel,
      consumer_tag: consumer_tag
    } do
      assert %{
               id: CustomerCreatedConsumer,
               restart: :permanent,
               shutdown: :brutal_kill,
               start:
                 {RabbitMQ.Consumer, :start_link,
                  [
                    %{consume_cb: _, prefetch_count: 3, queue: @queue, worker_count: 2},
                    [name: CustomerCreatedConsumer]
                  ]},
               type: :supervisor
             } = CustomerCreatedConsumer.child_spec([])

      # Stop consuming.
      Basic.cancel(channel, consumer_tag)

      # This will always be the last message received by the process.
      assert_receive({:basic_cancel_ok, %{consumer_tag: ^consumer_tag}})

      # Ensure no further messages are received.
      refute_receive(_)
    end

    test "handle_message/3 logs a message", %{
      channel: channel,
      consumer_tag: consumer_tag
    } do
      correlation_id = UUID.uuid4()
      payload = Jason.encode!(%{v: "1.0.0", customer_id: UUID.uuid4()})

      Basic.publish(channel, @exchange, "customer.created", payload,
        correlation_id: correlation_id
      )

      assert_receive(
        {:basic_deliver, payload,
         %{
           consumer_tag: ^consumer_tag,
           correlation_id: ^correlation_id,
           routing_key: "customer.created"
         } = meta}
      )

      assert capture_log(fn ->
               CustomerCreatedConsumer.handle_message(payload, meta, channel)
             end) =~ "Customer #{payload} created"

      # Stop consuming.
      Basic.cancel(channel, consumer_tag)

      # This will always be the last message received by the process.
      assert_receive({:basic_cancel_ok, %{consumer_tag: ^consumer_tag}})

      # Ensure no further messages are received.
      refute_receive(_)
    end
  end
end

⚠️ The below snippet assumes your application starts the CustomerUpdatedConsumer module as shown in earlier examples.

defmodule RabbitSampleTest.CustomerUpdatedConsumer do
  alias AMQP.{Basic, Channel, Connection, Queue}
  alias RabbitSample.CustomerUpdatedConsumer

  import ExUnit.CaptureLog

  use ExUnit.Case

  @amqp_url Application.get_env(:rabbit_mq, :amqp_url)
  @exchange "customer"
  @queue "#{@exchange}/customer.updated"

  setup_all do
    assert {:ok, connection} = Connection.open(@amqp_url)
    assert {:ok, channel} = Channel.open(connection)

    # Declare an exclusive queue and bind it to the customer exchange.
    {:ok, %{queue: queue}} = Queue.declare(channel, "", exclusive: true)
    :ok = Queue.bind(channel, queue, @exchange, routing_key: "#")

    # Clean up after all tests have ran.
    on_exit(fn ->
      # This queue would have been deleted automatically when the connection
      # gets closed, however we prefer to be explicit. Also, we ensure there
      # are no messages left hanging in the queue.
      assert {:ok, %{message_count: 0}} = Queue.delete(channel, queue)

      assert :ok = Channel.close(channel)
      assert :ok = Connection.close(connection)
    end)

    [channel: channel, queue: queue]
  end

  setup %{channel: channel, queue: queue} do
    # Each test will be notified when a message is consumed.
    assert {:ok, consumer_tag} = Basic.consume(channel, queue)

    # This will always be the first message received by the process.
    assert_receive({:basic_consume_ok, %{consumer_tag: ^consumer_tag}})

    on_exit(fn ->
      # Ensure there are no messages in the queue as the next test is about to start.
      assert true = Queue.empty?(channel, queue)
    end)

    [
      channel: channel,
      consumer_tag: consumer_tag
    ]
  end

  describe "#{__MODULE__}" do
    test "defines correctly configured child specification", %{
      channel: channel,
      consumer_tag: consumer_tag
    } do
      assert %{
               id: CustomerUpdatedConsumer,
               restart: :permanent,
               shutdown: :brutal_kill,
               start:
                 {RabbitMQ.Consumer, :start_link,
                  [
                    %{consume_cb: _, prefetch_count: 6, queue: @queue, worker_count: 2},
                    [name: CustomerUpdatedConsumer]
                  ]},
               type: :supervisor
             } = CustomerUpdatedConsumer.child_spec([])

      # Stop consuming.
      Basic.cancel(channel, consumer_tag)

      # This will always be the last message received by the process.
      assert_receive({:basic_cancel_ok, %{consumer_tag: ^consumer_tag}})

      # Ensure no further messages are received.
      refute_receive(_)
    end

    test "handle_message/3 logs a message", %{
      channel: channel,
      consumer_tag: consumer_tag
    } do
      correlation_id = UUID.uuid4()
      customer_data = %{id: UUID.uuid4()}
      payload = Jason.encode!(%{v: "1.0.0", customer_data: customer_data})

      Basic.publish(channel, @exchange, "customer.updated", payload,
        correlation_id: correlation_id
      )

      assert_receive(
        {:basic_deliver, payload,
         %{
           consumer_tag: ^consumer_tag,
           correlation_id: ^correlation_id,
           routing_key: "customer.updated"
         } = meta}
      )

      assert capture_log(fn ->
               CustomerUpdatedConsumer.handle_message(payload, meta, channel)
             end) =~ "Customer updated. Data: #{payload}."

      # Stop consuming.
      Basic.cancel(channel, consumer_tag)

      # This will always be the last message received by the process.
      assert_receive({:basic_cancel_ok, %{consumer_tag: ^consumer_tag}})

      # Ensure no further messages are received.
      refute_receive(_)
    end
  end
end

Balanced performance and reliability

The RabbitMQ modules are pre-configured with sensible defaults and follow design principles that improve and delicately balance both performance and reliability.

This has been possible through

  • a) extensive experience of working with Elixir and RabbitMQ in production; and
  • b) meticulous consultation of the below (and more) documents and guides.

⚠️ While most of the heavy-lifting is provided by the library itself, reading through the documents below before running any application in production is thoroughly recommended.

TODO

A quick and dirty tech-debt tracker, used in conjunction with Issues.

  • Add support for individual and batch publisher confirms.
  • Add support for publishing without confirm mode.
  • Increase test coverage to as close to 100% as possible.
  • Update testing guide.
  • Expose get_connection on the individual Consumer and Producer level
  • Change handle_message/3 to return :ok | {:error, :requeue} | {:error, term()}
  • Add handle_publisher_ack, make both optional
  • Add all optional callbacks (:basic_cancel, etc.) to the Consumer module