diff --git a/Cargo.lock b/Cargo.lock index 98cf23c62043..503c944affab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -213,17 +213,6 @@ dependencies = [ "critical-section", ] -[[package]] -name = "atty" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi 0.1.19", - "libc", - "winapi", -] - [[package]] name = "autocfg" version = "1.1.0" @@ -1786,15 +1775,6 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" -[[package]] -name = "hermit-abi" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" -dependencies = [ - "libc", -] - [[package]] name = "hermit-abi" version = "0.2.6" @@ -1825,6 +1805,16 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" +[[package]] +name = "histogram" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e673d137229619d5c2c8903b6ed5852b43636c0017ff2e66b1aafb8ccf04b80b" +dependencies = [ + "serde", + "thiserror", +] + [[package]] name = "hmac" version = "0.12.1" @@ -3738,7 +3728,7 @@ name = "s3_scrubber" version = "0.1.0" dependencies = [ "anyhow", - "atty", + "async-stream", "aws-config", "aws-sdk-s3", "aws-smithy-http", @@ -3749,7 +3739,10 @@ dependencies = [ "clap", "crc32c", "either", + "futures-util", "hex", + "histogram", + "itertools", "pageserver", "rand", "reqwest", @@ -3759,6 +3752,7 @@ dependencies = [ "thiserror", "tokio", "tokio-rustls", + "tokio-stream", "tracing", "tracing-appender", "tracing-subscriber", diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index 29bd6ce5986c..32d0d1bed286 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -3,6 +3,7 @@ //! Currently it only analyzes holes, which are regions within the layer range that the layer contains no updates for. In the future it might do more analysis (maybe key quantiles?) but it should never return sensitive data. use anyhow::Result; +use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; use std::cmp::Ordering; use std::collections::BinaryHeap; use std::ops::Range; @@ -142,12 +143,12 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> { let mut total_delta_layers = 0usize; let mut total_image_layers = 0usize; let mut total_excess_layers = 0usize; - for tenant in fs::read_dir(storage_path.join("tenants"))? { + for tenant in fs::read_dir(storage_path.join(TENANTS_SEGMENT_NAME))? { let tenant = tenant?; if !tenant.file_type()?.is_dir() { continue; } - for timeline in fs::read_dir(tenant.path().join("timelines"))? { + for timeline in fs::read_dir(tenant.path().join(TIMELINES_SEGMENT_NAME))? { let timeline = timeline?; if !timeline.file_type()?.is_dir() { continue; diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 608b3cecd65c..ff2044653a79 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -5,6 +5,7 @@ use clap::Subcommand; use pageserver::tenant::block_io::BlockCursor; use pageserver::tenant::disk_btree::DiskBtreeReader; use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary}; +use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; use pageserver::{page_cache, virtual_file}; use pageserver::{ repository::{Key, KEY_SIZE}, @@ -80,13 +81,13 @@ async fn read_delta_file(path: impl AsRef) -> Result<()> { pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { match cmd { LayerCmd::List { path } => { - for tenant in fs::read_dir(path.join("tenants"))? { + for tenant in fs::read_dir(path.join(TENANTS_SEGMENT_NAME))? { let tenant = tenant?; if !tenant.file_type()?.is_dir() { continue; } println!("tenant {}", tenant.file_name().to_string_lossy()); - for timeline in fs::read_dir(tenant.path().join("timelines"))? { + for timeline in fs::read_dir(tenant.path().join(TIMELINES_SEGMENT_NAME))? { let timeline = timeline?; if !timeline.file_type()?.is_dir() { continue; @@ -101,9 +102,9 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { timeline, } => { let timeline_path = path - .join("tenants") + .join(TENANTS_SEGMENT_NAME) .join(tenant) - .join("timelines") + .join(TIMELINES_SEGMENT_NAME) .join(timeline); let mut idx = 0; for layer in fs::read_dir(timeline_path)? { diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 5394f17398e7..ab485c969d04 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -32,7 +32,8 @@ use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig; use crate::tenant::config::TenantConf; use crate::tenant::config::TenantConfOpt; use crate::tenant::{ - TENANT_ATTACHING_MARKER_FILENAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME, + TENANTS_SEGMENT_NAME, TENANT_ATTACHING_MARKER_FILENAME, TENANT_DELETED_MARKER_FILE_NAME, + TIMELINES_SEGMENT_NAME, }; use crate::{ IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX, @@ -563,7 +564,7 @@ impl PageServerConf { // pub fn tenants_path(&self) -> PathBuf { - self.workdir.join("tenants") + self.workdir.join(TENANTS_SEGMENT_NAME) } pub fn tenant_path(&self, tenant_id: &TenantId) -> PathBuf { diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 855301cd1d1b..90491bda3f7c 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -141,6 +141,9 @@ pub use crate::tenant::metadata::save_metadata; // re-export for use in walreceiver pub use crate::tenant::timeline::WalReceiverInfo; +/// The "tenants" part of `tenants//timelines...` +pub const TENANTS_SEGMENT_NAME: &str = "tenants"; + /// Parts of the `.neon/tenants//timelines/` directory prefix. pub const TIMELINES_SEGMENT_NAME: &str = "timelines"; diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 48d43a4e638f..51a0ddc750b9 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -11,6 +11,7 @@ //! src/backend/storage/file/fd.c //! use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME}; +use crate::tenant::TENANTS_SEGMENT_NAME; use once_cell::sync::OnceCell; use std::fs::{self, File, OpenOptions}; use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Write}; @@ -235,7 +236,7 @@ impl VirtualFile { let parts = path_str.split('/').collect::>(); let tenant_id; let timeline_id; - if parts.len() > 5 && parts[parts.len() - 5] == "tenants" { + if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME { tenant_id = parts[parts.len() - 4].to_string(); timeline_id = parts[parts.len() - 2].to_string(); } else { diff --git a/s3_scrubber/Cargo.toml b/s3_scrubber/Cargo.toml index 47668eb4aafb..82cd105d4f1c 100644 --- a/s3_scrubber/Cargo.toml +++ b/s3_scrubber/Cargo.toml @@ -22,6 +22,10 @@ serde_json.workspace = true serde_with.workspace = true workspace_hack.workspace = true utils.workspace = true +async-stream.workspace = true +tokio-stream.workspace = true +futures-util.workspace = true +itertools.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } chrono = { workspace = true, default-features = false, features = ["clock", "serde"] } @@ -30,10 +34,8 @@ aws-config = { workspace = true, default-features = false, features = ["rustls", pageserver = {path="../pageserver"} - tracing.workspace = true tracing-subscriber.workspace = true clap.workspace = true - -atty = "0.2" -tracing-appender = "0.2" \ No newline at end of file +tracing-appender = "0.2" +histogram = "0.7" \ No newline at end of file diff --git a/s3_scrubber/src/checks.rs b/s3_scrubber/src/checks.rs index c52a40ee9453..e86c381985fb 100644 --- a/s3_scrubber/src/checks.rs +++ b/s3_scrubber/src/checks.rs @@ -4,13 +4,12 @@ use std::time::Duration; use anyhow::Context; use aws_sdk_s3::Client; -use tokio::io::AsyncReadExt; use tokio::task::JoinSet; use tracing::{error, info, info_span, warn, Instrument}; use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectId}; use crate::delete_batch_producer::DeleteProducerStats; -use crate::{list_objects_with_retries, RootTarget, MAX_RETRIES}; +use crate::{download_object_with_retries, list_objects_with_retries, RootTarget, MAX_RETRIES}; use pageserver::tenant::storage_layer::LayerFileName; use pageserver::tenant::IndexPart; use utils::id::TenantTimelineId; @@ -92,9 +91,9 @@ pub async fn validate_pageserver_active_tenant_and_timelines( branch_checks.spawn( async move { let check_errors = branch_cleanup_and_check_errors( - id, + &id, &s3_root, - &s3_active_branch, + Some(&s3_active_branch), console_branch, s3_data, ) @@ -107,13 +106,13 @@ pub async fn validate_pageserver_active_tenant_and_timelines( } let mut total_stats = BranchCheckStats::default(); - while let Some((id, branch_check_errors)) = branch_checks + while let Some((id, analysis)) = branch_checks .join_next() .await .transpose() .context("branch check task join")? { - total_stats.add(id, branch_check_errors); + total_stats.add(id, analysis.errors); } Ok(total_stats) } @@ -160,35 +159,59 @@ impl BranchCheckStats { } } -async fn branch_cleanup_and_check_errors( - id: TenantTimelineId, +pub struct TimelineAnalysis { + /// Anomalies detected + pub errors: Vec, + + /// Healthy-but-noteworthy, like old-versioned structures that are readable but + /// worth reporting for awareness that we must not remove that old version decoding + /// yet. + pub warnings: Vec, + + /// Keys not referenced in metadata: candidates for removal + pub garbage_keys: Vec, +} + +impl TimelineAnalysis { + fn new() -> Self { + Self { + errors: Vec::new(), + warnings: Vec::new(), + garbage_keys: Vec::new(), + } + } +} + +pub async fn branch_cleanup_and_check_errors( + id: &TenantTimelineId, s3_root: &RootTarget, - s3_active_branch: &BranchData, + s3_active_branch: Option<&BranchData>, console_branch: Option, s3_data: Option, -) -> Vec { - info!( - "Checking timeline for branch branch {:?}/{:?}", - s3_active_branch.project_id, s3_active_branch.id - ); - let mut branch_check_errors = Vec::new(); - - match console_branch { - Some(console_active_branch) => { - if console_active_branch.deleted { - branch_check_errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether if it got removed during the check", - s3_active_branch.id, s3_active_branch.project_id)) - } - }, - None => branch_check_errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether if it got removed during the check", +) -> TimelineAnalysis { + let mut result = TimelineAnalysis::new(); + + info!("Checking timeline {id}"); + + if let Some(s3_active_branch) = s3_active_branch { + info!( + "Checking console status for timeline for branch {:?}/{:?}", + s3_active_branch.project_id, s3_active_branch.id + ); + match console_branch { + Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check", + s3_active_branch.id, s3_active_branch.project_id)) + }, + None => { + result.errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check", s3_active_branch.id, s3_active_branch.project_id)) + } + }; } - let mut keys_to_remove = Vec::new(); - match s3_data { Some(s3_data) => { - keys_to_remove.extend(s3_data.keys_to_remove); + result.garbage_keys.extend(s3_data.keys_to_remove); match s3_data.blob_data { BlobDataParseResult::Parsed { @@ -196,16 +219,23 @@ async fn branch_cleanup_and_check_errors( mut s3_layers, } => { if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) { - branch_check_errors.push(format!( + result.errors.push(format!( "index_part.json version: {}", index_part.get_version() )) } + if &index_part.get_version() != IndexPart::KNOWN_VERSIONS.last().unwrap() { + result.warnings.push(format!( + "index_part.json version is not latest: {}", + index_part.get_version() + )) + } + if index_part.metadata.disk_consistent_lsn() != index_part.get_disk_consistent_lsn() { - branch_check_errors.push(format!( + result.errors.push(format!( "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})", index_part.metadata.disk_consistent_lsn(), index_part.get_disk_consistent_lsn(), @@ -220,13 +250,13 @@ async fn branch_cleanup_and_check_errors( for (layer, metadata) in index_part.layer_metadata { if metadata.file_size == 0 { - branch_check_errors.push(format!( + result.errors.push(format!( "index_part.json contains a layer {} that has 0 size in its layer metadata", layer.file_name(), )) } if !s3_layers.remove(&layer) { - branch_check_errors.push(format!( + result.errors.push(format!( "index_part.json contains a layer {} that is not present in S3", layer.file_name(), )) @@ -234,55 +264,66 @@ async fn branch_cleanup_and_check_errors( } if !s3_layers.is_empty() { - branch_check_errors.push(format!( + result.errors.push(format!( "index_part.json does not contain layers from S3: {:?}", s3_layers .iter() .map(|layer_name| layer_name.file_name()) .collect::>(), )); - keys_to_remove.extend(s3_layers.iter().map(|layer_name| { - let mut key = s3_root.timeline_root(id).prefix_in_bucket; - let delimiter = s3_root.delimiter(); - if !key.ends_with(delimiter) { - key.push_str(delimiter); - } - key.push_str(&layer_name.file_name()); - key - })); + result + .garbage_keys + .extend(s3_layers.iter().map(|layer_name| { + let mut key = s3_root.timeline_root(id).prefix_in_bucket; + let delimiter = s3_root.delimiter(); + if !key.ends_with(delimiter) { + key.push_str(delimiter); + } + key.push_str(&layer_name.file_name()); + key + })); } } - BlobDataParseResult::Incorrect(parse_errors) => branch_check_errors.extend( + BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend( parse_errors .into_iter() .map(|error| format!("parse error: {error}")), ), } } - None => branch_check_errors.push("Timeline has no data on S3 at all".to_string()), + None => result + .errors + .push("Timeline has no data on S3 at all".to_string()), } - if branch_check_errors.is_empty() { + if result.errors.is_empty() { info!("No check errors found"); } else { - warn!("Found check errors: {branch_check_errors:?}"); + warn!("Timeline metadata errors: {0:?}", result.errors); } - if !keys_to_remove.is_empty() { - error!("The following keys should be removed from S3: {keys_to_remove:?}") + if !result.warnings.is_empty() { + warn!("Timeline metadata warnings: {0:?}", result.warnings); } - branch_check_errors + if !result.garbage_keys.is_empty() { + error!( + "The following keys should be removed from S3: {0:?}", + result.garbage_keys + ) + } + + result } #[derive(Debug)] -struct S3TimelineBlobData { - blob_data: BlobDataParseResult, - keys_to_remove: Vec, +pub struct S3TimelineBlobData { + pub blob_data: BlobDataParseResult, + pub keys_to_remove: Vec, } #[derive(Debug)] -enum BlobDataParseResult { +pub enum BlobDataParseResult { Parsed { index_part: IndexPart, s3_layers: HashSet, @@ -290,7 +331,7 @@ enum BlobDataParseResult { Incorrect(Vec), } -async fn list_timeline_blobs( +pub async fn list_timeline_blobs( s3_client: &Client, id: TenantTimelineId, s3_root: &RootTarget, @@ -298,7 +339,7 @@ async fn list_timeline_blobs( let mut s3_layers = HashSet::new(); let mut index_part_object = None; - let timeline_dir_target = s3_root.timeline_root(id); + let timeline_dir_target = s3_root.timeline_root(&id); let mut continuation_token = None; let mut errors = Vec::new(); @@ -394,45 +435,3 @@ async fn list_timeline_blobs( keys_to_remove, }) } - -async fn download_object_with_retries( - s3_client: &Client, - bucket_name: &str, - key: &str, -) -> anyhow::Result> { - for _ in 0..MAX_RETRIES { - let mut body_buf = Vec::new(); - let response_stream = match s3_client - .get_object() - .bucket(bucket_name) - .key(key) - .send() - .await - { - Ok(response) => response, - Err(e) => { - error!("Failed to download object for key {key}: {e}"); - tokio::time::sleep(Duration::from_secs(1)).await; - continue; - } - }; - - match response_stream - .body - .into_async_read() - .read_to_end(&mut body_buf) - .await - { - Ok(bytes_read) => { - info!("Downloaded {bytes_read} bytes for object object with key {key}"); - return Ok(body_buf); - } - Err(e) => { - error!("Failed to stream object body for key {key}: {e}"); - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - } - - anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times") -} diff --git a/s3_scrubber/src/delete_batch_producer/timeline_batch.rs b/s3_scrubber/src/delete_batch_producer/timeline_batch.rs index 0a0b31ae8709..2ad522d3fb52 100644 --- a/s3_scrubber/src/delete_batch_producer/timeline_batch.rs +++ b/s3_scrubber/src/delete_batch_producer/timeline_batch.rs @@ -41,7 +41,7 @@ pub async fn schedule_cleanup_deleted_timelines( let new_stats = async move { let tenant_id_to_check = project_to_check.tenant; - let check_target = check_root.timelines_root(tenant_id_to_check); + let check_target = check_root.timelines_root(&tenant_id_to_check); let stats = super::process_s3_target_recursively( &check_s3_client, &check_target, diff --git a/s3_scrubber/src/lib.rs b/s3_scrubber/src/lib.rs index ea1338cf111a..f19a55efacb8 100644 --- a/s3_scrubber/src/lib.rs +++ b/s3_scrubber/src/lib.rs @@ -1,12 +1,15 @@ pub mod checks; pub mod cloud_admin_api; pub mod delete_batch_producer; +pub mod metadata_stream; mod s3_deletion; +pub mod scan_metadata; use std::env; use std::fmt::Display; use std::time::Duration; +use anyhow::Context; use aws_config::environment::EnvironmentVariableCredentialsProvider; use aws_config::imds::credentials::ImdsCredentialsProvider; use aws_config::meta::credentials::CredentialsProviderChain; @@ -14,7 +17,9 @@ use aws_config::sso::SsoCredentialsProvider; use aws_sdk_s3::config::Region; use aws_sdk_s3::{Client, Config}; +use reqwest::Url; pub use s3_deletion::S3Deleter; +use tokio::io::AsyncReadExt; use tracing::error; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -23,6 +28,8 @@ use utils::id::{TenantId, TenantTimelineId}; const MAX_RETRIES: usize = 20; const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN"; +pub const CLI_NAME: &str = "s3-scrubber"; + #[derive(Debug, Clone)] pub struct S3Target { pub bucket_name: String, @@ -69,19 +76,19 @@ impl RootTarget { } } - pub fn tenant_root(&self, tenant_id: TenantId) -> S3Target { + pub fn tenant_root(&self, tenant_id: &TenantId) -> S3Target { self.tenants_root().with_sub_segment(&tenant_id.to_string()) } - pub fn timelines_root(&self, tenant_id: TenantId) -> S3Target { + pub fn timelines_root(&self, tenant_id: &TenantId) -> S3Target { match self { Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"), Self::Safekeeper(_) => self.tenant_root(tenant_id), } } - pub fn timeline_root(&self, id: TenantTimelineId) -> S3Target { - self.timelines_root(id.tenant_id) + pub fn timeline_root(&self, id: &TenantTimelineId) -> S3Target { + self.timelines_root(&id.tenant_id) .with_sub_segment(&id.timeline_id.to_string()) } @@ -100,6 +107,55 @@ impl RootTarget { } } +pub struct BucketConfig { + pub region: String, + pub bucket: String, + + /// Use SSO if this is set, else rely on AWS_* environment vars + pub sso_account_id: Option, +} + +impl Display for BucketConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}/{}/{}", + self.sso_account_id.as_deref().unwrap_or(""), + self.region, + self.bucket + ) + } +} + +impl BucketConfig { + pub fn from_env() -> anyhow::Result { + let sso_account_id = env::var("SSO_ACCOUNT_ID").ok(); + let region = env::var("REGION").context("'REGION' param retrieval")?; + let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?; + + Ok(Self { + region, + bucket, + sso_account_id, + }) + } +} + +pub struct ConsoleConfig { + pub admin_api_url: Url, +} + +impl ConsoleConfig { + pub fn from_env() -> anyhow::Result { + let admin_api_url: Url = env::var("CLOUD_ADMIN_API_URL") + .context("'CLOUD_ADMIN_API_URL' param retrieval")? + .parse() + .context("'CLOUD_ADMIN_API_URL' param parsing")?; + + Ok(Self { admin_api_url }) + } +} + pub fn get_cloud_admin_api_token_or_exit() -> String { match env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR) { Ok(token) => token, @@ -114,23 +170,7 @@ pub fn get_cloud_admin_api_token_or_exit() -> String { } } -pub fn init_logging(binary_name: &str, dry_run: bool, node_kind: &str) -> WorkerGuard { - let file_name = if dry_run { - format!( - "{}_{}_{}__dry.log", - binary_name, - node_kind, - chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S") - ) - } else { - format!( - "{}_{}_{}.log", - binary_name, - node_kind, - chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S") - ) - }; - +pub fn init_logging(file_name: &str) -> WorkerGuard { let (file_writer, guard) = tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name)); @@ -140,7 +180,6 @@ pub fn init_logging(binary_name: &str, dry_run: bool, node_kind: &str) -> Worker .with_writer(file_writer); let stdout_logs = fmt::Layer::new() .with_target(false) - .with_ansi(atty::is(atty::Stream::Stdout)) .with_writer(std::io::stdout); tracing_subscriber::registry() .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"))) @@ -151,30 +190,43 @@ pub fn init_logging(binary_name: &str, dry_run: bool, node_kind: &str) -> Worker guard } -pub fn init_s3_client(account_id: String, bucket_region: Region) -> Client { +pub fn init_s3_client(account_id: Option, bucket_region: Region) -> Client { let credentials_provider = { // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY" - CredentialsProviderChain::first_try("env", EnvironmentVariableCredentialsProvider::new()) - // uses sso - .or_else( + let chain = CredentialsProviderChain::first_try( + "env", + EnvironmentVariableCredentialsProvider::new(), + ); + + // Use SSO if we were given an account ID + match account_id { + Some(sso_account) => chain.or_else( "sso", SsoCredentialsProvider::builder() - .account_id(account_id) + .account_id(sso_account) .role_name("PowerUserAccess") .start_url("https://neondb.awsapps.com/start") .region(Region::from_static("eu-central-1")) .build(), - ) - // uses imds v2 - .or_else("imds", ImdsCredentialsProvider::builder().build()) + ), + None => chain, + } + .or_else( + // Finally try IMDS + "imds", + ImdsCredentialsProvider::builder().build(), + ) }; - let config = Config::builder() + let mut builder = Config::builder() .region(bucket_region) - .credentials_provider(credentials_provider) - .build(); + .credentials_provider(credentials_provider); + + if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") { + builder = builder.endpoint_url(endpoint) + } - Client::from_conf(config) + Client::from_conf(builder.build()) } async fn list_objects_with_retries( @@ -202,3 +254,45 @@ async fn list_objects_with_retries( anyhow::bail!("Failed to list objects {MAX_RETRIES} times") } + +async fn download_object_with_retries( + s3_client: &Client, + bucket_name: &str, + key: &str, +) -> anyhow::Result> { + for _ in 0..MAX_RETRIES { + let mut body_buf = Vec::new(); + let response_stream = match s3_client + .get_object() + .bucket(bucket_name) + .key(key) + .send() + .await + { + Ok(response) => response, + Err(e) => { + error!("Failed to download object for key {key}: {e}"); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + }; + + match response_stream + .body + .into_async_read() + .read_to_end(&mut body_buf) + .await + { + Ok(bytes_read) => { + tracing::info!("Downloaded {bytes_read} bytes for object object with key {key}"); + return Ok(body_buf); + } + Err(e) => { + error!("Failed to stream object body for key {key}: {e}"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + + anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times") +} diff --git a/s3_scrubber/src/main.rs b/s3_scrubber/src/main.rs index 7004bcad5167..3c60723f888b 100644 --- a/s3_scrubber/src/main.rs +++ b/s3_scrubber/src/main.rs @@ -1,19 +1,18 @@ use std::collections::HashMap; -use std::env; use std::fmt::Display; use std::num::NonZeroUsize; use std::sync::Arc; use anyhow::Context; use aws_sdk_s3::config::Region; -use reqwest::Url; use s3_scrubber::cloud_admin_api::CloudAdminApiClient; use s3_scrubber::delete_batch_producer::DeleteBatchProducer; +use s3_scrubber::scan_metadata::scan_metadata; use s3_scrubber::{ - checks, get_cloud_admin_api_token_or_exit, init_logging, init_s3_client, RootTarget, S3Deleter, - S3Target, TraversingDepth, + checks, get_cloud_admin_api_token_or_exit, init_logging, init_s3_client, BucketConfig, + ConsoleConfig, RootTarget, S3Deleter, S3Target, TraversingDepth, CLI_NAME, }; -use tracing::{info, info_span, warn}; +use tracing::{info, warn}; use clap::{Parser, Subcommand, ValueEnum}; @@ -59,48 +58,7 @@ enum Command { #[arg(short, long, default_value_t = false)] skip_validation: bool, }, -} - -struct BucketConfig { - region: String, - bucket: String, - sso_account_id: String, -} - -impl Display for BucketConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}/{}/{}", self.sso_account_id, self.region, self.bucket) - } -} - -impl BucketConfig { - fn from_env() -> anyhow::Result { - let sso_account_id = - env::var("SSO_ACCOUNT_ID").context("'SSO_ACCOUNT_ID' param retrieval")?; - let region = env::var("REGION").context("'REGION' param retrieval")?; - let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?; - - Ok(Self { - region, - bucket, - sso_account_id, - }) - } -} - -struct ConsoleConfig { - admin_api_url: Url, -} - -impl ConsoleConfig { - fn from_env() -> anyhow::Result { - let admin_api_url: Url = env::var("CLOUD_ADMIN_API_URL") - .context("'CLOUD_ADMIN_API_URL' param retrieval")? - .parse() - .context("'CLOUD_ADMIN_API_URL' param parsing")?; - - Ok(Self { admin_api_url }) - } + ScanMetadata {}, } async fn tidy( @@ -111,13 +69,24 @@ async fn tidy( depth: TraversingDepth, skip_validation: bool, ) -> anyhow::Result<()> { - let binary_name = env::args() - .next() - .context("binary name in not the first argument")?; - let dry_run = !cli.delete; - let _guard = init_logging(&binary_name, dry_run, node_kind.as_str()); - let _main_span = info_span!("tidy", binary = %binary_name, %dry_run).entered(); + let file_name = if dry_run { + format!( + "{}_{}_{}__dry.log", + CLI_NAME, + node_kind, + chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S") + ) + } else { + format!( + "{}_{}_{}.log", + CLI_NAME, + node_kind, + chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S") + ) + }; + + let _guard = init_logging(&file_name); if dry_run { info!("Dry run, not removing items for real"); @@ -247,7 +216,7 @@ async fn main() -> anyhow::Result<()> { let bucket_config = BucketConfig::from_env()?; - match &cli.command { + match cli.command { Command::Tidy { node_kind, depth, @@ -258,11 +227,25 @@ async fn main() -> anyhow::Result<()> { &cli, bucket_config, console_config, - *node_kind, - *depth, - *skip_validation, + node_kind, + depth, + skip_validation, ) .await } + Command::ScanMetadata {} => match scan_metadata(bucket_config).await { + Err(e) => { + tracing::error!("Failed: {e}"); + Err(e) + } + Ok(summary) => { + println!("{}", summary.summary_string()); + if summary.is_fatal() { + Err(anyhow::anyhow!("Fatal scrub errors detected")) + } else { + Ok(()) + } + } + }, } } diff --git a/s3_scrubber/src/metadata_stream.rs b/s3_scrubber/src/metadata_stream.rs new file mode 100644 index 000000000000..4e500a96cf71 --- /dev/null +++ b/s3_scrubber/src/metadata_stream.rs @@ -0,0 +1,106 @@ +use anyhow::Context; +use async_stream::{stream, try_stream}; +use aws_sdk_s3::Client; +use tokio_stream::Stream; + +use crate::{list_objects_with_retries, RootTarget, TenantId}; +use utils::id::{TenantTimelineId, TimelineId}; + +/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2 +pub fn stream_tenants<'a>( + s3_client: &'a Client, + target: &'a RootTarget, +) -> impl Stream> + 'a { + try_stream! { + let mut continuation_token = None; + loop { + let tenants_target = target.tenants_root(); + let fetch_response = + list_objects_with_retries(s3_client, tenants_target, continuation_token.clone()).await?; + + let new_entry_ids = fetch_response + .common_prefixes() + .unwrap_or_default() + .iter() + .filter_map(|prefix| prefix.prefix()) + .filter_map(|prefix| -> Option<&str> { + prefix + .strip_prefix(&tenants_target.prefix_in_bucket)? + .strip_suffix('/') + }).map(|entry_id_str| { + entry_id_str + .parse() + .with_context(|| format!("Incorrect entry id str: {entry_id_str}")) + }); + + for i in new_entry_ids { + yield i?; + } + + match fetch_response.next_continuation_token { + Some(new_token) => continuation_token = Some(new_token), + None => break, + } + } + } +} + +/// Given a TenantId, output a stream of the timelines within that tenant, discovered +/// using ListObjectsv2. The listing is done before the stream is built, so that this +/// function can be used to generate concurrency on a stream using buffer_unordered. +pub async fn stream_tenant_timelines<'a>( + s3_client: &'a Client, + target: &'a RootTarget, + tenant: TenantId, +) -> anyhow::Result> + 'a> { + let mut timeline_ids: Vec> = Vec::new(); + let mut continuation_token = None; + let timelines_target = target.timelines_root(&tenant); + + loop { + tracing::info!("Listing in {}", tenant); + let fetch_response = + list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone()) + .await; + let fetch_response = match fetch_response { + Err(e) => { + timeline_ids.push(Err(e)); + break; + } + Ok(r) => r, + }; + + let new_entry_ids = fetch_response + .common_prefixes() + .unwrap_or_default() + .iter() + .filter_map(|prefix| prefix.prefix()) + .filter_map(|prefix| -> Option<&str> { + prefix + .strip_prefix(&timelines_target.prefix_in_bucket)? + .strip_suffix('/') + }) + .map(|entry_id_str| { + entry_id_str + .parse::() + .with_context(|| format!("Incorrect entry id str: {entry_id_str}")) + }); + + for i in new_entry_ids { + timeline_ids.push(i); + } + + match fetch_response.next_continuation_token { + Some(new_token) => continuation_token = Some(new_token), + None => break, + } + } + + tracing::info!("Yielding for {}", tenant); + Ok(stream! { + for i in timeline_ids { + let id = i?; + yield Ok(TenantTimelineId::new(tenant, id)); + } + }) +} diff --git a/s3_scrubber/src/s3_deletion.rs b/s3_scrubber/src/s3_deletion.rs index 716443790b9a..a03cc65c8913 100644 --- a/s3_scrubber/src/s3_deletion.rs +++ b/s3_scrubber/src/s3_deletion.rs @@ -164,7 +164,7 @@ async fn delete_tenants_batch( s3_target, s3_client, dry_run, - |root_target, tenant_to_delete| root_target.tenant_root(tenant_to_delete), + |root_target, tenant_to_delete| root_target.tenant_root(&tenant_to_delete), ) .await?; @@ -215,7 +215,7 @@ async fn delete_timelines_batch( s3_target, s3_client, dry_run, - |root_target, timeline_to_delete| root_target.timeline_root(timeline_to_delete), + |root_target, timeline_to_delete| root_target.timeline_root(&timeline_to_delete), ) .await?; @@ -386,7 +386,7 @@ async fn ensure_tenant_batch_deleted( for &tenant_id in batch { let fetch_response = - list_objects_with_retries(s3_client, &s3_target.tenant_root(tenant_id), None).await?; + list_objects_with_retries(s3_client, &s3_target.tenant_root(&tenant_id), None).await?; if fetch_response.is_truncated() || fetch_response.contents().is_some() @@ -415,7 +415,7 @@ async fn ensure_timeline_batch_deleted( for &id in batch { let fetch_response = - list_objects_with_retries(s3_client, &s3_target.timeline_root(id), None).await?; + list_objects_with_retries(s3_client, &s3_target.timeline_root(&id), None).await?; if fetch_response.is_truncated() || fetch_response.contents().is_some() diff --git a/s3_scrubber/src/scan_metadata.rs b/s3_scrubber/src/scan_metadata.rs new file mode 100644 index 000000000000..f75d7645a88b --- /dev/null +++ b/s3_scrubber/src/scan_metadata.rs @@ -0,0 +1,234 @@ +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use crate::checks::{ + branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData, + TimelineAnalysis, +}; +use crate::metadata_stream::{stream_tenant_timelines, stream_tenants}; +use crate::{init_logging, init_s3_client, BucketConfig, RootTarget, S3Target, CLI_NAME}; +use aws_sdk_s3::Client; +use aws_types::region::Region; +use futures_util::{pin_mut, StreamExt, TryStreamExt}; +use histogram::Histogram; +use pageserver::tenant::{IndexPart, TENANTS_SEGMENT_NAME}; +use utils::id::TenantTimelineId; + +pub struct MetadataSummary { + count: usize, + with_errors: HashSet, + with_warnings: HashSet, + with_garbage: HashSet, + indices_by_version: HashMap, + + layer_count: MinMaxHisto, + timeline_size_bytes: MinMaxHisto, + layer_size_bytes: MinMaxHisto, +} + +/// A histogram plus minimum and maximum tracking +struct MinMaxHisto { + histo: Histogram, + min: u64, + max: u64, +} + +impl MinMaxHisto { + fn new() -> Self { + Self { + histo: histogram::Histogram::builder() + .build() + .expect("Bad histogram params"), + min: u64::MAX, + max: 0, + } + } + + fn sample(&mut self, v: u64) -> Result<(), histogram::Error> { + self.min = std::cmp::min(self.min, v); + self.max = std::cmp::max(self.max, v); + let r = self.histo.increment(v, 1); + + if r.is_err() { + tracing::warn!("Bad histogram sample: {v}"); + } + + r + } + + fn oneline(&self) -> String { + let percentiles = match self.histo.percentiles(&[1.0, 10.0, 50.0, 90.0, 99.0]) { + Ok(p) => p, + Err(e) => return format!("No data: {}", e), + }; + + let percentiles: Vec = percentiles + .iter() + .map(|p| p.bucket().low() + p.bucket().high() / 2) + .collect(); + + format!( + "min {}, 1% {}, 10% {}, 50% {}, 90% {}, 99% {}, max {}", + self.min, + percentiles[0], + percentiles[1], + percentiles[2], + percentiles[3], + percentiles[4], + self.max, + ) + } +} + +impl MetadataSummary { + fn new() -> Self { + Self { + count: 0, + with_errors: HashSet::new(), + with_warnings: HashSet::new(), + with_garbage: HashSet::new(), + indices_by_version: HashMap::new(), + layer_count: MinMaxHisto::new(), + timeline_size_bytes: MinMaxHisto::new(), + layer_size_bytes: MinMaxHisto::new(), + } + } + + fn update_histograms(&mut self, index_part: &IndexPart) -> Result<(), histogram::Error> { + self.layer_count + .sample(index_part.layer_metadata.len() as u64)?; + let mut total_size: u64 = 0; + for meta in index_part.layer_metadata.values() { + total_size += meta.file_size; + self.layer_size_bytes.sample(meta.file_size)?; + } + self.timeline_size_bytes.sample(total_size)?; + + Ok(()) + } + + fn update_data(&mut self, data: &S3TimelineBlobData) { + self.count += 1; + if let BlobDataParseResult::Parsed { + index_part, + s3_layers: _, + } = &data.blob_data + { + *self + .indices_by_version + .entry(index_part.get_version()) + .or_insert(0) += 1; + + if let Err(e) = self.update_histograms(index_part) { + // Value out of range? Warn that the results are untrustworthy + tracing::warn!( + "Error updating histograms, summary stats may be wrong: {}", + e + ); + } + } + } + + fn update_analysis(&mut self, id: &TenantTimelineId, analysis: &TimelineAnalysis) { + if !analysis.errors.is_empty() { + self.with_errors.insert(*id); + } + + if !analysis.warnings.is_empty() { + self.with_warnings.insert(*id); + } + } + + /// Long-form output for printing at end of a scan + pub fn summary_string(&self) -> String { + let version_summary: String = itertools::join( + self.indices_by_version + .iter() + .map(|(k, v)| format!("{k}: {v}")), + ", ", + ); + + format!( + "Timelines: {0} +With errors: {1} +With warnings: {2} +With garbage: {3} +Index versions: {version_summary} +Timeline size bytes: {4} +Layer size bytes: {5} +Timeline layer count: {6} +", + self.count, + self.with_errors.len(), + self.with_warnings.len(), + self.with_garbage.len(), + self.timeline_size_bytes.oneline(), + self.layer_size_bytes.oneline(), + self.layer_count.oneline(), + ) + } + + pub fn is_fatal(&self) -> bool { + !self.with_errors.is_empty() + } +} + +/// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics. +pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result { + let file_name = format!( + "{}_scan_metadata_{}_{}.log", + CLI_NAME, + bucket_config.bucket, + chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S") + ); + + let _guard = init_logging(&file_name); + + let s3_client = Arc::new(init_s3_client( + bucket_config.sso_account_id, + Region::new(bucket_config.region), + )); + let delimiter = "/"; + let target = RootTarget::Pageserver(S3Target { + bucket_name: bucket_config.bucket.to_string(), + prefix_in_bucket: ["pageserver", "v1", TENANTS_SEGMENT_NAME, ""].join(delimiter), + delimiter: delimiter.to_string(), + }); + + let tenants = stream_tenants(&s3_client, &target); + + // How many tenants to process in parallel. We need to be mindful of pageservers + // accessing the same per tenant prefixes, so use a lower setting than pageservers. + const CONCURRENCY: usize = 32; + + // Generate a stream of TenantTimelineId + let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t)); + let timelines = timelines.try_buffer_unordered(CONCURRENCY); + let timelines = timelines.try_flatten(); + + // Generate a stream of S3TimelineBlobData + async fn report_on_timeline( + s3_client: &Client, + target: &RootTarget, + ttid: TenantTimelineId, + ) -> anyhow::Result<(TenantTimelineId, S3TimelineBlobData)> { + let data = list_timeline_blobs(s3_client, ttid, target).await?; + Ok((ttid, data)) + } + let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid)); + let timelines = timelines.try_buffer_unordered(CONCURRENCY); + + let mut summary = MetadataSummary::new(); + pin_mut!(timelines); + while let Some(i) = timelines.next().await { + let (ttid, data) = i?; + summary.update_data(&data); + + let analysis = + branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data)).await; + + summary.update_analysis(&ttid, &analysis); + } + + Ok(summary) +} diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 3936736e5660..4ea1d7970578 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -415,6 +415,7 @@ def __init__( pg_distrib_dir: Path, pg_version: PgVersion, test_name: str, + test_output_dir: Path, remote_storage: Optional[RemoteStorage] = None, remote_storage_users: RemoteStorageUsers = RemoteStorageUsers.PAGESERVER, pageserver_config_override: Optional[str] = None, @@ -455,6 +456,8 @@ def __init__( self.preserve_database_files = preserve_database_files self.initial_tenant = initial_tenant or TenantId.generate() self.initial_timeline = initial_timeline or TimelineId.generate() + self.scrub_on_exit = False + self.test_output_dir = test_output_dir assert test_name.startswith( "test_" @@ -489,6 +492,23 @@ def init_start(self, initial_tenant_conf: Optional[Dict[str, str]] = None) -> Ne return env + def enable_scrub_on_exit(self): + """ + Call this if you would like the fixture to automatically run + s3_scrubber at the end of the test, as a bidirectional test + that the scrubber is working properly, and that the code within + the test didn't produce any invalid remote state. + """ + + if not isinstance(self.remote_storage, S3Storage): + # The scrubber can't talk to e.g. LocalFS -- it needs + # an HTTP endpoint (mock is fine) to connect to. + raise RuntimeError( + "Cannot scrub with remote_storage={self.remote_storage}, require an S3 endpoint" + ) + + self.scrub_on_exit = True + def enable_remote_storage( self, remote_storage_kind: RemoteStorageKind, @@ -721,11 +741,20 @@ def __exit__( self.env.pageserver.stop(immediate=True) cleanup_error = None + + if self.scrub_on_exit: + try: + S3Scrubber(self.test_output_dir, self).scan_metadata() + except Exception as e: + log.error(f"Error during remote storage scrub: {e}") + cleanup_error = e + try: self.cleanup_remote_storage() except Exception as e: log.error(f"Error during remote storage cleanup: {e}") - cleanup_error = e + if cleanup_error is not None: + cleanup_error = e try: self.cleanup_local_storage() @@ -929,6 +958,7 @@ def _shared_simple_env( default_broker: NeonBroker, run_id: uuid.UUID, top_output_dir: Path, + test_output_dir: Path, neon_binpath: Path, pg_distrib_dir: Path, pg_version: PgVersion, @@ -957,6 +987,7 @@ def _shared_simple_env( run_id=run_id, preserve_database_files=pytestconfig.getoption("--preserve-database-files"), test_name=request.node.name, + test_output_dir=test_output_dir, ) as builder: env = builder.init_start() @@ -984,7 +1015,7 @@ def neon_simple_env(_shared_simple_env: NeonEnv) -> Iterator[NeonEnv]: @pytest.fixture(scope="function") def neon_env_builder( pytestconfig: Config, - test_output_dir: str, + test_output_dir: Path, port_distributor: PortDistributor, mock_s3_server: MockS3Server, neon_binpath: Path, @@ -1022,6 +1053,7 @@ def neon_env_builder( run_id=run_id, preserve_database_files=pytestconfig.getoption("--preserve-database-files"), test_name=request.node.name, + test_output_dir=test_output_dir, ) as builder: yield builder @@ -1728,7 +1760,10 @@ def run_capture( self._fixpath(command) log.info(f"Running command '{' '.join(command)}'") env = self._build_env(env) - return subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True, **kwargs) + base_path, _, _ = subprocess_capture( + self.log_dir, command, env=env, cwd=cwd, check=True, **kwargs + ) + return base_path @pytest.fixture(scope="function") @@ -2734,6 +2769,41 @@ def get_metrics(self) -> SafekeeperMetrics: return metrics +class S3Scrubber: + def __init__(self, log_dir: Path, env: NeonEnvBuilder): + self.env = env + self.log_dir = log_dir + + def scrubber_cli(self, args, timeout): + assert isinstance(self.env.remote_storage, S3Storage) + s3_storage = self.env.remote_storage + + env = { + "REGION": s3_storage.bucket_region, + "BUCKET": s3_storage.bucket_name, + } + env.update(s3_storage.access_env_vars()) + + if s3_storage.endpoint is not None: + env.update({"AWS_ENDPOINT_URL": s3_storage.endpoint}) + + base_args = [self.env.neon_binpath / "s3_scrubber"] + args = base_args + args + + (output_path, _, status_code) = subprocess_capture( + self.log_dir, args, echo_stderr=True, echo_stdout=True, env=env, check=False + ) + if status_code: + log.warning(f"Scrub command {args} failed") + log.warning(f"Scrub environment: {env}") + log.warning(f"Output at: {output_path}") + + raise RuntimeError("Remote storage scrub failed") + + def scan_metadata(self): + self.scrubber_cli(["scan-metadata"], timeout=30) + + def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path: """Compute the working directory for an individual test.""" test_name = request.node.name diff --git a/test_runner/fixtures/remote_storage.py b/test_runner/fixtures/remote_storage.py index 320e658639c2..a68257bbace5 100644 --- a/test_runner/fixtures/remote_storage.py +++ b/test_runner/fixtures/remote_storage.py @@ -88,6 +88,19 @@ def available_s3_storages() -> List[RemoteStorageKind]: return remote_storages +def s3_storage() -> RemoteStorageKind: + """ + For tests that require a remote storage impl that exposes an S3 + endpoint, but don't want to parametrize over multiple storage types. + + Use real S3 if available, else use MockS3 + """ + if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None: + return RemoteStorageKind.REAL_S3 + else: + return RemoteStorageKind.MOCK_S3 + + @dataclass class LocalFsStorage: root: Path diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index d03d2e7595f4..46ab446f99c6 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -4,9 +4,10 @@ import re import subprocess import tarfile +import threading import time from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Tuple, TypeVar +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, TypeVar from urllib.parse import urlencode import allure @@ -26,34 +27,100 @@ def get_self_dir() -> Path: return Path(__file__).resolve().parent -def subprocess_capture(capture_dir: Path, cmd: List[str], **kwargs: Any) -> str: - """Run a process and capture its output +def subprocess_capture( + capture_dir: Path, + cmd: List[str], + *, + check=False, + echo_stderr=False, + echo_stdout=False, + capture_stdout=False, + **kwargs: Any, +) -> Tuple[str, Optional[str], int]: + """Run a process and bifurcate its output to files and the `log` logger - Output will go to files named "cmd_NNN.stdout" and "cmd_NNN.stderr" + stderr and stdout are always captured in files. They are also optionally + echoed to the log (echo_stderr, echo_stdout), and/or captured and returned + (capture_stdout). + + File output will go to files named "cmd_NNN.stdout" and "cmd_NNN.stderr" where "cmd" is the name of the program and NNN is an incrementing counter. If those files already exist, we will overwrite them. - Returns basepath for files with captured output. + + Returns 3-tuple of: + - The base path for output files + - Captured stdout, or None + - The exit status of the process """ assert isinstance(cmd, list) - base = f"{os.path.basename(cmd[0])}_{global_counter()}" + base_cmd = os.path.basename(cmd[0]) + base = f"{base_cmd}_{global_counter()}" basepath = os.path.join(capture_dir, base) stdout_filename = f"{basepath}.stdout" stderr_filename = f"{basepath}.stderr" + # Since we will stream stdout and stderr concurrently, need to do it in a thread. + class OutputHandler(threading.Thread): + def __init__(self, in_file, out_file, echo: bool, capture: bool): + super().__init__() + self.in_file = in_file + self.out_file = out_file + self.echo = echo + self.capture = capture + self.captured = "" + + def run(self): + for line in self.in_file: + # Only bother decoding if we are going to do something more than stream to a file + if self.echo or self.capture: + string = line.decode(encoding="utf-8", errors="replace") + + if self.echo: + log.info(string) + + if self.capture: + self.captured += string + + self.out_file.write(line) + + captured = None try: - with open(stdout_filename, "w") as stdout_f: - with open(stderr_filename, "w") as stderr_f: + with open(stdout_filename, "wb") as stdout_f: + with open(stderr_filename, "wb") as stderr_f: log.info(f'Capturing stdout to "{base}.stdout" and stderr to "{base}.stderr"') - subprocess.run(cmd, **kwargs, stdout=stdout_f, stderr=stderr_f) + + p = subprocess.Popen( + cmd, + **kwargs, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout_handler = OutputHandler( + p.stdout, stdout_f, echo=echo_stdout, capture=capture_stdout + ) + stdout_handler.start() + stderr_handler = OutputHandler(p.stderr, stderr_f, echo=echo_stderr, capture=False) + stderr_handler.start() + + r = p.wait() + + stdout_handler.join() + stderr_handler.join() + + if check and r != 0: + raise subprocess.CalledProcessError(r, " ".join(cmd)) + + if capture_stdout: + captured = stdout_handler.captured finally: # Remove empty files if there is no output for filename in (stdout_filename, stderr_filename): if os.stat(filename).st_size == 0: os.remove(filename) - return basepath + return (basepath, captured, r) _global_counter = 0 diff --git a/test_runner/regress/test_pageserver_restart.py b/test_runner/regress/test_pageserver_restart.py index 1e41ebd15b59..567200ba1f75 100644 --- a/test_runner/regress/test_pageserver_restart.py +++ b/test_runner/regress/test_pageserver_restart.py @@ -3,11 +3,15 @@ import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.remote_storage import s3_storage # Test restarting page server, while safekeeper and compute node keep # running. def test_pageserver_restart(neon_env_builder: NeonEnvBuilder): + neon_env_builder.enable_remote_storage(remote_storage_kind=s3_storage()) + neon_env_builder.enable_scrub_on_exit() + env = neon_env_builder.init_start() env.neon_cli.create_branch("test_pageserver_restart") @@ -109,6 +113,9 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder): # safekeeper and compute node keep running. @pytest.mark.timeout(540) def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder): + neon_env_builder.enable_remote_storage(remote_storage_kind=s3_storage()) + neon_env_builder.enable_scrub_on_exit() + env = neon_env_builder.init_start() # Use a tiny checkpoint distance, to create a lot of layers quickly.