Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

persist-txn: unblock_read may incorrectly write to unregistered shards #27088

Open
jkosh44 opened this issue May 14, 2024 · 1 comment
Open
Assignees
Labels
C-bug Category: something is broken

Comments

@jkosh44
Copy link
Contributor

jkosh44 commented May 14, 2024

What version of Materialize are you using?

v0.99.0.dev

What is the issue?

DataSnapshot::unblock_read logically takes in three pieces of input:

  • Some timestamp, as_of.
  • The latest possibly unapplied write that is less than or equal to as_of, latest_write.
  • An upper bound on the times known to be empty of unapplied writes, empty_to (Note empty_to > as_of).

These are encapsulated in the DataSnapshot struct.

/// A token exchangeable for a data shard snapshot.
///
/// - Invariant: `latest_write <= as_of < empty_to`
/// - Invariant: `(latest_write, empty_to)` and `(as_of, empty_to)` have no
/// unapplied writes (which means we can do an empty CaA of those times if we
/// like).
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub struct DataSnapshot<T> {
/// The id of the data shard this snapshot is for.
pub(crate) data_id: ShardId,
/// The latest possibly unapplied write <= as_of. None if there are no
/// writes via txns or if they're all known to be applied.
pub(crate) latest_write: Option<T>,
/// The as_of asked for.
pub(crate) as_of: T,
/// An upper bound on the times known to be empty of unapplied writes via txns.
pub(crate) empty_to: T,
}

unblock_read allows a reader to read the data shard directly at as_of, by taking the following steps:

  1. Wait for the upper of the data shard to pass latest_write.
  2. Compare and append an empty write at empty_to.

/// Unblocks reading a snapshot at `self.as_of` by waiting for the latest
/// write before that time and then running an empty CaA if necessary.
///
/// Returns a frontier that is greater than the as_of and less_equal the
/// physical upper of the data shard. This is suitable for use as an initial
/// input to `TxnsCache::data_listen_next` (after reading up to it, of
/// course).
#[instrument(level = "debug", fields(shard = %self.data_id, ts = ?self.as_of))]
pub(crate) async fn unblock_read<K, V, D>(
&self,
mut data_write: WriteHandle<K, V, T, D>,
) -> Antichain<T>
where
K: Debug + Codec,
V: Debug + Codec,
D: Semigroup + Codec64 + Send + Sync,
{
debug!(
"unblock_read latest_write={:?} as_of={:?} for {:.9}",
self.latest_write,
self.as_of,
self.data_id.to_string()
);
// First block until the latest write has been applied.
if let Some(latest_write) = self.latest_write.as_ref() {
let () = data_write
.wait_for_upper_past(&Antichain::from_elem(latest_write.clone()))
.await;
}
// Now fill `(latest_write,as_of]` with empty updates, so we can read
// the shard at as_of normally. In practice, because CaA takes an
// exclusive upper, we actually fill `(latest_write, empty_to)`.
//
// It's quite counter-intuitive for reads to involve writes, but I think
// this is fine. In particular, because writing empty updates to a
// persist shard is a metadata-only operation. It might result in things
// like GC maintenance or a CRDB write, but this is also true for
// registering a reader. On the balance, I think this is a _much_ better
// set of tradeoffs than the original plan of trying to translate read
// timestamps to the most recent write and reading that.
let Some(mut data_upper) = data_write.shared_upper().into_option() else {
// If the upper is the empty antichain, then we've unblocked all
// possible reads. Return early.
debug!(
"CaA data snapshot {:.9} shard finalized",
self.data_id.to_string(),
);
return Antichain::new();
};
while data_upper < self.empty_to {
// It would be very bad if we accidentally filled any times <=
// latest_write with empty updates, so defensively assert on each
// iteration through the loop.
if let Some(latest_write) = self.latest_write.as_ref() {
assert!(latest_write < &data_upper);
}
assert!(self.as_of < self.empty_to);
let res = crate::small_caa(
|| format!("data {:.9} unblock reads", self.data_id.to_string()),
&mut data_write,
&[],
data_upper.clone(),
self.empty_to.clone(),
)
.await;
match res {
Ok(()) => {
// Persist registers writers on the first write, so politely
// expire the writer we just created, but (as a performance
// optimization) only if we actually wrote something.
data_write.expire().await;
break;
}
Err(new_data_upper) => {
data_upper = new_data_upper;
continue;
}
}
}
Antichain::from_elem(self.empty_to.clone())
}

The problem is that unblock_read never checks to see if the data shard is registered within the range [latest_write, empty_to). If it is unregistered, then it should not be writing to that shard. It's likely that there is some other process writing to the shard, which may be internally tracking the upper. It will then be surprised (panicked even) to see that the upper has moved out from under it.

@jkosh44 jkosh44 added the C-bug Category: something is broken label May 14, 2024
@jkosh44 jkosh44 self-assigned this May 14, 2024
jkosh44 added a commit to jkosh44/materialize that referenced this issue May 23, 2024
@jkosh44
Copy link
Contributor Author

jkosh44 commented May 23, 2024

I have a PoC for a fix of this (https://github.com/MaterializeInc/materialize/pull/27268/files), but there's a potential issue. Currently we have an invariant that calling apply_le(ts) is sufficient to unblock a read at time ts. In fact, this invariant is tested in multiple places:

// During code review, we discussed an alternate implementation of
// empty_to that was an Option: None when we knew about a write > the
// as_of, and Some when we didn't. The None case would mean that we
// don't need to CaA empty updates in. This is quite appealing, but
// would cause an issue with the guarantee that `apply_le(as_of)` is
// sufficient to unblock a read. Specifically:
//
// - Write at 3, but don't apply.
// - Write at 5, but don't apply.
// - Catch the cache up past the write at 5.
// - Run apply_le(4) to unblock a read a 4.
// - Run a snapshot at 4.
// - If nothing else applies the write at 5, the snapshot would
// deadlock.

// Run all of the following in a timeout to make hangs easier to debug.
tokio::time::timeout(Duration::from_secs(30), async {
info!("finished with max_ts of {}", max_ts);
txns.apply_le(&max_ts).await;
for data_id in data_ids {
info!("reading data shard {}", data_id);
log.assert_snapshot(data_id, max_ts)
.instrument(info_span!("read_data", data_id = format!("{:.9}", data_id)))
.await;
}
info!("now waiting for reads {}", max_ts);
for (tx, data_id, as_of, subscribe) in reads {
let _ = tx.send(max_ts + 1);
let output = subscribe.await.unwrap();
log.assert_eq(data_id, as_of, max_ts + 1, output);
}
})
.await
.unwrap();

However, fixing this issue removes that guarantee for unregistered shards. Calling apply_le(ts) is not sufficient to unblock a read at time ts. You also have to sit around and wait for someone to actually write to that shard at time ts. In my branch, the stress_correctness test times out waiting on reads to unregistered shards.

Perhaps the right approach is that part of the contract for unblock_reads should be that the caller is allowed to write to the shard directly at empty_to. This would be hard to enforce in practice because unblock_read is often called indirectly and the caller usually never looks at empty_to.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-bug Category: something is broken
Projects
None yet
Development

No branches or pull requests

1 participant