Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Design for Postgres event transport #11

Open
adamcharnock opened this issue Nov 14, 2019 · 6 comments
Open

Design for Postgres event transport #11

adamcharnock opened this issue Nov 14, 2019 · 6 comments

Comments

@adamcharnock
Copy link
Owner

adamcharnock commented Nov 14, 2019

Needed commands (mirroring from Redis event transport):

  • xadd
  • xread_group
  • xpending
  • xclaim
  • xrevrange

This requires:

  • Messages
  • Groups
  • Consumers

We will need to be able to:

  • Determine which message each group is up to
  • Determine which consumers are currently processing a message
  • Upto one consumer from each group can be processing a message
  • Each consumer can be processing multiple messages at any one time

Potential schema outline:

  • messages
    • id SERIAL INT
    • created_at TIMESTAMPTZ
    • stream VARCHAR(255)
    • content JSONB
  • groups
    • id SERIAL INT
    • name VARCHAR(255)
    • stream VARCHAR(255)
    • latest_id INT
  • pending_messages
    • message_id INT
    • consumer_name VARCHAR(255)
    • claimed_at TIMESTAMPTZ

Potentially move each stream into its own table.


Note: The old transaction event transport was deleted in 899e9b5

@adamcharnock
Copy link
Owner Author

Note that a postgres-based event transport will not achieve application/bus atomicity. For this we will still need a postgres-based layer that resides in the database being written to. This is because postgres does not support cross-database transactions

@apollo13
Copy link
Contributor

apollo13 commented Jan 3, 2020

Hi there, I just found lightbus and postgres support looks interesting. Thinking about it, I am wondering if it doesn't help to investigate this issue from a postgresql POV. For example postgres allows you to select items from a queue using "SELECT … FOR UPDATE SKIP LOCKED" or use advisory locks which both can help when implementing a task queue.

So instead of thinking in terms of redis streams, maybe think in the semantics that you are trying to achieve instead. For events this could mean "an event needs to be created and then consumed at least once" and then think about an efficient way to achieve this in postgres. You kind of have this in We will need to be able to:, but I am not sure for instance if you actually need to be able to Determine which consumers are currently processing a message (I don't know lightbus well enough yet to be able to tell where this requirement comes from).

Please note that I am not saying your current approach/thoughts are bad or wrong in any way; I am merely trying to provide another perspective/approach to the problem.

@adamcharnock
Copy link
Owner Author

Hey @apollo13, you make good points, and input is always very welcome. Thank you for taking the time.

I definitely agree with you about coming at this from a Postgres point of view. To give some history, at the back of my mind I was assuming this could simply be implemented as typical Postgres queue. Events would be inserted into a table, and they would be selected out and processed (using one of the techniques you point out above).

However, I realised this wouldn't work. This provides a job queue (one message goes in, one comes out), but Lightbus events use a fan-out design. In Lightbus there may be zero or more consumers[1] for an event, and each consumer will receive each event at least once.

At this point I decided to make some notes in this ticket, so what you see above is how far I got in thinking about it at the time. I think I was essentially trying to get some notes down around "we need to implement consumer groups in Postgres". [2]

Perhaps I could therefore crystallise the core requirement as: An event needs to be received by each listener at least once.

If it helps, off the top of my head I can see a couple of potential avenues to explore:

  • There are incoming (id, timestamp, message) and outgoing (id, timestamp, message, consumer_group, consumer_name) tables. Messages are written into incoming, and there is some mechanism where they are copied into outgoing (a copy for each consumer group). They can they be pulled from there by consumers with SELECT … FOR UPDATE WHERE consumer_group = ... SKIP LOCKED or similar.
  • Use a single table with an ARRAY column which keeps track of which consumers have received the message and which have acknowledged it. I think row-level locking would prevent multiple consumers processing the same event simultaneously though (I haven't looked into advisory locks).

These are only rough ideas, and probably need some refining. I could knock out some pseudocode if that would be of interest.

I certainly be happy to support you if you were interested in working on this. Even if not, thank you for spurring on some more thought about this 👍

[1] The Lightbus API calls consumers 'listeners'

[2] I suspect this is somewhat what I was getting at with "Determine which consumers are currently processing a message". I.e. we should be able to reclaim messages from dead consumers within our group.

@apollo13
Copy link
Contributor

apollo13 commented Jan 3, 2020

In Lightbus there may be zero or more consumers[1] for an event, and each consumer will receive each event at least once.

Ah right, this is probably what I was missing. And since the client creating the events doesn't know the consumers there is no way to insert the event multiple times for different consumers either.

Perhaps I could therefore crystallise the core requirement as: An event needs to be received by each listener at least once.

I very much like this as problem description. I'll think about it and let you know if I can come up with something :) Thank you for taking the time to go into the details.

@adamcharnock
Copy link
Owner Author

No problem at all. I'd love to hear what you come up with!


Also, to expand on this:

Ah right, this is probably what I was missing. And since the client creating the events doesn't know the consumers there is no way to insert the event multiple times for different consumers either.

I would like to explicitly add: It is possible that a future consumer group will be created which wishes to consume past events (i.e. event sourcing). So it is definitely true that one can not know all the potential destinations for an event at the time an event is fired.

@adamcharnock
Copy link
Owner Author

Adding some notes here regarding some discussion I've had with @apollo13 in the discourse channel:

  • Advisory locks may be useful, with one lock per consumer group
  • Use LISTEN/NOTIFY to notify consumers. Probably one channel per API/event (perhaps determined by a stream_use parameter as with the redis transport)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants