diff --git a/hyperactor/src/config.rs b/hyperactor/src/config.rs index 3de17fa99..ff628edcd 100644 --- a/hyperactor/src/config.rs +++ b/hyperactor/src/config.rs @@ -37,49 +37,49 @@ declare_attrs! { /// Timeout used by allocator for stopping a proc. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string()), - py_name: None, + py_name: Some("process_exit_timeout".to_string()), }) pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10); /// Message acknowledgment interval @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string()), - py_name: None, + py_name: Some("message_ack_time_interval".to_string()), }) pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500); /// Number of messages after which to send an acknowledgment @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string()), - py_name: None, + py_name: Some("message_ack_every_n_messages".to_string()), }) pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000; /// Default hop Time-To-Live for message envelopes. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string()), - py_name: None, + py_name: Some("message_ttl_default".to_string()), }) pub attr MESSAGE_TTL_DEFAULT : u8 = 64; /// Maximum buffer size for split port messages @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string()), - py_name: None, + py_name: Some("split_max_buffer_size".to_string()), }) pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5; /// The maximum time an update can be buffered before being reduced. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string()), - py_name: None, + py_name: Some("split_max_buffer_age".to_string()), }) pub attr SPLIT_MAX_BUFFER_AGE: Duration = Duration::from_millis(50); /// Timeout used by proc mesh for stopping an actor. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string()), - py_name: None, + py_name: Some("stop_actor_timeout".to_string()), }) pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(10); @@ -87,7 +87,7 @@ declare_attrs! { /// Should be less than the timeout for STOP_ACTOR_TIMEOUT. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_CLEANUP_TIMEOUT".to_string()), - py_name: None, + py_name: Some("cleanup_timeout".to_string()), }) pub attr CLEANUP_TIMEOUT: Duration = Duration::from_secs(3); @@ -96,21 +96,21 @@ declare_attrs! { /// deprecation. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string()), - py_name: None, + py_name: Some("remote_allocator_heartbeat_interval".to_string()), }) pub attr REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL: Duration = Duration::from_mins(5); /// The default encoding to be used. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_DEFAULT_ENCODING".to_string()), - py_name: None, + py_name: Some("default_encoding".to_string()), }) pub attr DEFAULT_ENCODING: Encoding = Encoding::Multipart; /// How often to check for full MPSC channel on NetRx. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()), - py_name: None, + py_name: Some("channel_net_rx_buffer_full_check_interval".to_string()), }) pub attr CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL: Duration = Duration::from_secs(5); @@ -118,11 +118,15 @@ declare_attrs! { /// Set to 0.01 for 1% sampling, 0.1 for 10% sampling, 0.90 for 90% sampling, etc. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string()), - py_name: None, + py_name: Some("message_latency_sampling_rate".to_string()), }) pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01; /// Whether to enable client sequence assignment. + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_ENABLE_CLIENT_SEQ_ASSIGNMENT".to_string()), + py_name: Some("enable_client_seq_assignment".to_string()), + }) pub attr ENABLE_CLIENT_SEQ_ASSIGNMENT: bool = false; /// Timeout for [`Host::spawn`] to await proc readiness. diff --git a/hyperactor_config/src/global.rs b/hyperactor_config/src/global.rs index 2346c1955..f7e77255c 100644 --- a/hyperactor_config/src/global.rs +++ b/hyperactor_config/src/global.rs @@ -10,7 +10,8 @@ //! //! This module provides the process-wide configuration store and APIs //! to access it. Configuration values are resolved via a **layered -//! model**: `TestOverride → Runtime → Env → File → Default`. +//! model**: `TestOverride → Env → Runtime → File → ClientOverride → +//! Default`. //! //! - Reads (`get`, `get_cloned`) consult layers in that order, falling //! back to defaults if no explicit value is set. @@ -21,14 +22,14 @@ //! that are removed automatically when the guard drops. //! - In normal operation, a parent process can capture its effective //! config via `attrs()` and pass that snapshot to a child during -//! bootstrap. The child installs it as a `Runtime` layer so the -//! parent's values take precedence over Env/File/Defaults. +//! bootstrap. The child installs it as a `ClientOverride` layer. +//! Note that Env and Runtime layers will take precedence over this +//! inherited configuration. //! //! This design provides flexibility (easy test overrides, runtime //! updates, YAML/Env baselines) while ensuring type safety and //! predictable resolution order. //! -//! //! # Testing //! //! Tests can override global configuration using [`lock`]. This @@ -62,7 +63,7 @@ use crate::from_yaml; /// Configuration source layers in priority order. /// -/// Resolution order is always: **TestOverride -> Runtime -> Env +/// Resolution order is always: **TestOverride -> Env -> Runtime /// -> File -> ClientOverride -> Default**. /// /// Smaller `priority()` number = higher precedence. @@ -74,11 +75,11 @@ pub enum Source { /// Values loaded from configuration files (e.g., YAML). File, /// Values read from environment variables at process startup. - /// Higher priority than File and ClientOverride, but lower than - /// Runtime/TestOverride. + /// Higher priority than Runtime, File, and ClientOverride, but + /// lower than TestOverride. Env, - /// Values set programmatically at runtime. Highest stable - /// priority layer; only overridden by TestOverride. + /// Values set programmatically at runtime. High-priority layer + /// but overridden by Env and TestOverride. Runtime, /// Ephemeral values inserted by tests via /// `ConfigLock::override_key`. Always wins over all other @@ -88,13 +89,14 @@ pub enum Source { /// Return the numeric priority for a source. /// -/// Smaller number = higher precedence. Matches the documented -/// order: TestOverride (0) -> Runtime (1) -> Env (2) -> File (3) -> ClientOverride (4). +/// Smaller number = higher precedence. Matches the documented order: +/// TestOverride (0) -> Env (1) -> Runtime (2) -> File (3) -> +/// ClientOverride (4). fn priority(s: Source) -> u8 { match s { Source::TestOverride => 0, - Source::Runtime => 1, - Source::Env => 2, + Source::Env => 1, + Source::Runtime => 2, Source::File => 3, Source::ClientOverride => 4, } @@ -105,9 +107,9 @@ fn priority(s: Source) -> u8 { /// `Layers` wraps a vector of [`Layer`]s, always kept sorted by /// [`priority`] (lowest number = highest precedence). /// -/// Resolution (`get`, `get_cloned`, `attrs`) consults `ordered` -/// from front to back, returning the first value found for each -/// key and falling back to defaults if none are set in any layer. +/// Resolution (`get`, `get_cloned`, `attrs`) consults `ordered` from +/// front to back, returning the first value found for each key and +/// falling back to defaults if none are set in any layer. struct Layers { /// Kept sorted by `priority` (lowest number first = highest /// priority). @@ -116,18 +118,17 @@ struct Layers { /// A single configuration layer in the global configuration model. /// -/// Layers are consulted in priority order (`TestOverride → Runtime → -/// Env → File → Default`) when resolving configuration values. Each -/// variant holds an [`Attrs`] map of key/value pairs. +/// Layers are consulted in priority order (`TestOverride → Env → +/// Runtime → File → ClientOverride → Default`) when resolving +/// configuration values. Each variant holds an [`Attrs`] map of +/// key/value pairs. /// /// The `TestOverride` variant additionally maintains per-key override -/// stacks to support nested and out-of-order test overrides. These -/// stacks are currently placeholders for a future refactor; for now, -/// only the `attrs` field is used. +/// stacks to support nested and out-of-order test overrides. /// /// Variants: -/// - [`Layer::ClientOverride`] - Values set by the config snapshot sent from the client -/// during proc bootstrap. +/// - [`Layer::ClientOverride`] - Values set by the config snapshot +/// sent from the client during proc bootstrap. /// - [`Layer::File`] — Values loaded from configuration files. /// - [`Layer::Env`] — Values sourced from process environment /// variables. @@ -136,7 +137,8 @@ struct Layers { /// under [`ConfigLock`]. /// /// Layers are stored in [`Layers::ordered`], kept sorted by their -/// effective [`Source`] priority (`TestOverride` first, `Default` last). +/// effective [`Source`] priority (`TestOverride` first, `Default` +/// last). enum Layer { /// Values set by the config snapshot sent from the client /// during proc bootstrap. @@ -149,15 +151,16 @@ enum Layer { /// installed at startup via [`init_from_env`]. Env(Attrs), - /// Values set programmatically at runtime. Stable high-priority - /// layer used by parent/child bootstrap and dynamic updates. + /// Values set programmatically at runtime. High-priority layer + /// used for dynamic updates (e.g., Python `configure()` API), but + /// overridden by Env and TestOverride layers. Runtime(Attrs), /// Ephemeral values inserted during tests via /// [`ConfigLock::override_key`]. Always takes precedence over all - /// other layers. Currently holds both the active `attrs` map and - /// a per-key `stacks` table (used to support nested or - /// out-of-order test overrides in future refactors). + /// other layers. Holds both the active `attrs` map and a per-key + /// `stacks` table to support nested and out-of-order test + /// overrides. TestOverride { attrs: Attrs, stacks: HashMap<&'static str, OverrideStack>, @@ -179,10 +182,6 @@ enum Layer { /// first override was applied (or `None` if it did not exist). /// - `frames`: The stack of active override frames, with the top /// being the last element in the vector. -/// -/// The full stack mechanism is not yet active; it is introduced -/// incrementally to prepare for robust out-of-order test override -/// restoration. struct OverrideStack { /// The name of the process environment variable associated with /// this configuration key, if any. @@ -244,9 +243,9 @@ struct OverrideFrame { /// Return the [`Source`] corresponding to a given [`Layer`]. /// /// This provides a uniform way to retrieve a layer's logical source -/// (File, Env, Runtime, or TestOverride) regardless of its internal -/// representation. Used for sorting layers by priority and for -/// source-based lookups or removals. +/// (File, Env, Runtime, TestOverride, or ClientOverride) regardless +/// of its internal representation. Used for sorting layers by +/// priority and for source-based lookups or removals. fn layer_source(l: &Layer) -> Source { match l { Layer::File(_) => Source::File, @@ -301,24 +300,24 @@ fn test_override_index(layers: &Layers) -> Option { /// Global layered configuration store. /// -/// This is the single authoritative store for configuration in -/// the process. It is always present, protected by an `RwLock`, -/// and holds a [`Layers`] struct containing all active sources. +/// This is the single authoritative store for configuration in the +/// process. It is always present, protected by an `RwLock`, and holds +/// a [`Layers`] struct containing all active sources. /// /// On startup it is seeded with a single [`Source::Env`] layer /// (values loaded from process environment variables). Additional /// layers can be installed later via [`set`] or cleared with -/// [`clear`]. Reads (`get`, `get_cloned`, `attrs`) consult the -/// layers in priority order. +/// [`clear`]. Reads (`get`, `get_cloned`, `attrs`) consult the layers +/// in priority order. /// -/// In tests, a [`Source::TestOverride`] layer is pushed on demand -/// by [`ConfigLock::override_key`]. This layer always takes -/// precedence and is automatically removed when the guard drops. +/// In tests, a [`Source::TestOverride`] layer is pushed on demand by +/// [`ConfigLock::override_key`]. This layer always takes precedence +/// and is automatically removed when the guard drops. /// -/// In normal operation, a parent process may capture its config -/// with [`attrs`] and pass it to a child during bootstrap. The -/// child installs this snapshot as its [`Source::Runtime`] layer, -/// ensuring the parent's values override Env/File/Defaults. +/// In normal operation, a parent process may capture its config with +/// [`attrs`] and pass it to a child during bootstrap. The child +/// installs this snapshot as its [`Source::ClientOverride`] layer, +/// which has the lowest precedence among explicit layers. static LAYERS: LazyLock>> = LazyLock::new(|| { let env = from_env(); let layers = Layers { @@ -339,17 +338,15 @@ static OVERRIDE_TOKEN_SEQ: AtomicU64 = AtomicU64::new(1); /// Acquire the global configuration lock. /// -/// This lock serializes all mutations of the global -/// configuration, ensuring they cannot clobber each other. It -/// returns a [`ConfigLock`] guard, which must be held for the -/// duration of any mutation (e.g. inserting or overriding -/// values). +/// This lock serializes all mutations of the global configuration, +/// ensuring they cannot clobber each other. It returns a +/// [`ConfigLock`] guard, which must be held for the duration of any +/// mutation (e.g. inserting or overriding values). /// -/// Most commonly used in tests, where it provides exclusive -/// access to push a [`Source::TestOverride`] layer via -/// [`ConfigLock::override_key`]. The override layer is -/// automatically removed when the guard drops, restoring the -/// original state. +/// Most commonly used in tests, where it provides exclusive access to +/// push a [`Source::TestOverride`] layer via +/// [`ConfigLock::override_key`]. The override layer is automatically +/// removed when the guard drops, restoring the original state. /// /// # Example /// ```rust,ignore @@ -371,7 +368,7 @@ pub fn lock() -> ConfigLock { /// `CONFIG.env_name` (from `@meta(CONFIG = ConfigAttr { … })`) to /// determine its mapping. The resulting values are installed as the /// [`Source::Env`] layer. Keys without a corresponding environment -/// variable fall back to defaults or higher-priority sources. +/// variable fall back to lower-priority sources or defaults. /// /// Typically invoked once at process startup to overlay config values /// from the environment. Repeated calls replace the existing Env @@ -382,15 +379,13 @@ pub fn init_from_env() { /// Initialize the global configuration from a YAML file. /// -/// Loads values from the specified YAML file and installs them as -/// the [`Source::File`] layer. This is the lowest-priority -/// explicit source: values from Env, Runtime, or TestOverride -/// layers always take precedence. Keys not present in the file -/// fall back to their defaults or higher-priority sources. +/// Loads values from the specified YAML file and installs them as the +/// [`Source::File`] layer. During resolution, File is consulted after +/// TestOverride, Env, and Runtime layers, and before ClientOverride +/// and defaults. /// -/// Typically invoked once at process startup to provide a -/// baseline configuration. Repeated calls replace the existing -/// File layer. +/// Typically invoked once at process startup to provide a baseline +/// configuration. Repeated calls replace the existing File layer. pub fn init_from_yaml>(path: P) -> Result<(), anyhow::Error> { let file = from_yaml(path)?; set(Source::File, file); @@ -399,9 +394,9 @@ pub fn init_from_yaml>(path: P) -> Result<(), anyhow::Error> { /// Get a key from the global configuration (Copy types). /// -/// Resolution order: TestOverride -> Runtime -> Env -> File -> -/// Default. Panics if the key has no default and is not set in -/// any layer. +/// Resolution order: TestOverride -> Env -> Runtime -> File -> +/// ClientOverride -> Default. Panics if the key has no default and is +/// not set in any layer. pub fn get(key: Key) -> T { let layers = LAYERS.read().unwrap(); for layer in &layers.ordered { @@ -432,9 +427,9 @@ pub fn override_or_global(overrides: &Attrs, key: Key) - /// Get a key by cloning the value. /// -/// Resolution order: TestOverride -> Runtime -> Env -> File -> -/// Default. Panics if the key has no default and is not set in -/// any layer. +/// Resolution order: TestOverride -> Env -> Runtime -> File -> +/// ClientOverride -> Default. Panics if the key has no default and +/// is not set in any layer. pub fn get_cloned(key: Key) -> T { try_get_cloned(key) .expect("key must have a default") @@ -443,9 +438,9 @@ pub fn get_cloned(key: Key) -> T { /// Try to get a key by cloning the value. /// -/// Resolution order: TestOverride -> Runtime -> Env -> File -> -/// Default. Returns None if the key has no default and is not set in -/// any layer. +/// Resolution order: TestOverride -> Env -> Runtime -> File -> +/// ClientOverride -> Default. Returns None if the key has no default +/// and is not set in any layer. pub fn try_get_cloned(key: Key) -> Option { let layers = LAYERS.read().unwrap(); for layer in &layers.ordered { @@ -477,16 +472,16 @@ fn make_layer(source: Source, attrs: Attrs) -> Layer { /// Insert or replace a configuration layer for the given source. /// -/// If a layer with the same [`Source`] already exists, its -/// contents are replaced with the provided `attrs`. Otherwise a -/// new layer is added. After insertion, layers are re-sorted so -/// that higher-priority sources (e.g. [`Source::TestOverride`], -/// [`Source::Runtime`]) appear before lower-priority ones -/// ([`Source::Env`], [`Source::File`]). +/// If a layer with the same [`Source`] already exists, its contents +/// are replaced with the provided `attrs`. Otherwise a new layer is +/// added. After insertion, layers are re-sorted so that +/// higher-priority sources (e.g. [`Source::TestOverride`], +/// [`Source::Env`]) appear before lower-priority ones +/// ([`Source::Runtime`], [`Source::File`]). /// /// This function is used by initialization routines (e.g. -/// `init_from_env`, `init_from_yaml`) and by tests when -/// overriding configuration values. +/// `init_from_env`, `init_from_yaml`) and by tests when overriding +/// configuration values. pub fn set(source: Source, attrs: Attrs) { let mut g = LAYERS.write().unwrap(); if let Some(l) = g.ordered.iter_mut().find(|l| layer_source(l) == source) { @@ -494,7 +489,7 @@ pub fn set(source: Source, attrs: Attrs) { } else { g.ordered.push(make_layer(source, attrs)); } - g.ordered.sort_by_key(|l| priority(layer_source(l))); // TestOverride < Runtime < Env < File < ClientOverride + g.ordered.sort_by_key(|l| priority(layer_source(l))); // TestOverride < Env < Runtime < File < ClientOverride } /// Insert or update a configuration layer for the given [`Source`]. @@ -521,16 +516,16 @@ pub fn create_or_merge(source: Source, attrs: Attrs) { } else { g.ordered.push(make_layer(source, attrs)); } - g.ordered.sort_by_key(|l| priority(layer_source(l))); // TestOverride < Runtime < Env < File < ClientOverride + g.ordered.sort_by_key(|l| priority(layer_source(l))); // TestOverride < Env < Runtime < File < ClientOverride } /// Remove the configuration layer for the given [`Source`], if /// present. /// -/// After this call, values from that source will no longer -/// contribute to resolution in [`get`], [`get_cloned`], or -/// [`attrs`]. Defaults and any remaining layers continue to apply -/// in their normal priority order. +/// After this call, values from that source will no longer contribute +/// to resolution in [`get`], [`get_cloned`], or [`attrs`]. Defaults +/// and any remaining layers continue to apply in their normal +/// priority order. pub fn clear(source: Source) { let mut g = LAYERS.write().unwrap(); g.ordered.retain(|l| layer_source(l) != source); @@ -540,8 +535,8 @@ pub fn clear(source: Source) { /// **(only keys marked with `@meta(CONFIG = ...)`)**. /// /// Resolution per key: -/// 1) First explicit value found in layers (TestOverride → -/// Runtime → Env → File). +/// 1) First explicit value found in layers (TestOverride → Env → +/// Runtime → File → ClientOverride). /// 2) Otherwise, the key's default (if any). /// /// Notes: @@ -552,8 +547,8 @@ pub fn attrs() -> Attrs { let layers = LAYERS.read().unwrap(); let mut merged = Attrs::new(); - // Iterate all declared keys (registered via `declare_attrs!` - // + inventory). + // Iterate all declared keys (registered via `declare_attrs!` + + // inventory). for info in inventory::iter::() { // Skip keys not marked as `CONFIG`. if info.meta.get(CONFIG).is_none() { @@ -571,8 +566,8 @@ pub fn attrs() -> Attrs { } } - // If no explicit value, materialize the default if there - // is one. + // If no explicit value, materialize the default if there is + // one. let boxed = match chosen { Some(b) => b, None => { @@ -622,12 +617,12 @@ pub fn runtime_attrs() -> Attrs { /// Reset the global configuration to only Defaults (for testing). /// -/// This clears all explicit layers (`File`, `Env`, `Runtime`, and -/// `TestOverride`). Subsequent lookups will resolve keys entirely -/// from their declared defaults. +/// This clears all explicit layers (`File`, `Env`, `Runtime`, +/// `ClientOverride`, and `TestOverride`). Subsequent lookups will +/// resolve keys entirely from their declared defaults. /// -/// Note: Should be called while holding [`global::lock`] in -/// tests, to ensure no concurrent modifications happen. +/// Note: Should be called while holding [`global::lock`] in tests, to +/// ensure no concurrent modifications happen. pub fn reset_to_defaults() { let mut g = LAYERS.write().unwrap(); g.ordered.clear(); @@ -729,16 +724,16 @@ impl ConfigLock { } /// When a [`ConfigLock`] is dropped, the special -/// [`Source::TestOverride`] layer (if present) is removed -/// entirely. This discards all temporary overrides created under -/// the lock, ensuring they cannot leak into subsequent tests or -/// callers. Other layers (`Runtime`, `Env`, `File`, and defaults) +/// [`Source::TestOverride`] layer (if present) is removed entirely. +/// This discards all temporary overrides created under the lock, +/// ensuring they cannot leak into subsequent tests or callers. Other +/// layers (`Runtime`, `Env`, `File`, `ClientOverride`, and defaults) /// are left untouched. /// -/// Note: individual values within the TestOverride layer may -/// already have been restored by [`ConfigValueGuard`]s as they -/// drop. This final removal guarantees no residual layer remains -/// once the lock itself is released. +/// Note: individual values within the TestOverride layer may already +/// have been restored by [`ConfigValueGuard`]s as they drop. This +/// final removal guarantees no residual layer remains once the lock +/// itself is released. impl Drop for ConfigLock { fn drop(&mut self) { let mut guard = LAYERS.write().unwrap(); @@ -748,7 +743,7 @@ impl Drop for ConfigLock { } } -/// A guard that restores a single configuration value when dropped +/// A guard that restores a single configuration value when dropped. pub struct ConfigValueGuard<'a, T: 'static> { key: crate::attrs::Key, token: u64, @@ -838,7 +833,7 @@ impl Drop for ConfigValueGuard<'_, T> { // No changes to attrs or env here. } } // else: token already handled; nothing to do - } // &must stack borrow ends here + } // &mut stack borrow ends here // If we emptied the stack for this key, restore env and drop // the stack entry. @@ -867,9 +862,9 @@ mod tests { use crate::ConfigAttr; use crate::attrs::declare_attrs; - // Test configuration keys used to exercise the layered config infrastructure. - // These mirror hyperactor's config keys but are declared locally to keep - // hyperactor_config independent. + // Test configuration keys used to exercise the layered config + // infrastructure. These mirror hyperactor's config keys but are + // declared locally to keep hyperactor_config independent. declare_attrs! { /// Maximum frame length for codec @@ -913,20 +908,29 @@ mod tests { py_name: None, }) pub attr MESSAGE_TTL_DEFAULT: u8 = 64; + + /// A test key with no environment variable mapping + @meta(CONFIG = ConfigAttr { + env_name: None, + py_name: None, + }) + pub attr CONFIG_KEY_NO_ENV: u32 = 100; } #[test] fn test_global_config() { let config = lock(); - // Reset global config to defaults to avoid interference from other tests + // Reset global config to defaults to avoid interference from + // other tests reset_to_defaults(); assert_eq!(get(CODEC_MAX_FRAME_LENGTH), CODEC_MAX_FRAME_LENGTH_DEFAULT); { let _guard = config.override_key(CODEC_MAX_FRAME_LENGTH, 1024); assert_eq!(get(CODEC_MAX_FRAME_LENGTH), 1024); - // The configuration will be automatically restored when _guard goes out of scope + // The configuration will be automatically restored when + // _guard goes out of scope } assert_eq!(get(CODEC_MAX_FRAME_LENGTH), CODEC_MAX_FRAME_LENGTH_DEFAULT); @@ -936,7 +940,8 @@ mod tests { fn test_overrides() { let config = lock(); - // Reset global config to defaults to avoid interference from other tests + // Reset global config to defaults to avoid interference from + // other tests reset_to_defaults(); // Test the new lock/override API for individual config values @@ -1047,18 +1052,18 @@ mod tests { env[MESSAGE_DELIVERY_TIMEOUT] = Duration::from_secs(40); set(Source::Env, env); - // Runtime beats both. + // Runtime layer (but Env beats it). let mut rt = Attrs::new(); rt[MESSAGE_DELIVERY_TIMEOUT] = Duration::from_secs(50); set(Source::Runtime, rt); - assert_eq!(get(MESSAGE_DELIVERY_TIMEOUT), Duration::from_secs(50)); + assert_eq!(get(MESSAGE_DELIVERY_TIMEOUT), Duration::from_secs(40)); - // Clearing Runtime should reveal Env again. - clear(Source::Runtime); + // Clearing Env should reveal Runtime. + clear(Source::Env); - // With the Runtime layer gone, Env still wins over File. - assert_eq!(get(MESSAGE_DELIVERY_TIMEOUT), Duration::from_secs(40)); + // With the Env layer gone, Runtime wins over File. + assert_eq!(get(MESSAGE_DELIVERY_TIMEOUT), Duration::from_secs(50)); } #[test] @@ -1083,7 +1088,7 @@ mod tests { } #[test] - fn test_parent_child_snapshot_as_runtime_layer() { + fn test_parent_child_snapshot_as_clientoverride_layer() { let _lock = lock(); reset_to_defaults(); @@ -1095,12 +1100,13 @@ mod tests { let parent_snap = attrs(); // "Child" process: start clean, install parent snapshot as - // Runtime. + // ClientOverride. reset_to_defaults(); - set(Source::Runtime, parent_snap); + set(Source::ClientOverride, parent_snap); - // Child should observe parent's effective value (as highest - // stable layer). + // Child should observe parent's effective value from the + // ClientOverride layer (since child has no Env/Runtime/File + // layers set). assert_eq!(get(MESSAGE_ACK_EVERY_N_MESSAGES), 12345); } @@ -1151,8 +1157,8 @@ mod tests { rt[SPLIT_MAX_BUFFER_SIZE] = 9; set(Source::Runtime, rt); - // Sanity: highest wins. - assert_eq!(get(SPLIT_MAX_BUFFER_SIZE), 9); + // Sanity: Env wins over Runtime and File. + assert_eq!(get(SPLIT_MAX_BUFFER_SIZE), 8); // Reset clears all explicit layers; defaults apply. reset_to_defaults(); @@ -1332,9 +1338,9 @@ mod tests { #[test] fn test_priority_order() { use Source::*; - assert!(priority(TestOverride) < priority(Runtime)); - assert!(priority(Runtime) < priority(Env)); - assert!(priority(Env) < priority(File)); + assert!(priority(TestOverride) < priority(Env)); + assert!(priority(Env) < priority(Runtime)); + assert!(priority(Runtime) < priority(File)); assert!(priority(File) < priority(ClientOverride)); } @@ -1370,4 +1376,202 @@ mod tests { assert_eq!(get(MESSAGE_TTL_DEFAULT), 42); } + + #[test] + fn test_clientoverride_precedence_loses_to_all_other_layers() { + let _lock = lock(); + reset_to_defaults(); + + // ClientOverride sets a baseline value. + let mut client = Attrs::new(); + client[MESSAGE_TTL_DEFAULT] = 10; + set(Source::ClientOverride, client); + assert_eq!(get(MESSAGE_TTL_DEFAULT), 10); + + // File should beat ClientOverride. + let mut file = Attrs::new(); + file[MESSAGE_TTL_DEFAULT] = 20; + set(Source::File, file); + assert_eq!(get(MESSAGE_TTL_DEFAULT), 20); + + // Runtime should beat both File and ClientOverride. + let mut runtime = Attrs::new(); + runtime[MESSAGE_TTL_DEFAULT] = 30; + set(Source::Runtime, runtime); + assert_eq!(get(MESSAGE_TTL_DEFAULT), 30); + + // Env should beat Runtime, File, and ClientOverride. + let mut env = Attrs::new(); + env[MESSAGE_TTL_DEFAULT] = 40; + set(Source::Env, env); + assert_eq!(get(MESSAGE_TTL_DEFAULT), 40); + + // Clear higher layers one by one to verify fallback. + clear(Source::Env); + assert_eq!(get(MESSAGE_TTL_DEFAULT), 30); // Runtime + + clear(Source::Runtime); + assert_eq!(get(MESSAGE_TTL_DEFAULT), 20); // File + + clear(Source::File); + assert_eq!(get(MESSAGE_TTL_DEFAULT), 10); // ClientOverride + } + + #[test] + fn test_create_or_merge_clientoverride() { + let _lock = lock(); + reset_to_defaults(); + + // Seed ClientOverride with one key. + let mut client = Attrs::new(); + client[MESSAGE_TTL_DEFAULT] = 10; + set(Source::ClientOverride, client); + + // Merge in a different key. + let mut update = Attrs::new(); + update[MESSAGE_ACK_EVERY_N_MESSAGES] = 123; + create_or_merge(Source::ClientOverride, update); + + // Both keys should now be visible. + assert_eq!(get(MESSAGE_TTL_DEFAULT), 10); + assert_eq!(get(MESSAGE_ACK_EVERY_N_MESSAGES), 123); + } + + #[test] + fn test_override_or_global_returns_override_when_present() { + let _lock = lock(); + reset_to_defaults(); + + // Set a global value via Env. + let mut env = Attrs::new(); + env[MESSAGE_TTL_DEFAULT] = 99; + set(Source::Env, env); + + // Create an override Attrs with a different value. + let mut overrides = Attrs::new(); + overrides[MESSAGE_TTL_DEFAULT] = 42; + + // Should return the override value, not global. + assert_eq!(override_or_global(&overrides, MESSAGE_TTL_DEFAULT), 42); + } + + #[test] + fn test_override_or_global_returns_global_when_not_present() { + let _lock = lock(); + reset_to_defaults(); + + // Set a global value via Env. + let mut env = Attrs::new(); + env[MESSAGE_TTL_DEFAULT] = 99; + set(Source::Env, env); + + // Empty overrides. + let overrides = Attrs::new(); + + // Should return the global value. + assert_eq!(override_or_global(&overrides, MESSAGE_TTL_DEFAULT), 99); + } + + #[test] + fn test_runtime_attrs_returns_only_runtime_layer() { + let _lock = lock(); + reset_to_defaults(); + + // Set values in multiple layers. + let mut file = Attrs::new(); + file[MESSAGE_TTL_DEFAULT] = 10; + set(Source::File, file); + + let mut env = Attrs::new(); + env[SPLIT_MAX_BUFFER_SIZE] = 20; + set(Source::Env, env); + + let mut runtime = Attrs::new(); + runtime[MESSAGE_ACK_EVERY_N_MESSAGES] = 123; + set(Source::Runtime, runtime); + + // runtime_attrs() should return only Runtime layer contents. + let rt = runtime_attrs(); + + // Should have the Runtime key. + assert_eq!(rt[MESSAGE_ACK_EVERY_N_MESSAGES], 123); + + // Should NOT have File or Env keys. + assert!(!rt.contains_key(MESSAGE_TTL_DEFAULT)); + assert!(!rt.contains_key(SPLIT_MAX_BUFFER_SIZE)); + } + + #[test] + fn test_override_key_without_env_name_does_not_mirror_to_env() { + let lock = lock(); + reset_to_defaults(); + + // Verify default value. + assert_eq!(get(CONFIG_KEY_NO_ENV), 100); + + // Override the key (which has no env_name). + let _guard = lock.override_key(CONFIG_KEY_NO_ENV, 999); + + // Should see the override value. + assert_eq!(get(CONFIG_KEY_NO_ENV), 999); + + // No env var should have been set (test doesn't crash, + // behavior is clean). This test mainly ensures no panic + // occurs during override/restore. + + drop(_guard); + + // Should restore to default. + assert_eq!(get(CONFIG_KEY_NO_ENV), 100); + } + + #[test] + fn test_multiple_different_keys_overridden_simultaneously() { + let lock = lock(); + reset_to_defaults(); + + // SAFETY: single-threaded test. + unsafe { + std::env::remove_var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH"); + std::env::remove_var("HYPERACTOR_MESSAGE_TTL_DEFAULT"); + } + + // Override multiple different keys at once. + let guard1 = lock.override_key(CODEC_MAX_FRAME_LENGTH, 1111); + let guard2 = lock.override_key(MESSAGE_TTL_DEFAULT, 42); + let guard3 = lock.override_key(CHANNEL_MULTIPART, false); + + // All should reflect their override values. + assert_eq!(get(CODEC_MAX_FRAME_LENGTH), 1111); + assert_eq!(get(MESSAGE_TTL_DEFAULT), 42); + assert_eq!(get(CHANNEL_MULTIPART), false); + + // Env vars should be mirrored. + assert_eq!( + std::env::var("HYPERACTOR_CODEC_MAX_FRAME_LENGTH").unwrap(), + "1111" + ); + assert_eq!( + std::env::var("HYPERACTOR_MESSAGE_TTL_DEFAULT").unwrap(), + "42" + ); + + // Drop guards in arbitrary order. + drop(guard2); // Drop MESSAGE_TTL_DEFAULT first + + // MESSAGE_TTL_DEFAULT should restore, others should remain. + assert_eq!(get(MESSAGE_TTL_DEFAULT), MESSAGE_TTL_DEFAULT_DEFAULT); + assert_eq!(get(CODEC_MAX_FRAME_LENGTH), 1111); + assert_eq!(get(CHANNEL_MULTIPART), false); + + // Env for MESSAGE_TTL_DEFAULT should be cleared. + assert!(std::env::var("HYPERACTOR_MESSAGE_TTL_DEFAULT").is_err()); + + drop(guard1); + drop(guard3); + + // All should be restored. + assert_eq!(get(CODEC_MAX_FRAME_LENGTH), CODEC_MAX_FRAME_LENGTH_DEFAULT); + assert_eq!(get(CHANNEL_MULTIPART), CHANNEL_MULTIPART_DEFAULT); + } } diff --git a/hyperactor_mesh/src/alloc.rs b/hyperactor_mesh/src/alloc.rs index 030574385..a4c3eb340 100644 --- a/hyperactor_mesh/src/alloc.rs +++ b/hyperactor_mesh/src/alloc.rs @@ -70,7 +70,7 @@ declare_attrs! { /// us to bind the port to the host's public IP address. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_REMOTE_ALLOC_BIND_TO_INADDR_ANY".to_string()), - py_name: None, + py_name: Some("remote_alloc_bind_to_inaddr_any".to_string()), }) pub attr REMOTE_ALLOC_BIND_TO_INADDR_ANY: bool = false; @@ -89,7 +89,7 @@ declare_attrs! { // TODO: remove this env var, and make it part of alloc spec instead. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_REMOTE_ALLOC_BOOTSTRAP_ADDR".to_string()), - py_name: None, + py_name: Some("remote_alloc_bootstrap_addr".to_string()), }) pub attr REMOTE_ALLOC_BOOTSTRAP_ADDR: String; @@ -104,7 +104,7 @@ declare_attrs! { /// ports are allowed to be used. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_REMOTE_ALLOC_ALLOWED_PORT_RANGE".to_string()), - py_name: None, + py_name: Some("remote_alloc_allowed_port_range".to_string()), }) pub attr REMOTE_ALLOC_ALLOWED_PORT_RANGE: Range; } diff --git a/hyperactor_mesh/src/bootstrap.rs b/hyperactor_mesh/src/bootstrap.rs index 182159c4b..f55e7b520 100644 --- a/hyperactor_mesh/src/bootstrap.rs +++ b/hyperactor_mesh/src/bootstrap.rs @@ -144,7 +144,7 @@ declare_attrs! { /// "false")`. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string()), - py_name: None, + py_name: Some("mesh_bootstrap_enable_pdeathsig".to_string()), }) pub attr MESH_BOOTSTRAP_ENABLE_PDEATHSIG: bool = true; @@ -154,7 +154,7 @@ declare_attrs! { /// file descriptor load). @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string()), - py_name: None, + py_name: Some("mesh_terminate_concurrency".to_string()), }) pub attr MESH_TERMINATE_CONCURRENCY: usize = 16; @@ -163,7 +163,7 @@ declare_attrs! { /// the child to exit before escalating to SIGKILL. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string()), - py_name: None, + py_name: Some("mesh_terminate_timeout".to_string()), }) pub attr MESH_TERMINATE_TIMEOUT: Duration = Duration::from_secs(10); } diff --git a/hyperactor_mesh/src/config.rs b/hyperactor_mesh/src/config.rs index 6db024027..94c1a93f7 100644 --- a/hyperactor_mesh/src/config.rs +++ b/hyperactor_mesh/src/config.rs @@ -23,7 +23,7 @@ declare_attrs! { /// the limit so no dimension needs to be folded. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESH_MAX_CAST_DIMENSION_SIZE".to_string()), - py_name: None, + py_name: Some("max_cast_dimension_size".to_string()), }) pub attr MAX_CAST_DIMENSION_SIZE: usize = usize::MAX; } diff --git a/hyperactor_mesh/src/logging.rs b/hyperactor_mesh/src/logging.rs index 587bd402a..c54bfe365 100644 --- a/hyperactor_mesh/src/logging.rs +++ b/hyperactor_mesh/src/logging.rs @@ -74,21 +74,21 @@ declare_attrs! { /// After pause lines will be flushed and reading will resume. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_READ_LOG_BUFFER".to_string()), - py_name: None, + py_name: Some("read_log_buffer".to_string()), }) pub attr READ_LOG_BUFFER: usize = 100; /// If enabled, local logs are also written to a file and aggregated @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_FORCE_FILE_LOG".to_string()), - py_name: None, + py_name: Some("force_file_log".to_string()), }) pub attr FORCE_FILE_LOG: bool = false; /// Prefixes logs with rank @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_PREFIX_WITH_RANK".to_string()), - py_name: None, + py_name: Some("prefix_with_rank".to_string()), }) pub attr PREFIX_WITH_RANK: bool = true; } diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index 6e2cd7cbe..ea2f0e19a 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -82,13 +82,13 @@ declare_attrs! { /// meshes. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESH_PROC_STOP_MAX_IDLE".to_string()), - py_name: None, + py_name: Some("proc_stop_max_idle".to_string()), }) pub attr PROC_STOP_MAX_IDLE: Duration = Duration::from_secs(30); @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESH_GET_PROC_STATE_MAX_IDLE".to_string()), - py_name: None, + py_name: Some("get_proc_state_max_idle".to_string()), }) pub attr GET_PROC_STATE_MAX_IDLE: Duration = Duration::from_mins(1); } @@ -1640,6 +1640,15 @@ mod tests { Duration::from_mins(1), ); + // Unset env vars that were mirrored by TestOverride, so child + // processes don't inherit them. This allows Runtime layer to + // override ClientOverride. SAFETY: Single-threaded test under + // global config lock. + unsafe { + std::env::remove_var("HYPERACTOR_HOST_SPAWN_READY_TIMEOUT"); + std::env::remove_var("HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT"); + } + let instance = testing::instance().await; let proc_meshes = testing::proc_meshes(instance, extent!(replicas = 2)).await; diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index c51a186bb..c0bf8859f 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -84,13 +84,13 @@ declare_attrs! { /// meshes. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESH_ACTOR_SPAWN_MAX_IDLE".to_string()), - py_name: None, + py_name: Some("actor_spawn_max_idle".to_string()), }) pub attr ACTOR_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30); @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESH_GET_ACTOR_STATE_MAX_IDLE".to_string()), - py_name: None, + py_name: Some("get_actor_state_max_idle".to_string()), }) pub attr GET_ACTOR_STATE_MAX_IDLE: Duration = Duration::from_mins(1); } diff --git a/monarch_hyperactor/src/buffers.rs b/monarch_hyperactor/src/buffers.rs index 4b869a6b5..b8669c084 100644 --- a/monarch_hyperactor/src/buffers.rs +++ b/monarch_hyperactor/src/buffers.rs @@ -31,7 +31,7 @@ declare_attrs! { /// Writes >= this size are stored as zero-copy references. @meta(CONFIG = ConfigAttr { env_name: Some("MONARCH_HYPERACTOR_SMALL_WRITE_THRESHOLD".to_string()), - py_name: None, + py_name: Some("small_write_threshold".to_string()), }) pub attr SMALL_WRITE_THRESHOLD: usize = 256; } diff --git a/monarch_hyperactor/src/config.rs b/monarch_hyperactor/src/config.rs index 84daf8282..503d68e4e 100644 --- a/monarch_hyperactor/src/config.rs +++ b/monarch_hyperactor/src/config.rs @@ -38,6 +38,125 @@ use pyo3::prelude::*; use crate::channel::PyBindSpec; +/// Python wrapper for Encoding enum. +/// +/// This type bridges between Python strings (e.g., "bincode", +/// "serde_json", "serde_multipart") and Rust's +/// `hyperactor::data::Encoding` enum. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct PyEncoding(pub hyperactor::data::Encoding); + +impl From for hyperactor::data::Encoding { + fn from(e: PyEncoding) -> Self { + e.0 + } +} + +impl From for PyEncoding { + fn from(e: hyperactor::data::Encoding) -> Self { + PyEncoding(e) + } +} + +impl<'py> FromPyObject<'py> for PyEncoding { + fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { + let s: String = ob.extract()?; + let encoding = s.parse::().map_err(|e| { + PyValueError::new_err(format!( + "Invalid encoding '{}': {}. Valid values: bincode, serde_json, serde_multipart", + s, e + )) + })?; + Ok(PyEncoding(encoding)) + } +} + +impl<'py> IntoPyObject<'py> for PyEncoding { + type Target = PyAny; + type Output = Bound<'py, Self::Target>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> Result { + let formatted = self.0.to_string(); + formatted.into_bound_py_any(py) + } +} + +/// Python wrapper for Range, using tuple or string format. +/// +/// This type bridges between Python and Rust's +/// `std::ops::Range`. +/// Accepts either: +/// - Tuple: `(8000, 9000)` +/// - String: `"8000..9000"` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PyPortRange(pub std::ops::Range); + +impl From for std::ops::Range { + fn from(r: PyPortRange) -> Self { + r.0 + } +} + +impl From> for PyPortRange { + fn from(r: std::ops::Range) -> Self { + PyPortRange(r) + } +} + +impl<'py> FromPyObject<'py> for PyPortRange { + fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { + // Try tuple format first: (start, end) + if let Ok((start, end)) = ob.extract::<(u16, u16)>() { + if start >= end { + return Err(PyValueError::new_err(format!( + "Invalid port range ({}, {}): start must be less than end", + start, end + ))); + } + return Ok(PyPortRange(start..end)); + } + + // Fall back to string format: "start..end" + let s: String = ob.extract().map_err(|_| { + PyTypeError::new_err( + "Port range must be either a tuple (start, end) or string 'start..end'", + ) + })?; + let parts: Vec<&str> = s.split("..").collect(); + if parts.len() != 2 { + return Err(PyValueError::new_err(format!( + "Invalid port range format '{}': expected 'start..end'", + s + ))); + } + let start = parts[0].parse::().map_err(|e| { + PyValueError::new_err(format!("Invalid start port '{}': {}", parts[0], e)) + })?; + let end = parts[1].parse::().map_err(|e| { + PyValueError::new_err(format!("Invalid end port '{}': {}", parts[1], e)) + })?; + if start >= end { + return Err(PyValueError::new_err(format!( + "Invalid port range '{}': start must be less than end", + s + ))); + } + Ok(PyPortRange(start..end)) + } +} + +impl<'py> IntoPyObject<'py> for PyPortRange { + type Target = PyAny; + type Output = Bound<'py, Self::Target>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> Result { + let formatted = format!("{}..{}", self.0.start, self.0.end); + formatted.into_bound_py_any(py) + } +} + /// Python wrapper for Duration, using humantime format strings. /// /// This type bridges between Python strings (e.g., "30s", "5m") and @@ -82,6 +201,10 @@ impl<'py> IntoPyObject<'py> for PyDuration { // Declare monarch-specific configuration keys declare_attrs! { /// Use a single asyncio runtime for all Python actors, rather than one per actor + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_SHARED_ASYNCIO_RUNTIME".to_string()), + py_name: Some("shared_asyncio_runtime".to_string()), + }) pub attr SHARED_ASYNCIO_RUNTIME: bool = false; } @@ -118,8 +241,9 @@ static KEY_BY_NAME: std::sync::LazyLock` and use it to get/set values in the global configl +/// Map from typehash to an info struct that can be used to downcast +/// an `ErasedKey` to a concrete `Key` and use it to get/set values +/// in the global configl static TYPEHASH_TO_INFO: std::sync::LazyLock> = std::sync::LazyLock::new(|| { inventory::iter::() @@ -163,8 +287,8 @@ where /// /// This mirrors [`get_global_config_py`] but restricts the lookup to /// the `Source::Runtime` layer (ignoring -/// TestOverride/Env/File/defaults). If the key has a runtime -/// override, it is cloned as `T`, converted to `P`, then to a +/// TestOverride/Env/File/ClientOverride/defaults). If the key has a +/// runtime override, it is cloned as `T`, converted to `P`, then to a /// `PyObject`; otherwise `Ok(None)` is returned. fn get_runtime_config_py<'py, P, T>( py: Python<'py>, @@ -190,7 +314,7 @@ where /// This is the write-path for the "Python configuration layer": it /// takes a typed key/value and merges it into `Source::Runtime` via /// `create_or_merge`. No other layers -/// (Env/File/TestOverride/Defaults) are affected. +/// (Env/File/TestOverride/ClientOverride/Defaults) are affected. fn set_runtime_config_py( key: &'static dyn ErasedKey, value: T, @@ -285,14 +409,13 @@ struct PythonConfigTypeInfo { // `TYPEHASH_TO_INFO` via `inventory::iter()`. inventory::collect!(PythonConfigTypeInfo); -/// Macro to declare that keys of this type can be configured -/// from python using `monarch.configure(...)`. For types -/// like `String` that are convertible directly to/from PyObjects, -/// you can just use `declare_py_config_type!(String)`. For types -/// that must first be converted to/from a rust python wrapper -/// (e.g., keys with type `BindSpec` must use `PyBindSpec` -/// as an intermediate step), the usage is -/// `declare_py_config_type!(PyBindSpec as BindSpec)`. +/// Macro to declare that keys of this type can be configured from +/// python using `monarch.configure(...)`. For types like `String` +/// that are convertible directly to/from PyObjects, you can just use +/// `declare_py_config_type!(String)`. For types that must first be +/// converted to/from a rust python wrapper (e.g., keys with type +/// `BindSpec` must use `PyBindSpec` as an intermediate step), the +/// usage is `declare_py_config_type!(PyBindSpec as BindSpec)`. macro_rules! declare_py_config_type { ($($ty:ty),+ $(,)?) => { hyperactor::paste! { @@ -344,6 +467,8 @@ macro_rules! declare_py_config_type { declare_py_config_type!(PyBindSpec as BindSpec); declare_py_config_type!(PyDuration as Duration); +declare_py_config_type!(PyEncoding as hyperactor::data::Encoding); +declare_py_config_type!(PyPortRange as std::ops::Range::); declare_py_config_type!( i8, i16, i32, i64, u8, u16, u32, u64, usize, f32, f64, bool, String ); @@ -514,3 +639,222 @@ pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> { Ok(()) } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use pyo3::prelude::*; + use pyo3::types::PyDict; + use pyo3::types::PyString; + use pyo3::types::PyTuple; + + use super::*; + + #[test] + fn test_pyduration_parse_valid_formats() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + // Test various valid duration formats + let s = PyString::new(py, "30s"); + let d: PyDuration = s.extract().unwrap(); + assert_eq!(d.0, Duration::from_secs(30)); + + let s = PyString::new(py, "5m"); + let d: PyDuration = s.extract().unwrap(); + assert_eq!(d.0, Duration::from_mins(5)); + + let s = PyString::new(py, "1h"); + let d: PyDuration = s.extract().unwrap(); + assert_eq!(d.0, Duration::from_secs(3600)); + + let s = PyString::new(py, "500ms"); + let d: PyDuration = s.extract().unwrap(); + assert_eq!(d.0, Duration::from_millis(500)); + + let s = PyString::new(py, "1m 30s"); + let d: PyDuration = s.extract().unwrap(); + assert_eq!(d.0, Duration::from_secs(90)); + }); + } + + #[test] + fn test_pyduration_parse_invalid_format() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let s = PyString::new(py, "invalid"); + let result: PyResult = s.extract(); + assert!(result.is_err()); + let err_msg = format!("{}", result.unwrap_err()); + assert!(err_msg.contains("Invalid duration format")); + }); + } + + #[test] + fn test_pyduration_roundtrip() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let original = Duration::from_secs(42); + let py_duration = PyDuration(original); + let py_obj = py_duration.into_pyobject(py).unwrap(); + let back: PyDuration = py_obj.extract().unwrap(); + assert_eq!(back.0, original); + }); + } + + #[test] + fn test_pyencoding_parse_valid_values() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let s = PyString::new(py, "bincode"); + let e: PyEncoding = s.extract().unwrap(); + assert_eq!(e.0, hyperactor::data::Encoding::Bincode); + + let s = PyString::new(py, "serde_json"); + let e: PyEncoding = s.extract().unwrap(); + assert_eq!(e.0, hyperactor::data::Encoding::Json); + + let s = PyString::new(py, "serde_multipart"); + let e: PyEncoding = s.extract().unwrap(); + assert_eq!(e.0, hyperactor::data::Encoding::Multipart); + }); + } + + #[test] + fn test_pyencoding_parse_invalid_value() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let s = PyString::new(py, "invalid_encoding"); + let result: PyResult = s.extract(); + assert!(result.is_err()); + let err_msg = format!("{}", result.unwrap_err()); + assert!(err_msg.contains("Invalid encoding")); + assert!(err_msg.contains("Valid values")); + }); + } + + #[test] + fn test_pyencoding_roundtrip() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let original = hyperactor::data::Encoding::Multipart; + let py_encoding = PyEncoding(original); + let py_obj = py_encoding.into_pyobject(py).unwrap(); + let back: PyEncoding = py_obj.extract().unwrap(); + assert_eq!(back.0, original); + }); + } + + #[test] + fn test_pyportrange_parse_tuple_format() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let tuple = PyTuple::new(py, [8000u16, 9000u16]).unwrap(); + let r: PyPortRange = tuple.extract().unwrap(); + assert_eq!(r.0.start, 8000); + assert_eq!(r.0.end, 9000); + }); + } + + #[test] + fn test_pyportrange_parse_string_format() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let s = PyString::new(py, "8000..9000"); + let r: PyPortRange = s.extract().unwrap(); + assert_eq!(r.0.start, 8000); + assert_eq!(r.0.end, 9000); + }); + } + + #[test] + fn test_pyportrange_reject_invalid_string_format() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + // Missing ".." + let s = PyString::new(py, "8000-9000"); + let result: PyResult = s.extract(); + assert!(result.is_err()); + + // Not numbers + let s = PyString::new(py, "abc..def"); + let result: PyResult = s.extract(); + assert!(result.is_err()); + + // Too many parts + let s = PyString::new(py, "8000..9000..10000"); + let result: PyResult = s.extract(); + assert!(result.is_err()); + }); + } + + #[test] + fn test_pyportrange_reject_invalid_ranges() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + // start >= end (tuple format) + let tuple = PyTuple::new(py, [9000u16, 8000u16]).unwrap(); + let result: PyResult = tuple.extract(); + assert!(result.is_err()); + let err_msg = format!("{}", result.unwrap_err()); + assert!(err_msg.contains("start must be less than end")); + + // start == end (tuple format) + let tuple = PyTuple::new(py, [8000u16, 8000u16]).unwrap(); + let result: PyResult = tuple.extract(); + assert!(result.is_err()); + + // start >= end (string format) + let s = PyString::new(py, "9000..8000"); + let result: PyResult = s.extract(); + assert!(result.is_err()); + let err_msg = format!("{}", result.unwrap_err()); + assert!(err_msg.contains("start must be less than end")); + }); + } + + #[test] + fn test_pyportrange_roundtrip() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let original = 8000..9000; + let py_range = PyPortRange(original.clone()); + let py_obj = py_range.into_pyobject(py).unwrap(); + let s: String = py_obj.extract().unwrap(); + assert_eq!(s, "8000..9000"); + + // Parse back + let back: PyPortRange = py_obj.extract().unwrap(); + assert_eq!(back.0, original); + }); + } + + #[test] + fn test_pyportrange_accepts_both_formats() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + // Create a dict with both formats + let dict = PyDict::new(py); + dict.set_item("tuple_format", (8000u16, 9000u16)).unwrap(); + dict.set_item("string_format", "8000..9000").unwrap(); + + // Both should parse to the same range + let r1: PyPortRange = dict + .get_item("tuple_format") + .unwrap() + .unwrap() + .extract() + .unwrap(); + let r2: PyPortRange = dict + .get_item("string_format") + .unwrap() + .unwrap() + .extract() + .unwrap(); + + assert_eq!(r1.0, r2.0); + assert_eq!(r1.0.start, 8000); + assert_eq!(r1.0.end, 9000); + }); + } +} diff --git a/python/monarch/_rust_bindings/monarch_hyperactor/config.pyi b/python/monarch/_rust_bindings/monarch_hyperactor/config.pyi index b384b5cc8..50ce0ae01 100644 --- a/python/monarch/_rust_bindings/monarch_hyperactor/config.pyi +++ b/python/monarch/_rust_bindings/monarch_hyperactor/config.pyi @@ -40,36 +40,123 @@ def configure( message_delivery_timeout: str = ..., host_spawn_ready_timeout: str = ..., mesh_proc_spawn_max_idle: str = ..., + process_exit_timeout: str = ..., + message_ack_time_interval: str = ..., + message_ack_every_n_messages: int = ..., + message_ttl_default: int = ..., + split_max_buffer_size: int = ..., + split_max_buffer_age: str = ..., + stop_actor_timeout: str = ..., + cleanup_timeout: str = ..., + remote_allocator_heartbeat_interval: str = ..., + default_encoding: str = ..., + channel_net_rx_buffer_full_check_interval: str = ..., + message_latency_sampling_rate: float = ..., + enable_client_seq_assignment: bool = ..., + mesh_bootstrap_enable_pdeathsig: bool = ..., + mesh_terminate_concurrency: int = ..., + mesh_terminate_timeout: str = ..., + shared_asyncio_runtime: bool = ..., + small_write_threshold: int = ..., + max_cast_dimension_size: int = ..., + remote_alloc_bind_to_inaddr_any: bool = ..., + remote_alloc_bootstrap_addr: str = ..., + remote_alloc_allowed_port_range: str | tuple[int, int] = ..., + read_log_buffer: int = ..., + force_file_log: bool = ..., + prefix_with_rank: bool = ..., + actor_spawn_max_idle: str = ..., + get_actor_state_max_idle: str = ..., + proc_stop_max_idle: str = ..., + get_proc_state_max_idle: str = ..., **kwargs: object, ) -> None: """Configure Hyperactor runtime defaults for this process. - This updates the **runtime** configuration layer from Python, - setting the default channel transport and optional logging - behaviour (forwarding, file capture, and how many lines to tail), - plus any additional CONFIG-marked keys passed via **kwargs. + This updates the **Runtime** configuration layer from Python, + setting transports, logging behavior, timeouts, and other runtime + parameters. + + All duration parameters accept humantime strings like "30s", "5m", + "2h", or "1h 30m". + + For complete parameter documentation, see the Python wrapper + `monarch.config.configure()` which provides the same interface + with detailed descriptions of all 39 configuration parameters + organized into logical categories (transport, logging, message + handling, mesh bootstrap, allocation, proc/host mesh timeouts, + etc.). Args: - default_transport: Default channel transport for communication. Can be: - - A ChannelTransport enum value (e.g., ChannelTransport.Unix) - - A explicit address string in the ZMQ-style URL format (e.g., "tcp://127.0.0.1:8080") - enable_log_forwarding: Whether to forward logs from actors - enable_file_capture: Whether to capture file output + default_transport: Default channel transport. Can be + ChannelTransport enum (e.g., ChannelTransport.Unix) or + explicit address string in ZMQ-style URL format (e.g., + "tcp://127.0.0.1:8080") + enable_log_forwarding: Forward logs from actors + enable_file_capture: Capture file output tail_log_lines: Number of log lines to tail codec_max_frame_length: Maximum frame length for codec (bytes) - message_delivery_timeout: Timeout for message delivery (e.g., "30s", "5m") - host_spawn_ready_timeout: Timeout for host spawn readiness (e.g., "30s") - mesh_proc_spawn_max_idle: Maximum idle time for spawning procs (e.g., "30s") - **kwargs: Additional configuration keys - - Duration values should use humantime format strings: - - "30s" for 30 seconds - - "5m" for 5 minutes - - "2h" for 2 hours - - "1h 30m" for 1 hour 30 minutes + message_delivery_timeout: Timeout for message delivery + (humantime) + host_spawn_ready_timeout: Timeout for host spawn readiness + (humantime) + mesh_proc_spawn_max_idle: Maximum idle time for spawning procs + (humantime) + process_exit_timeout: Timeout for process exit (humantime) + message_ack_time_interval: Time interval for message + acknowledgments (humantime) + message_ack_every_n_messages: Acknowledge every N messages + message_ttl_default: Default message time-to-live + split_max_buffer_size: Maximum buffer size for message splitting + (bytes) + split_max_buffer_age: Maximum age for split message buffers + (humantime) + stop_actor_timeout: Timeout for stopping actors (humantime) + cleanup_timeout: Timeout for cleanup operations (humantime) + remote_allocator_heartbeat_interval: Heartbeat interval for + remote allocator (humantime) + default_encoding: Default message encoding ("bincode", + "serde_json", or "serde_multipart") + channel_net_rx_buffer_full_check_interval: Network receive buffer + check interval (humantime) + message_latency_sampling_rate: Sampling rate for message latency + (0.0 to 1.0) + enable_client_seq_assignment: Enable client-side sequence + assignment + mesh_bootstrap_enable_pdeathsig: Enable parent-death signal for + spawned processes + mesh_terminate_concurrency: Maximum concurrent terminations + during shutdown + mesh_terminate_timeout: Timeout per child during graceful + termination (humantime) + shared_asyncio_runtime: Share asyncio runtime across actors + small_write_threshold: Threshold below which writes are copied + (bytes) + max_cast_dimension_size: Maximum dimension size for cast + operations + remote_alloc_bind_to_inaddr_any: Bind remote allocators to + INADDR_ANY + remote_alloc_bootstrap_addr: Bootstrap address for remote + allocators + remote_alloc_allowed_port_range: Allowed port range as + "start..end" or (start, end) tuple + read_log_buffer: Buffer size for reading logs (bytes) + force_file_log: Force file-based logging regardless of + environment + prefix_with_rank: Prefix log lines with rank information + actor_spawn_max_idle: Maximum idle time while spawning actors + (humantime) + get_actor_state_max_idle: Maximum idle time for actor state + queries (humantime) + proc_stop_max_idle: Maximum idle time while stopping procs + (humantime) + get_proc_state_max_idle: Maximum idle time for proc state queries + (humantime) + **kwargs: Reserved for future configuration keys Historically this API is named ``configure(...)``; conceptually it acts as "set runtime config for this process". + """ ... diff --git a/python/monarch/config/__init__.py b/python/monarch/config/__init__.py index 9ca3de06a..1998f9410 100644 --- a/python/monarch/config/__init__.py +++ b/python/monarch/config/__init__.py @@ -44,25 +44,109 @@ def configure( message_delivery_timeout: str | None = None, host_spawn_ready_timeout: str | None = None, mesh_proc_spawn_max_idle: str | None = None, + process_exit_timeout: str | None = None, + message_ack_time_interval: str | None = None, + message_ack_every_n_messages: int | None = None, + message_ttl_default: int | None = None, + split_max_buffer_size: int | None = None, + split_max_buffer_age: str | None = None, + stop_actor_timeout: str | None = None, + cleanup_timeout: str | None = None, + remote_allocator_heartbeat_interval: str | None = None, + default_encoding: str | None = None, + channel_net_rx_buffer_full_check_interval: str | None = None, + message_latency_sampling_rate: float | None = None, + enable_client_seq_assignment: bool | None = None, + mesh_bootstrap_enable_pdeathsig: bool | None = None, + mesh_terminate_concurrency: int | None = None, + mesh_terminate_timeout: str | None = None, + shared_asyncio_runtime: bool | None = None, + small_write_threshold: int | None = None, + max_cast_dimension_size: int | None = None, + remote_alloc_bind_to_inaddr_any: bool | None = None, + remote_alloc_bootstrap_addr: str | None = None, + remote_alloc_allowed_port_range: str | tuple[int, int] | None = None, + read_log_buffer: int | None = None, + force_file_log: bool | None = None, + prefix_with_rank: bool | None = None, + actor_spawn_max_idle: str | None = None, + get_actor_state_max_idle: str | None = None, + proc_stop_max_idle: str | None = None, + get_proc_state_max_idle: str | None = None, **kwargs: object, ) -> None: """Configure Hyperactor runtime defaults for this process. This updates the **Runtime** configuration layer from Python, setting - transports, logging behavior, and any other CONFIG-marked keys supplied - via ``**kwargs``. Duration values should be humantime strings (``"30s"``, - ``"5m"``, ``"1h 30m"``). + transports, logging behavior, timeouts, and other runtime parameters. + + All duration parameters accept humantime strings like ``"30s"``, ``"5m"``, + ``"2h"``, or ``"1h 30m"``. Args: - default_transport: Default channel transport for actor communication. - enable_log_forwarding: Forward child stdout/stderr through the mesh. - enable_file_capture: Persist child stdout/stderr to per-host files. - tail_log_lines: Number of log lines to retain in memory. - codec_max_frame_length: Maximum serialized message size in bytes. - message_delivery_timeout: Max delivery time (humantime string). - host_spawn_ready_timeout: Max host bootstrapping time (humantime). - mesh_proc_spawn_max_idle: Max idle time while spawning procs. - **kwargs: Additional configuration keys exposed by rust bindings. + Transport configuration: + default_transport: Default channel transport for actor communication. + Can be a ChannelTransport enum or explicit address string. + + Basic logging behavior: + enable_log_forwarding: Forward child stdout/stderr through the mesh. + enable_file_capture: Persist child stdout/stderr to per-host files. + tail_log_lines: Number of log lines to retain in memory. + + Message encoding and delivery: + codec_max_frame_length: Maximum serialized message size in bytes. + message_delivery_timeout: Max delivery time (humantime). + + Core mesh timeouts: + host_spawn_ready_timeout: Max host bootstrapping time (humantime). + mesh_proc_spawn_max_idle: Max idle time while spawning procs (humantime). + + Hyperactor timeouts and message handling: + process_exit_timeout: Timeout for process exit (humantime). + message_ack_time_interval: Time interval for message acknowledgments (humantime). + message_ack_every_n_messages: Acknowledge every N messages. + message_ttl_default: Default message time-to-live. + split_max_buffer_size: Maximum buffer size for message splitting (bytes). + split_max_buffer_age: Maximum age for split message buffers (humantime). + stop_actor_timeout: Timeout for stopping actors (humantime). + cleanup_timeout: Timeout for cleanup operations (humantime). + remote_allocator_heartbeat_interval: Heartbeat interval for remote allocator (humantime). + default_encoding: Default message encoding ("bincode", "serde_json", or "serde_multipart"). + channel_net_rx_buffer_full_check_interval: Network receive buffer check interval (humantime). + message_latency_sampling_rate: Sampling rate for message latency tracking (0.0 to 1.0). + enable_client_seq_assignment: Enable client-side sequence assignment. + + Mesh bootstrap configuration: + mesh_bootstrap_enable_pdeathsig: Enable parent-death signal for spawned processes. + mesh_terminate_concurrency: Maximum concurrent terminations during shutdown. + mesh_terminate_timeout: Timeout per child during graceful termination (humantime). + + Runtime and buffering: + shared_asyncio_runtime: Share asyncio runtime across actors. + small_write_threshold: Threshold below which writes are copied (bytes). + + Mesh configuration: + max_cast_dimension_size: Maximum dimension size for cast operations. + + Remote allocation: + remote_alloc_bind_to_inaddr_any: Bind remote allocators to INADDR_ANY. + remote_alloc_bootstrap_addr: Bootstrap address for remote allocators. + remote_alloc_allowed_port_range: Allowed port range as "start..end" or (start, end) tuple. + + Logging configuration: + read_log_buffer: Buffer size for reading logs (bytes). + force_file_log: Force file-based logging regardless of environment. + prefix_with_rank: Prefix log lines with rank information. + + Proc mesh timeouts: + actor_spawn_max_idle: Maximum idle time while spawning actors (humantime). + get_actor_state_max_idle: Maximum idle time for actor state queries (humantime). + + Host mesh timeouts: + proc_stop_max_idle: Maximum idle time while stopping procs (humantime). + get_proc_state_max_idle: Maximum idle time for proc state queries (humantime). + + **kwargs: Reserved for future configuration keys exposed by Rust bindings. """ params: Dict[str, Any] = dict(kwargs) @@ -82,6 +166,69 @@ def configure( params["host_spawn_ready_timeout"] = host_spawn_ready_timeout if mesh_proc_spawn_max_idle is not None: params["mesh_proc_spawn_max_idle"] = mesh_proc_spawn_max_idle + if process_exit_timeout is not None: + params["process_exit_timeout"] = process_exit_timeout + if message_ack_time_interval is not None: + params["message_ack_time_interval"] = message_ack_time_interval + if message_ack_every_n_messages is not None: + params["message_ack_every_n_messages"] = message_ack_every_n_messages + if message_ttl_default is not None: + params["message_ttl_default"] = message_ttl_default + if split_max_buffer_size is not None: + params["split_max_buffer_size"] = split_max_buffer_size + if split_max_buffer_age is not None: + params["split_max_buffer_age"] = split_max_buffer_age + if stop_actor_timeout is not None: + params["stop_actor_timeout"] = stop_actor_timeout + if cleanup_timeout is not None: + params["cleanup_timeout"] = cleanup_timeout + if remote_allocator_heartbeat_interval is not None: + params["remote_allocator_heartbeat_interval"] = ( + remote_allocator_heartbeat_interval + ) + if default_encoding is not None: + params["default_encoding"] = default_encoding + if channel_net_rx_buffer_full_check_interval is not None: + params["channel_net_rx_buffer_full_check_interval"] = ( + channel_net_rx_buffer_full_check_interval + ) + if message_latency_sampling_rate is not None: + params["message_latency_sampling_rate"] = message_latency_sampling_rate + if enable_client_seq_assignment is not None: + params["enable_client_seq_assignment"] = enable_client_seq_assignment + if mesh_bootstrap_enable_pdeathsig is not None: + params["mesh_bootstrap_enable_pdeathsig"] = mesh_bootstrap_enable_pdeathsig + if mesh_terminate_concurrency is not None: + params["mesh_terminate_concurrency"] = mesh_terminate_concurrency + if mesh_terminate_timeout is not None: + params["mesh_terminate_timeout"] = mesh_terminate_timeout + if shared_asyncio_runtime is not None: + params["shared_asyncio_runtime"] = shared_asyncio_runtime + if small_write_threshold is not None: + params["small_write_threshold"] = small_write_threshold + if max_cast_dimension_size is not None: + params["max_cast_dimension_size"] = max_cast_dimension_size + # Forward new alloc config keys + if remote_alloc_bind_to_inaddr_any is not None: + params["remote_alloc_bind_to_inaddr_any"] = remote_alloc_bind_to_inaddr_any + if remote_alloc_bootstrap_addr is not None: + params["remote_alloc_bootstrap_addr"] = remote_alloc_bootstrap_addr + if remote_alloc_allowed_port_range is not None: + params["remote_alloc_allowed_port_range"] = remote_alloc_allowed_port_range + if read_log_buffer is not None: + params["read_log_buffer"] = read_log_buffer + if force_file_log is not None: + params["force_file_log"] = force_file_log + if prefix_with_rank is not None: + params["prefix_with_rank"] = prefix_with_rank + if actor_spawn_max_idle is not None: + params["actor_spawn_max_idle"] = actor_spawn_max_idle + if get_actor_state_max_idle is not None: + params["get_actor_state_max_idle"] = get_actor_state_max_idle + if proc_stop_max_idle is not None: + params["proc_stop_max_idle"] = proc_stop_max_idle + if get_proc_state_max_idle is not None: + params["get_proc_state_max_idle"] = get_proc_state_max_idle _configure(**params) diff --git a/python/tests/test_config.py b/python/tests/test_config.py index 6080464e2..6c5faab68 100644 --- a/python/tests/test_config.py +++ b/python/tests/test_config.py @@ -256,3 +256,323 @@ def test_duration_config_multiple() -> None: assert config["mesh_proc_spawn_max_idle"] == "30s" assert not config["enable_log_forwarding"] assert config["tail_log_lines"] == 0 + + +# ============================================================================ +# Systematic tests for all 29 new config parameters added +# ============================================================================ + + +@pytest.mark.parametrize( + "param_name,test_value,expected_value,default_value", + [ + # Hyperactor timeouts and message handling + ("process_exit_timeout", "20s", "20s", "10s"), + ("message_ack_time_interval", "2s", "2s", "500ms"), + ("split_max_buffer_age", "100ms", "100ms", "50ms"), + ("stop_actor_timeout", "15s", "15s", "10s"), + ("cleanup_timeout", "25s", "25s", "3s"), + ("remote_allocator_heartbeat_interval", "10m", "10m", "5m"), + ("channel_net_rx_buffer_full_check_interval", "200ms", "200ms", "5s"), + # Mesh bootstrap config + ("mesh_terminate_timeout", "20s", "20s", "10s"), + # Proc mesh timeouts + ("actor_spawn_max_idle", "45s", "45s", "30s"), + ("get_actor_state_max_idle", "90s", "1m 30s", "1m"), + # Host mesh timeouts + ("proc_stop_max_idle", "45s", "45s", "30s"), + ("get_proc_state_max_idle", "90s", "1m 30s", "1m"), + ], +) +def test_new_duration_params(param_name, test_value, expected_value, default_value): + """Test all new duration configuration parameters.""" + # Verify default value + config = get_global_config() + assert config[param_name] == default_value + + # Set new value and verify + with configured(**{param_name: test_value}) as config: + assert config[param_name] == expected_value + + # Verify restoration to default + config = get_global_config() + assert config[param_name] == default_value + + +@pytest.mark.parametrize( + "param_name,test_value,default_value", + [ + # Hyperactor message handling + ("message_ack_every_n_messages", 500, 1000), + ("message_ttl_default", 20, 64), + ("split_max_buffer_size", 2048, 5), + # Mesh bootstrap config + ("mesh_terminate_concurrency", 32, 16), + # Runtime and buffering + ("small_write_threshold", 512, 256), + # Mesh config (usize::MAX doesn't have a fixed value, skip default check) + # Logging config + ("read_log_buffer", 16384, 100), + ], +) +def test_new_integer_params(param_name, test_value, default_value): + """Test all new integer configuration parameters.""" + # Verify default value + config = get_global_config() + assert config[param_name] == default_value + + # Set new value and verify + with configured(**{param_name: test_value}) as config: + assert config[param_name] == test_value + + # Verify restoration to default + config = get_global_config() + assert config[param_name] == default_value + + +@pytest.mark.parametrize( + "param_name,default_value", + [ + # Hyperactor message handling + ("enable_client_seq_assignment", False), + # Mesh bootstrap config + ("mesh_bootstrap_enable_pdeathsig", True), + # Runtime and buffering + ("shared_asyncio_runtime", False), + # Remote allocation + ("remote_alloc_bind_to_inaddr_any", False), + # Logging config + ("force_file_log", False), + ("prefix_with_rank", True), + ], +) +def test_new_boolean_params(param_name, default_value): + """Test all new boolean configuration parameters.""" + # Verify default value + config = get_global_config() + assert config[param_name] == default_value + + # Set to opposite value and verify + with configured(**{param_name: not default_value}) as config: + assert config[param_name] == (not default_value) + + # Verify restoration to default + config = get_global_config() + assert config[param_name] == default_value + + +def test_new_float_param_message_latency_sampling_rate(): + """Test message_latency_sampling_rate float parameter.""" + # Verify default value (0.01, using approx for f32 precision) + config = get_global_config() + assert config["message_latency_sampling_rate"] == pytest.approx(0.01, rel=1e-5) + + # Test various valid sampling rates + test_values = [0.0, 0.1, 0.5, 0.99, 1.0] + for rate in test_values: + with configured(message_latency_sampling_rate=rate) as config: + assert config["message_latency_sampling_rate"] == pytest.approx( + rate, rel=1e-5 + ) + + # Verify restoration + config = get_global_config() + assert config["message_latency_sampling_rate"] == pytest.approx(0.01, rel=1e-5) + + +def test_new_encoding_param(): + """Test default_encoding string parameter with valid encodings.""" + # Verify default value + config = get_global_config() + assert config["default_encoding"] == "serde_multipart" + + # Test all valid encodings + valid_encodings = ["bincode", "serde_json", "serde_multipart"] + for encoding in valid_encodings: + with configured(default_encoding=encoding) as config: + assert config["default_encoding"] == encoding + + # Verify restoration + config = get_global_config() + assert config["default_encoding"] == "serde_multipart" + + +def test_new_encoding_param_invalid(): + """Test that invalid encoding values raise errors.""" + with pytest.raises(TypeError, match="invalid value"): + with configured(default_encoding="invalid_encoding"): + pass + + with pytest.raises(TypeError, match="invalid value"): + with configured(default_encoding="xml"): + pass + + +def test_new_bootstrap_addr_param(): + """Test remote_alloc_bootstrap_addr string parameter.""" + # Note: This attribute has no default value, only test setting it + test_addr = "tcp://127.0.0.1:9000" + with configured(remote_alloc_bootstrap_addr=test_addr) as config: + assert config["remote_alloc_bootstrap_addr"] == test_addr + + +def test_new_port_range_param_tuple(): + """Test remote_alloc_allowed_port_range with tuple format.""" + # Note: This attribute has no default value, only test setting it + with configured(remote_alloc_allowed_port_range=(8000, 9000)) as config: + assert config["remote_alloc_allowed_port_range"] == "8000..9000" + + +def test_new_port_range_param_string(): + """Test remote_alloc_allowed_port_range with string format.""" + # Test string format + with configured(remote_alloc_allowed_port_range="8000..9000") as config: + assert config["remote_alloc_allowed_port_range"] == "8000..9000" + + # Test edge cases + with configured(remote_alloc_allowed_port_range="1..65535") as config: + assert config["remote_alloc_allowed_port_range"] == "1..65535" + + +def test_new_port_range_param_invalid(): + """Test that invalid port ranges raise errors.""" + # Invalid tuple: start >= end + with pytest.raises(TypeError, match="invalid value"): + with configured(remote_alloc_allowed_port_range=(9000, 8000)): + pass + + # Invalid string format + with pytest.raises(TypeError, match="invalid value"): + with configured(remote_alloc_allowed_port_range="invalid"): + pass + + with pytest.raises(TypeError, match="invalid value"): + with configured(remote_alloc_allowed_port_range="8000-9000"): # wrong separator + pass + + +def test_all_new_params_together(): + """Test setting all 29 new config parameters simultaneously.""" + with configured( + # Hyperactor timeouts and message handling + process_exit_timeout="20s", + message_ack_time_interval="2s", + message_ack_every_n_messages=500, + message_ttl_default=20, + split_max_buffer_size=2048, + split_max_buffer_age="100ms", + stop_actor_timeout="15s", + cleanup_timeout="25s", + remote_allocator_heartbeat_interval="10s", + default_encoding="serde_json", + channel_net_rx_buffer_full_check_interval="200ms", + message_latency_sampling_rate=0.5, + enable_client_seq_assignment=True, + # Mesh bootstrap config + mesh_bootstrap_enable_pdeathsig=False, + mesh_terminate_concurrency=16, + mesh_terminate_timeout="20s", + # Runtime and buffering + shared_asyncio_runtime=True, + small_write_threshold=512, + # Mesh config + max_cast_dimension_size=2048, + # Remote allocation + remote_alloc_bind_to_inaddr_any=True, + remote_alloc_bootstrap_addr="tcp://127.0.0.1:9000", + remote_alloc_allowed_port_range=(8000, 9000), + # Logging config + read_log_buffer=16384, + force_file_log=True, + prefix_with_rank=True, + # Proc mesh timeouts + actor_spawn_max_idle="45s", + get_actor_state_max_idle="90s", + # Host mesh timeouts + proc_stop_max_idle="45s", + get_proc_state_max_idle="90s", + ) as config: + # Verify all values are set correctly + assert config["process_exit_timeout"] == "20s" + assert config["message_ack_time_interval"] == "2s" + assert config["message_ack_every_n_messages"] == 500 + assert config["message_ttl_default"] == 20 + assert config["split_max_buffer_size"] == 2048 + assert config["split_max_buffer_age"] == "100ms" + assert config["stop_actor_timeout"] == "15s" + assert config["cleanup_timeout"] == "25s" + assert config["remote_allocator_heartbeat_interval"] == "10s" + assert config["default_encoding"] == "serde_json" + assert config["channel_net_rx_buffer_full_check_interval"] == "200ms" + assert config["message_latency_sampling_rate"] == pytest.approx(0.5, rel=1e-5) + assert config["enable_client_seq_assignment"] is True + assert config["mesh_bootstrap_enable_pdeathsig"] is False + assert config["mesh_terminate_concurrency"] == 16 + assert config["mesh_terminate_timeout"] == "20s" + assert config["shared_asyncio_runtime"] is True + assert config["small_write_threshold"] == 512 + assert config["max_cast_dimension_size"] == 2048 + assert config["remote_alloc_bind_to_inaddr_any"] is True + assert config["remote_alloc_bootstrap_addr"] == "tcp://127.0.0.1:9000" + assert config["remote_alloc_allowed_port_range"] == "8000..9000" + assert config["read_log_buffer"] == 16384 + assert config["force_file_log"] is True + assert config["prefix_with_rank"] is True + assert config["actor_spawn_max_idle"] == "45s" + assert config["get_actor_state_max_idle"] == "1m 30s" + assert config["proc_stop_max_idle"] == "45s" + assert config["get_proc_state_max_idle"] == "1m 30s" + + # Verify all values are restored to defaults + config = get_global_config() + assert config["process_exit_timeout"] == "10s" + assert config["message_ack_time_interval"] == "500ms" + assert config["message_ack_every_n_messages"] == 1000 + assert config["message_ttl_default"] == 64 + assert config["split_max_buffer_size"] == 5 + assert config["split_max_buffer_age"] == "50ms" + assert config["stop_actor_timeout"] == "10s" + assert config["cleanup_timeout"] == "3s" + assert config["remote_allocator_heartbeat_interval"] == "5m" + assert config["default_encoding"] == "serde_multipart" + assert config["channel_net_rx_buffer_full_check_interval"] == "5s" + assert config["message_latency_sampling_rate"] == pytest.approx(0.01, rel=1e-5) + assert config["enable_client_seq_assignment"] is False + assert config["mesh_bootstrap_enable_pdeathsig"] is True + assert config["mesh_terminate_concurrency"] == 16 + assert config["mesh_terminate_timeout"] == "10s" + assert config["shared_asyncio_runtime"] is False + assert config["small_write_threshold"] == 256 + # max_cast_dimension_size is usize::MAX, skip checking it + assert config["remote_alloc_bind_to_inaddr_any"] is False + # remote_alloc_bootstrap_addr and remote_alloc_allowed_port_range have no defaults + assert config["read_log_buffer"] == 100 + assert config["force_file_log"] is False + assert config["prefix_with_rank"] is True + assert config["actor_spawn_max_idle"] == "30s" + assert config["get_actor_state_max_idle"] == "1m" + assert config["proc_stop_max_idle"] == "30s" + assert config["get_proc_state_max_idle"] == "1m" + + +def test_new_params_type_errors(): + """Test that type errors are raised for incorrect parameter types.""" + # Duration param with wrong type + with pytest.raises(TypeError): + with configured(process_exit_timeout=30): # type: ignore + pass + + # Integer param with wrong type + with pytest.raises(TypeError): + with configured(message_ack_every_n_messages="100"): # type: ignore + pass + + # Boolean param with wrong type + with pytest.raises(TypeError): + with configured(enable_client_seq_assignment="true"): # type: ignore + pass + + # Float param with wrong type + with pytest.raises(TypeError): + with configured(message_latency_sampling_rate="0.5"): # type: ignore + pass