Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] Add AMQP (RabbitMQ) support in async consumers API #761

Open
deefdragon opened this issue Jan 10, 2024 · 7 comments
Open

[feature] Add AMQP (RabbitMQ) support in async consumers API #761

deefdragon opened this issue Jan 10, 2024 · 7 comments

Comments

@deefdragon
Copy link

Is your feature request related to a problem? Please describe.

This is a re-opening of #373 (and a few others) to add RabbitMQ as an async consumer input now that several existing asynchronous consumers have been added in V5.2.0.

I created this as a new ticket specifically because I am looking into writing a PR for it, and want to have an open ticket to collect the questions I have in.

Describe the solution you'd like

Addition of RabbitMQ

Design Questions

I have been looking at the code for the existing consumers, and it appears that the consumer has to do 3 things.

  1. Be added as a consumer that can be set up in consuming.go
  2. Implement the Service Interface
  3. Be able to listen for data and call Dispatch

Everything else appears to be consumer specific. Am I missing anything?

Consumer Multiplexing

Is a new consumer created for each specified input? IE if I have 2 tables I am reading data in from in postgres, (or 2 queues in rabbitMQ) Would I be calling the setup function twice, or is the consumer client set up once, and expected to handle the multiple inputs itself?

Multiple instances of centrifugo

When centrifugo is scaled out horizontally, it appears that postgres divides the work into partitions to make sure each piece of data is only handled by one instance each. Does the dispatch call properly send data between the instances so that the clients are all properly informed?

If so, RabbitMQ would be able to have each client in the centrifugo instance connect to the same queue, and the data would be processed in a partitioned manner automatically. I just wish to make sure that is acceptable.

Updates Mid-Execution

Is the expectation that the user restarts centrifugo to apply updates to asnyc consumers? or must they be able to adjust on the fly to changes in config?

Testing expectations

I will be able to write some unit tests for the RabbitMQ consumer of course, but I also saw some things that makes me think there are integration tests. Is that the case, and if so, what are the expectations around integration tests (and where might I look for more information/examples on those)?

@FZambia
Copy link
Member

FZambia commented Jan 10, 2024

Hello @deefdragon

I knew that adding async consumers feature may quickly result into requests for other native integrations :) I don't want to add support integrations with all the possible queueing systems in Centrifugo core, so need to think a bit about a good strategy here. My first thought that it should be somehow based on a popularity of the system we integrate with - think RabbitMQ is popular enough to be one such system. And one more criteria – I think we need a real use case before writing a new async consumer. Do you have a production system where you want to use Centrifugo with async consuming from Rabbit? If yes - could you describe the use case? If not – I'd say we better leave this issue open for some time first, discuss questions you asked and then wait for the feedback from community.

Now to questions - yep, I think you understood right how code is structured now, async consumer is just a Service which consumes some queue and calls Dispatch. In the message from queue system we expect API command to execute - method and payload.

The important property of current consumers (PostgreSQL table and Kafka topics) is that they both can maintain message ordering in channels if properly partitioned. RabbitMQ can't do this in basic setup as far I know. It seems that it's possible to achieve with https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams/ and this article additionally describes how to achieve partitioning with it for increased throughput.

Is a new consumer created for each specified input? IE if I have 2 tables I am reading data in from in postgres, (or 2 queues in rabbitMQ) Would I be calling the setup function twice, or is the consumer client set up once, and expected to handle the multiple inputs itself?

I'd say consuming from many queues is nice if possible, for Kafka it's possible to consume from many topics - but that was almost free to implement, not sure how this may be in RabbitMQ case. If it's simple to use many queues - why not and users can decide whether to configure one or many.

If so, RabbitMQ would be able to have each client in the centrifugo instance connect to the same queue, and the data would be processed in a partitioned manner automatically. I just wish to make sure that is acceptable.

Yep, just the note about ordering of processing – I believe most Centrifugo users want to keep order of messages in channels. Of course there could be cases where order is not important. Would be nice to provide an option to keep ordering right from the start. But in general this is why real use case is needed here to have some ground for decisions.

Is the expectation that the user restarts centrifugo to apply updates to asnyc consumers? or must they be able to adjust on the fly to changes in config?

Think applying changes upon restart is sufficient for now. Applying changes on the fly is nice to have - but I afraid can require non-trivial changes in configuration - no enough understanding at this point how difficult this could be.

I will be able to write some unit tests for the RabbitMQ consumer of course, but I also saw some things that makes me think there are integration tests. Is that the case, and if so, what are the expectations around integration tests (and where might I look for more information/examples on those)?

Yep, currently we test consumers in CI, using docker-compose.yaml to start external services. Locally, just docker compose up and then go test ./... -tags integration -v

Overall, I'd approach this carefully and would be nice to understand it may be really useful for some Centrifugo user with production app.

@deefdragon
Copy link
Author

Production Use Case

To answer the question around current production use with one of a few examples, I have a system that authenticates with twitch for user login (among others). When a user changes their password on twitch, twitch sends out a de-auth message to all 3rd party integrations. On getting this message, I de-auth any active keys to prevent them from refreshing, but before the JWT expires naturally (<5m), I want to be able to inform any active clients to log the user out. (Of the front end specifically so they can not send new http requests, not centrifugo. I am aware PRO does support de-authing tokens).

My intent was that users would be able to listen on an appropriate channel on the front end, and when I send a message to the rabbitMQ queue that gets these messages, centrifugo then passes that message on to the appropriate user. (My very initial planning so far has options to substitute headers in the channel so you can target a given channel in a namespace or a given user)

In general, that could be handled by an HTTP call, but that also applies for most cases with async consumers I think. As a general case, RabitMQ is automatically integrated to all of my services due to how I initialize my infra (I have a common framework during startup), and the ability to not have to deploy another service to get messaging direct to users would be ideal.

Integration

I'm wondering if it would be possible to use go's plugin system to separate out the different supported async consumers. There are some very specific considerations when it comes to version matching that may complicate that however.
Something to consider researching later I think. It would be easy enough to migrate any existing implementation to a plugin system if that is the path taken.

As is, I'm going to start with mimicking the existing code to get a hang on how well rabbitMQ can integrate to being with.

Ordering

To me, the most important part of centrifugo is how it centralizes sending users events on demand, and the time between events on the same channel for all of my use-cases is normally on the order of several seconds at smallest, and several minutes under normal circumstances.

As such, I missed the importance of tight ordering for other users and considered it more an optional feature. Useful, but not required.

I will do what I can to take keeping ordering into consideration when working on this.

@deefdragon
Copy link
Author

I have a basic AMQP (I always forget that's the generic variant of rabbitMQ queues. Ive updated the title) consumer written up, tho I've not gone through and done super rigorous testing as of yet.

I have a better understanding of the complexity in the way the consumers are currently written now, and what you mean about strategy. I misinterpreted In the message from queue system we expect API command to execute - method and payload. to mean that the existing standard has the data required located in the message as a whole somewhere (IE could be parsed from the headers of the message), not that all data required would be encoded in the body of the message itself.

The version I currently have written follows the pattern of the Kafka consumer and encodes both the payload and the method into a single json message.

To me however, the ideal end form for an async amqp consumer would be to only have data/b64data be part of the body and have all the other data pulled from the headers/config.

My reasoning for this is to allow setting up the AMQP consumer to listen to the same, already existing messages I'm sending.

Untitled Diagram drawio

So in this example, I already have Service 1 and Service 2 parsing data from RabbitMQ queues. I'd like to be able to create a queue that attaches to the existing exchange, and have Centrifugo be able to get/build the channel and other options from the headers and config. In this way, the only change that would have to be made to pass any events to the front end would be to change the centrifugo config, and the rabbitMQ queues, requiring no code changes.

The code to do this however is going to be quite a bit more complicated due to the list of options that need to be supported. As I have an active use-case for this, I'm going to split what I have currently off, and try to come up with a proof of concept to allow the more complex formatting use-cases. I will circle around with my findings if I think it can be done reasonably, and stably.

(Apologies if I'm getting pushy here. The use case Ive described is somewhat high priority given its security related, but I also have some other stuff undergoing an active migration that this would simplify by leaps and bounds if I can get it functioning)

@deefdragon deefdragon changed the title [feature] Add RabbitMQ to async consumers API [feature] Add AMQP (RabbitMQ) support in async consumers API Jan 12, 2024
@FZambia
Copy link
Member

FZambia commented Jan 12, 2024

Thanks for sharing more details!

I'm wondering if it would be possible to use go's plugin system to separate out the different supported async consumers. There are some very specific considerations when it comes to version matching that may complicate that however.

Currently the answer to this - we are not ready to include Go plugin support to Centrifugo. If you look in the Go ecosystem most systems that added Go plugin support consider it experimental, and the fact plugin requires exact versions of libraries makes this quite unmanageable.

As such, I missed the importance of tight ordering for other users and considered it more an optional feature. Useful, but not required.

If ordering is not important for your use case, I suppose it's ok to not have it but think about possible solutions. We can emphasize this in doc if ordering is hard to achieve, just would be nice to understand whether it's hard or impossible.

to mean that the existing standard has the data required located in the message as a whole somewhere (IE could be parsed from the headers of the message), not that all data required would be encoded in the body of the message itself.

Think this is the most difficult question here. Yes - consumers in Centrifugo expect messages to be Centrifugo API command. If you want to have some rules how to extract data from RabbitMQ messages - then I suppose it's out of scope of Centrifugo at the moment. Also, I think it would be quite strange/non-obvious if all your messages travelling to various destinations will contain Centrifugo-specific payload in headers. I think the correct solution here – write a service which will transform RabbitMQ messages to Centrifugo format - in this case you can simply use HTTP API I suppose.

@deefdragon
Copy link
Author

Implementations

Here is my initial implementation that reads data in from AMQP and dispatches via the existing method used by kafka (I actually used literally the same struct to parse the data).

I also have a more advanced variant that allows substituting information in as needed for the channel, method, and the other parameters (this is optional as it defaults to the simple variant). It is somewhat complex to get this method to work however, as it uses go-templates to accomplish the substitution, and some really messy json encoding/decoding to build the payload structure.

I'm going to be testing the more advanced version in my production environment to see if I encounter any particular issues/complexities with it, but its been stable so far. If I do get heavy use out of the complex form I'm going to see if I can clean it up & optimize it to not require converting to/from json half a dozen times and actually reasonable to submit a PR for. I also would like to see if I can make it usable on more than just the AMQP consumer so its some kind of standardized.

Extra headers

I think it would be quite strange/non-obvious if all your messages traveling to various destinations will contain Centrifugo-specific payload in headers

As is, I pass different headers for different destinations, so in several cases I'm already passing extraneous data from the perspective of one program vs another. I also do tracing that is informed by the headers I include in the messages, so for most cases I would arguably not be including any extra data. This is a fair comment however, as that will not necessarily be the case for most users.

Overall I think I would prefer having the extra headers as opposed to having to send messages into two separate queues, but that's part of why I'm going to do the testing. To verify if this is the case&actually useful.

Ordering

As a side note, I am still researching & testing if ordering is possible. To my understanding the method rabbitMQ uses to build the single consumer streams means that it SHOULD just be a drop in replacement, and you just point the consumer at the appropriate queue. I don't know if the user needs to be able to set any headers for the connection however.

@FZambia
Copy link
Member

FZambia commented Jan 17, 2024

Hello, took a quick look, still thinking that templating to extract Centrifugo-specific data is a very controversial approach. One of the goals of Centrifugo - we generally try to translate good practices to users. Having large message for all systems seems like an antipattern for most use cases. I understand it reduces the work and number of components, but may become a limitation or security concern in the future.

@FZambia
Copy link
Member

FZambia commented Jan 18, 2024

Looks like RabbitMQ Super streams provide exact semantics we want in Centrifugo regarding throughput scalability and ordering: https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-super-streams/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants