diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs index 574ec8a7f83f0..35ef6800bc8c4 100644 --- a/src/persist-client/src/fetch.rs +++ b/src/persist-client/src/fetch.rs @@ -472,7 +472,6 @@ impl Lease { } /// Returns the inner [SeqNo] of this [Lease]. - #[cfg(test)] pub fn seqno(&self) -> SeqNo { *self.0 } diff --git a/src/persist-client/src/internal/datadriven.rs b/src/persist-client/src/internal/datadriven.rs index 99fccd6cbf7f8..37f40072cb973 100644 --- a/src/persist-client/src/internal/datadriven.rs +++ b/src/persist-client/src/internal/datadriven.rs @@ -245,9 +245,6 @@ mod tests { "fetch-batch" => machine_dd::fetch_batch(&state, args).await, "finalize" => machine_dd::finalize(&mut state, args).await, "gc" => machine_dd::gc(&mut state, args).await, - "heartbeat-leased-reader" => { - machine_dd::heartbeat_leased_reader(&state, args).await - } "is-finalized" => machine_dd::is_finalized(&state, args), "listen-through" => { machine_dd::listen_through(&mut state, args).await diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 33e5544ec2f39..8fc02564c9df7 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -23,7 +23,6 @@ use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; #[allow(unused_imports)] // False positive. use mz_ore::fmt::FormatBuffer; -use mz_ore::task::JoinHandle; use mz_ore::{assert_none, soft_assert_no_log}; use mz_persist::location::{ExternalError, Indeterminate, SeqNo}; use mz_persist::retry::Retry; @@ -32,7 +31,7 @@ use mz_persist_types::{Codec, Codec64, Opaque}; use semver::Version; use timely::PartialOrder; use timely::progress::{Antichain, Timestamp}; -use tracing::{Instrument, debug, info, trace_span, warn}; +use tracing::{Instrument, debug, info, trace_span}; use crate::async_runtime::IsolatedRuntime; use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES; @@ -42,7 +41,6 @@ use crate::critical::CriticalReaderId; use crate::error::{CodecMismatch, InvalidUsage}; use crate::internal::apply::Applier; use crate::internal::compact::CompactReq; -use crate::internal::gc::GarbageCollector; use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance}; use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics}; use crate::internal::paths::PartialRollupKey; @@ -54,7 +52,7 @@ use crate::internal::state::{ use crate::internal::state_versions::StateVersions; use crate::internal::trace::{ApplyMergeResult, FueledMergeRes}; use crate::internal::watch::StateWatch; -use crate::read::{LeasedReaderId, READER_LEASE_DURATION}; +use crate::read::LeasedReaderId; use crate::rpc::PubSubSender; use crate::schema::CaESchema; use crate::write::WriterId; @@ -620,7 +618,7 @@ where pub async fn downgrade_since( &self, reader_id: &LeasedReaderId, - outstanding_seqno: Option, + outstanding_seqno: SeqNo, new_since: &Antichain, heartbeat_timestamp_ms: u64, ) -> (SeqNo, Since, RoutineMaintenance) { @@ -663,20 +661,6 @@ where } } - pub async fn heartbeat_leased_reader( - &self, - reader_id: &LeasedReaderId, - heartbeat_timestamp_ms: u64, - ) -> (SeqNo, bool, RoutineMaintenance) { - let metrics = Arc::clone(&self.applier.metrics); - let (seqno, existed, maintenance) = self - .apply_unbatched_idempotent_cmd(&metrics.cmds.heartbeat_reader, |_, _, state| { - state.heartbeat_leased_reader(reader_id, heartbeat_timestamp_ms) - }) - .await; - (seqno, existed, maintenance) - } - pub async fn expire_leased_reader( &self, reader_id: &LeasedReaderId, @@ -1195,119 +1179,6 @@ impl CompareAndAppendRes { } } -impl Machine -where - K: Debug + Codec, - V: Debug + Codec, - T: Timestamp + Lattice + Codec64 + Sync, - D: Monoid + Codec64 + Send + Sync, -{ - #[allow(clippy::unused_async)] - pub async fn start_reader_heartbeat_tasks( - self, - reader_id: LeasedReaderId, - gc: GarbageCollector, - ) -> Vec> { - let mut ret = Vec::new(); - let metrics = Arc::clone(&self.applier.metrics); - - // TODO: In response to a production incident, this runs the heartbeat - // task on both the in-context tokio runtime and persist's isolated - // runtime. We think we were seeing tasks (including this one) get stuck - // indefinitely in tokio while waiting for a runtime worker. This could - // happen if some other task in that runtime never yields. It's possible - // that one of the two runtimes is healthy while the other isn't (this - // was inconclusive in the incident debugging), and the heartbeat task - // is fairly lightweight, so run a copy in each in case that helps. - // - // The real fix here is to find the misbehaving task and fix it. Remove - // this duplication when that happens. - let name = format!("persist::heartbeat_read({},{})", self.shard_id(), reader_id); - ret.push(mz_ore::task::spawn(|| name, { - let machine = self.clone(); - let reader_id = reader_id.clone(); - let gc = gc.clone(); - metrics - .tasks - .heartbeat_read - .instrument_task(Self::reader_heartbeat_task(machine, reader_id, gc)) - })); - - let isolated_runtime = Arc::clone(&self.isolated_runtime); - let name = format!( - "persist::heartbeat_read_isolated({},{})", - self.shard_id(), - reader_id - ); - ret.push( - isolated_runtime.spawn_named( - || name, - metrics - .tasks - .heartbeat_read - .instrument_task(Self::reader_heartbeat_task(self, reader_id, gc)), - ), - ); - - ret - } - - async fn reader_heartbeat_task( - machine: Self, - reader_id: LeasedReaderId, - gc: GarbageCollector, - ) { - let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 2; - loop { - let before_sleep = Instant::now(); - tokio::time::sleep(sleep_duration).await; - - let elapsed_since_before_sleeping = before_sleep.elapsed(); - if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) { - warn!( - "reader ({}) of shard ({}) went {}s between heartbeats", - reader_id, - machine.shard_id(), - elapsed_since_before_sleeping.as_secs_f64() - ); - } - - let before_heartbeat = Instant::now(); - let (_seqno, existed, maintenance) = machine - .heartbeat_leased_reader(&reader_id, (machine.applier.cfg.now)()) - .await; - maintenance.start_performing(&machine, &gc); - - let elapsed_since_heartbeat = before_heartbeat.elapsed(); - if elapsed_since_heartbeat > Duration::from_secs(60) { - warn!( - "reader ({}) of shard ({}) heartbeat call took {}s", - reader_id, - machine.shard_id(), - elapsed_since_heartbeat.as_secs_f64(), - ); - } - - if !existed { - // If the read handle was intentionally expired, this task - // *should* be aborted before it observes the expiration. So if - // we get here, this task somehow failed to keep the read lease - // alive. Warn loudly, because there's now a live read handle to - // an expired shard that will panic if used, but don't panic, - // just in case there is some edge case that results in this - // task observing the intentional expiration of a read handle. - warn!( - "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \ - while read handle is live", - reader_id, - machine.shard_id(), - ); - return; - } - } - } -} - pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config = Config::new( "persist_next_listen_batch_retryer_fixed_sleep", Duration::from_millis(1200), // pubsub is on by default! @@ -1648,7 +1519,7 @@ pub mod datadriven { args: DirectiveArgs<'_>, ) -> Result { let since = args.expect_antichain("since"); - let seqno = args.optional("seqno"); + let seqno = args.optional("seqno").unwrap_or(datadriven.machine.seqno()); let reader_id = args.expect("reader_id"); let (_, since, routine) = datadriven .machine @@ -2351,18 +2222,6 @@ pub mod datadriven { )) } - pub async fn heartbeat_leased_reader( - datadriven: &MachineState, - args: DirectiveArgs<'_>, - ) -> Result { - let reader_id = args.expect("reader_id"); - let _ = datadriven - .machine - .heartbeat_leased_reader(&reader_id, (datadriven.client.cfg.now)()) - .await; - Ok(format!("{} ok\n", datadriven.machine.seqno())) - } - pub async fn expire_critical_reader( datadriven: &mut MachineState, args: DirectiveArgs<'_>, @@ -2619,10 +2478,13 @@ pub mod tests { let client = new_test_client(&dyncfgs).await; // set a low rollup threshold so GC/truncation is more aggressive client.cfg.set_config(&ROLLUP_THRESHOLD, 5); - let (mut write, _) = client + let (mut write, read) = client .expect_open::(ShardId::new()) .await; + // Ensure the reader is not holding back the since. + read.expire().await; + // Write a bunch of batches. This should result in a bounded number of // live entries in consensus. const NUM_BATCHES: u64 = 100; diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index fc8f570b81c58..3a9500eae508d 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -421,7 +421,6 @@ impl MetricsVecs { )), compare_and_downgrade_since: self.cmd_metrics("compare_and_downgrade_since"), downgrade_since: self.cmd_metrics("downgrade_since"), - heartbeat_reader: self.cmd_metrics("heartbeat_reader"), expire_reader: self.cmd_metrics("expire_reader"), expire_writer: self.cmd_metrics("expire_writer"), merge_res: self.cmd_metrics("merge_res"), @@ -630,7 +629,6 @@ pub struct CmdsMetrics { pub(crate) compare_and_append_noop: IntCounter, pub(crate) compare_and_downgrade_since: CmdMetrics, pub(crate) downgrade_since: CmdMetrics, - pub(crate) heartbeat_reader: CmdMetrics, pub(crate) expire_reader: CmdMetrics, pub(crate) expire_writer: CmdMetrics, pub(crate) merge_res: CmdMetrics, diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index 193982966d80a..1105ceec58e75 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -1877,7 +1877,7 @@ where &mut self, reader_id: &LeasedReaderId, seqno: SeqNo, - outstanding_seqno: Option, + outstanding_seqno: SeqNo, new_since: &Antichain, heartbeat_timestamp_ms: u64, ) -> ControlFlow>, Since> { @@ -1905,18 +1905,15 @@ where reader_state.last_heartbeat_timestamp_ms, ); - let seqno = match outstanding_seqno { - Some(outstanding_seqno) => { - assert!( - outstanding_seqno >= reader_state.seqno, - "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \ + let seqno = { + assert!( + outstanding_seqno >= reader_state.seqno, + "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \ is behind current reader_state ({:?})", - outstanding_seqno, - reader_state.seqno, - ); - std::cmp::min(outstanding_seqno, seqno) - } - None => seqno, + outstanding_seqno, + reader_state.seqno, + ); + std::cmp::min(outstanding_seqno, seqno) }; reader_state.seqno = seqno; @@ -1979,33 +1976,6 @@ where } } - pub fn heartbeat_leased_reader( - &mut self, - reader_id: &LeasedReaderId, - heartbeat_timestamp_ms: u64, - ) -> ControlFlow, bool> { - // We expire all readers if the upper and since both advance to the - // empty antichain. Gracefully handle this. At the same time, - // short-circuit the cmd application so we don't needlessly create new - // SeqNos. - if self.is_tombstone() { - return Break(NoOpStateTransition(false)); - } - - match self.leased_readers.get_mut(reader_id) { - Some(reader_state) => { - reader_state.last_heartbeat_timestamp_ms = std::cmp::max( - heartbeat_timestamp_ms, - reader_state.last_heartbeat_timestamp_ms, - ); - Continue(true) - } - // No-op, but we still commit the state change so that this gets - // linearized (maybe we're looking at old state). - None => Continue(false), - } - } - pub fn expire_leased_reader( &mut self, reader_id: &LeasedReaderId, @@ -3231,7 +3201,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, seqno, - None, + seqno, &Antichain::from_elem(2), now() ), @@ -3243,7 +3213,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, seqno, - None, + seqno, &Antichain::from_elem(2), now() ), @@ -3255,7 +3225,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, seqno, - None, + seqno, &Antichain::from_elem(1), now() ), @@ -3280,7 +3250,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader2, seqno, - None, + seqno, &Antichain::from_elem(3), now() ), @@ -3292,7 +3262,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, seqno, - None, + seqno, &Antichain::from_elem(5), now() ), @@ -3324,7 +3294,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader3, seqno, - None, + seqno, &Antichain::from_elem(10), now() ), @@ -3624,7 +3594,7 @@ pub(crate) mod tests { state.collections.downgrade_since( &reader, SeqNo::minimum(), - None, + SeqNo::minimum(), &Antichain::from_elem(2), now() ), diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 1ffe69be17230..d446a95c64f46 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -364,8 +364,7 @@ impl PersistClient { Arc::clone(&self.blob), reader_id, schemas, - reader_state.since, - heartbeat_ts, + reader_state, ) .await; @@ -920,7 +919,6 @@ impl PersistClient { #[cfg(test)] mod tests { use std::future::Future; - use std::mem; use std::pin::Pin; use std::task::Context; use std::time::Duration; @@ -2007,14 +2005,12 @@ mod tests { .expect("client construction failed") .expect_open::<(), (), u64, i64>(ShardId::new()) .await; - let mut read_unexpired_state = read + let read_unexpired_state = read .unexpired_state .take() .expect("handle should have unexpired state"); read.expire().await; - for read_heartbeat_task in mem::take(&mut read_unexpired_state._heartbeat_tasks) { - let () = read_heartbeat_task.await; - } + read_unexpired_state.heartbeat_task.await } /// Verify that shard finalization works with empty shards, shards that have diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 21ee955db815a..36f6894d04259 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -10,24 +10,23 @@ //! Read capabilities and handles use async_stream::stream; -use std::backtrace::Backtrace; use std::collections::BTreeMap; -use std::fmt::Debug; +use std::fmt::{Debug, Formatter}; use std::future::Future; -use std::sync::Arc; -use std::time::Duration; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; +use differential_dataflow::Hashable; use differential_dataflow::consolidation::consolidate_updates; use differential_dataflow::difference::Monoid; use differential_dataflow::lattice::Lattice; -use differential_dataflow::trace::Description; use futures::Stream; use futures_util::{StreamExt, stream}; use mz_dyncfg::Config; +use mz_ore::cast::CastLossy; use mz_ore::halt; use mz_ore::instrument; -use mz_ore::now::EpochMillis; -use mz_ore::task::{AbortOnDropHandle, JoinHandle, RuntimeExt}; +use mz_ore::task::JoinHandle; use mz_persist::location::{Blob, SeqNo}; use mz_persist_types::columnar::{ColumnDecoder, Schema}; use mz_persist_types::{Codec, Codec64}; @@ -36,8 +35,8 @@ use serde::{Deserialize, Serialize}; use timely::PartialOrder; use timely::order::TotalOrder; use timely::progress::{Antichain, Timestamp}; -use tokio::runtime::Handle; -use tracing::{Instrument, debug_span, warn}; +use tokio::sync::Notify; +use tracing::warn; use uuid::Uuid; use crate::batch::BLOB_TARGET_SIZE; @@ -45,9 +44,9 @@ use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters}; use crate::fetch::FetchConfig; use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part}; use crate::internal::encoding::Schemas; -use crate::internal::machine::{ExpireFn, Machine}; +use crate::internal::machine::Machine; use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics}; -use crate::internal::state::{BatchPart, HollowBatch}; +use crate::internal::state::{HollowBatch, LeasedReaderState}; use crate::internal::watch::StateWatch; use crate::iter::{Consolidator, StructuredSort}; use crate::schema::SchemaCache; @@ -228,8 +227,6 @@ pub enum ListenEvent { #[derive(Debug)] pub struct Listen { handle: ReadHandle, - watch: StateWatch, - as_of: Antichain, since: Antichain, frontier: Antichain, @@ -258,11 +255,8 @@ where // (initially as_of although the frontier is inclusive and the as_of // isn't). Be a good citizen and downgrade early. handle.downgrade_since(&since).await; - - let watch = handle.machine.applier.watch(); Ok(Listen { handle, - watch, since, frontier: as_of.clone(), as_of, @@ -290,7 +284,7 @@ where let min_elapsed = self.handle.heartbeat_duration(); let next_batch = self.handle.machine.next_listen_batch( &self.frontier, - &mut self.watch, + &mut self.handle.watch, Some(&self.handle.reader_id), retry, ); @@ -501,6 +495,134 @@ where } } +/// A concurrent state - one which allows reading, writing, and waiting for changes made by +/// another concurrent writer. +/// +/// This is morally similar to a mutex with a condvar, but allowing asynchronous waits and with +/// access methods that make it a little trickier to accidentally hold a lock across a yield point. +pub(crate) struct AwaitableState { + state: Arc>, + /// NB: we can't wrap the [Notify] in the lock since the signature of [Notify::notified] + /// doesn't allow it, but this is only accessed while holding the lock. + notify: Arc, +} + +impl Debug for AwaitableState { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.state.read().fmt(f) + } +} + +impl Clone for AwaitableState { + fn clone(&self) -> Self { + Self { + state: Arc::clone(&self.state), + notify: Arc::clone(&self.notify), + } + } +} + +impl AwaitableState { + pub fn new(value: T) -> Self { + Self { + state: Arc::new(RwLock::new(value)), + notify: Arc::new(Notify::new()), + } + } + + #[allow(unused)] + pub fn read(&self, read_fn: impl FnOnce(&T) -> A) -> A { + let guard = self.state.read().expect("not poisoned"); + let state = &*guard; + read_fn(state) + } + + pub fn modify(&self, write_fn: impl FnOnce(&mut T) -> A) -> A { + let mut guard = self.state.write().expect("not poisoned"); + let state = &mut *guard; + let result = write_fn(state); + // Notify everyone while holding the guard. This guarantees that all waiters will observe + // the just-updated state. + self.notify.notify_waiters(); + result + } + + pub async fn wait_for(&self, mut wait_fn: impl FnMut(&T) -> Option) -> A { + loop { + let notified = { + let guard = self.state.read().expect("not poisoned"); + let state = &*guard; + if let Some(result) = wait_fn(state) { + return result; + } + // Grab the notified future while holding the guard. This ensures that we will see any + // future modifications to this state, even if they happen before the first poll. + let notified = self.notify.notified(); + drop(guard); + notified + }; + + notified.await; + } + } + + pub async fn wait_while(&self, mut wait_fn: impl FnMut(&T) -> bool) { + self.wait_for(|s| (!wait_fn(s)).then_some(())).await + } +} + +#[derive(Debug)] +pub(crate) struct LeaseMetadata { + /// The frontier we should hold the time-based lease back to. + held_since: Antichain, + /// The since hold we've actually committed to state. Should always be <= + /// the since hold. + applied_since: Antichain, + /// The largest seqno we've observed in the state. + recent_seqno: SeqNo, + /// The set of active leases. We hold back the seqno to the minimum lease or + /// the recent_seqno, whichever is earlier. + leases: BTreeMap, + /// True iff this state is expired. + expired: bool, + /// Used to trigger the background task to heartbeat state, instead of waiting for + /// the next tick. + request_sync: bool, +} + +impl LeaseMetadata +where + T: Timestamp + TotalOrder + Lattice + Codec64 + Sync, +{ + pub fn downgrade_since(&mut self, since: &Antichain) { + self.held_since.join_assign(since); + } + + pub fn observe_seqno(&mut self, seqno: SeqNo) { + self.recent_seqno = seqno.max(self.recent_seqno); + } + + pub fn lease_seqno(&mut self) -> Lease { + let seqno = self.recent_seqno; + let lease = self + .leases + .entry(seqno) + .or_insert_with(|| Lease::new(seqno)); + lease.clone() + } + + pub fn outstanding_seqno(&mut self) -> SeqNo { + while let Some(first) = self.leases.first_entry() { + if first.get().count() <= 1 { + first.remove(); + } else { + return *first.key(); + } + } + self.recent_seqno + } +} + /// A "capability" granting the ability to read the state of some shard at times /// greater or equal to `self.since()`. /// @@ -528,13 +650,14 @@ pub struct ReadHandle { pub(crate) machine: Machine, pub(crate) gc: GarbageCollector, pub(crate) blob: Arc, + watch: StateWatch, + pub(crate) reader_id: LeasedReaderId, pub(crate) read_schemas: Schemas, pub(crate) schema_cache: SchemaCache, since: Antichain, - pub(crate) last_heartbeat: EpochMillis, - pub(crate) leased_seqnos: BTreeMap, + pub(crate) leased_seqnos: AwaitableState>, pub(crate) unexpired_state: Option, } @@ -553,6 +676,7 @@ where T: Timestamp + TotalOrder + Lattice + Codec64 + Sync, D: Monoid + Codec64 + Send + Sync, { + #[allow(clippy::unused_async)] pub(crate) async fn new( cfg: PersistConfig, metrics: Arc, @@ -561,35 +685,157 @@ where blob: Arc, reader_id: LeasedReaderId, read_schemas: Schemas, - since: Antichain, - last_heartbeat: EpochMillis, + state: LeasedReaderState, ) -> Self { let schema_cache = machine.applier.schema_cache(); - let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), reader_id.clone()); + let leased_seqnos = AwaitableState::new(LeaseMetadata { + held_since: state.since.clone(), + applied_since: state.since.clone(), + recent_seqno: state.seqno, + leases: Default::default(), + expired: false, + request_sync: false, + }); ReadHandle { cfg, metrics: Arc::clone(&metrics), machine: machine.clone(), gc: gc.clone(), blob, + watch: machine.applier.watch(), reader_id: reader_id.clone(), read_schemas, schema_cache, - since, - last_heartbeat, - leased_seqnos: BTreeMap::new(), + since: state.since, + leased_seqnos: leased_seqnos.clone(), unexpired_state: Some(UnexpiredReadHandleState { - expire_fn, - _heartbeat_tasks: machine - .start_reader_heartbeat_tasks(reader_id, gc) - .await - .into_iter() - .map(JoinHandle::abort_on_drop) - .collect(), + heartbeat_task: Self::start_reader_heartbeat_task( + machine, + reader_id, + gc, + leased_seqnos, + ), }), } } + fn start_reader_heartbeat_task( + machine: Machine, + reader_id: LeasedReaderId, + gc: GarbageCollector, + leased_seqnos: AwaitableState>, + ) -> JoinHandle<()> { + let metrics = Arc::clone(&machine.applier.metrics); + let name = format!( + "persist::heartbeat_read({},{})", + machine.shard_id(), + reader_id + ); + mz_ore::task::spawn(|| name, { + metrics.tasks.heartbeat_read.instrument_task(async move { + Self::reader_heartbeat_task(machine, reader_id, gc, leased_seqnos).await + }) + }) + } + + async fn reader_heartbeat_task( + machine: Machine, + reader_id: LeasedReaderId, + gc: GarbageCollector, + leased_seqnos: AwaitableState>, + ) { + let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 4; + // Jitter the first tick to avoid a thundering herd when many readers are started around + // the same instant, like during deploys. + let jitter: f64 = f64::cast_lossy(reader_id.hashed()) / f64::cast_lossy(u64::MAX); + let mut interval = tokio::time::interval_at( + tokio::time::Instant::now() + sleep_duration.mul_f64(jitter), + sleep_duration, + ); + let mut held_since = leased_seqnos.read(|s| (&s.held_since).clone()); + loop { + let before_sleep = Instant::now(); + let _woke_by_tick = tokio::select! { + _tick = interval.tick() => { + true + } + _whatever = leased_seqnos.wait_while(|s| !s.request_sync) => { + false + } + }; + + let elapsed_since_before_sleeping = before_sleep.elapsed(); + if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) { + warn!( + "reader ({}) of shard ({}) went {}s between heartbeats", + reader_id, + machine.shard_id(), + elapsed_since_before_sleeping.as_secs_f64() + ); + } + + let before_heartbeat = Instant::now(); + let heartbeat_ms = (machine.applier.cfg.now)(); + let current_seqno = machine.seqno(); + let result = leased_seqnos.modify(|s| { + if s.expired { + Err(()) + } else { + s.observe_seqno(current_seqno); + s.request_sync = false; + held_since.join_assign(&s.held_since); + Ok(s.outstanding_seqno()) + } + }); + let actual_since = match result { + Ok(held_seqno) => { + let (seqno, actual_since, maintenance) = machine + .downgrade_since(&reader_id, held_seqno, &held_since, heartbeat_ms) + .await; + leased_seqnos.modify(|s| { + s.applied_since.clone_from(&actual_since.0); + s.observe_seqno(seqno) + }); + maintenance.start_performing(&machine, &gc); + actual_since + } + Err(()) => { + let (seqno, maintenance) = machine.expire_leased_reader(&reader_id).await; + leased_seqnos.modify(|s| s.observe_seqno(seqno)); + maintenance.start_performing(&machine, &gc); + break; + } + }; + + let elapsed_since_heartbeat = before_heartbeat.elapsed(); + if elapsed_since_heartbeat > Duration::from_secs(60) { + warn!( + "reader ({}) of shard ({}) heartbeat call took {}s", + reader_id, + machine.shard_id(), + elapsed_since_heartbeat.as_secs_f64(), + ); + } + + if PartialOrder::less_than(&held_since, &actual_since.0) { + // If the read handle was intentionally expired, this task + // *should* be aborted before it observes the expiration. So if + // we get here, this task somehow failed to keep the read lease + // alive. Warn loudly, because there's now a live read handle to + // an expired shard that will panic if used, but don't panic, + // just in case there is some edge case that results in this + // task observing the intentional expiration of a read handle. + warn!( + "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \ + while read handle is live", + reader_id, + machine.shard_id(), + ); + return; + } + } + } + /// This handle's shard id. pub fn shard_id(&self) -> ShardId { self.machine.shard_id() @@ -602,15 +848,13 @@ where &self.since } - fn outstanding_seqno(&mut self) -> Option { - while let Some(first) = self.leased_seqnos.first_entry() { - if first.get().count() <= 1 { - first.remove(); - } else { - return Some(*first.key()); - } - } - None + #[cfg(test)] + fn outstanding_seqno(&mut self) -> SeqNo { + let current_seqno = self.machine.seqno(); + self.leased_seqnos.modify(|s| { + s.observe_seqno(current_seqno); + s.outstanding_seqno() + }) } /// Forwards the since frontier of this handle, giving up the ability to @@ -625,41 +869,14 @@ where /// timestamp, making the call a no-op). #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))] pub async fn downgrade_since(&mut self, new_since: &Antichain) { - // Guaranteed to be the smallest/oldest outstanding lease on a `SeqNo`. - let outstanding_seqno = self.outstanding_seqno(); - - let heartbeat_ts = (self.cfg.now)(); - let (_seqno, current_reader_since, maintenance) = self - .machine - .downgrade_since(&self.reader_id, outstanding_seqno, new_since, heartbeat_ts) + self.since = new_since.clone(); + self.leased_seqnos.modify(|s| { + s.downgrade_since(new_since); + s.request_sync = true; + }); + self.leased_seqnos + .wait_while(|s| PartialOrder::less_than(&s.held_since, new_since)) .await; - - // Debugging for database-issues#4590. - if let Some(outstanding_seqno) = outstanding_seqno { - let seqnos_held = _seqno.0.saturating_sub(outstanding_seqno.0); - // We get just over 1 seqno-per-second on average for a shard in - // prod, so this is about an hour. - const SEQNOS_HELD_THRESHOLD: u64 = 60 * 60; - if seqnos_held >= SEQNOS_HELD_THRESHOLD { - tracing::info!( - "{} reader {} holding an unexpected number of seqnos {} vs {}: {:?}. bt: {:?}", - self.machine.shard_id(), - self.reader_id, - outstanding_seqno, - _seqno, - self.leased_seqnos.keys().take(10).collect::>(), - // The Debug impl of backtrace is less aesthetic, but will put the trace - // on a single line and play more nicely with our Honeycomb quota - Backtrace::force_capture(), - ); - } - } - - self.since = current_reader_since.0; - // A heartbeat is just any downgrade_since traffic, so update the - // internal rate limiter here to play nicely with `maybe_heartbeat`. - self.last_heartbeat = heartbeat_ts; - maintenance.start_performing(&self.machine, &self.gc); } /// Returns an ongoing subscription of updates to a shard. @@ -748,23 +965,6 @@ where Ok(Subscribe::new(snapshot_parts, listen)) } - fn lease_batch_part( - &mut self, - desc: Description, - part: BatchPart, - filter: FetchBatchFilter, - ) -> LeasedBatchPart { - LeasedBatchPart { - metrics: Arc::clone(&self.metrics), - shard_id: self.machine.shard_id(), - filter, - desc, - part, - lease: self.lease_seqno(), - filter_pushdown_audit: false, - } - } - fn lease_batch_parts( &mut self, batch: HollowBatch, @@ -774,8 +974,17 @@ where let blob = Arc::clone(&self.blob); let metrics = Arc::clone(&self.metrics); let desc = batch.desc.clone(); + let lease = self.lease_seqno().await; for await part in batch.part_stream(self.shard_id(), &*blob, &*metrics) { - yield self.lease_batch_part(desc.clone(), part.expect("leased part").into_owned(), filter.clone()) + yield LeasedBatchPart { + metrics: Arc::clone(&self.metrics), + shard_id: self.machine.shard_id(), + filter: filter.clone(), + desc: desc.clone(), + part: part.expect("leased part").into_owned(), + lease: lease.clone(), + filter_pushdown_audit: false, + } } } } @@ -783,13 +992,18 @@ where /// Tracks that the `ReadHandle`'s machine's current `SeqNo` is being /// "leased out" to a `LeasedBatchPart`, and cannot be garbage /// collected until its lease has been returned. - fn lease_seqno(&mut self) -> Lease { - let seqno = self.machine.seqno(); - let lease = self - .leased_seqnos - .entry(seqno) - .or_insert_with(|| Lease::new(seqno)); - lease.clone() + async fn lease_seqno(&mut self) -> Lease { + let current_seqno = self.machine.seqno(); + let lease = self.leased_seqnos.modify(|s| { + s.observe_seqno(current_seqno); + s.lease_seqno() + }); + // The seqno we've leased may be the seqno observed by our heartbeat task, which could be + // ahead of the last state we saw. Ensure we only observe states in the future of our hold. + // (Since these are backed by the same state in the same process, this should all be pretty + // fast.) + self.watch.wait_for_seqno_ge(lease.seqno()).await; + lease } /// Returns an independent [ReadHandle] with a new [LeasedReaderId] but the @@ -822,8 +1036,7 @@ where Arc::clone(&self.blob), new_reader_id, self.read_schemas.clone(), - reader_state.since, - heartbeat_ts, + reader_state, ) .await; new_reader @@ -838,13 +1051,12 @@ where /// This is an internally rate limited helper, designed to allow users to /// call it as frequently as they like. Call this or [Self::downgrade_since], /// on some interval that is "frequent" compared to the read lease duration. + #[allow(clippy::unused_async)] pub async fn maybe_downgrade_since(&mut self, new_since: &Antichain) { - let min_elapsed = self.heartbeat_duration(); - let elapsed_since_last_heartbeat = - Duration::from_millis((self.cfg.now)().saturating_sub(self.last_heartbeat)); - if elapsed_since_last_heartbeat >= min_elapsed { - self.downgrade_since(new_since).await; - } + self.since = new_since.clone(); + self.leased_seqnos.modify(|s| { + s.downgrade_since(new_since); + }); } /// Politely expires this reader, releasing its lease. @@ -857,27 +1069,14 @@ where /// happens. #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))] pub async fn expire(mut self) { - // We drop the unexpired state before expiring the reader to ensure the - // heartbeat tasks can never observe the expired state. This doesn't - // matter for correctness, but avoids confusing log output if the - // heartbeat task were to discover that its lease has been expired. + self.leased_seqnos.modify(|s| { + s.expired = true; + s.request_sync = true; + }); let Some(unexpired_state) = self.unexpired_state.take() else { return; }; - unexpired_state.expire_fn.0().await; - } - - fn expire_fn( - machine: Machine, - gc: GarbageCollector, - reader_id: LeasedReaderId, - ) -> ExpireFn { - ExpireFn(Box::new(move || { - Box::pin(async move { - let (_, maintenance) = machine.expire_leased_reader(&reader_id).await; - maintenance.start_performing(&machine, &gc); - }) - })) + unexpired_state.heartbeat_task.await; } /// Test helper for a [Self::listen] call that is expected to succeed. @@ -893,8 +1092,7 @@ where /// State for a read handle that has not been explicitly expired. #[derive(Debug)] pub(crate) struct UnexpiredReadHandleState { - expire_fn: ExpireFn, - pub(crate) _heartbeat_tasks: Vec>, + pub(crate) heartbeat_task: JoinHandle<()>, } /// An incremental cursor through a particular shard, returned from [ReadHandle::snapshot_cursor]. @@ -1022,7 +1220,7 @@ where should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool, ) -> Result, Since> { let batches = self.machine.snapshot(&as_of).await?; - let lease = self.lease_seqno(); + let lease = self.lease_seqno().await; Self::read_batches_consolidated( &self.cfg, @@ -1235,34 +1433,10 @@ where impl Drop for ReadHandle { fn drop(&mut self) { - // We drop the unexpired state before expiring the reader to ensure the - // heartbeat tasks can never observe the expired state. This doesn't - // matter for correctness, but avoids confusing log output if the - // heartbeat task were to discover that its lease has been expired. - let Some(unexpired_state) = self.unexpired_state.take() else { - return; - }; - - let handle = match Handle::try_current() { - Ok(x) => x, - Err(_) => { - warn!( - "ReadHandle {} dropped without being explicitly expired, falling back to lease timeout", - self.reader_id - ); - return; - } - }; - // Spawn a best-effort task to expire this read handle. It's fine if - // this doesn't run to completion, we'd just have to wait out the lease - // before the shard-global since is unblocked. - // - // Intentionally create the span outside the task to set the parent. - let expire_span = debug_span!("drop::expire"); - handle.spawn_named( - || format!("ReadHandle::expire ({})", self.reader_id), - unexpired_state.expire_fn.0().instrument(expire_span), - ); + self.leased_seqnos.modify(|s| { + s.expired = true; + s.request_sync = true; + }); } } @@ -1544,7 +1718,7 @@ mod tests { // We should expect the SeqNo to be downgraded if this part's SeqNo // is no longer leased to any other parts, either. - let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > Some(part_seqno); + let expect_downgrade = subscribe.listen.handle.outstanding_seqno() > part_seqno; let new_seqno_since = subscribe.listen.handle.machine.applier.seqno_since(); if expect_downgrade {