From c5461c1372341b9407004bd9965fee9313f0c49f Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 12 Dec 2025 18:11:48 -0500 Subject: [PATCH 1/8] Remove now-redundant background heartbeat task --- src/persist-client/src/internal/machine.rs | 50 +++------------------- src/persist-client/src/lib.rs | 7 +-- src/persist-client/src/read.rs | 11 ++--- 3 files changed, 13 insertions(+), 55 deletions(-) diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 33e5544ec2f39..12b01876730d7 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -1202,54 +1202,18 @@ where T: Timestamp + Lattice + Codec64 + Sync, D: Monoid + Codec64 + Send + Sync, { - #[allow(clippy::unused_async)] - pub async fn start_reader_heartbeat_tasks( + pub fn start_reader_heartbeat_task( self, reader_id: LeasedReaderId, gc: GarbageCollector, - ) -> Vec> { - let mut ret = Vec::new(); + ) -> JoinHandle<()> { let metrics = Arc::clone(&self.applier.metrics); - - // TODO: In response to a production incident, this runs the heartbeat - // task on both the in-context tokio runtime and persist's isolated - // runtime. We think we were seeing tasks (including this one) get stuck - // indefinitely in tokio while waiting for a runtime worker. This could - // happen if some other task in that runtime never yields. It's possible - // that one of the two runtimes is healthy while the other isn't (this - // was inconclusive in the incident debugging), and the heartbeat task - // is fairly lightweight, so run a copy in each in case that helps. - // - // The real fix here is to find the misbehaving task and fix it. Remove - // this duplication when that happens. let name = format!("persist::heartbeat_read({},{})", self.shard_id(), reader_id); - ret.push(mz_ore::task::spawn(|| name, { - let machine = self.clone(); - let reader_id = reader_id.clone(); - let gc = gc.clone(); - metrics - .tasks - .heartbeat_read - .instrument_task(Self::reader_heartbeat_task(machine, reader_id, gc)) - })); - - let isolated_runtime = Arc::clone(&self.isolated_runtime); - let name = format!( - "persist::heartbeat_read_isolated({},{})", - self.shard_id(), - reader_id - ); - ret.push( - isolated_runtime.spawn_named( - || name, - metrics - .tasks - .heartbeat_read - .instrument_task(Self::reader_heartbeat_task(self, reader_id, gc)), - ), - ); - - ret + mz_ore::task::spawn(|| name, { + metrics.tasks.heartbeat_read.instrument_task(async move { + Self::reader_heartbeat_task(self, reader_id, gc).await + }) + }) } async fn reader_heartbeat_task( diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 1ffe69be17230..d7209924ce02c 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -920,7 +920,6 @@ impl PersistClient { #[cfg(test)] mod tests { use std::future::Future; - use std::mem; use std::pin::Pin; use std::task::Context; use std::time::Duration; @@ -2007,14 +2006,12 @@ mod tests { .expect("client construction failed") .expect_open::<(), (), u64, i64>(ShardId::new()) .await; - let mut read_unexpired_state = read + let read_unexpired_state = read .unexpired_state .take() .expect("handle should have unexpired state"); read.expire().await; - for read_heartbeat_task in mem::take(&mut read_unexpired_state._heartbeat_tasks) { - let () = read_heartbeat_task.await; - } + read_unexpired_state._heartbeat_tasks.await } /// Verify that shard finalization works with empty shards, shards that have diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 21ee955db815a..d3d4fc330b557 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -580,12 +580,9 @@ where leased_seqnos: BTreeMap::new(), unexpired_state: Some(UnexpiredReadHandleState { expire_fn, - _heartbeat_tasks: machine - .start_reader_heartbeat_tasks(reader_id, gc) - .await - .into_iter() - .map(JoinHandle::abort_on_drop) - .collect(), + _heartbeat_tasks: JoinHandle::abort_on_drop( + machine.start_reader_heartbeat_task(reader_id, gc), + ), }), } } @@ -894,7 +891,7 @@ where #[derive(Debug)] pub(crate) struct UnexpiredReadHandleState { expire_fn: ExpireFn, - pub(crate) _heartbeat_tasks: Vec>, + pub(crate) _heartbeat_tasks: AbortOnDropHandle<()>, } /// An incremental cursor through a particular shard, returned from [ReadHandle::snapshot_cursor]. From ef6cfc7d45a648eda235dacf62d0abd0aec8a2a8 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 12 Dec 2025 13:20:58 -0500 Subject: [PATCH 2/8] Move the seqno-relevant data into a shared state --- src/persist-client/src/lib.rs | 2 +- src/persist-client/src/read.rs | 190 ++++++++++++++++++++++----------- 2 files changed, 128 insertions(+), 64 deletions(-) diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index d7209924ce02c..d335717368ecd 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -364,7 +364,7 @@ impl PersistClient { Arc::clone(&self.blob), reader_id, schemas, - reader_state.since, + reader_state, heartbeat_ts, ) .await; diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index d3d4fc330b557..04e174fa3ecd5 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -10,17 +10,15 @@ //! Read capabilities and handles use async_stream::stream; -use std::backtrace::Backtrace; use std::collections::BTreeMap; -use std::fmt::Debug; +use std::fmt::{Debug, Formatter}; use std::future::Future; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::Duration; use differential_dataflow::consolidation::consolidate_updates; use differential_dataflow::difference::Monoid; use differential_dataflow::lattice::Lattice; -use differential_dataflow::trace::Description; use futures::Stream; use futures_util::{StreamExt, stream}; use mz_dyncfg::Config; @@ -37,6 +35,7 @@ use timely::PartialOrder; use timely::order::TotalOrder; use timely::progress::{Antichain, Timestamp}; use tokio::runtime::Handle; +use tokio::sync::Notify; use tracing::{Instrument, debug_span, warn}; use uuid::Uuid; @@ -47,7 +46,7 @@ use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_ use crate::internal::encoding::Schemas; use crate::internal::machine::{ExpireFn, Machine}; use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics}; -use crate::internal::state::{BatchPart, HollowBatch}; +use crate::internal::state::{HollowBatch, LeasedReaderState}; use crate::internal::watch::StateWatch; use crate::iter::{Consolidator, StructuredSort}; use crate::schema::SchemaCache; @@ -501,6 +500,104 @@ where } } +pub(crate) struct AwaitableState { + state: Arc>, + /// NB: we can't wrap the [Notify] in the lock since the signature of [Notify::notified] + /// doesn't allow it, but this is only accessed while holding the lock. + notify: Arc, +} + +impl Debug for AwaitableState { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.state.read().fmt(f) + } +} + +impl Clone for AwaitableState { + fn clone(&self) -> Self { + Self { + state: Arc::clone(&self.state), + notify: Arc::clone(&self.notify), + } + } +} + +impl AwaitableState { + pub fn new(value: T) -> Self { + Self { + state: Arc::new(RwLock::new(value)), + notify: Arc::new(Notify::new()), + } + } + + pub fn read(&self, read_fn: impl FnOnce(&T) -> A) -> A { + let guard = self.state.read().expect("not poisoned"); + let state = &*guard; + read_fn(state) + } + + pub fn modify(&self, write_fn: impl FnOnce(&mut T) -> A) -> A { + let mut guard = self.state.write().expect("not poisoned"); + let state = &mut *guard; + let result = write_fn(state); + // Notify everyone while holding the guard. This guarantees that all waiters will observe + // the just-updated state. + self.notify.notify_waiters(); + result + } + + pub async fn wait_for(&self, mut wait_fn: impl FnMut(&T) -> Option) -> A { + loop { + let notified = { + let guard = self.state.read().expect("not poisoned"); + let state = &*guard; + if let Some(result) = wait_fn(state) { + return result; + } + // Grab the notified future while holding the guard. This ensures that we will see any + // future modifications to this state, even if they happen before the first poll. + self.notify.notified() + }; + + notified.await; + } + } +} + +#[derive(Debug)] +pub(crate) struct LeaseMetadata { + recent_seqno: SeqNo, + /// The set of active leases. We hold back the seqno to the minimum lease or + /// the recent_seqno, whichever is earlier. + leases: BTreeMap, +} + +impl LeaseMetadata { + pub fn observe_seqno(&mut self, seqno: SeqNo) { + self.recent_seqno = seqno.max(self.recent_seqno); + } + + pub fn lease_seqno(&mut self) -> Lease { + let seqno = self.recent_seqno; + let lease = self + .leases + .entry(seqno) + .or_insert_with(|| Lease::new(seqno)); + lease.clone() + } + + pub fn outstanding_seqno(&mut self) -> Option { + while let Some(first) = self.leases.first_entry() { + if first.get().count() <= 1 { + first.remove(); + } else { + return Some(*first.key()); + } + } + None + } +} + /// A "capability" granting the ability to read the state of some shard at times /// greater or equal to `self.since()`. /// @@ -534,7 +631,7 @@ pub struct ReadHandle { since: Antichain, pub(crate) last_heartbeat: EpochMillis, - pub(crate) leased_seqnos: BTreeMap, + pub(crate) leased_seqnos: AwaitableState, pub(crate) unexpired_state: Option, } @@ -561,11 +658,15 @@ where blob: Arc, reader_id: LeasedReaderId, read_schemas: Schemas, - since: Antichain, + state: LeasedReaderState, last_heartbeat: EpochMillis, ) -> Self { let schema_cache = machine.applier.schema_cache(); let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), reader_id.clone()); + let leased_seqnos = AwaitableState::new(LeaseMetadata { + recent_seqno: state.seqno, + leases: Default::default(), + }); ReadHandle { cfg, metrics: Arc::clone(&metrics), @@ -575,9 +676,9 @@ where reader_id: reader_id.clone(), read_schemas, schema_cache, - since, + since: state.since, last_heartbeat, - leased_seqnos: BTreeMap::new(), + leased_seqnos: leased_seqnos.clone(), unexpired_state: Some(UnexpiredReadHandleState { expire_fn, _heartbeat_tasks: JoinHandle::abort_on_drop( @@ -600,14 +701,7 @@ where } fn outstanding_seqno(&mut self) -> Option { - while let Some(first) = self.leased_seqnos.first_entry() { - if first.get().count() <= 1 { - first.remove(); - } else { - return Some(*first.key()); - } - } - None + self.leased_seqnos.modify(|s| s.outstanding_seqno()) } /// Forwards the since frontier of this handle, giving up the ability to @@ -631,27 +725,6 @@ where .downgrade_since(&self.reader_id, outstanding_seqno, new_since, heartbeat_ts) .await; - // Debugging for database-issues#4590. - if let Some(outstanding_seqno) = outstanding_seqno { - let seqnos_held = _seqno.0.saturating_sub(outstanding_seqno.0); - // We get just over 1 seqno-per-second on average for a shard in - // prod, so this is about an hour. - const SEQNOS_HELD_THRESHOLD: u64 = 60 * 60; - if seqnos_held >= SEQNOS_HELD_THRESHOLD { - tracing::info!( - "{} reader {} holding an unexpected number of seqnos {} vs {}: {:?}. bt: {:?}", - self.machine.shard_id(), - self.reader_id, - outstanding_seqno, - _seqno, - self.leased_seqnos.keys().take(10).collect::>(), - // The Debug impl of backtrace is less aesthetic, but will put the trace - // on a single line and play more nicely with our Honeycomb quota - Backtrace::force_capture(), - ); - } - } - self.since = current_reader_since.0; // A heartbeat is just any downgrade_since traffic, so update the // internal rate limiter here to play nicely with `maybe_heartbeat`. @@ -745,23 +818,6 @@ where Ok(Subscribe::new(snapshot_parts, listen)) } - fn lease_batch_part( - &mut self, - desc: Description, - part: BatchPart, - filter: FetchBatchFilter, - ) -> LeasedBatchPart { - LeasedBatchPart { - metrics: Arc::clone(&self.metrics), - shard_id: self.machine.shard_id(), - filter, - desc, - part, - lease: self.lease_seqno(), - filter_pushdown_audit: false, - } - } - fn lease_batch_parts( &mut self, batch: HollowBatch, @@ -771,8 +827,17 @@ where let blob = Arc::clone(&self.blob); let metrics = Arc::clone(&self.metrics); let desc = batch.desc.clone(); + let lease = self.lease_seqno(); for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) { - yield self.lease_batch_part(desc.clone(), part.expect("leased part").into_owned(), filter.clone()) + yield LeasedBatchPart { + metrics: Arc::clone(&self.metrics), + shard_id: self.machine.shard_id(), + filter: filter.clone(), + desc: desc.clone(), + part: part.expect("leased part").into_owned(), + lease: lease.clone(), + filter_pushdown_audit: false, + } } } } @@ -781,12 +846,11 @@ where /// "leased out" to a `LeasedBatchPart`, and cannot be garbage /// collected until its lease has been returned. fn lease_seqno(&mut self) -> Lease { - let seqno = self.machine.seqno(); - let lease = self - .leased_seqnos - .entry(seqno) - .or_insert_with(|| Lease::new(seqno)); - lease.clone() + let current_seqno = self.machine.seqno(); + self.leased_seqnos.modify(|s| { + s.observe_seqno(current_seqno); + s.lease_seqno() + }) } /// Returns an independent [ReadHandle] with a new [LeasedReaderId] but the @@ -819,7 +883,7 @@ where Arc::clone(&self.blob), new_reader_id, self.read_schemas.clone(), - reader_state.since, + reader_state, heartbeat_ts, ) .await; From c37bf5b892f4e96f206eb15fb3ba95d62ef96f42 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 19 Dec 2025 16:58:14 -0500 Subject: [PATCH 3/8] Observe new seqnos in both the handle and the heartbeat thread --- src/persist-client/src/fetch.rs | 1 - src/persist-client/src/internal/machine.rs | 9 ++++-- src/persist-client/src/read.rs | 34 +++++++++++++--------- 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs index 574ec8a7f83f0..35ef6800bc8c4 100644 --- a/src/persist-client/src/fetch.rs +++ b/src/persist-client/src/fetch.rs @@ -472,7 +472,6 @@ impl Lease { } /// Returns the inner [SeqNo] of this [Lease]. - #[cfg(test)] pub fn seqno(&self) -> SeqNo { *self.0 } diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 12b01876730d7..4ffada05d700c 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -54,7 +54,7 @@ use crate::internal::state::{ use crate::internal::state_versions::StateVersions; use crate::internal::trace::{ApplyMergeResult, FueledMergeRes}; use crate::internal::watch::StateWatch; -use crate::read::{LeasedReaderId, READER_LEASE_DURATION}; +use crate::read::{AwaitableState, LeaseMetadata, LeasedReaderId, READER_LEASE_DURATION}; use crate::rpc::PubSubSender; use crate::schema::CaESchema; use crate::write::WriterId; @@ -1206,12 +1206,13 @@ where self, reader_id: LeasedReaderId, gc: GarbageCollector, + leased_seqnos: AwaitableState, ) -> JoinHandle<()> { let metrics = Arc::clone(&self.applier.metrics); let name = format!("persist::heartbeat_read({},{})", self.shard_id(), reader_id); mz_ore::task::spawn(|| name, { metrics.tasks.heartbeat_read.instrument_task(async move { - Self::reader_heartbeat_task(self, reader_id, gc).await + Self::reader_heartbeat_task(self, reader_id, gc, leased_seqnos).await }) }) } @@ -1220,6 +1221,7 @@ where machine: Self, reader_id: LeasedReaderId, gc: GarbageCollector, + leased_seqnos: AwaitableState, ) { let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 2; loop { @@ -1237,9 +1239,10 @@ where } let before_heartbeat = Instant::now(); - let (_seqno, existed, maintenance) = machine + let (seqno, existed, maintenance) = machine .heartbeat_leased_reader(&reader_id, (machine.applier.cfg.now)()) .await; + leased_seqnos.modify(|s| s.observe_seqno(seqno)); maintenance.start_performing(&machine, &gc); let elapsed_since_heartbeat = before_heartbeat.elapsed(); diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 04e174fa3ecd5..f0af8737b1e47 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -227,8 +227,6 @@ pub enum ListenEvent { #[derive(Debug)] pub struct Listen { handle: ReadHandle, - watch: StateWatch, - as_of: Antichain, since: Antichain, frontier: Antichain, @@ -257,11 +255,8 @@ where // (initially as_of although the frontier is inclusive and the as_of // isn't). Be a good citizen and downgrade early. handle.downgrade_since(&since).await; - - let watch = handle.machine.applier.watch(); Ok(Listen { handle, - watch, since, frontier: as_of.clone(), as_of, @@ -289,7 +284,7 @@ where let min_elapsed = self.handle.heartbeat_duration(); let next_batch = self.handle.machine.next_listen_batch( &self.frontier, - &mut self.watch, + &mut self.handle.watch, Some(&self.handle.reader_id), retry, ); @@ -625,6 +620,8 @@ pub struct ReadHandle { pub(crate) machine: Machine, pub(crate) gc: GarbageCollector, pub(crate) blob: Arc, + watch: StateWatch, + pub(crate) reader_id: LeasedReaderId, pub(crate) read_schemas: Schemas, pub(crate) schema_cache: SchemaCache, @@ -673,6 +670,7 @@ where machine: machine.clone(), gc: gc.clone(), blob, + watch: machine.applier.watch(), reader_id: reader_id.clone(), read_schemas, schema_cache, @@ -681,9 +679,11 @@ where leased_seqnos: leased_seqnos.clone(), unexpired_state: Some(UnexpiredReadHandleState { expire_fn, - _heartbeat_tasks: JoinHandle::abort_on_drop( - machine.start_reader_heartbeat_task(reader_id, gc), - ), + _heartbeat_tasks: JoinHandle::abort_on_drop(machine.start_reader_heartbeat_task( + reader_id, + gc, + leased_seqnos, + )), }), } } @@ -827,7 +827,7 @@ where let blob = Arc::clone(&self.blob); let metrics = Arc::clone(&self.metrics); let desc = batch.desc.clone(); - let lease = self.lease_seqno(); + let lease = self.lease_seqno().await; for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) { yield LeasedBatchPart { metrics: Arc::clone(&self.metrics), @@ -845,12 +845,18 @@ where /// Tracks that the `ReadHandle`'s machine's current `SeqNo` is being /// "leased out" to a `LeasedBatchPart`, and cannot be garbage /// collected until its lease has been returned. - fn lease_seqno(&mut self) -> Lease { + async fn lease_seqno(&mut self) -> Lease { let current_seqno = self.machine.seqno(); - self.leased_seqnos.modify(|s| { + let lease = self.leased_seqnos.modify(|s| { s.observe_seqno(current_seqno); s.lease_seqno() - }) + }); + // The seqno we've leased may be the seqno observed by our heartbeat task, which could be + // ahead of the last state we saw. Ensure we only observe states in the future of our hold. + // (Since these are backed by the same state in the same process, this should all be pretty + // fast.) + self.watch.wait_for_seqno_ge(lease.seqno()).await; + lease } /// Returns an independent [ReadHandle] with a new [LeasedReaderId] but the @@ -1083,7 +1089,7 @@ where should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool, ) -> Result, Since> { let batches = self.machine.snapshot(&as_of).await?; - let lease = self.lease_seqno(); + let lease = self.lease_seqno().await; Self::read_batches_consolidated( &self.cfg, From a96044db38a7106236b796e92834ea5ea9dfda81 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 19 Dec 2025 17:08:58 -0500 Subject: [PATCH 4/8] Always downgrade to the shared seqno Should be pretty recent! --- src/persist-client/src/internal/machine.rs | 4 +-- src/persist-client/src/internal/state.rs | 35 ++++++++++------------ src/persist-client/src/read.rs | 16 ++++++---- 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 4ffada05d700c..da9888c818a74 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -620,7 +620,7 @@ where pub async fn downgrade_since( &self, reader_id: &LeasedReaderId, - outstanding_seqno: Option, + outstanding_seqno: SeqNo, new_since: &Antichain, heartbeat_timestamp_ms: u64, ) -> (SeqNo, Since, RoutineMaintenance) { @@ -1615,7 +1615,7 @@ pub mod datadriven { args: DirectiveArgs<'_>, ) -> Result { let since = args.expect_antichain("since"); - let seqno = args.optional("seqno"); + let seqno = args.optional("seqno").unwrap_or(datadriven.machine.seqno()); let reader_id = args.expect("reader_id"); let (_, since, routine) = datadriven .machine diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index 193982966d80a..626d6fcbf03f0 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -1877,7 +1877,7 @@ where &mut self, reader_id: &LeasedReaderId, seqno: SeqNo, - outstanding_seqno: Option, + outstanding_seqno: SeqNo, new_since: &Antichain, heartbeat_timestamp_ms: u64, ) -> ControlFlow>, Since> { @@ -1905,18 +1905,15 @@ where reader_state.last_heartbeat_timestamp_ms, ); - let seqno = match outstanding_seqno { - Some(outstanding_seqno) => { - assert!( - outstanding_seqno >= reader_state.seqno, - "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \ + let seqno = { + assert!( + outstanding_seqno >= reader_state.seqno, + "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \ is behind current reader_state ({:?})", - outstanding_seqno, - reader_state.seqno, - ); - std::cmp::min(outstanding_seqno, seqno) - } - None => seqno, + outstanding_seqno, + reader_state.seqno, + ); + std::cmp::min(outstanding_seqno, seqno) }; reader_state.seqno = seqno; @@ -3231,7 +3228,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, seqno, - None, + seqno, &Antichain::from_elem(2), now() ), @@ -3243,7 +3240,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, seqno, - None, + seqno, &Antichain::from_elem(2), now() ), @@ -3255,7 +3252,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, seqno, - None, + seqno, &Antichain::from_elem(1), now() ), @@ -3280,7 +3277,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader2, seqno, - None, + seqno, &Antichain::from_elem(3), now() ), @@ -3292,7 +3289,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, seqno, - None, + seqno, &Antichain::from_elem(5), now() ), @@ -3324,7 +3321,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader3, seqno, - None, + seqno, &Antichain::from_elem(10), now() ), @@ -3624,7 +3621,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, SeqNo::minimum(), - None, + SeqNo::minimum(), &Antichain::from_elem(2), now() ), diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index f0af8737b1e47..2f80efc45393d 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -581,15 +581,15 @@ impl LeaseMetadata { lease.clone() } - pub fn outstanding_seqno(&mut self) -> Option { + pub fn outstanding_seqno(&mut self) -> SeqNo { while let Some(first) = self.leases.first_entry() { if first.get().count() <= 1 { first.remove(); } else { - return Some(*first.key()); + return *first.key(); } } - None + self.recent_seqno } } @@ -700,8 +700,12 @@ where &self.since } - fn outstanding_seqno(&mut self) -> Option { - self.leased_seqnos.modify(|s| s.outstanding_seqno()) + fn outstanding_seqno(&mut self) -> SeqNo { + let current_seqno = self.machine.seqno(); + self.leased_seqnos.modify(|s| { + s.observe_seqno(current_seqno); + s.outstanding_seqno() + }) } /// Forwards the since frontier of this handle, giving up the ability to @@ -1611,7 +1615,7 @@ mod tests { // We should expect the SeqNo to be downgraded if this part's SeqNo // is no longer leased to any other parts, either. - let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > Some(part_seqno); + let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > part_seqno; let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since(); if expect_downgrade { From 62d498f61295981254d03f647efaebfd2b5bacd3 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 19 Dec 2025 17:28:11 -0500 Subject: [PATCH 5/8] Use the main downgrade-since method in the background thread --- src/persist-client/src/internal/datadriven.rs | 3 -- src/persist-client/src/internal/machine.rs | 45 ++++++------------- src/persist-client/src/internal/metrics.rs | 2 - src/persist-client/src/internal/state.rs | 27 ----------- src/persist-client/src/read.rs | 18 ++++++-- 5 files changed, 28 insertions(+), 67 deletions(-) diff --git a/src/persist-client/src/internal/datadriven.rs b/src/persist-client/src/internal/datadriven.rs index 99fccd6cbf7f8..37f40072cb973 100644 --- a/src/persist-client/src/internal/datadriven.rs +++ b/src/persist-client/src/internal/datadriven.rs @@ -245,9 +245,6 @@ mod tests { "fetch-batch" => machine_dd::fetch_batch(&state, args).await, "finalize" => machine_dd::finalize(&mut state, args).await, "gc" => machine_dd::gc(&mut state, args).await, - "heartbeat-leased-reader" => { - machine_dd::heartbeat_leased_reader(&state, args).await - } "is-finalized" => machine_dd::is_finalized(&state, args), "listen-through" => { machine_dd::listen_through(&mut state, args).await diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index da9888c818a74..20244c5662d03 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -31,6 +31,7 @@ use mz_persist_types::schema::SchemaId; use mz_persist_types::{Codec, Codec64, Opaque}; use semver::Version; use timely::PartialOrder; +use timely::order::TotalOrder; use timely::progress::{Antichain, Timestamp}; use tracing::{Instrument, debug, info, trace_span, warn}; @@ -663,20 +664,6 @@ where } } - pub async fn heartbeat_leased_reader( - &self, - reader_id: &LeasedReaderId, - heartbeat_timestamp_ms: u64, - ) -> (SeqNo, bool, RoutineMaintenance) { - let metrics = Arc::clone(&self.applier.metrics); - let (seqno, existed, maintenance) = self - .apply_unbatched_idempotent_cmd(&metrics.cmds.heartbeat_reader, |_, _, state| { - state.heartbeat_leased_reader(reader_id, heartbeat_timestamp_ms) - }) - .await; - (seqno, existed, maintenance) - } - pub async fn expire_leased_reader( &self, reader_id: &LeasedReaderId, @@ -1199,14 +1186,14 @@ impl Machine where K: Debug + Codec, V: Debug + Codec, - T: Timestamp + Lattice + Codec64 + Sync, + T: Timestamp + Lattice + TotalOrder + Codec64 + Sync, D: Monoid + Codec64 + Send + Sync, { pub fn start_reader_heartbeat_task( self, reader_id: LeasedReaderId, gc: GarbageCollector, - leased_seqnos: AwaitableState, + leased_seqnos: AwaitableState>, ) -> JoinHandle<()> { let metrics = Arc::clone(&self.applier.metrics); let name = format!("persist::heartbeat_read({},{})", self.shard_id(), reader_id); @@ -1221,7 +1208,7 @@ where machine: Self, reader_id: LeasedReaderId, gc: GarbageCollector, - leased_seqnos: AwaitableState, + leased_seqnos: AwaitableState>, ) { let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 2; loop { @@ -1239,8 +1226,14 @@ where } let before_heartbeat = Instant::now(); - let (seqno, existed, maintenance) = machine - .heartbeat_leased_reader(&reader_id, (machine.applier.cfg.now)()) + let heartbeat_ms = (machine.applier.cfg.now)(); + let current_seqno = machine.seqno(); + let (outstanding_seqno, new_since) = leased_seqnos.modify(|s| { + s.observe_seqno(current_seqno); + (s.outstanding_seqno(), s.since().clone()) + }); + let (seqno, actual_since, maintenance) = machine + .downgrade_since(&reader_id, outstanding_seqno, &new_since, heartbeat_ms) .await; leased_seqnos.modify(|s| s.observe_seqno(seqno)); maintenance.start_performing(&machine, &gc); @@ -1255,7 +1248,7 @@ where ); } - if !existed { + if actual_since.0.is_empty() { // If the read handle was intentionally expired, this task // *should* be aborted before it observes the expiration. So if // we get here, this task somehow failed to keep the read lease @@ -2318,18 +2311,6 @@ pub mod datadriven { )) } - pub async fn heartbeat_leased_reader( - datadriven: &MachineState, - args: DirectiveArgs<'_>, - ) -> Result { - let reader_id = args.expect("reader_id"); - let _ = datadriven - .machine - .heartbeat_leased_reader(&reader_id, (datadriven.client.cfg.now)()) - .await; - Ok(format!("{} ok\n", datadriven.machine.seqno())) - } - pub async fn expire_critical_reader( datadriven: &mut MachineState, args: DirectiveArgs<'_>, diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index fc8f570b81c58..3a9500eae508d 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -421,7 +421,6 @@ impl MetricsVecs { )), compare_and_downgrade_since: self.cmd_metrics("compare_and_downgrade_since"), downgrade_since: self.cmd_metrics("downgrade_since"), - heartbeat_reader: self.cmd_metrics("heartbeat_reader"), expire_reader: self.cmd_metrics("expire_reader"), expire_writer: self.cmd_metrics("expire_writer"), merge_res: self.cmd_metrics("merge_res"), @@ -630,7 +629,6 @@ pub struct CmdsMetrics { pub(crate) compare_and_append_noop: IntCounter, pub(crate) compare_and_downgrade_since: CmdMetrics, pub(crate) downgrade_since: CmdMetrics, - pub(crate) heartbeat_reader: CmdMetrics, pub(crate) expire_reader: CmdMetrics, pub(crate) expire_writer: CmdMetrics, pub(crate) merge_res: CmdMetrics, diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index 626d6fcbf03f0..1105ceec58e75 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -1976,33 +1976,6 @@ where } } - pub fn heartbeat_leased_reader( - &mut self, - reader_id: &LeasedReaderId, - heartbeat_timestamp_ms: u64, - ) -> ControlFlow, bool> { - // We expire all readers if the upper and since both advance to the - // empty antichain. Gracefully handle this. At the same time, - // short-circuit the cmd application so we don't needlessly create new - // SeqNos. - if self.is_tombstone() { - return Break(NoOpStateTransition(false)); - } - - match self.leased_readers.get_mut(reader_id) { - Some(reader_state) => { - reader_state.last_heartbeat_timestamp_ms = std::cmp::max( - heartbeat_timestamp_ms, - reader_state.last_heartbeat_timestamp_ms, - ); - Continue(true) - } - // No-op, but we still commit the state change so that this gets - // linearized (maybe we're looking at old state). - None => Continue(false), - } - } - pub fn expire_leased_reader( &mut self, reader_id: &LeasedReaderId, diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 2f80efc45393d..2e9c4f97168ec 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -560,14 +560,23 @@ impl AwaitableState { } #[derive(Debug)] -pub(crate) struct LeaseMetadata { +pub(crate) struct LeaseMetadata { + /// The frontier we should hold the time-based lease back to. + held_since: Antichain, recent_seqno: SeqNo, /// The set of active leases. We hold back the seqno to the minimum lease or /// the recent_seqno, whichever is earlier. leases: BTreeMap, } -impl LeaseMetadata { +impl LeaseMetadata +where + T: Timestamp + TotalOrder + Lattice + Codec64 + Sync, +{ + pub fn downgrade_since(&mut self, since: &Antichain) { + self.held_since.join_assign(since); + } + pub fn observe_seqno(&mut self, seqno: SeqNo) { self.recent_seqno = seqno.max(self.recent_seqno); } @@ -628,7 +637,7 @@ pub struct ReadHandle { since: Antichain, pub(crate) last_heartbeat: EpochMillis, - pub(crate) leased_seqnos: AwaitableState, + pub(crate) leased_seqnos: AwaitableState>, pub(crate) unexpired_state: Option, } @@ -661,6 +670,7 @@ where let schema_cache = machine.applier.schema_cache(); let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), reader_id.clone()); let leased_seqnos = AwaitableState::new(LeaseMetadata { + held_since: state.since.clone(), recent_seqno: state.seqno, leases: Default::default(), }); @@ -730,6 +740,8 @@ where .await; self.since = current_reader_since.0; + self.leased_seqnos + .modify(|s| s.downgrade_since(&self.since)); // A heartbeat is just any downgrade_since traffic, so update the // internal rate limiter here to play nicely with `maybe_heartbeat`. self.last_heartbeat = heartbeat_ts; From e036f6a4135f7287d0a0ef3bcd8030ce937e39cb Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 19 Dec 2025 18:17:30 -0500 Subject: [PATCH 6/8] Do all downgrades in the background task --- src/persist-client/src/internal/machine.rs | 22 +++++++-- src/persist-client/src/lib.rs | 1 - src/persist-client/src/read.rs | 55 +++++++++++----------- 3 files changed, 47 insertions(+), 31 deletions(-) diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 20244c5662d03..1c657252d5711 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -14,12 +14,13 @@ use std::ops::ControlFlow::{self, Break, Continue}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; +use differential_dataflow::Hashable; use differential_dataflow::difference::Monoid; use differential_dataflow::lattice::Lattice; use futures::FutureExt; use futures::future::{self, BoxFuture}; use mz_dyncfg::{Config, ConfigSet}; -use mz_ore::cast::CastFrom; +use mz_ore::cast::{CastFrom, CastLossy}; use mz_ore::error::ErrorExt; #[allow(unused_imports)] // False positive. use mz_ore::fmt::FormatBuffer; @@ -1210,10 +1211,24 @@ where gc: GarbageCollector, leased_seqnos: AwaitableState>, ) { - let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 2; + let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 4; + // Jitter the first tick to avoid a thundering herd when many readers are started around + // the same instant, like during deploys. + let jitter: f64 = f64::cast_lossy(reader_id.hashed()) / f64::cast_lossy(u64::MAX); + let mut interval = tokio::time::interval_at( + tokio::time::Instant::now() + sleep_duration.mul_f64(jitter), + sleep_duration, + ); loop { let before_sleep = Instant::now(); - tokio::time::sleep(sleep_duration).await; + let _woke_by_tick = tokio::select! { + _tick = interval.tick() => { + true + } + _whatever = leased_seqnos.wait_while(|s| !s.request_sync) => { + false + } + }; let elapsed_since_before_sleeping = before_sleep.elapsed(); if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) { @@ -1230,6 +1245,7 @@ where let current_seqno = machine.seqno(); let (outstanding_seqno, new_since) = leased_seqnos.modify(|s| { s.observe_seqno(current_seqno); + s.request_sync = false; (s.outstanding_seqno(), s.since().clone()) }); let (seqno, actual_since, maintenance) = machine diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index d335717368ecd..428a79cf7b352 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -365,7 +365,6 @@ impl PersistClient { reader_id, schemas, reader_state, - heartbeat_ts, ) .await; diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 2e9c4f97168ec..99dac9137c4ec 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -24,7 +24,6 @@ use futures_util::{StreamExt, stream}; use mz_dyncfg::Config; use mz_ore::halt; use mz_ore::instrument; -use mz_ore::now::EpochMillis; use mz_ore::task::{AbortOnDropHandle, JoinHandle, RuntimeExt}; use mz_persist::location::{Blob, SeqNo}; use mz_persist_types::columnar::{ColumnDecoder, Schema}; @@ -495,6 +494,11 @@ where } } +/// A concurrent state - one which allows reading, writing, and waiting for changes made by +/// another concurrent writer. +/// +/// This is morally similar to a mutex with a condvar, but allowing asynchronous waits and with +/// access methods that make it a little trickier to accidentally hold a lock across a yield point. pub(crate) struct AwaitableState { state: Arc>, /// NB: we can't wrap the [Notify] in the lock since the signature of [Notify::notified] @@ -525,6 +529,7 @@ impl AwaitableState { } } + #[allow(unused)] pub fn read(&self, read_fn: impl FnOnce(&T) -> A) -> A { let guard = self.state.read().expect("not poisoned"); let state = &*guard; @@ -551,12 +556,18 @@ impl AwaitableState { } // Grab the notified future while holding the guard. This ensures that we will see any // future modifications to this state, even if they happen before the first poll. - self.notify.notified() + let notified = self.notify.notified(); + drop(guard); + notified }; notified.await; } } + + pub async fn wait_while(&self, mut wait_fn: impl FnMut(&T) -> bool) { + self.wait_for(|s| (!wait_fn(s)).then_some(())).await + } } #[derive(Debug)] @@ -567,6 +578,7 @@ pub(crate) struct LeaseMetadata { /// The set of active leases. We hold back the seqno to the minimum lease or /// the recent_seqno, whichever is earlier. leases: BTreeMap, + pub request_sync: bool, } impl LeaseMetadata @@ -636,7 +648,6 @@ pub struct ReadHandle { pub(crate) schema_cache: SchemaCache, since: Antichain, - pub(crate) last_heartbeat: EpochMillis, pub(crate) leased_seqnos: AwaitableState>, pub(crate) unexpired_state: Option, } @@ -665,7 +676,6 @@ where reader_id: LeasedReaderId, read_schemas: Schemas, state: LeasedReaderState, - last_heartbeat: EpochMillis, ) -> Self { let schema_cache = machine.applier.schema_cache(); let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), reader_id.clone()); @@ -673,6 +683,7 @@ where held_since: state.since.clone(), recent_seqno: state.seqno, leases: Default::default(), + request_sync: false, }); ReadHandle { cfg, @@ -685,7 +696,6 @@ where read_schemas, schema_cache, since: state.since, - last_heartbeat, leased_seqnos: leased_seqnos.clone(), unexpired_state: Some(UnexpiredReadHandleState { expire_fn, @@ -710,6 +720,7 @@ where &self.since } + #[cfg(test)] fn outstanding_seqno(&mut self) -> SeqNo { let current_seqno = self.machine.seqno(); self.leased_seqnos.modify(|s| { @@ -730,22 +741,14 @@ where /// timestamp, making the call a no-op). #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))] pub async fn downgrade_since(&mut self, new_since: &Antichain) { - // Guaranteed to be the smallest/oldest outstanding lease on a `SeqNo`. - let outstanding_seqno = self.outstanding_seqno(); - - let heartbeat_ts = (self.cfg.now)(); - let (_seqno, current_reader_since, maintenance) = self - .machine - .downgrade_since(&self.reader_id, outstanding_seqno, new_since, heartbeat_ts) - .await; - - self.since = current_reader_since.0; + self.since = new_since.clone(); + self.leased_seqnos.modify(|s| { + s.downgrade_since(new_since); + s.request_sync = true; + }); self.leased_seqnos - .modify(|s| s.downgrade_since(&self.since)); - // A heartbeat is just any downgrade_since traffic, so update the - // internal rate limiter here to play nicely with `maybe_heartbeat`. - self.last_heartbeat = heartbeat_ts; - maintenance.start_performing(&self.machine, &self.gc); + .wait_while(|s| PartialOrder::less_than(&s.held_since, new_since)) + .await; } /// Returns an ongoing subscription of updates to a shard. @@ -906,7 +909,6 @@ where new_reader_id, self.read_schemas.clone(), reader_state, - heartbeat_ts, ) .await; new_reader @@ -921,13 +923,12 @@ where /// This is an internally rate limited helper, designed to allow users to /// call it as frequently as they like. Call this or [Self::downgrade_since], /// on some interval that is "frequent" compared to the read lease duration. + #[allow(clippy::unused_async)] pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain) { - let min_elapsed = self.heartbeat_duration(); - let elapsed_since_last_heartbeat = - Duration::from_millis((self.cfg.now)().saturating_sub(self.last_heartbeat)); - if elapsed_since_last_heartbeat >= min_elapsed { - self.downgrade_since(new_since).await; - } + self.since = new_since.clone(); + self.leased_seqnos.modify(|s| { + s.downgrade_since(new_since); + }); } /// Politely expires this reader, releasing its lease. From 3fee16c0675bceb10d5d86bdd294444f76f5ea0d Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Mon, 22 Dec 2025 17:39:09 -0500 Subject: [PATCH 7/8] Shift expiry to the background task --- src/persist-client/src/internal/machine.rs | 42 +++++++++---- src/persist-client/src/lib.rs | 2 +- src/persist-client/src/read.rs | 73 +++++----------------- 3 files changed, 47 insertions(+), 70 deletions(-) diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 1c657252d5711..3109ed5c55314 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -1219,6 +1219,7 @@ where tokio::time::Instant::now() + sleep_duration.mul_f64(jitter), sleep_duration, ); + let mut held_since = leased_seqnos.read(|s| s.since().clone()); loop { let before_sleep = Instant::now(); let _woke_by_tick = tokio::select! { @@ -1243,16 +1244,32 @@ where let before_heartbeat = Instant::now(); let heartbeat_ms = (machine.applier.cfg.now)(); let current_seqno = machine.seqno(); - let (outstanding_seqno, new_since) = leased_seqnos.modify(|s| { - s.observe_seqno(current_seqno); - s.request_sync = false; - (s.outstanding_seqno(), s.since().clone()) + let result = leased_seqnos.modify(|s| { + if s.expired { + Err(()) + } else { + s.observe_seqno(current_seqno); + s.request_sync = false; + held_since.join_assign(s.since()); + Ok(s.outstanding_seqno()) + } }); - let (seqno, actual_since, maintenance) = machine - .downgrade_since(&reader_id, outstanding_seqno, &new_since, heartbeat_ms) - .await; - leased_seqnos.modify(|s| s.observe_seqno(seqno)); - maintenance.start_performing(&machine, &gc); + let actual_since = match result { + Ok(held_seqno) => { + let (seqno, actual_since, maintenance) = machine + .downgrade_since(&reader_id, held_seqno, &held_since, heartbeat_ms) + .await; + leased_seqnos.modify(|s| s.observe_seqno(seqno)); + maintenance.start_performing(&machine, &gc); + actual_since + } + Err(()) => { + let (seqno, maintenance) = machine.expire_leased_reader(&reader_id).await; + leased_seqnos.modify(|s| s.observe_seqno(seqno)); + maintenance.start_performing(&machine, &gc); + break; + } + }; let elapsed_since_heartbeat = before_heartbeat.elapsed(); if elapsed_since_heartbeat > Duration::from_secs(60) { @@ -1264,7 +1281,7 @@ where ); } - if actual_since.0.is_empty() { + if PartialOrder::less_than(&held_since, &actual_since.0) { // If the read handle was intentionally expired, this task // *should* be aborted before it observes the expiration. So if // we get here, this task somehow failed to keep the read lease @@ -2583,10 +2600,13 @@ pub mod tests { let client = new_test_client(&dyncfgs).await; // set a low rollup threshold so GC/truncation is more aggressive client.cfg.set_config(&ROLLUP_THRESHOLD, 5); - let (mut write, _) = client + let (mut write, read) = client .expect_open::(ShardId::new()) .await; + // Ensure the reader is not holding back the since. + read.expire().await; + // Write a bunch of batches. This should result in a bounded number of // live entries in consensus. const NUM_BATCHES: u64 = 100; diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 428a79cf7b352..d446a95c64f46 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -2010,7 +2010,7 @@ mod tests { .take() .expect("handle should have unexpired state"); read.expire().await; - read_unexpired_state._heartbeat_tasks.await + read_unexpired_state.heartbeat_task.await } /// Verify that shard finalization works with empty shards, shards that have diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 99dac9137c4ec..e85df276686f6 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -33,9 +33,8 @@ use serde::{Deserialize, Serialize}; use timely::PartialOrder; use timely::order::TotalOrder; use timely::progress::{Antichain, Timestamp}; -use tokio::runtime::Handle; use tokio::sync::Notify; -use tracing::{Instrument, debug_span, warn}; +use tracing::warn; use uuid::Uuid; use crate::batch::BLOB_TARGET_SIZE; @@ -43,7 +42,7 @@ use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters}; use crate::fetch::FetchConfig; use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part}; use crate::internal::encoding::Schemas; -use crate::internal::machine::{ExpireFn, Machine}; +use crate::internal::machine::Machine; use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics}; use crate::internal::state::{HollowBatch, LeasedReaderState}; use crate::internal::watch::StateWatch; @@ -578,6 +577,7 @@ pub(crate) struct LeaseMetadata { /// The set of active leases. We hold back the seqno to the minimum lease or /// the recent_seqno, whichever is earlier. leases: BTreeMap, + pub expired: bool, pub request_sync: bool, } @@ -678,11 +678,11 @@ where state: LeasedReaderState, ) -> Self { let schema_cache = machine.applier.schema_cache(); - let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), reader_id.clone()); let leased_seqnos = AwaitableState::new(LeaseMetadata { held_since: state.since.clone(), recent_seqno: state.seqno, leases: Default::default(), + expired: false, request_sync: false, }); ReadHandle { @@ -698,12 +698,7 @@ where since: state.since, leased_seqnos: leased_seqnos.clone(), unexpired_state: Some(UnexpiredReadHandleState { - expire_fn, - _heartbeat_tasks: JoinHandle::abort_on_drop(machine.start_reader_heartbeat_task( - reader_id, - gc, - leased_seqnos, - )), + heartbeat_task: machine.start_reader_heartbeat_task(reader_id, gc, leased_seqnos), }), } } @@ -941,27 +936,14 @@ where /// happens. #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))] pub async fn expire(mut self) { - // We drop the unexpired state before expiring the reader to ensure the - // heartbeat tasks can never observe the expired state. This doesn't - // matter for correctness, but avoids confusing log output if the - // heartbeat task were to discover that its lease has been expired. + self.leased_seqnos.modify(|s| { + s.expired = true; + s.request_sync = true; + }); let Some(unexpired_state) = self.unexpired_state.take() else { return; }; - unexpired_state.expire_fn.0().await; - } - - fn expire_fn( - machine: Machine, - gc: GarbageCollector, - reader_id: LeasedReaderId, - ) -> ExpireFn { - ExpireFn(Box::new(move || { - Box::pin(async move { - let (_, maintenance) = machine.expire_leased_reader(&reader_id).await; - maintenance.start_performing(&machine, &gc); - }) - })) + unexpired_state.heartbeat_task.await; } /// Test helper for a [Self::listen] call that is expected to succeed. @@ -977,8 +959,7 @@ where /// State for a read handle that has not been explicitly expired. #[derive(Debug)] pub(crate) struct UnexpiredReadHandleState { - expire_fn: ExpireFn, - pub(crate) _heartbeat_tasks: AbortOnDropHandle<()>, + pub(crate) heartbeat_task: JoinHandle<()>, } /// An incremental cursor through a particular shard, returned from [ReadHandle::snapshot_cursor]. @@ -1319,34 +1300,10 @@ where impl Drop for ReadHandle { fn drop(&mut self) { - // We drop the unexpired state before expiring the reader to ensure the - // heartbeat tasks can never observe the expired state. This doesn't - // matter for correctness, but avoids confusing log output if the - // heartbeat task were to discover that its lease has been expired. - let Some(unexpired_state) = self.unexpired_state.take() else { - return; - }; - - let handle = match Handle::try_current() { - Ok(x) => x, - Err(_) => { - warn!( - "ReadHandle {} dropped without being explicitly expired, falling back to lease timeout", - self.reader_id - ); - return; - } - }; - // Spawn a best-effort task to expire this read handle. It's fine if - // this doesn't run to completion, we'd just have to wait out the lease - // before the shard-global since is unblocked. - // - // Intentionally create the span outside the task to set the parent. - let expire_span = debug_span!("drop::expire"); - handle.spawn_named( - || format!("ReadHandle::expire ({})", self.reader_id), - unexpired_state.expire_fn.0().instrument(expire_span), - ); + self.leased_seqnos.modify(|s| { + s.expired = true; + s.request_sync = true; + }); } } From 19d2c0ccf49f55146625594b9687de7bd5ffa79e Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Mon, 22 Dec 2025 19:11:34 -0500 Subject: [PATCH 8/8] Move the heartbeat task to read.rs This is a bit more read-handle business than machine business, in my opinion! --- src/persist-client/src/internal/machine.rs | 128 +----------------- src/persist-client/src/read.rs | 143 ++++++++++++++++++++- 2 files changed, 141 insertions(+), 130 deletions(-) diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 3109ed5c55314..8fc02564c9df7 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -14,17 +14,15 @@ use std::ops::ControlFlow::{self, Break, Continue}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; -use differential_dataflow::Hashable; use differential_dataflow::difference::Monoid; use differential_dataflow::lattice::Lattice; use futures::FutureExt; use futures::future::{self, BoxFuture}; use mz_dyncfg::{Config, ConfigSet}; -use mz_ore::cast::{CastFrom, CastLossy}; +use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; #[allow(unused_imports)] // False positive. use mz_ore::fmt::FormatBuffer; -use mz_ore::task::JoinHandle; use mz_ore::{assert_none, soft_assert_no_log}; use mz_persist::location::{ExternalError, Indeterminate, SeqNo}; use mz_persist::retry::Retry; @@ -32,9 +30,8 @@ use mz_persist_types::schema::SchemaId; use mz_persist_types::{Codec, Codec64, Opaque}; use semver::Version; use timely::PartialOrder; -use timely::order::TotalOrder; use timely::progress::{Antichain, Timestamp}; -use tracing::{Instrument, debug, info, trace_span, warn}; +use tracing::{Instrument, debug, info, trace_span}; use crate::async_runtime::IsolatedRuntime; use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES; @@ -44,7 +41,6 @@ use crate::critical::CriticalReaderId; use crate::error::{CodecMismatch, InvalidUsage}; use crate::internal::apply::Applier; use crate::internal::compact::CompactReq; -use crate::internal::gc::GarbageCollector; use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance}; use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics}; use crate::internal::paths::PartialRollupKey; @@ -56,7 +52,7 @@ use crate::internal::state::{ use crate::internal::state_versions::StateVersions; use crate::internal::trace::{ApplyMergeResult, FueledMergeRes}; use crate::internal::watch::StateWatch; -use crate::read::{AwaitableState, LeaseMetadata, LeasedReaderId, READER_LEASE_DURATION}; +use crate::read::LeasedReaderId; use crate::rpc::PubSubSender; use crate::schema::CaESchema; use crate::write::WriterId; @@ -1183,124 +1179,6 @@ impl CompareAndAppendRes { } } -impl Machine -where - K: Debug + Codec, - V: Debug + Codec, - T: Timestamp + Lattice + TotalOrder + Codec64 + Sync, - D: Monoid + Codec64 + Send + Sync, -{ - pub fn start_reader_heartbeat_task( - self, - reader_id: LeasedReaderId, - gc: GarbageCollector, - leased_seqnos: AwaitableState>, - ) -> JoinHandle<()> { - let metrics = Arc::clone(&self.applier.metrics); - let name = format!("persist::heartbeat_read({},{})", self.shard_id(), reader_id); - mz_ore::task::spawn(|| name, { - metrics.tasks.heartbeat_read.instrument_task(async move { - Self::reader_heartbeat_task(self, reader_id, gc, leased_seqnos).await - }) - }) - } - - async fn reader_heartbeat_task( - machine: Self, - reader_id: LeasedReaderId, - gc: GarbageCollector, - leased_seqnos: AwaitableState>, - ) { - let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 4; - // Jitter the first tick to avoid a thundering herd when many readers are started around - // the same instant, like during deploys. - let jitter: f64 = f64::cast_lossy(reader_id.hashed()) / f64::cast_lossy(u64::MAX); - let mut interval = tokio::time::interval_at( - tokio::time::Instant::now() + sleep_duration.mul_f64(jitter), - sleep_duration, - ); - let mut held_since = leased_seqnos.read(|s| s.since().clone()); - loop { - let before_sleep = Instant::now(); - let _woke_by_tick = tokio::select! { - _tick = interval.tick() => { - true - } - _whatever = leased_seqnos.wait_while(|s| !s.request_sync) => { - false - } - }; - - let elapsed_since_before_sleeping = before_sleep.elapsed(); - if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) { - warn!( - "reader ({}) of shard ({}) went {}s between heartbeats", - reader_id, - machine.shard_id(), - elapsed_since_before_sleeping.as_secs_f64() - ); - } - - let before_heartbeat = Instant::now(); - let heartbeat_ms = (machine.applier.cfg.now)(); - let current_seqno = machine.seqno(); - let result = leased_seqnos.modify(|s| { - if s.expired { - Err(()) - } else { - s.observe_seqno(current_seqno); - s.request_sync = false; - held_since.join_assign(s.since()); - Ok(s.outstanding_seqno()) - } - }); - let actual_since = match result { - Ok(held_seqno) => { - let (seqno, actual_since, maintenance) = machine - .downgrade_since(&reader_id, held_seqno, &held_since, heartbeat_ms) - .await; - leased_seqnos.modify(|s| s.observe_seqno(seqno)); - maintenance.start_performing(&machine, &gc); - actual_since - } - Err(()) => { - let (seqno, maintenance) = machine.expire_leased_reader(&reader_id).await; - leased_seqnos.modify(|s| s.observe_seqno(seqno)); - maintenance.start_performing(&machine, &gc); - break; - } - }; - - let elapsed_since_heartbeat = before_heartbeat.elapsed(); - if elapsed_since_heartbeat > Duration::from_secs(60) { - warn!( - "reader ({}) of shard ({}) heartbeat call took {}s", - reader_id, - machine.shard_id(), - elapsed_since_heartbeat.as_secs_f64(), - ); - } - - if PartialOrder::less_than(&held_since, &actual_since.0) { - // If the read handle was intentionally expired, this task - // *should* be aborted before it observes the expiration. So if - // we get here, this task somehow failed to keep the read lease - // alive. Warn loudly, because there's now a live read handle to - // an expired shard that will panic if used, but don't panic, - // just in case there is some edge case that results in this - // task observing the intentional expiration of a read handle. - warn!( - "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \ - while read handle is live", - reader_id, - machine.shard_id(), - ); - return; - } - } - } -} - pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config = Config::new( "persist_next_listen_batch_retryer_fixed_sleep", Duration::from_millis(1200), // pubsub is on by default! diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index e85df276686f6..36f6894d04259 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -14,17 +14,19 @@ use std::collections::BTreeMap; use std::fmt::{Debug, Formatter}; use std::future::Future; use std::sync::{Arc, RwLock}; -use std::time::Duration; +use std::time::{Duration, Instant}; +use differential_dataflow::Hashable; use differential_dataflow::consolidation::consolidate_updates; use differential_dataflow::difference::Monoid; use differential_dataflow::lattice::Lattice; use futures::Stream; use futures_util::{StreamExt, stream}; use mz_dyncfg::Config; +use mz_ore::cast::CastLossy; use mz_ore::halt; use mz_ore::instrument; -use mz_ore::task::{AbortOnDropHandle, JoinHandle, RuntimeExt}; +use mz_ore::task::JoinHandle; use mz_persist::location::{Blob, SeqNo}; use mz_persist_types::columnar::{ColumnDecoder, Schema}; use mz_persist_types::{Codec, Codec64}; @@ -573,12 +575,19 @@ impl AwaitableState { pub(crate) struct LeaseMetadata { /// The frontier we should hold the time-based lease back to. held_since: Antichain, + /// The since hold we've actually committed to state. Should always be <= + /// the since hold. + applied_since: Antichain, + /// The largest seqno we've observed in the state. recent_seqno: SeqNo, /// The set of active leases. We hold back the seqno to the minimum lease or /// the recent_seqno, whichever is earlier. leases: BTreeMap, - pub expired: bool, - pub request_sync: bool, + /// True iff this state is expired. + expired: bool, + /// Used to trigger the background task to heartbeat state, instead of waiting for + /// the next tick. + request_sync: bool, } impl LeaseMetadata @@ -667,6 +676,7 @@ where T: Timestamp + TotalOrder + Lattice + Codec64 + Sync, D: Monoid + Codec64 + Send + Sync, { + #[allow(clippy::unused_async)] pub(crate) async fn new( cfg: PersistConfig, metrics: Arc, @@ -680,6 +690,7 @@ where let schema_cache = machine.applier.schema_cache(); let leased_seqnos = AwaitableState::new(LeaseMetadata { held_since: state.since.clone(), + applied_since: state.since.clone(), recent_seqno: state.seqno, leases: Default::default(), expired: false, @@ -698,11 +709,133 @@ where since: state.since, leased_seqnos: leased_seqnos.clone(), unexpired_state: Some(UnexpiredReadHandleState { - heartbeat_task: machine.start_reader_heartbeat_task(reader_id, gc, leased_seqnos), + heartbeat_task: Self::start_reader_heartbeat_task( + machine, + reader_id, + gc, + leased_seqnos, + ), }), } } + fn start_reader_heartbeat_task( + machine: Machine, + reader_id: LeasedReaderId, + gc: GarbageCollector, + leased_seqnos: AwaitableState>, + ) -> JoinHandle<()> { + let metrics = Arc::clone(&machine.applier.metrics); + let name = format!( + "persist::heartbeat_read({},{})", + machine.shard_id(), + reader_id + ); + mz_ore::task::spawn(|| name, { + metrics.tasks.heartbeat_read.instrument_task(async move { + Self::reader_heartbeat_task(machine, reader_id, gc, leased_seqnos).await + }) + }) + } + + async fn reader_heartbeat_task( + machine: Machine, + reader_id: LeasedReaderId, + gc: GarbageCollector, + leased_seqnos: AwaitableState>, + ) { + let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 4; + // Jitter the first tick to avoid a thundering herd when many readers are started around + // the same instant, like during deploys. + let jitter: f64 = f64::cast_lossy(reader_id.hashed()) / f64::cast_lossy(u64::MAX); + let mut interval = tokio::time::interval_at( + tokio::time::Instant::now() + sleep_duration.mul_f64(jitter), + sleep_duration, + ); + let mut held_since = leased_seqnos.read(|s| (&s.held_since).clone()); + loop { + let before_sleep = Instant::now(); + let _woke_by_tick = tokio::select! { + _tick = interval.tick() => { + true + } + _whatever = leased_seqnos.wait_while(|s| !s.request_sync) => { + false + } + }; + + let elapsed_since_before_sleeping = before_sleep.elapsed(); + if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) { + warn!( + "reader ({}) of shard ({}) went {}s between heartbeats", + reader_id, + machine.shard_id(), + elapsed_since_before_sleeping.as_secs_f64() + ); + } + + let before_heartbeat = Instant::now(); + let heartbeat_ms = (machine.applier.cfg.now)(); + let current_seqno = machine.seqno(); + let result = leased_seqnos.modify(|s| { + if s.expired { + Err(()) + } else { + s.observe_seqno(current_seqno); + s.request_sync = false; + held_since.join_assign(&s.held_since); + Ok(s.outstanding_seqno()) + } + }); + let actual_since = match result { + Ok(held_seqno) => { + let (seqno, actual_since, maintenance) = machine + .downgrade_since(&reader_id, held_seqno, &held_since, heartbeat_ms) + .await; + leased_seqnos.modify(|s| { + s.applied_since.clone_from(&actual_since.0); + s.observe_seqno(seqno) + }); + maintenance.start_performing(&machine, &gc); + actual_since + } + Err(()) => { + let (seqno, maintenance) = machine.expire_leased_reader(&reader_id).await; + leased_seqnos.modify(|s| s.observe_seqno(seqno)); + maintenance.start_performing(&machine, &gc); + break; + } + }; + + let elapsed_since_heartbeat = before_heartbeat.elapsed(); + if elapsed_since_heartbeat > Duration::from_secs(60) { + warn!( + "reader ({}) of shard ({}) heartbeat call took {}s", + reader_id, + machine.shard_id(), + elapsed_since_heartbeat.as_secs_f64(), + ); + } + + if PartialOrder::less_than(&held_since, &actual_since.0) { + // If the read handle was intentionally expired, this task + // *should* be aborted before it observes the expiration. So if + // we get here, this task somehow failed to keep the read lease + // alive. Warn loudly, because there's now a live read handle to + // an expired shard that will panic if used, but don't panic, + // just in case there is some edge case that results in this + // task observing the intentional expiration of a read handle. + warn!( + "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \ + while read handle is live", + reader_id, + machine.shard_id(), + ); + return; + } + } + } + /// This handle's shard id. pub fn shard_id(&self) -> ShardId { self.machine.shard_id()