Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/persist-client/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,6 @@ impl Lease {
}

/// Returns the inner [SeqNo] of this [Lease].
#[cfg(test)]
pub fn seqno(&self) -> SeqNo {
*self.0
}
Expand Down
3 changes: 0 additions & 3 deletions src/persist-client/src/internal/datadriven.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,6 @@ mod tests {
"fetch-batch" => machine_dd::fetch_batch(&state, args).await,
"finalize" => machine_dd::finalize(&mut state, args).await,
"gc" => machine_dd::gc(&mut state, args).await,
"heartbeat-leased-reader" => {
machine_dd::heartbeat_leased_reader(&state, args).await
}
"is-finalized" => machine_dd::is_finalized(&state, args),
"listen-through" => {
machine_dd::listen_through(&mut state, args).await
Expand Down
154 changes: 8 additions & 146 deletions src/persist-client/src/internal/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use mz_ore::cast::CastFrom;
use mz_ore::error::ErrorExt;
#[allow(unused_imports)] // False positive.
use mz_ore::fmt::FormatBuffer;
use mz_ore::task::JoinHandle;
use mz_ore::{assert_none, soft_assert_no_log};
use mz_persist::location::{ExternalError, Indeterminate, SeqNo};
use mz_persist::retry::Retry;
Expand All @@ -32,7 +31,7 @@ use mz_persist_types::{Codec, Codec64, Opaque};
use semver::Version;
use timely::PartialOrder;
use timely::progress::{Antichain, Timestamp};
use tracing::{Instrument, debug, info, trace_span, warn};
use tracing::{Instrument, debug, info, trace_span};

use crate::async_runtime::IsolatedRuntime;
use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES;
Expand All @@ -42,7 +41,6 @@ use crate::critical::CriticalReaderId;
use crate::error::{CodecMismatch, InvalidUsage};
use crate::internal::apply::Applier;
use crate::internal::compact::CompactReq;
use crate::internal::gc::GarbageCollector;
use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance};
use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics};
use crate::internal::paths::PartialRollupKey;
Expand All @@ -54,7 +52,7 @@ use crate::internal::state::{
use crate::internal::state_versions::StateVersions;
use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
use crate::internal::watch::StateWatch;
use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
use crate::read::LeasedReaderId;
use crate::rpc::PubSubSender;
use crate::schema::CaESchema;
use crate::write::WriterId;
Expand Down Expand Up @@ -620,7 +618,7 @@ where
pub async fn downgrade_since(
&self,
reader_id: &LeasedReaderId,
outstanding_seqno: Option<SeqNo>,
outstanding_seqno: SeqNo,
new_since: &Antichain<T>,
heartbeat_timestamp_ms: u64,
) -> (SeqNo, Since<T>, RoutineMaintenance) {
Expand Down Expand Up @@ -663,20 +661,6 @@ where
}
}

pub async fn heartbeat_leased_reader(
&self,
reader_id: &LeasedReaderId,
heartbeat_timestamp_ms: u64,
) -> (SeqNo, bool, RoutineMaintenance) {
let metrics = Arc::clone(&self.applier.metrics);
let (seqno, existed, maintenance) = self
.apply_unbatched_idempotent_cmd(&metrics.cmds.heartbeat_reader, |_, _, state| {
state.heartbeat_leased_reader(reader_id, heartbeat_timestamp_ms)
})
.await;
(seqno, existed, maintenance)
}

pub async fn expire_leased_reader(
&self,
reader_id: &LeasedReaderId,
Expand Down Expand Up @@ -1195,119 +1179,6 @@ impl<T: Debug> CompareAndAppendRes<T> {
}
}

impl<K, V, T, D> Machine<K, V, T, D>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64 + Sync,
D: Monoid + Codec64 + Send + Sync,
{
#[allow(clippy::unused_async)]
pub async fn start_reader_heartbeat_tasks(
self,
reader_id: LeasedReaderId,
gc: GarbageCollector<K, V, T, D>,
) -> Vec<JoinHandle<()>> {
let mut ret = Vec::new();
let metrics = Arc::clone(&self.applier.metrics);

// TODO: In response to a production incident, this runs the heartbeat
// task on both the in-context tokio runtime and persist's isolated
// runtime. We think we were seeing tasks (including this one) get stuck
// indefinitely in tokio while waiting for a runtime worker. This could
// happen if some other task in that runtime never yields. It's possible
// that one of the two runtimes is healthy while the other isn't (this
// was inconclusive in the incident debugging), and the heartbeat task
// is fairly lightweight, so run a copy in each in case that helps.
//
// The real fix here is to find the misbehaving task and fix it. Remove
// this duplication when that happens.
let name = format!("persist::heartbeat_read({},{})", self.shard_id(), reader_id);
ret.push(mz_ore::task::spawn(|| name, {
let machine = self.clone();
let reader_id = reader_id.clone();
let gc = gc.clone();
metrics
.tasks
.heartbeat_read
.instrument_task(Self::reader_heartbeat_task(machine, reader_id, gc))
}));

let isolated_runtime = Arc::clone(&self.isolated_runtime);
let name = format!(
"persist::heartbeat_read_isolated({},{})",
self.shard_id(),
reader_id
);
ret.push(
isolated_runtime.spawn_named(
|| name,
metrics
.tasks
.heartbeat_read
.instrument_task(Self::reader_heartbeat_task(self, reader_id, gc)),
),
);

ret
}

async fn reader_heartbeat_task(
machine: Self,
reader_id: LeasedReaderId,
gc: GarbageCollector<K, V, T, D>,
) {
let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 2;
loop {
let before_sleep = Instant::now();
tokio::time::sleep(sleep_duration).await;

let elapsed_since_before_sleeping = before_sleep.elapsed();
if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) {
warn!(
"reader ({}) of shard ({}) went {}s between heartbeats",
reader_id,
machine.shard_id(),
elapsed_since_before_sleeping.as_secs_f64()
);
}

let before_heartbeat = Instant::now();
let (_seqno, existed, maintenance) = machine
.heartbeat_leased_reader(&reader_id, (machine.applier.cfg.now)())
.await;
maintenance.start_performing(&machine, &gc);

let elapsed_since_heartbeat = before_heartbeat.elapsed();
if elapsed_since_heartbeat > Duration::from_secs(60) {
warn!(
"reader ({}) of shard ({}) heartbeat call took {}s",
reader_id,
machine.shard_id(),
elapsed_since_heartbeat.as_secs_f64(),
);
}

if !existed {
// If the read handle was intentionally expired, this task
// *should* be aborted before it observes the expiration. So if
// we get here, this task somehow failed to keep the read lease
// alive. Warn loudly, because there's now a live read handle to
// an expired shard that will panic if used, but don't panic,
// just in case there is some edge case that results in this
// task observing the intentional expiration of a read handle.
warn!(
"heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \
while read handle is live",
reader_id,
machine.shard_id(),
);
return;
}
}
}
}

pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config<Duration> = Config::new(
"persist_next_listen_batch_retryer_fixed_sleep",
Duration::from_millis(1200), // pubsub is on by default!
Expand Down Expand Up @@ -1648,7 +1519,7 @@ pub mod datadriven {
args: DirectiveArgs<'_>,
) -> Result<String, anyhow::Error> {
let since = args.expect_antichain("since");
let seqno = args.optional("seqno");
let seqno = args.optional("seqno").unwrap_or(datadriven.machine.seqno());
let reader_id = args.expect("reader_id");
let (_, since, routine) = datadriven
.machine
Expand Down Expand Up @@ -2351,18 +2222,6 @@ pub mod datadriven {
))
}

pub async fn heartbeat_leased_reader(
datadriven: &MachineState,
args: DirectiveArgs<'_>,
) -> Result<String, anyhow::Error> {
let reader_id = args.expect("reader_id");
let _ = datadriven
.machine
.heartbeat_leased_reader(&reader_id, (datadriven.client.cfg.now)())
.await;
Ok(format!("{} ok\n", datadriven.machine.seqno()))
}

pub async fn expire_critical_reader(
datadriven: &mut MachineState,
args: DirectiveArgs<'_>,
Expand Down Expand Up @@ -2619,10 +2478,13 @@ pub mod tests {
let client = new_test_client(&dyncfgs).await;
// set a low rollup threshold so GC/truncation is more aggressive
client.cfg.set_config(&ROLLUP_THRESHOLD, 5);
let (mut write, _) = client
let (mut write, read) = client
.expect_open::<String, (), u64, i64>(ShardId::new())
.await;

// Ensure the reader is not holding back the since.
read.expire().await;

// Write a bunch of batches. This should result in a bounded number of
// live entries in consensus.
const NUM_BATCHES: u64 = 100;
Expand Down
2 changes: 0 additions & 2 deletions src/persist-client/src/internal/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,6 @@ impl MetricsVecs {
)),
compare_and_downgrade_since: self.cmd_metrics("compare_and_downgrade_since"),
downgrade_since: self.cmd_metrics("downgrade_since"),
heartbeat_reader: self.cmd_metrics("heartbeat_reader"),
expire_reader: self.cmd_metrics("expire_reader"),
expire_writer: self.cmd_metrics("expire_writer"),
merge_res: self.cmd_metrics("merge_res"),
Expand Down Expand Up @@ -630,7 +629,6 @@ pub struct CmdsMetrics {
pub(crate) compare_and_append_noop: IntCounter,
pub(crate) compare_and_downgrade_since: CmdMetrics,
pub(crate) downgrade_since: CmdMetrics,
pub(crate) heartbeat_reader: CmdMetrics,
pub(crate) expire_reader: CmdMetrics,
pub(crate) expire_writer: CmdMetrics,
pub(crate) merge_res: CmdMetrics,
Expand Down
62 changes: 16 additions & 46 deletions src/persist-client/src/internal/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1877,7 +1877,7 @@ where
&mut self,
reader_id: &LeasedReaderId,
seqno: SeqNo,
outstanding_seqno: Option<SeqNo>,
outstanding_seqno: SeqNo,
new_since: &Antichain<T>,
heartbeat_timestamp_ms: u64,
) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
Expand Down Expand Up @@ -1905,18 +1905,15 @@ where
reader_state.last_heartbeat_timestamp_ms,
);

let seqno = match outstanding_seqno {
Some(outstanding_seqno) => {
assert!(
outstanding_seqno >= reader_state.seqno,
"SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
let seqno = {
assert!(
outstanding_seqno >= reader_state.seqno,
"SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
is behind current reader_state ({:?})",
outstanding_seqno,
reader_state.seqno,
);
std::cmp::min(outstanding_seqno, seqno)
}
None => seqno,
outstanding_seqno,
reader_state.seqno,
);
std::cmp::min(outstanding_seqno, seqno)
};

reader_state.seqno = seqno;
Expand Down Expand Up @@ -1979,33 +1976,6 @@ where
}
}

pub fn heartbeat_leased_reader(
&mut self,
reader_id: &LeasedReaderId,
heartbeat_timestamp_ms: u64,
) -> ControlFlow<NoOpStateTransition<bool>, bool> {
// We expire all readers if the upper and since both advance to the
// empty antichain. Gracefully handle this. At the same time,
// short-circuit the cmd application so we don't needlessly create new
// SeqNos.
if self.is_tombstone() {
return Break(NoOpStateTransition(false));
}

match self.leased_readers.get_mut(reader_id) {
Some(reader_state) => {
reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
heartbeat_timestamp_ms,
reader_state.last_heartbeat_timestamp_ms,
);
Continue(true)
}
// No-op, but we still commit the state change so that this gets
// linearized (maybe we're looking at old state).
None => Continue(false),
}
}

pub fn expire_leased_reader(
&mut self,
reader_id: &LeasedReaderId,
Expand Down Expand Up @@ -3231,7 +3201,7 @@ pub(crate) mod tests {
state.collections.downgrade_since(
&reader,
seqno,
None,
seqno,
&Antichain::from_elem(2),
now()
),
Expand All @@ -3243,7 +3213,7 @@ pub(crate) mod tests {
state.collections.downgrade_since(
&reader,
seqno,
None,
seqno,
&Antichain::from_elem(2),
now()
),
Expand All @@ -3255,7 +3225,7 @@ pub(crate) mod tests {
state.collections.downgrade_since(
&reader,
seqno,
None,
seqno,
&Antichain::from_elem(1),
now()
),
Expand All @@ -3280,7 +3250,7 @@ pub(crate) mod tests {
state.collections.downgrade_since(
&reader2,
seqno,
None,
seqno,
&Antichain::from_elem(3),
now()
),
Expand All @@ -3292,7 +3262,7 @@ pub(crate) mod tests {
state.collections.downgrade_since(
&reader,
seqno,
None,
seqno,
&Antichain::from_elem(5),
now()
),
Expand Down Expand Up @@ -3324,7 +3294,7 @@ pub(crate) mod tests {
state.collections.downgrade_since(
&reader3,
seqno,
None,
seqno,
&Antichain::from_elem(10),
now()
),
Expand Down Expand Up @@ -3624,7 +3594,7 @@ pub(crate) mod tests {
state.collections.downgrade_since(
&reader,
SeqNo::minimum(),
None,
SeqNo::minimum(),
&Antichain::from_elem(2),
now()
),
Expand Down
Loading