Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for custom response exchange #104

Merged
merged 2 commits into from
Apr 21, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 33 additions & 16 deletions pjrpc/server/integration/aio_pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,33 @@ class Executor:
`aio_pika <https://aio-pika.readthedocs.io/en/latest/>`_ based JSON-RPC server.

:param broker_url: broker connection url
:param queue_name: requests queue name
:param rx_queue_name: requests queue name
:param tx_exchange_name: response exchange name
:param tx_routing_key: response routing key
:param prefetch_count: worker prefetch count
:param kwargs: dispatcher additional arguments
"""

def __init__(self, broker_url: URL, queue_name: str, prefetch_count: int = 0, **kwargs: Any):
def __init__(
self,
broker_url: URL,
rx_queue_name: str,
tx_exchange_name: str = None,
tx_routing_key: str = None,
prefetch_count: int = 0,
**kwargs: Any
):
self._broker_url = broker_url
self._queue_name = queue_name
self._rx_queue_name = rx_queue_name
self._tx_exchange_name = tx_exchange_name
self._tx_routing_key = tx_routing_key
self._prefetch_count = prefetch_count

self._connection = aio_pika.connection.Connection(broker_url)
self._channel: Optional[aio_pika.abc.AbstractChannel] = None

self._queue: Optional[aio_pika.abc.AbstractQueue] = None
self._exchange: Optional[aio_pika.abc.AbstractExchange] = None
self._consumer_tag: Optional[str] = None

self._dispatcher = pjrpc.server.AsyncDispatcher(**kwargs)
Expand All @@ -40,17 +53,20 @@ def dispatcher(self) -> pjrpc.server.AsyncDispatcher:

return self._dispatcher

async def start(self, queue_args: Optional[Dict[str, Any]] = None) -> None:
async def start(self, queue_args: Optional[Dict[str, Any]] = None, exchange_args: Optional[Dict[str, Any]] = None) -> None:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is too long. Please split it to be less than 120 characters.

"""
Starts executor.

:param queue_args: queue arguments
:param exchange_args: exchange arguments
"""

await self._connection.connect()
self._channel = channel = await self._connection.channel()

self._queue = queue = await channel.declare_queue(self._queue_name, **(queue_args or {}))
self._queue = queue = await channel.declare_queue(self._rx_queue_name, **(queue_args or {}))
if self._tx_exchange_name:
self._exchange = await channel.declare_exchange(self._tx_exchange_name, **(exchange_args or {}))
await channel.set_qos(prefetch_count=self._prefetch_count)
self._consumer_tag = await queue.consume(self._rpc_handle)

Expand Down Expand Up @@ -80,17 +96,18 @@ async def _rpc_handle(self, message: aio_pika.abc.AbstractIncomingMessage) -> No
if response_text is not None:
if reply_to is None:
logger.warning("property 'reply_to' is missing")
else:
async with self._connection.channel() as channel:
await channel.default_exchange.publish(
aio_pika.Message(
body=response_text.encode(),
reply_to=reply_to,
correlation_id=message.correlation_id,
content_type=pjrpc.common.DEFAULT_CONTENT_TYPE,
),
routing_key=reply_to,
)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, remove extra whitespace

async with self._connection.channel() as channel:
exchange = self._exchange if self._exchange else channel.default_exchange
await exchange.publish(
aio_pika.Message(
body=response_text.encode(),
reply_to=reply_to,
correlation_id=message.correlation_id,
content_type=pjrpc.common.DEFAULT_CONTENT_TYPE,
),
routing_key=self._tx_routing_key if self._tx_routing_key else reply_to,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be necessary to check that self._tx_routing_key is not None or reply_to is not None otherwise routing_key will be None which is forbidden.

Copy link
Author

@jgrasboeck jgrasboeck Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, thx! Fixed it by using default routing key "" in case neither reply_to or tx_routing_key is given. This could be usesfull on cases where a fanout exchange is used as target.

)

await message.ack()

Expand Down
Loading