-
Notifications
You must be signed in to change notification settings - Fork 454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
storage: commit to some ordering between partitions in UPSERT #26965
Comments
Noting it here too. This is a bug, there should be no discrepancies in the produced collection no matter how many restart points you insert. We need to store the mz timestamp in the upsert state and use that too when comparing updates |
@petrosagg for keys whose partitions are only-increasing, we ARE definite; handling keys whose partitions can go down and handling resumptions requires that we hold the partition of each message in upsert state, which can't be done (currently) unless the partition is part of the output relation, because during hydration we only have the output shard |
Indeed. I didn't claim there aren't cases we we are definite. We're also definite when we process an empty topic /s But in seriousness, given that there is a way to fix this, we should.
I don't think so. The timestamp associated with each key in upsert state should be the tuple |
A key changing the partition (right now, specifically Kafka partitions) is considered not supported by
UPSERT
sources, for good reason: there is no defined order between partitions, so the order that updates occur at is undefined.Previous behavior
Before #24663, if you DID create an
UPSERT
sources where a key's partition changed, we had the following behavior:This is a (afaiui) a non-definite Collection; depending on how the exact same upstream data is reclocked, we could end up with collections the accumulates differently. However, once the data is reclocked, things work out fine, and we are able to resume an
UPSERT
ingestion correctlyCurrent behavior
After #24663, we now always choose the update from the larger partition, with one caveat: If we are resuming an ingestion, updates after the resumption frontier are chosen over anything
This is definite in the steady-state, but resumption can cause arbitrary orders, depending on the exact moment of resumption.
Committing to a decision
I think we should commit the current behavior; while it is not strictly correct, for kafka sources (of kafka-like sources) where a key's partition CAN change, but only ever go increase, it is the ideal behavior.
The text was updated successfully, but these errors were encountered: