-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Idempotent drop_generation #13072
Idempotent drop_generation #13072
Conversation
9ef5ef9
to
bd6ba87
Compare
8991d9f
to
7a310c3
Compare
@@ -27,25 +27,6 @@ opts() -> | |||
|
|||
%% | |||
|
|||
t_idempotent_store_batch(_Config) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, this doesn't hold, since writing batches to the old generations breaks all kinds of assumptions about the replay.
@keynslug Is this property important for e.g. replication layer? Can it replay add_generation
and store_batch
out of order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had a discussion about that behavior, and settled down on the approach "preserve idempotency as much as possible", basically. Initially the approach was "drop batches targeting outdated generations on the floor".
Can it replay
add_generation
andstore_batch
out of order?
It sort of depends on what "out-of-order" means here. The class of situations this was introduced to cover more or less boils down to:
- Replica applied
store_batch
,add_generation
,store_batch
to the storage stateSS0
. - Storage state becomes
SS1
. - Something happened, e.g.
SS1
transferred as a snapshot to another replica. - Replica applies the same sequence (e.g. from the Raft log) to the storage state
SS1
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, commit_batch
is still idempotent, since prepare
adds generation id into the cooked batch. I think splitting the operation should ultimately help with this. There are two obstacles:
Assuming we run prepare
in the aux server on leader:
- ra doesn't provide an API to call aux server with a timeout (can be worked around)
- there's no API to get leader from a node that is not part of the shard cluster.
be9617c
to
dd852d4
Compare
06fc3df
to
d70b3f3
Compare
@@ -179,8 +179,7 @@ make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> | |||
| {ok, end_of_stream} | |||
| {error, _}. | |||
delete_next(Node, DB, Shard, Iter, Selector, BatchSize) -> | |||
emqx_rpc:call( | |||
Shard, | |||
erpc:call( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using janky gen_rpc is only justified when forwarding large binaries, however this is not the case here.
1be35fc
to
50e04a0
Compare
apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl
Outdated
Show resolved
Hide resolved
@@ -546,13 +561,15 @@ handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) -> | |||
LastWrittenTs = 0 | |||
end, | |||
case Latch of | |||
false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) -> | |||
false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) + 1 -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: why the + 1 is needed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It delays event emission by one epoch to add a little safety margin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P.S. Added a comment.
%% NOTE | ||
%% We assume that batches do not span generations. Callers should enforce this. | ||
?tp(emqx_ds_storage_layer_prepare_batch, #{ | ||
shard => Shard, messages => Messages, options => Options | ||
}), | ||
{GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time), | ||
GenId = generation_current(Shard), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect this could introduce message duplication under snapshot transfer + log replay scenarios. I.e. imagine a following sequence of entries in the logs:
{T1 = 42, Msg1}
{T2 = 43, Msg2}
{T3 = 44, add_generation}
{T4 = 45, Msg3}
If they are reapplied on top of the storage state that already had them applied, couldn't Msg1
and Msg2
end up both in previous and in the current generation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it will happen. But running prepare in the main state machine code should be removed. Prepare should run on the leader (in aux callback), and add a cooked batch to the raft log. Committing cooked batch to the storage is idempotent.
%% any more messages here. The iterator reached the end: | ||
%% the stream has been fully replayed. | ||
{ok, end_of_stream}; | ||
IsCurrent = GenId =:= generation_current(Shard), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this loss of write idempotency could be alleviated by picking the "current" generation according to Now
here, instead of the most recent storage state? I have a feeling this also relaxes the requirement to read from leaders, but am not 100% sure.
The risk here I guess is having Now
pointing to a dropped generation (from the storage state perspective), but picking the earliest alive in that case should work the same I guess.
4a4df95
to
4358077
Compare
Reduce volume of logs and crash reports from DS
Make sure storage events originating from generation X are handled in the context of the same generation.
4358077
to
59a09fb
Compare
Fixes EMQX-12402
Release version: v/e5.7
Summary
Various small, but important fixes.
drop_generation
operation can be replayed multiple times by the replication layer, but it's not idempotent. This PR adds a workaround that avoids a crash whendrop_generation
doesn't succeed. In the future, however, we want to makedrop_generation
idempotent in a nicer way.storage_layer:generation_at
function, which has been removed for good).format_status
callback for several workers to avoid dumping contents of the entire message buffer to the console.end_of_stream
detection to the layout CBM. Previously storage layer used a heuristic: old generations that return an empty batch won't produce more data. This is, obviously, incorrect: for example, bitfield-LTS layout MAY return empty batch while waiting for safe cutoff time.reference
layout has been enabled in prod build. It could be useful for integration testing.bitfield_lts:handle_event
callback that lead to missed safe cutoff time updates, and effectively, subscribers being unable to fetch messages until a fresh batch was published.PR Checklist
Please convert it to a draft if any of the following conditions are not met. Reviewers may skip over until all the items are checked:
changes/(ce|ee)/(feat|perf|fix|breaking)-<PR-id>.en.md
filesChecklist for CI (.github/workflows) changes
changes/
dir for user-facing artifacts update