Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic offset tracking for stream queues #661

Open
wants to merge 63 commits into
base: main
Choose a base branch
from

Conversation

viktorerlingsson
Copy link
Member

@viktorerlingsson viktorerlingsson commented Apr 17, 2024

WHAT is this pull request doing?

Adds broker tracking of consumer offsets in streams if no x-stream-offset is provided by the consumer. Does not track if the consumer tag is generated by the broker.

✅ When to run cleanup_consumer_offsets?
✅ IndexError when trying to cleanup if msg_size => segment_size

HOW can this pull request be tested?

Run specs

@viktorerlingsson viktorerlingsson force-pushed the streams_automatic_offset_tracking branch 2 times, most recently from 2ecc8d3 to e00b4e5 Compare April 23, 2024 13:56
src/lavinmq/mfile.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
@viktorerlingsson viktorerlingsson linked an issue May 14, 2024 that may be closed by this pull request
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
@viktorerlingsson viktorerlingsson force-pushed the streams_automatic_offset_tracking branch from 194926e to e58ad9b Compare May 23, 2024 14:38
@viktorerlingsson viktorerlingsson marked this pull request as ready for review June 3, 2024 08:09
Copy link
Member

@carlhoerberg carlhoerberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens with consumer_offset_capacity is reached? MFile doesn't auto expand.

src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/client/channel/stream_consumer.cr Outdated Show resolved Hide resolved
src/lavinmq/mfile.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
@viktorerlingsson
Copy link
Member Author

viktorerlingsson commented Jun 17, 2024

What happens with consumer_offset_capacity is reached? MFile doesn't auto expand.

Fixed here

src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
return {@last_offset, seg, pos} if @size.zero?
mfile = @segments[seg]
msg = BytesMessage.from_bytes(mfile.to_slice + pos)
offset = offset_from_headers(msg.properties.headers)
{offset, seg, pos}
rescue ex : IndexError # first segment can be empty if message size >= segment size
return offset_at(seg + 1, pos, true) unless retried
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if many segments has been deleted? this only tired one more segment? and should pos be reused? Shouldn't it be in the beginning of the next available segment?

Suggested change
return offset_at(seg + 1, pos, true) unless retried
return offset_at(@segments.first_key, 4, true) unless retried

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offset_at() is always called with @segments.first_key (or @segments.last_key), so seg + 1 will always be the second segment (unless we're looking in the last segment, but since it's the last segment, we will never run into the issue of a message spilling over to the next segment in that case).

And I don't think we ever will have two empty segments in a row? Anytime a message spills over, it will end up in the next segment. The way we expire segments for streams shouldn't allow for any gaps. But maybe getting the next key in the hash is safer?

pos should be set to 4 though 👍

src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
@viktorerlingsson viktorerlingsson force-pushed the streams_automatic_offset_tracking branch from d52b6f5 to a74d4a6 Compare June 19, 2024 13:39
Copy link
Member

@carlhoerberg carlhoerberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to think about replication of the consumer offset file.

src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved

def update_consumer_offset(consumer_tag : String, new_offset : Int64)
if pos = @consumer_offset_positions[consumer_tag]?
IO::ByteFormat::SystemEndian.encode(new_offset, @consumer_offsets.to_slice(pos, 8, false))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this information is not replicated to followers? i guess Replicator doesn't have support for modifying files, so needs to built out.

or rethink, should we do append-only and GC the file instead? Either by assigning each ctag a number, and persist that information too, or by using lz4 compression?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this information is not replicated to followers? i guess Replicator doesn't have support for modifying files, so needs to built out.

Yeah, haven't really thought about replication, but we should obviously do that!

or rethink, should we do append-only and GC the file instead? Either by assigning each ctag a number, and persist that information too, or by using lz4 compression?

Hmm, I feel like append-only could cause some issues where the data we need is scattered between potentially large amounts of stale data. But maybe we can handle that by GC'ing (when file is full? or regular intervals) and keep only the latest offset for each ctag? Replacing the file with a new, compacted version. (and expand the file if it's full and there's nothing to GC.) We already keep a hash (@consumer_offset_positions) of all tracked ctags and their respective position in the file , so knowing what to keep when compacting should be pretty straight-forward.

I don't think I understand what you mean by Either by assigning each ctag a number, and persist that information too, or by using lz4 compression? though. I guess lz4 would be to use less space on disk in trade-off for a little extra cpu usage?

And I'm not sure if that's better than building out Replicator to support modifying files? Maybe there are other use cases for supporting modifying files in the future (like log compaction in streams).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Stream queues: Automatic offset tracking
3 participants