diff --git a/docs/source/api/monarch.config.rst b/docs/source/api/monarch.config.rst index 4419a34fd..3ba4ce39b 100644 --- a/docs/source/api/monarch.config.rst +++ b/docs/source/api/monarch.config.rst @@ -159,6 +159,62 @@ Timeouts spawn operation fails. This prevents hung or stuck process creation from waiting indefinitely. +``process_exit_timeout`` + Timeout for waiting on process exit during shutdown. + + - **Type**: ``str`` (duration format) + - **Default**: ``"10s"`` + - **Environment**: ``HYPERACTOR_PROCESS_EXIT_TIMEOUT`` + +``stop_actor_timeout`` + Timeout for gracefully stopping actors. + + - **Type**: ``str`` (duration format) + - **Default**: ``"10s"`` + - **Environment**: ``HYPERACTOR_STOP_ACTOR_TIMEOUT`` + +``cleanup_timeout`` + Timeout for cleanup operations during shutdown. + + - **Type**: ``str`` (duration format) + - **Default**: ``"3s"`` + - **Environment**: ``HYPERACTOR_CLEANUP_TIMEOUT`` + +``actor_spawn_max_idle`` + Maximum idle time between updates while spawning actors in a proc mesh. + + - **Type**: ``str`` (duration format) + - **Default**: ``"30s"`` + - **Environment**: ``HYPERACTOR_MESH_ACTOR_SPAWN_MAX_IDLE`` + +``get_actor_state_max_idle`` + Maximum idle time for actor state queries. + + - **Type**: ``str`` (duration format) + - **Default**: ``"1m"`` + - **Environment**: ``HYPERACTOR_MESH_GET_ACTOR_STATE_MAX_IDLE`` + +``proc_stop_max_idle`` + Maximum idle time between updates while stopping procs. + + - **Type**: ``str`` (duration format) + - **Default**: ``"30s"`` + - **Environment**: ``HYPERACTOR_MESH_PROC_STOP_MAX_IDLE`` + +``get_proc_state_max_idle`` + Maximum idle time for proc state queries. + + - **Type**: ``str`` (duration format) + - **Default**: ``"1m"`` + - **Environment**: ``HYPERACTOR_MESH_GET_PROC_STATE_MAX_IDLE`` + +``mesh_terminate_timeout`` + Timeout per child during graceful mesh termination. + + - **Type**: ``str`` (duration format) + - **Default**: ``"10s"`` + - **Environment**: ``HYPERACTOR_MESH_TERMINATE_TIMEOUT`` + Logging ------- @@ -212,6 +268,198 @@ Logging with configured(tail_log_lines=100): mesh = this_host().spawn_procs(per_host={"workers": 4}) +``read_log_buffer`` + Buffer size for reading logs (in bytes). + + - **Type**: ``int`` + - **Default**: ``100`` + - **Environment**: ``HYPERACTOR_READ_LOG_BUFFER`` + +``force_file_log`` + Force file-based logging regardless of environment. + + - **Type**: ``bool`` + - **Default**: ``False`` + - **Environment**: ``HYPERACTOR_FORCE_FILE_LOG`` + +``prefix_with_rank`` + Prefix log lines with rank information. + + - **Type**: ``bool`` + - **Default**: ``True`` + - **Environment**: ``HYPERACTOR_PREFIX_WITH_RANK`` + + +Message Handling +---------------- + +``message_ack_time_interval`` + Time interval for message acknowledgments. + + - **Type**: ``str`` (duration format) + - **Default**: ``"500ms"`` + - **Environment**: ``HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL`` + +``message_ack_every_n_messages`` + Acknowledge every N messages. + + - **Type**: ``int`` + - **Default**: ``1000`` + - **Environment**: ``HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES`` + +``message_ttl_default`` + Default message time-to-live (number of hops). + + - **Type**: ``int`` + - **Default**: ``64`` + - **Environment**: ``HYPERACTOR_MESSAGE_TTL_DEFAULT`` + +``split_max_buffer_size`` + Maximum buffer size for message splitting (number of fragments). + + - **Type**: ``int`` + - **Default**: ``5`` + - **Environment**: ``HYPERACTOR_SPLIT_MAX_BUFFER_SIZE`` + +``split_max_buffer_age`` + Maximum age for split message buffers. + + - **Type**: ``str`` (duration format) + - **Default**: ``"50ms"`` + - **Environment**: ``HYPERACTOR_SPLIT_MAX_BUFFER_AGE`` + +``channel_net_rx_buffer_full_check_interval`` + Network receive buffer check interval. + + - **Type**: ``str`` (duration format) + - **Default**: ``"5s"`` + - **Environment**: ``HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL`` + +``message_latency_sampling_rate`` + Sampling rate for message latency tracking (0.0 to 1.0). + + - **Type**: ``float`` + - **Default**: ``0.01`` + - **Environment**: ``HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE`` + + A value of ``0.01`` means 1% of messages are sampled. Use ``1.0`` for + 100% sampling (all messages) or ``0.0`` to disable sampling. + +``enable_client_seq_assignment`` + Enable client-side sequence assignment for messages. + + - **Type**: ``bool`` + - **Default**: ``False`` + - **Environment**: ``HYPERACTOR_ENABLE_CLIENT_SEQ_ASSIGNMENT`` + + +Message Encoding +---------------- + +``default_encoding`` + Default message encoding format. + + - **Type**: ``str`` + - **Default**: ``"serde_multipart"`` + - **Environment**: ``HYPERACTOR_DEFAULT_ENCODING`` + + Supported values: + + - ``"bincode"`` - Binary encoding + - ``"serde_json"`` - JSON encoding + - ``"serde_multipart"`` - Multipart encoding (default) + + +Mesh Bootstrap +-------------- + +``mesh_bootstrap_enable_pdeathsig`` + Enable parent-death signal for spawned processes. + + - **Type**: ``bool`` + - **Default**: ``True`` + - **Environment**: ``HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG`` + + When ``True``, child processes receive SIGTERM if their parent dies, + preventing orphaned processes. + +``mesh_terminate_concurrency`` + Maximum concurrent terminations during mesh shutdown. + + - **Type**: ``int`` + - **Default**: ``16`` + - **Environment**: ``HYPERACTOR_MESH_TERMINATE_CONCURRENCY`` + + +Runtime and Buffering +---------------------- + +``shared_asyncio_runtime`` + Share asyncio runtime across actors. + + - **Type**: ``bool`` + - **Default**: ``False`` + - **Environment**: ``MONARCH_HYPERACTOR_SHARED_ASYNCIO_RUNTIME`` + +``small_write_threshold`` + Threshold below which writes are copied (in bytes). + + - **Type**: ``int`` + - **Default**: ``256`` + - **Environment**: ``MONARCH_HYPERACTOR_SMALL_WRITE_THRESHOLD`` + + Writes smaller than this threshold are copied into a contiguous buffer. + Writes at or above this size are stored as zero-copy references. + + +Mesh Configuration +------------------ + +``max_cast_dimension_size`` + Maximum dimension size for cast operations. + + - **Type**: ``int`` + - **Default**: ``usize::MAX`` (platform-dependent) + - **Environment**: ``HYPERACTOR_MESH_MAX_CAST_DIMENSION_SIZE`` + + +Remote Allocation +----------------- + +``remote_allocator_heartbeat_interval`` + Heartbeat interval for remote allocator. + + - **Type**: ``str`` (duration format) + - **Default**: ``"5m"`` + - **Environment**: ``HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL`` + +``remote_alloc_bind_to_inaddr_any`` + Bind remote allocators to INADDR_ANY (0.0.0.0). + + - **Type**: ``bool`` + - **Default**: ``False`` + - **Environment**: ``HYPERACTOR_REMOTE_ALLOC_BIND_TO_INADDR_ANY`` + +``remote_alloc_bootstrap_addr`` + Bootstrap address for remote allocators. + + - **Type**: ``str`` + - **Default**: None (no default) + - **Environment**: ``HYPERACTOR_REMOTE_ALLOC_BOOTSTRAP_ADDR`` + + Example: ``"tcp://127.0.0.1:9000"`` + +``remote_alloc_allowed_port_range`` + Allowed port range for remote allocators. + + - **Type**: ``str`` or ``tuple[int, int]`` + - **Default**: None (no default) + - **Environment**: ``HYPERACTOR_REMOTE_ALLOC_ALLOWED_PORT_RANGE`` + + Can be specified as a string (``"8000..9000"``) or tuple (``(8000, + 9000)``). + + Validation and Error Handling ----------------------------- @@ -219,12 +467,14 @@ Validation and Error Handling * Unknown keys raise ``ValueError``. * Type mismatches raise ``TypeError`` (for example, passing a string instead - of ``ChannelTransport`` for ``default_transport`` or a non-bool to logging - flags). + of ``ChannelTransport`` for ``default_transport``, a non-bool to logging + flags, or an integer instead of a string for duration parameters). +* Invalid values raise ``TypeError`` (for example, invalid encoding names, + invalid port ranges, or malformed duration strings). * Duration strings must follow `humantime `_ syntax; - invalid strings or non-string values trigger ``TypeError`` with a message - that highlights the bad value. + invalid strings trigger ``TypeError`` with a message that highlights the + bad value. Normalization ~~~~~~~~~~~~~ diff --git a/hyperactor/src/config.rs b/hyperactor/src/config.rs index 3de17fa99..ff628edcd 100644 --- a/hyperactor/src/config.rs +++ b/hyperactor/src/config.rs @@ -37,49 +37,49 @@ declare_attrs! { /// Timeout used by allocator for stopping a proc. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_PROCESS_EXIT_TIMEOUT".to_string()), - py_name: None, + py_name: Some("process_exit_timeout".to_string()), }) pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10); /// Message acknowledgment interval @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESSAGE_ACK_TIME_INTERVAL".to_string()), - py_name: None, + py_name: Some("message_ack_time_interval".to_string()), }) pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500); /// Number of messages after which to send an acknowledgment @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESSAGE_ACK_EVERY_N_MESSAGES".to_string()), - py_name: None, + py_name: Some("message_ack_every_n_messages".to_string()), }) pub attr MESSAGE_ACK_EVERY_N_MESSAGES: u64 = 1000; /// Default hop Time-To-Live for message envelopes. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESSAGE_TTL_DEFAULT".to_string()), - py_name: None, + py_name: Some("message_ttl_default".to_string()), }) pub attr MESSAGE_TTL_DEFAULT : u8 = 64; /// Maximum buffer size for split port messages @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE".to_string()), - py_name: None, + py_name: Some("split_max_buffer_size".to_string()), }) pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5; /// The maximum time an update can be buffered before being reduced. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_SPLIT_MAX_BUFFER_AGE".to_string()), - py_name: None, + py_name: Some("split_max_buffer_age".to_string()), }) pub attr SPLIT_MAX_BUFFER_AGE: Duration = Duration::from_millis(50); /// Timeout used by proc mesh for stopping an actor. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_STOP_ACTOR_TIMEOUT".to_string()), - py_name: None, + py_name: Some("stop_actor_timeout".to_string()), }) pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(10); @@ -87,7 +87,7 @@ declare_attrs! { /// Should be less than the timeout for STOP_ACTOR_TIMEOUT. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_CLEANUP_TIMEOUT".to_string()), - py_name: None, + py_name: Some("cleanup_timeout".to_string()), }) pub attr CLEANUP_TIMEOUT: Duration = Duration::from_secs(3); @@ -96,21 +96,21 @@ declare_attrs! { /// deprecation. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL".to_string()), - py_name: None, + py_name: Some("remote_allocator_heartbeat_interval".to_string()), }) pub attr REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL: Duration = Duration::from_mins(5); /// The default encoding to be used. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_DEFAULT_ENCODING".to_string()), - py_name: None, + py_name: Some("default_encoding".to_string()), }) pub attr DEFAULT_ENCODING: Encoding = Encoding::Multipart; /// How often to check for full MPSC channel on NetRx. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()), - py_name: None, + py_name: Some("channel_net_rx_buffer_full_check_interval".to_string()), }) pub attr CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL: Duration = Duration::from_secs(5); @@ -118,11 +118,15 @@ declare_attrs! { /// Set to 0.01 for 1% sampling, 0.1 for 10% sampling, 0.90 for 90% sampling, etc. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE".to_string()), - py_name: None, + py_name: Some("message_latency_sampling_rate".to_string()), }) pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01; /// Whether to enable client sequence assignment. + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_ENABLE_CLIENT_SEQ_ASSIGNMENT".to_string()), + py_name: Some("enable_client_seq_assignment".to_string()), + }) pub attr ENABLE_CLIENT_SEQ_ASSIGNMENT: bool = false; /// Timeout for [`Host::spawn`] to await proc readiness. diff --git a/hyperactor_mesh/src/alloc.rs b/hyperactor_mesh/src/alloc.rs index 030574385..a4c3eb340 100644 --- a/hyperactor_mesh/src/alloc.rs +++ b/hyperactor_mesh/src/alloc.rs @@ -70,7 +70,7 @@ declare_attrs! { /// us to bind the port to the host's public IP address. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_REMOTE_ALLOC_BIND_TO_INADDR_ANY".to_string()), - py_name: None, + py_name: Some("remote_alloc_bind_to_inaddr_any".to_string()), }) pub attr REMOTE_ALLOC_BIND_TO_INADDR_ANY: bool = false; @@ -89,7 +89,7 @@ declare_attrs! { // TODO: remove this env var, and make it part of alloc spec instead. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_REMOTE_ALLOC_BOOTSTRAP_ADDR".to_string()), - py_name: None, + py_name: Some("remote_alloc_bootstrap_addr".to_string()), }) pub attr REMOTE_ALLOC_BOOTSTRAP_ADDR: String; @@ -104,7 +104,7 @@ declare_attrs! { /// ports are allowed to be used. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_REMOTE_ALLOC_ALLOWED_PORT_RANGE".to_string()), - py_name: None, + py_name: Some("remote_alloc_allowed_port_range".to_string()), }) pub attr REMOTE_ALLOC_ALLOWED_PORT_RANGE: Range; } diff --git a/hyperactor_mesh/src/bootstrap.rs b/hyperactor_mesh/src/bootstrap.rs index 182159c4b..f55e7b520 100644 --- a/hyperactor_mesh/src/bootstrap.rs +++ b/hyperactor_mesh/src/bootstrap.rs @@ -144,7 +144,7 @@ declare_attrs! { /// "false")`. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESH_BOOTSTRAP_ENABLE_PDEATHSIG".to_string()), - py_name: None, + py_name: Some("mesh_bootstrap_enable_pdeathsig".to_string()), }) pub attr MESH_BOOTSTRAP_ENABLE_PDEATHSIG: bool = true; @@ -154,7 +154,7 @@ declare_attrs! { /// file descriptor load). @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESH_TERMINATE_CONCURRENCY".to_string()), - py_name: None, + py_name: Some("mesh_terminate_concurrency".to_string()), }) pub attr MESH_TERMINATE_CONCURRENCY: usize = 16; @@ -163,7 +163,7 @@ declare_attrs! { /// the child to exit before escalating to SIGKILL. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESH_TERMINATE_TIMEOUT".to_string()), - py_name: None, + py_name: Some("mesh_terminate_timeout".to_string()), }) pub attr MESH_TERMINATE_TIMEOUT: Duration = Duration::from_secs(10); } diff --git a/hyperactor_mesh/src/config.rs b/hyperactor_mesh/src/config.rs index 6db024027..94c1a93f7 100644 --- a/hyperactor_mesh/src/config.rs +++ b/hyperactor_mesh/src/config.rs @@ -23,7 +23,7 @@ declare_attrs! { /// the limit so no dimension needs to be folded. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESH_MAX_CAST_DIMENSION_SIZE".to_string()), - py_name: None, + py_name: Some("max_cast_dimension_size".to_string()), }) pub attr MAX_CAST_DIMENSION_SIZE: usize = usize::MAX; } diff --git a/hyperactor_mesh/src/logging.rs b/hyperactor_mesh/src/logging.rs index 587bd402a..c54bfe365 100644 --- a/hyperactor_mesh/src/logging.rs +++ b/hyperactor_mesh/src/logging.rs @@ -74,21 +74,21 @@ declare_attrs! { /// After pause lines will be flushed and reading will resume. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_READ_LOG_BUFFER".to_string()), - py_name: None, + py_name: Some("read_log_buffer".to_string()), }) pub attr READ_LOG_BUFFER: usize = 100; /// If enabled, local logs are also written to a file and aggregated @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_FORCE_FILE_LOG".to_string()), - py_name: None, + py_name: Some("force_file_log".to_string()), }) pub attr FORCE_FILE_LOG: bool = false; /// Prefixes logs with rank @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_PREFIX_WITH_RANK".to_string()), - py_name: None, + py_name: Some("prefix_with_rank".to_string()), }) pub attr PREFIX_WITH_RANK: bool = true; } diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index a70376094..ea2f0e19a 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -82,13 +82,13 @@ declare_attrs! { /// meshes. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESH_PROC_STOP_MAX_IDLE".to_string()), - py_name: None, + py_name: Some("proc_stop_max_idle".to_string()), }) pub attr PROC_STOP_MAX_IDLE: Duration = Duration::from_secs(30); @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESH_GET_PROC_STATE_MAX_IDLE".to_string()), - py_name: None, + py_name: Some("get_proc_state_max_idle".to_string()), }) pub attr GET_PROC_STATE_MAX_IDLE: Duration = Duration::from_mins(1); } diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index c51a186bb..c0bf8859f 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -84,13 +84,13 @@ declare_attrs! { /// meshes. @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESH_ACTOR_SPAWN_MAX_IDLE".to_string()), - py_name: None, + py_name: Some("actor_spawn_max_idle".to_string()), }) pub attr ACTOR_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30); @meta(CONFIG = ConfigAttr { env_name: Some("HYPERACTOR_MESH_GET_ACTOR_STATE_MAX_IDLE".to_string()), - py_name: None, + py_name: Some("get_actor_state_max_idle".to_string()), }) pub attr GET_ACTOR_STATE_MAX_IDLE: Duration = Duration::from_mins(1); } diff --git a/monarch_hyperactor/src/buffers.rs b/monarch_hyperactor/src/buffers.rs index 4b869a6b5..b8669c084 100644 --- a/monarch_hyperactor/src/buffers.rs +++ b/monarch_hyperactor/src/buffers.rs @@ -31,7 +31,7 @@ declare_attrs! { /// Writes >= this size are stored as zero-copy references. @meta(CONFIG = ConfigAttr { env_name: Some("MONARCH_HYPERACTOR_SMALL_WRITE_THRESHOLD".to_string()), - py_name: None, + py_name: Some("small_write_threshold".to_string()), }) pub attr SMALL_WRITE_THRESHOLD: usize = 256; } diff --git a/monarch_hyperactor/src/config.rs b/monarch_hyperactor/src/config.rs index 84daf8282..503d68e4e 100644 --- a/monarch_hyperactor/src/config.rs +++ b/monarch_hyperactor/src/config.rs @@ -38,6 +38,125 @@ use pyo3::prelude::*; use crate::channel::PyBindSpec; +/// Python wrapper for Encoding enum. +/// +/// This type bridges between Python strings (e.g., "bincode", +/// "serde_json", "serde_multipart") and Rust's +/// `hyperactor::data::Encoding` enum. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct PyEncoding(pub hyperactor::data::Encoding); + +impl From for hyperactor::data::Encoding { + fn from(e: PyEncoding) -> Self { + e.0 + } +} + +impl From for PyEncoding { + fn from(e: hyperactor::data::Encoding) -> Self { + PyEncoding(e) + } +} + +impl<'py> FromPyObject<'py> for PyEncoding { + fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { + let s: String = ob.extract()?; + let encoding = s.parse::().map_err(|e| { + PyValueError::new_err(format!( + "Invalid encoding '{}': {}. Valid values: bincode, serde_json, serde_multipart", + s, e + )) + })?; + Ok(PyEncoding(encoding)) + } +} + +impl<'py> IntoPyObject<'py> for PyEncoding { + type Target = PyAny; + type Output = Bound<'py, Self::Target>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> Result { + let formatted = self.0.to_string(); + formatted.into_bound_py_any(py) + } +} + +/// Python wrapper for Range, using tuple or string format. +/// +/// This type bridges between Python and Rust's +/// `std::ops::Range`. +/// Accepts either: +/// - Tuple: `(8000, 9000)` +/// - String: `"8000..9000"` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PyPortRange(pub std::ops::Range); + +impl From for std::ops::Range { + fn from(r: PyPortRange) -> Self { + r.0 + } +} + +impl From> for PyPortRange { + fn from(r: std::ops::Range) -> Self { + PyPortRange(r) + } +} + +impl<'py> FromPyObject<'py> for PyPortRange { + fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { + // Try tuple format first: (start, end) + if let Ok((start, end)) = ob.extract::<(u16, u16)>() { + if start >= end { + return Err(PyValueError::new_err(format!( + "Invalid port range ({}, {}): start must be less than end", + start, end + ))); + } + return Ok(PyPortRange(start..end)); + } + + // Fall back to string format: "start..end" + let s: String = ob.extract().map_err(|_| { + PyTypeError::new_err( + "Port range must be either a tuple (start, end) or string 'start..end'", + ) + })?; + let parts: Vec<&str> = s.split("..").collect(); + if parts.len() != 2 { + return Err(PyValueError::new_err(format!( + "Invalid port range format '{}': expected 'start..end'", + s + ))); + } + let start = parts[0].parse::().map_err(|e| { + PyValueError::new_err(format!("Invalid start port '{}': {}", parts[0], e)) + })?; + let end = parts[1].parse::().map_err(|e| { + PyValueError::new_err(format!("Invalid end port '{}': {}", parts[1], e)) + })?; + if start >= end { + return Err(PyValueError::new_err(format!( + "Invalid port range '{}': start must be less than end", + s + ))); + } + Ok(PyPortRange(start..end)) + } +} + +impl<'py> IntoPyObject<'py> for PyPortRange { + type Target = PyAny; + type Output = Bound<'py, Self::Target>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> Result { + let formatted = format!("{}..{}", self.0.start, self.0.end); + formatted.into_bound_py_any(py) + } +} + /// Python wrapper for Duration, using humantime format strings. /// /// This type bridges between Python strings (e.g., "30s", "5m") and @@ -82,6 +201,10 @@ impl<'py> IntoPyObject<'py> for PyDuration { // Declare monarch-specific configuration keys declare_attrs! { /// Use a single asyncio runtime for all Python actors, rather than one per actor + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_SHARED_ASYNCIO_RUNTIME".to_string()), + py_name: Some("shared_asyncio_runtime".to_string()), + }) pub attr SHARED_ASYNCIO_RUNTIME: bool = false; } @@ -118,8 +241,9 @@ static KEY_BY_NAME: std::sync::LazyLock` and use it to get/set values in the global configl +/// Map from typehash to an info struct that can be used to downcast +/// an `ErasedKey` to a concrete `Key` and use it to get/set values +/// in the global configl static TYPEHASH_TO_INFO: std::sync::LazyLock> = std::sync::LazyLock::new(|| { inventory::iter::() @@ -163,8 +287,8 @@ where /// /// This mirrors [`get_global_config_py`] but restricts the lookup to /// the `Source::Runtime` layer (ignoring -/// TestOverride/Env/File/defaults). If the key has a runtime -/// override, it is cloned as `T`, converted to `P`, then to a +/// TestOverride/Env/File/ClientOverride/defaults). If the key has a +/// runtime override, it is cloned as `T`, converted to `P`, then to a /// `PyObject`; otherwise `Ok(None)` is returned. fn get_runtime_config_py<'py, P, T>( py: Python<'py>, @@ -190,7 +314,7 @@ where /// This is the write-path for the "Python configuration layer": it /// takes a typed key/value and merges it into `Source::Runtime` via /// `create_or_merge`. No other layers -/// (Env/File/TestOverride/Defaults) are affected. +/// (Env/File/TestOverride/ClientOverride/Defaults) are affected. fn set_runtime_config_py( key: &'static dyn ErasedKey, value: T, @@ -285,14 +409,13 @@ struct PythonConfigTypeInfo { // `TYPEHASH_TO_INFO` via `inventory::iter()`. inventory::collect!(PythonConfigTypeInfo); -/// Macro to declare that keys of this type can be configured -/// from python using `monarch.configure(...)`. For types -/// like `String` that are convertible directly to/from PyObjects, -/// you can just use `declare_py_config_type!(String)`. For types -/// that must first be converted to/from a rust python wrapper -/// (e.g., keys with type `BindSpec` must use `PyBindSpec` -/// as an intermediate step), the usage is -/// `declare_py_config_type!(PyBindSpec as BindSpec)`. +/// Macro to declare that keys of this type can be configured from +/// python using `monarch.configure(...)`. For types like `String` +/// that are convertible directly to/from PyObjects, you can just use +/// `declare_py_config_type!(String)`. For types that must first be +/// converted to/from a rust python wrapper (e.g., keys with type +/// `BindSpec` must use `PyBindSpec` as an intermediate step), the +/// usage is `declare_py_config_type!(PyBindSpec as BindSpec)`. macro_rules! declare_py_config_type { ($($ty:ty),+ $(,)?) => { hyperactor::paste! { @@ -344,6 +467,8 @@ macro_rules! declare_py_config_type { declare_py_config_type!(PyBindSpec as BindSpec); declare_py_config_type!(PyDuration as Duration); +declare_py_config_type!(PyEncoding as hyperactor::data::Encoding); +declare_py_config_type!(PyPortRange as std::ops::Range::); declare_py_config_type!( i8, i16, i32, i64, u8, u16, u32, u64, usize, f32, f64, bool, String ); @@ -514,3 +639,222 @@ pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> { Ok(()) } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use pyo3::prelude::*; + use pyo3::types::PyDict; + use pyo3::types::PyString; + use pyo3::types::PyTuple; + + use super::*; + + #[test] + fn test_pyduration_parse_valid_formats() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + // Test various valid duration formats + let s = PyString::new(py, "30s"); + let d: PyDuration = s.extract().unwrap(); + assert_eq!(d.0, Duration::from_secs(30)); + + let s = PyString::new(py, "5m"); + let d: PyDuration = s.extract().unwrap(); + assert_eq!(d.0, Duration::from_mins(5)); + + let s = PyString::new(py, "1h"); + let d: PyDuration = s.extract().unwrap(); + assert_eq!(d.0, Duration::from_secs(3600)); + + let s = PyString::new(py, "500ms"); + let d: PyDuration = s.extract().unwrap(); + assert_eq!(d.0, Duration::from_millis(500)); + + let s = PyString::new(py, "1m 30s"); + let d: PyDuration = s.extract().unwrap(); + assert_eq!(d.0, Duration::from_secs(90)); + }); + } + + #[test] + fn test_pyduration_parse_invalid_format() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let s = PyString::new(py, "invalid"); + let result: PyResult = s.extract(); + assert!(result.is_err()); + let err_msg = format!("{}", result.unwrap_err()); + assert!(err_msg.contains("Invalid duration format")); + }); + } + + #[test] + fn test_pyduration_roundtrip() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let original = Duration::from_secs(42); + let py_duration = PyDuration(original); + let py_obj = py_duration.into_pyobject(py).unwrap(); + let back: PyDuration = py_obj.extract().unwrap(); + assert_eq!(back.0, original); + }); + } + + #[test] + fn test_pyencoding_parse_valid_values() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let s = PyString::new(py, "bincode"); + let e: PyEncoding = s.extract().unwrap(); + assert_eq!(e.0, hyperactor::data::Encoding::Bincode); + + let s = PyString::new(py, "serde_json"); + let e: PyEncoding = s.extract().unwrap(); + assert_eq!(e.0, hyperactor::data::Encoding::Json); + + let s = PyString::new(py, "serde_multipart"); + let e: PyEncoding = s.extract().unwrap(); + assert_eq!(e.0, hyperactor::data::Encoding::Multipart); + }); + } + + #[test] + fn test_pyencoding_parse_invalid_value() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let s = PyString::new(py, "invalid_encoding"); + let result: PyResult = s.extract(); + assert!(result.is_err()); + let err_msg = format!("{}", result.unwrap_err()); + assert!(err_msg.contains("Invalid encoding")); + assert!(err_msg.contains("Valid values")); + }); + } + + #[test] + fn test_pyencoding_roundtrip() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let original = hyperactor::data::Encoding::Multipart; + let py_encoding = PyEncoding(original); + let py_obj = py_encoding.into_pyobject(py).unwrap(); + let back: PyEncoding = py_obj.extract().unwrap(); + assert_eq!(back.0, original); + }); + } + + #[test] + fn test_pyportrange_parse_tuple_format() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let tuple = PyTuple::new(py, [8000u16, 9000u16]).unwrap(); + let r: PyPortRange = tuple.extract().unwrap(); + assert_eq!(r.0.start, 8000); + assert_eq!(r.0.end, 9000); + }); + } + + #[test] + fn test_pyportrange_parse_string_format() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let s = PyString::new(py, "8000..9000"); + let r: PyPortRange = s.extract().unwrap(); + assert_eq!(r.0.start, 8000); + assert_eq!(r.0.end, 9000); + }); + } + + #[test] + fn test_pyportrange_reject_invalid_string_format() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + // Missing ".." + let s = PyString::new(py, "8000-9000"); + let result: PyResult = s.extract(); + assert!(result.is_err()); + + // Not numbers + let s = PyString::new(py, "abc..def"); + let result: PyResult = s.extract(); + assert!(result.is_err()); + + // Too many parts + let s = PyString::new(py, "8000..9000..10000"); + let result: PyResult = s.extract(); + assert!(result.is_err()); + }); + } + + #[test] + fn test_pyportrange_reject_invalid_ranges() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + // start >= end (tuple format) + let tuple = PyTuple::new(py, [9000u16, 8000u16]).unwrap(); + let result: PyResult = tuple.extract(); + assert!(result.is_err()); + let err_msg = format!("{}", result.unwrap_err()); + assert!(err_msg.contains("start must be less than end")); + + // start == end (tuple format) + let tuple = PyTuple::new(py, [8000u16, 8000u16]).unwrap(); + let result: PyResult = tuple.extract(); + assert!(result.is_err()); + + // start >= end (string format) + let s = PyString::new(py, "9000..8000"); + let result: PyResult = s.extract(); + assert!(result.is_err()); + let err_msg = format!("{}", result.unwrap_err()); + assert!(err_msg.contains("start must be less than end")); + }); + } + + #[test] + fn test_pyportrange_roundtrip() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + let original = 8000..9000; + let py_range = PyPortRange(original.clone()); + let py_obj = py_range.into_pyobject(py).unwrap(); + let s: String = py_obj.extract().unwrap(); + assert_eq!(s, "8000..9000"); + + // Parse back + let back: PyPortRange = py_obj.extract().unwrap(); + assert_eq!(back.0, original); + }); + } + + #[test] + fn test_pyportrange_accepts_both_formats() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { + // Create a dict with both formats + let dict = PyDict::new(py); + dict.set_item("tuple_format", (8000u16, 9000u16)).unwrap(); + dict.set_item("string_format", "8000..9000").unwrap(); + + // Both should parse to the same range + let r1: PyPortRange = dict + .get_item("tuple_format") + .unwrap() + .unwrap() + .extract() + .unwrap(); + let r2: PyPortRange = dict + .get_item("string_format") + .unwrap() + .unwrap() + .extract() + .unwrap(); + + assert_eq!(r1.0, r2.0); + assert_eq!(r1.0.start, 8000); + assert_eq!(r1.0.end, 9000); + }); + } +} diff --git a/python/monarch/_rust_bindings/monarch_hyperactor/config.pyi b/python/monarch/_rust_bindings/monarch_hyperactor/config.pyi index b384b5cc8..1f1f4f55f 100644 --- a/python/monarch/_rust_bindings/monarch_hyperactor/config.pyi +++ b/python/monarch/_rust_bindings/monarch_hyperactor/config.pyi @@ -40,36 +40,123 @@ def configure( message_delivery_timeout: str = ..., host_spawn_ready_timeout: str = ..., mesh_proc_spawn_max_idle: str = ..., + process_exit_timeout: str = ..., + message_ack_time_interval: str = ..., + message_ack_every_n_messages: int = ..., + message_ttl_default: int = ..., + split_max_buffer_size: int = ..., + split_max_buffer_age: str = ..., + stop_actor_timeout: str = ..., + cleanup_timeout: str = ..., + remote_allocator_heartbeat_interval: str = ..., + default_encoding: str = ..., + channel_net_rx_buffer_full_check_interval: str = ..., + message_latency_sampling_rate: float = ..., + enable_client_seq_assignment: bool = ..., + mesh_bootstrap_enable_pdeathsig: bool = ..., + mesh_terminate_concurrency: int = ..., + mesh_terminate_timeout: str = ..., + shared_asyncio_runtime: bool = ..., + small_write_threshold: int = ..., + max_cast_dimension_size: int = ..., + remote_alloc_bind_to_inaddr_any: bool = ..., + remote_alloc_bootstrap_addr: str = ..., + remote_alloc_allowed_port_range: str | tuple[int, int] = ..., + read_log_buffer: int = ..., + force_file_log: bool = ..., + prefix_with_rank: bool = ..., + actor_spawn_max_idle: str = ..., + get_actor_state_max_idle: str = ..., + proc_stop_max_idle: str = ..., + get_proc_state_max_idle: str = ..., **kwargs: object, ) -> None: """Configure Hyperactor runtime defaults for this process. - This updates the **runtime** configuration layer from Python, - setting the default channel transport and optional logging - behaviour (forwarding, file capture, and how many lines to tail), - plus any additional CONFIG-marked keys passed via **kwargs. + This updates the **Runtime** configuration layer from Python, + setting transports, logging behavior, timeouts, and other runtime + parameters. + + All duration parameters accept humantime strings like "30s", "5m", + "2h", or "1h 30m". + + For complete parameter documentation, see the Python wrapper + `monarch.config.configure()` which provides the same interface + with detailed descriptions of all 37 configuration parameters + organized into logical categories (transport, logging, message + handling, mesh bootstrap, allocation, proc/host mesh timeouts, + etc.). Args: - default_transport: Default channel transport for communication. Can be: - - A ChannelTransport enum value (e.g., ChannelTransport.Unix) - - A explicit address string in the ZMQ-style URL format (e.g., "tcp://127.0.0.1:8080") - enable_log_forwarding: Whether to forward logs from actors - enable_file_capture: Whether to capture file output + default_transport: Default channel transport. Can be + ChannelTransport enum (e.g., ChannelTransport.Unix) or + explicit address string in ZMQ-style URL format (e.g., + "tcp://127.0.0.1:8080") + enable_log_forwarding: Forward logs from actors + enable_file_capture: Capture file output tail_log_lines: Number of log lines to tail codec_max_frame_length: Maximum frame length for codec (bytes) - message_delivery_timeout: Timeout for message delivery (e.g., "30s", "5m") - host_spawn_ready_timeout: Timeout for host spawn readiness (e.g., "30s") - mesh_proc_spawn_max_idle: Maximum idle time for spawning procs (e.g., "30s") - **kwargs: Additional configuration keys - - Duration values should use humantime format strings: - - "30s" for 30 seconds - - "5m" for 5 minutes - - "2h" for 2 hours - - "1h 30m" for 1 hour 30 minutes + message_delivery_timeout: Timeout for message delivery + (humantime) + host_spawn_ready_timeout: Timeout for host spawn readiness + (humantime) + mesh_proc_spawn_max_idle: Maximum idle time for spawning procs + (humantime) + process_exit_timeout: Timeout for process exit (humantime) + message_ack_time_interval: Time interval for message + acknowledgments (humantime) + message_ack_every_n_messages: Acknowledge every N messages + message_ttl_default: Default message time-to-live + split_max_buffer_size: Maximum buffer size for message splitting + (bytes) + split_max_buffer_age: Maximum age for split message buffers + (humantime) + stop_actor_timeout: Timeout for stopping actors (humantime) + cleanup_timeout: Timeout for cleanup operations (humantime) + remote_allocator_heartbeat_interval: Heartbeat interval for + remote allocator (humantime) + default_encoding: Default message encoding ("bincode", + "serde_json", or "serde_multipart") + channel_net_rx_buffer_full_check_interval: Network receive buffer + check interval (humantime) + message_latency_sampling_rate: Sampling rate for message latency + (0.0 to 1.0) + enable_client_seq_assignment: Enable client-side sequence + assignment + mesh_bootstrap_enable_pdeathsig: Enable parent-death signal for + spawned processes + mesh_terminate_concurrency: Maximum concurrent terminations + during shutdown + mesh_terminate_timeout: Timeout per child during graceful + termination (humantime) + shared_asyncio_runtime: Share asyncio runtime across actors + small_write_threshold: Threshold below which writes are copied + (bytes) + max_cast_dimension_size: Maximum dimension size for cast + operations + remote_alloc_bind_to_inaddr_any: Bind remote allocators to + INADDR_ANY + remote_alloc_bootstrap_addr: Bootstrap address for remote + allocators + remote_alloc_allowed_port_range: Allowed port range as + "start..end" or (start, end) tuple + read_log_buffer: Buffer size for reading logs (bytes) + force_file_log: Force file-based logging regardless of + environment + prefix_with_rank: Prefix log lines with rank information + actor_spawn_max_idle: Maximum idle time while spawning actors + (humantime) + get_actor_state_max_idle: Maximum idle time for actor state + queries (humantime) + proc_stop_max_idle: Maximum idle time while stopping procs + (humantime) + get_proc_state_max_idle: Maximum idle time for proc state queries + (humantime) + **kwargs: Reserved for future configuration keys Historically this API is named ``configure(...)``; conceptually it acts as "set runtime config for this process". + """ ... diff --git a/python/monarch/config/__init__.py b/python/monarch/config/__init__.py index 9ca3de06a..1998f9410 100644 --- a/python/monarch/config/__init__.py +++ b/python/monarch/config/__init__.py @@ -44,25 +44,109 @@ def configure( message_delivery_timeout: str | None = None, host_spawn_ready_timeout: str | None = None, mesh_proc_spawn_max_idle: str | None = None, + process_exit_timeout: str | None = None, + message_ack_time_interval: str | None = None, + message_ack_every_n_messages: int | None = None, + message_ttl_default: int | None = None, + split_max_buffer_size: int | None = None, + split_max_buffer_age: str | None = None, + stop_actor_timeout: str | None = None, + cleanup_timeout: str | None = None, + remote_allocator_heartbeat_interval: str | None = None, + default_encoding: str | None = None, + channel_net_rx_buffer_full_check_interval: str | None = None, + message_latency_sampling_rate: float | None = None, + enable_client_seq_assignment: bool | None = None, + mesh_bootstrap_enable_pdeathsig: bool | None = None, + mesh_terminate_concurrency: int | None = None, + mesh_terminate_timeout: str | None = None, + shared_asyncio_runtime: bool | None = None, + small_write_threshold: int | None = None, + max_cast_dimension_size: int | None = None, + remote_alloc_bind_to_inaddr_any: bool | None = None, + remote_alloc_bootstrap_addr: str | None = None, + remote_alloc_allowed_port_range: str | tuple[int, int] | None = None, + read_log_buffer: int | None = None, + force_file_log: bool | None = None, + prefix_with_rank: bool | None = None, + actor_spawn_max_idle: str | None = None, + get_actor_state_max_idle: str | None = None, + proc_stop_max_idle: str | None = None, + get_proc_state_max_idle: str | None = None, **kwargs: object, ) -> None: """Configure Hyperactor runtime defaults for this process. This updates the **Runtime** configuration layer from Python, setting - transports, logging behavior, and any other CONFIG-marked keys supplied - via ``**kwargs``. Duration values should be humantime strings (``"30s"``, - ``"5m"``, ``"1h 30m"``). + transports, logging behavior, timeouts, and other runtime parameters. + + All duration parameters accept humantime strings like ``"30s"``, ``"5m"``, + ``"2h"``, or ``"1h 30m"``. Args: - default_transport: Default channel transport for actor communication. - enable_log_forwarding: Forward child stdout/stderr through the mesh. - enable_file_capture: Persist child stdout/stderr to per-host files. - tail_log_lines: Number of log lines to retain in memory. - codec_max_frame_length: Maximum serialized message size in bytes. - message_delivery_timeout: Max delivery time (humantime string). - host_spawn_ready_timeout: Max host bootstrapping time (humantime). - mesh_proc_spawn_max_idle: Max idle time while spawning procs. - **kwargs: Additional configuration keys exposed by rust bindings. + Transport configuration: + default_transport: Default channel transport for actor communication. + Can be a ChannelTransport enum or explicit address string. + + Basic logging behavior: + enable_log_forwarding: Forward child stdout/stderr through the mesh. + enable_file_capture: Persist child stdout/stderr to per-host files. + tail_log_lines: Number of log lines to retain in memory. + + Message encoding and delivery: + codec_max_frame_length: Maximum serialized message size in bytes. + message_delivery_timeout: Max delivery time (humantime). + + Core mesh timeouts: + host_spawn_ready_timeout: Max host bootstrapping time (humantime). + mesh_proc_spawn_max_idle: Max idle time while spawning procs (humantime). + + Hyperactor timeouts and message handling: + process_exit_timeout: Timeout for process exit (humantime). + message_ack_time_interval: Time interval for message acknowledgments (humantime). + message_ack_every_n_messages: Acknowledge every N messages. + message_ttl_default: Default message time-to-live. + split_max_buffer_size: Maximum buffer size for message splitting (bytes). + split_max_buffer_age: Maximum age for split message buffers (humantime). + stop_actor_timeout: Timeout for stopping actors (humantime). + cleanup_timeout: Timeout for cleanup operations (humantime). + remote_allocator_heartbeat_interval: Heartbeat interval for remote allocator (humantime). + default_encoding: Default message encoding ("bincode", "serde_json", or "serde_multipart"). + channel_net_rx_buffer_full_check_interval: Network receive buffer check interval (humantime). + message_latency_sampling_rate: Sampling rate for message latency tracking (0.0 to 1.0). + enable_client_seq_assignment: Enable client-side sequence assignment. + + Mesh bootstrap configuration: + mesh_bootstrap_enable_pdeathsig: Enable parent-death signal for spawned processes. + mesh_terminate_concurrency: Maximum concurrent terminations during shutdown. + mesh_terminate_timeout: Timeout per child during graceful termination (humantime). + + Runtime and buffering: + shared_asyncio_runtime: Share asyncio runtime across actors. + small_write_threshold: Threshold below which writes are copied (bytes). + + Mesh configuration: + max_cast_dimension_size: Maximum dimension size for cast operations. + + Remote allocation: + remote_alloc_bind_to_inaddr_any: Bind remote allocators to INADDR_ANY. + remote_alloc_bootstrap_addr: Bootstrap address for remote allocators. + remote_alloc_allowed_port_range: Allowed port range as "start..end" or (start, end) tuple. + + Logging configuration: + read_log_buffer: Buffer size for reading logs (bytes). + force_file_log: Force file-based logging regardless of environment. + prefix_with_rank: Prefix log lines with rank information. + + Proc mesh timeouts: + actor_spawn_max_idle: Maximum idle time while spawning actors (humantime). + get_actor_state_max_idle: Maximum idle time for actor state queries (humantime). + + Host mesh timeouts: + proc_stop_max_idle: Maximum idle time while stopping procs (humantime). + get_proc_state_max_idle: Maximum idle time for proc state queries (humantime). + + **kwargs: Reserved for future configuration keys exposed by Rust bindings. """ params: Dict[str, Any] = dict(kwargs) @@ -82,6 +166,69 @@ def configure( params["host_spawn_ready_timeout"] = host_spawn_ready_timeout if mesh_proc_spawn_max_idle is not None: params["mesh_proc_spawn_max_idle"] = mesh_proc_spawn_max_idle + if process_exit_timeout is not None: + params["process_exit_timeout"] = process_exit_timeout + if message_ack_time_interval is not None: + params["message_ack_time_interval"] = message_ack_time_interval + if message_ack_every_n_messages is not None: + params["message_ack_every_n_messages"] = message_ack_every_n_messages + if message_ttl_default is not None: + params["message_ttl_default"] = message_ttl_default + if split_max_buffer_size is not None: + params["split_max_buffer_size"] = split_max_buffer_size + if split_max_buffer_age is not None: + params["split_max_buffer_age"] = split_max_buffer_age + if stop_actor_timeout is not None: + params["stop_actor_timeout"] = stop_actor_timeout + if cleanup_timeout is not None: + params["cleanup_timeout"] = cleanup_timeout + if remote_allocator_heartbeat_interval is not None: + params["remote_allocator_heartbeat_interval"] = ( + remote_allocator_heartbeat_interval + ) + if default_encoding is not None: + params["default_encoding"] = default_encoding + if channel_net_rx_buffer_full_check_interval is not None: + params["channel_net_rx_buffer_full_check_interval"] = ( + channel_net_rx_buffer_full_check_interval + ) + if message_latency_sampling_rate is not None: + params["message_latency_sampling_rate"] = message_latency_sampling_rate + if enable_client_seq_assignment is not None: + params["enable_client_seq_assignment"] = enable_client_seq_assignment + if mesh_bootstrap_enable_pdeathsig is not None: + params["mesh_bootstrap_enable_pdeathsig"] = mesh_bootstrap_enable_pdeathsig + if mesh_terminate_concurrency is not None: + params["mesh_terminate_concurrency"] = mesh_terminate_concurrency + if mesh_terminate_timeout is not None: + params["mesh_terminate_timeout"] = mesh_terminate_timeout + if shared_asyncio_runtime is not None: + params["shared_asyncio_runtime"] = shared_asyncio_runtime + if small_write_threshold is not None: + params["small_write_threshold"] = small_write_threshold + if max_cast_dimension_size is not None: + params["max_cast_dimension_size"] = max_cast_dimension_size + # Forward new alloc config keys + if remote_alloc_bind_to_inaddr_any is not None: + params["remote_alloc_bind_to_inaddr_any"] = remote_alloc_bind_to_inaddr_any + if remote_alloc_bootstrap_addr is not None: + params["remote_alloc_bootstrap_addr"] = remote_alloc_bootstrap_addr + if remote_alloc_allowed_port_range is not None: + params["remote_alloc_allowed_port_range"] = remote_alloc_allowed_port_range + if read_log_buffer is not None: + params["read_log_buffer"] = read_log_buffer + if force_file_log is not None: + params["force_file_log"] = force_file_log + if prefix_with_rank is not None: + params["prefix_with_rank"] = prefix_with_rank + if actor_spawn_max_idle is not None: + params["actor_spawn_max_idle"] = actor_spawn_max_idle + if get_actor_state_max_idle is not None: + params["get_actor_state_max_idle"] = get_actor_state_max_idle + if proc_stop_max_idle is not None: + params["proc_stop_max_idle"] = proc_stop_max_idle + if get_proc_state_max_idle is not None: + params["get_proc_state_max_idle"] = get_proc_state_max_idle _configure(**params) diff --git a/python/tests/test_config.py b/python/tests/test_config.py index 6080464e2..52f245f2c 100644 --- a/python/tests/test_config.py +++ b/python/tests/test_config.py @@ -13,7 +13,7 @@ from monarch._rust_bindings.monarch_hyperactor.channel import BindSpec, ChannelTransport from monarch._rust_bindings.monarch_hyperactor.supervision import SupervisionError -from monarch.actor import Actor, endpoint, this_proc +from monarch.actor import Actor, endpoint, this_host from monarch.config import configured, get_global_config @@ -94,7 +94,6 @@ def test_get_set_multiple() -> None: # This test tries to allocate too much memory for the GitHub actions # environment. @pytest.mark.oss_skip -@pytest.mark.skip(reason="local procs now bypass channels -- they dispatch directly") def test_codec_max_frame_length_exceeds_default() -> None: """Test that sending 10 chunks of 1GiB fails with default 10 GiB limit.""" @@ -117,8 +116,10 @@ def process_chunks(self, chunks): with override_fault_hook(): # Try to send 10 chunks of 1GiB each with default 10 GiB limit - # This should fail due to serialization overhead - proc = this_proc() + # This should fail due to serialization overhead. + # Spawn in separate proc so messages are serialized via Unix + # sockets + proc = this_host().spawn_procs() # Create 10 chunks, 1GiB each (total 10GiB) chunks = [bytes(oneGiB) for _ in range(10)] @@ -132,7 +133,6 @@ def process_chunks(self, chunks): # This test tries to allocate too much memory for the GitHub actions # environment. @pytest.mark.oss_skip -@pytest.mark.skip(reason="local procs now bypass channels -- they dispatch directly") def test_codec_max_frame_length_with_increased_limit() -> None: """Test that we can successfully send 10 chunks of 1GiB each with 100 GiB limit.""" @@ -156,7 +156,9 @@ def process_chunks(self, chunks): # Set the frame limit to confidently handle 10GiB with configured(codec_max_frame_length=oneHundredGiB): - proc = this_proc() + # Spawn in separate proc so messages are serialized via Unix + # sockets + proc = this_host().spawn_procs() # Create 10 chunks, 1GiB each (total 10GiB) chunks = [bytes(oneGiB) for _ in range(10)] @@ -256,3 +258,323 @@ def test_duration_config_multiple() -> None: assert config["mesh_proc_spawn_max_idle"] == "30s" assert not config["enable_log_forwarding"] assert config["tail_log_lines"] == 0 + + +# ============================================================================ +# Systematic tests for all 29 new config parameters added +# ============================================================================ + + +@pytest.mark.parametrize( + "param_name,test_value,expected_value,default_value", + [ + # Hyperactor timeouts and message handling + ("process_exit_timeout", "20s", "20s", "10s"), + ("message_ack_time_interval", "2s", "2s", "500ms"), + ("split_max_buffer_age", "100ms", "100ms", "50ms"), + ("stop_actor_timeout", "15s", "15s", "10s"), + ("cleanup_timeout", "25s", "25s", "3s"), + ("remote_allocator_heartbeat_interval", "10m", "10m", "5m"), + ("channel_net_rx_buffer_full_check_interval", "200ms", "200ms", "5s"), + # Mesh bootstrap config + ("mesh_terminate_timeout", "20s", "20s", "10s"), + # Proc mesh timeouts + ("actor_spawn_max_idle", "45s", "45s", "30s"), + ("get_actor_state_max_idle", "90s", "1m 30s", "1m"), + # Host mesh timeouts + ("proc_stop_max_idle", "45s", "45s", "30s"), + ("get_proc_state_max_idle", "90s", "1m 30s", "1m"), + ], +) +def test_new_duration_params(param_name, test_value, expected_value, default_value): + """Test all new duration configuration parameters.""" + # Verify default value + config = get_global_config() + assert config[param_name] == default_value + + # Set new value and verify + with configured(**{param_name: test_value}) as config: + assert config[param_name] == expected_value + + # Verify restoration to default + config = get_global_config() + assert config[param_name] == default_value + + +@pytest.mark.parametrize( + "param_name,test_value,default_value", + [ + # Hyperactor message handling + ("message_ack_every_n_messages", 500, 1000), + ("message_ttl_default", 20, 64), + ("split_max_buffer_size", 2048, 5), + # Mesh bootstrap config + ("mesh_terminate_concurrency", 32, 16), + # Runtime and buffering + ("small_write_threshold", 512, 256), + # Mesh config (usize::MAX doesn't have a fixed value, skip default check) + # Logging config + ("read_log_buffer", 16384, 100), + ], +) +def test_new_integer_params(param_name, test_value, default_value): + """Test all new integer configuration parameters.""" + # Verify default value + config = get_global_config() + assert config[param_name] == default_value + + # Set new value and verify + with configured(**{param_name: test_value}) as config: + assert config[param_name] == test_value + + # Verify restoration to default + config = get_global_config() + assert config[param_name] == default_value + + +@pytest.mark.parametrize( + "param_name,default_value", + [ + # Hyperactor message handling + ("enable_client_seq_assignment", False), + # Mesh bootstrap config + ("mesh_bootstrap_enable_pdeathsig", True), + # Runtime and buffering + ("shared_asyncio_runtime", False), + # Remote allocation + ("remote_alloc_bind_to_inaddr_any", False), + # Logging config + ("force_file_log", False), + ("prefix_with_rank", True), + ], +) +def test_new_boolean_params(param_name, default_value): + """Test all new boolean configuration parameters.""" + # Verify default value + config = get_global_config() + assert config[param_name] == default_value + + # Set to opposite value and verify + with configured(**{param_name: not default_value}) as config: + assert config[param_name] == (not default_value) + + # Verify restoration to default + config = get_global_config() + assert config[param_name] == default_value + + +def test_new_float_param_message_latency_sampling_rate(): + """Test message_latency_sampling_rate float parameter.""" + # Verify default value (0.01, using approx for f32 precision) + config = get_global_config() + assert config["message_latency_sampling_rate"] == pytest.approx(0.01, rel=1e-5) + + # Test various valid sampling rates + test_values = [0.0, 0.1, 0.5, 0.99, 1.0] + for rate in test_values: + with configured(message_latency_sampling_rate=rate) as config: + assert config["message_latency_sampling_rate"] == pytest.approx( + rate, rel=1e-5 + ) + + # Verify restoration + config = get_global_config() + assert config["message_latency_sampling_rate"] == pytest.approx(0.01, rel=1e-5) + + +def test_new_encoding_param(): + """Test default_encoding string parameter with valid encodings.""" + # Verify default value + config = get_global_config() + assert config["default_encoding"] == "serde_multipart" + + # Test all valid encodings + valid_encodings = ["bincode", "serde_json", "serde_multipart"] + for encoding in valid_encodings: + with configured(default_encoding=encoding) as config: + assert config["default_encoding"] == encoding + + # Verify restoration + config = get_global_config() + assert config["default_encoding"] == "serde_multipart" + + +def test_new_encoding_param_invalid(): + """Test that invalid encoding values raise errors.""" + with pytest.raises(TypeError, match="invalid value"): + with configured(default_encoding="invalid_encoding"): + pass + + with pytest.raises(TypeError, match="invalid value"): + with configured(default_encoding="xml"): + pass + + +def test_new_bootstrap_addr_param(): + """Test remote_alloc_bootstrap_addr string parameter.""" + # Note: This attribute has no default value, only test setting it + test_addr = "tcp://127.0.0.1:9000" + with configured(remote_alloc_bootstrap_addr=test_addr) as config: + assert config["remote_alloc_bootstrap_addr"] == test_addr + + +def test_new_port_range_param_tuple(): + """Test remote_alloc_allowed_port_range with tuple format.""" + # Note: This attribute has no default value, only test setting it + with configured(remote_alloc_allowed_port_range=(8000, 9000)) as config: + assert config["remote_alloc_allowed_port_range"] == "8000..9000" + + +def test_new_port_range_param_string(): + """Test remote_alloc_allowed_port_range with string format.""" + # Test string format + with configured(remote_alloc_allowed_port_range="8000..9000") as config: + assert config["remote_alloc_allowed_port_range"] == "8000..9000" + + # Test edge cases + with configured(remote_alloc_allowed_port_range="1..65535") as config: + assert config["remote_alloc_allowed_port_range"] == "1..65535" + + +def test_new_port_range_param_invalid(): + """Test that invalid port ranges raise errors.""" + # Invalid tuple: start >= end + with pytest.raises(TypeError, match="invalid value"): + with configured(remote_alloc_allowed_port_range=(9000, 8000)): + pass + + # Invalid string format + with pytest.raises(TypeError, match="invalid value"): + with configured(remote_alloc_allowed_port_range="invalid"): + pass + + with pytest.raises(TypeError, match="invalid value"): + with configured(remote_alloc_allowed_port_range="8000-9000"): # wrong separator + pass + + +def test_all_new_params_together(): + """Test setting all 29 new config parameters simultaneously.""" + with configured( + # Hyperactor timeouts and message handling + process_exit_timeout="20s", + message_ack_time_interval="2s", + message_ack_every_n_messages=500, + message_ttl_default=20, + split_max_buffer_size=2048, + split_max_buffer_age="100ms", + stop_actor_timeout="15s", + cleanup_timeout="25s", + remote_allocator_heartbeat_interval="10s", + default_encoding="serde_json", + channel_net_rx_buffer_full_check_interval="200ms", + message_latency_sampling_rate=0.5, + enable_client_seq_assignment=True, + # Mesh bootstrap config + mesh_bootstrap_enable_pdeathsig=False, + mesh_terminate_concurrency=16, + mesh_terminate_timeout="20s", + # Runtime and buffering + shared_asyncio_runtime=True, + small_write_threshold=512, + # Mesh config + max_cast_dimension_size=2048, + # Remote allocation + remote_alloc_bind_to_inaddr_any=True, + remote_alloc_bootstrap_addr="tcp://127.0.0.1:9000", + remote_alloc_allowed_port_range=(8000, 9000), + # Logging config + read_log_buffer=16384, + force_file_log=True, + prefix_with_rank=True, + # Proc mesh timeouts + actor_spawn_max_idle="45s", + get_actor_state_max_idle="90s", + # Host mesh timeouts + proc_stop_max_idle="45s", + get_proc_state_max_idle="90s", + ) as config: + # Verify all values are set correctly + assert config["process_exit_timeout"] == "20s" + assert config["message_ack_time_interval"] == "2s" + assert config["message_ack_every_n_messages"] == 500 + assert config["message_ttl_default"] == 20 + assert config["split_max_buffer_size"] == 2048 + assert config["split_max_buffer_age"] == "100ms" + assert config["stop_actor_timeout"] == "15s" + assert config["cleanup_timeout"] == "25s" + assert config["remote_allocator_heartbeat_interval"] == "10s" + assert config["default_encoding"] == "serde_json" + assert config["channel_net_rx_buffer_full_check_interval"] == "200ms" + assert config["message_latency_sampling_rate"] == pytest.approx(0.5, rel=1e-5) + assert config["enable_client_seq_assignment"] is True + assert config["mesh_bootstrap_enable_pdeathsig"] is False + assert config["mesh_terminate_concurrency"] == 16 + assert config["mesh_terminate_timeout"] == "20s" + assert config["shared_asyncio_runtime"] is True + assert config["small_write_threshold"] == 512 + assert config["max_cast_dimension_size"] == 2048 + assert config["remote_alloc_bind_to_inaddr_any"] is True + assert config["remote_alloc_bootstrap_addr"] == "tcp://127.0.0.1:9000" + assert config["remote_alloc_allowed_port_range"] == "8000..9000" + assert config["read_log_buffer"] == 16384 + assert config["force_file_log"] is True + assert config["prefix_with_rank"] is True + assert config["actor_spawn_max_idle"] == "45s" + assert config["get_actor_state_max_idle"] == "1m 30s" + assert config["proc_stop_max_idle"] == "45s" + assert config["get_proc_state_max_idle"] == "1m 30s" + + # Verify all values are restored to defaults + config = get_global_config() + assert config["process_exit_timeout"] == "10s" + assert config["message_ack_time_interval"] == "500ms" + assert config["message_ack_every_n_messages"] == 1000 + assert config["message_ttl_default"] == 64 + assert config["split_max_buffer_size"] == 5 + assert config["split_max_buffer_age"] == "50ms" + assert config["stop_actor_timeout"] == "10s" + assert config["cleanup_timeout"] == "3s" + assert config["remote_allocator_heartbeat_interval"] == "5m" + assert config["default_encoding"] == "serde_multipart" + assert config["channel_net_rx_buffer_full_check_interval"] == "5s" + assert config["message_latency_sampling_rate"] == pytest.approx(0.01, rel=1e-5) + assert config["enable_client_seq_assignment"] is False + assert config["mesh_bootstrap_enable_pdeathsig"] is True + assert config["mesh_terminate_concurrency"] == 16 + assert config["mesh_terminate_timeout"] == "10s" + assert config["shared_asyncio_runtime"] is False + assert config["small_write_threshold"] == 256 + # max_cast_dimension_size is usize::MAX, skip checking it + assert config["remote_alloc_bind_to_inaddr_any"] is False + # remote_alloc_bootstrap_addr and remote_alloc_allowed_port_range have no defaults + assert config["read_log_buffer"] == 100 + assert config["force_file_log"] is False + assert config["prefix_with_rank"] is True + assert config["actor_spawn_max_idle"] == "30s" + assert config["get_actor_state_max_idle"] == "1m" + assert config["proc_stop_max_idle"] == "30s" + assert config["get_proc_state_max_idle"] == "1m" + + +def test_new_params_type_errors(): + """Test that type errors are raised for incorrect parameter types.""" + # Duration param with wrong type + with pytest.raises(TypeError): + with configured(process_exit_timeout=30): # type: ignore + pass + + # Integer param with wrong type + with pytest.raises(TypeError): + with configured(message_ack_every_n_messages="100"): # type: ignore + pass + + # Boolean param with wrong type + with pytest.raises(TypeError): + with configured(enable_client_seq_assignment="true"): # type: ignore + pass + + # Float param with wrong type + with pytest.raises(TypeError): + with configured(message_latency_sampling_rate="0.5"): # type: ignore + pass