diff --git a/crates/pulsing-actor/src/behavior/reference.rs b/crates/pulsing-actor/src/behavior/reference.rs index eb2f34bc2..2c38d091c 100644 --- a/crates/pulsing-actor/src/behavior/reference.rs +++ b/crates/pulsing-actor/src/behavior/reference.rs @@ -1,5 +1,6 @@ use crate::actor::ActorRef; use crate::actor::ActorSystemRef; +use crate::error::{PulsingError, RuntimeError}; use serde::{de::DeserializeOwned, Serialize}; use std::marker::PhantomData; use std::sync::Arc; @@ -72,9 +73,13 @@ where fn resolve(&self) -> anyhow::Result { match &self.mode { ResolutionMode::Direct(inner) => Ok(inner.clone()), - ResolutionMode::Dynamic(system) => system - .local_actor_ref_by_name(&self.name) - .ok_or_else(|| anyhow::anyhow!("Actor not found: {}", self.name)), + ResolutionMode::Dynamic(system) => { + system.local_actor_ref_by_name(&self.name).ok_or_else(|| { + anyhow::Error::from(PulsingError::from(RuntimeError::actor_not_found( + self.name.clone(), + ))) + }) + } } } diff --git a/crates/pulsing-actor/src/error.rs b/crates/pulsing-actor/src/error.rs index 0d247c8bb..eb9f2bee8 100644 --- a/crates/pulsing-actor/src/error.rs +++ b/crates/pulsing-actor/src/error.rs @@ -1,108 +1,112 @@ //! Unified error types for the actor system. +//! +//! Error hierarchy (matches Python exception structure): +//! - PulsingError: Top-level error enum +//! - RuntimeError: Framework/system-level errors +//! - Actor system errors (NotFound, Stopped, etc.) +//! - Transport errors (ConnectionFailed, etc.) +//! - Cluster errors (NodeNotFound, etc.) +//! - Config errors (InvalidValue, etc.) +//! - I/O errors, Serialization errors +//! → Maps to Python: PulsingRuntimeError +//! - ActorError: User Actor execution errors +//! - Business errors (user input errors) +//! - System errors (internal errors from user code) +//! - Timeout errors (operation timeouts) +//! - Unsupported errors (unsupported operations) +//! → Maps to Python: PulsingActorError (and subclasses) use thiserror::Error; /// Unified error type for the Pulsing actor system /// /// This enum encompasses all error categories in the system. -/// It implements `From` for each sub-error type for easy conversion. +/// Errors are divided into two main categories: +/// - RuntimeError: Framework/system-level errors +/// - ActorError: User Actor execution errors #[derive(Error, Debug)] pub enum PulsingError { - /// Actor-related errors + /// Runtime errors: Framework/system-level errors + #[error("Runtime error: {0}")] + Runtime(#[from] RuntimeError), + + /// Actor errors: User Actor execution errors #[error("Actor error: {0}")] Actor(#[from] ActorError), - - /// Transport layer errors - #[error("Transport error: {0}")] - Transport(#[from] TransportError), - - /// Cluster-related errors - #[error("Cluster error: {0}")] - Cluster(#[from] ClusterError), - - /// Configuration errors - #[error("Configuration error: {0}")] - Config(#[from] ConfigError), - - /// I/O errors - #[error("I/O error: {0}")] - Io(#[from] std::io::Error), - - /// Serialization/deserialization errors - #[error("Serialization error: {0}")] - Serialization(String), - - /// Timeout errors - #[error("Timeout: {0}")] - Timeout(String), - - /// Generic errors (for cases not covered by specific types) - #[error("{0}")] - Other(String), } impl PulsingError { - /// Create a generic error from a message - pub fn other(msg: impl Into) -> Self { - Self::Other(msg.into()) + /// Check if this is a runtime error + pub fn is_runtime(&self) -> bool { + matches!(self, Self::Runtime(_)) } - /// Create a timeout error - pub fn timeout(msg: impl Into) -> Self { - Self::Timeout(msg.into()) - } - - /// Create a serialization error - pub fn serialization(msg: impl Into) -> Self { - Self::Serialization(msg.into()) + /// Check if this is an actor error + pub fn is_actor(&self) -> bool { + matches!(self, Self::Actor(_)) } } impl From for PulsingError { fn from(err: anyhow::Error) -> Self { // Try to downcast to known error types + if let Some(runtime_err) = err.downcast_ref::() { + return Self::Runtime(runtime_err.clone()); + } if let Some(actor_err) = err.downcast_ref::() { return Self::Actor(actor_err.clone()); } - if let Some(transport_err) = err.downcast_ref::() { - return Self::Transport(transport_err.clone()); - } - if let Some(cluster_err) = err.downcast_ref::() { - return Self::Cluster(cluster_err.clone()); + // Try to downcast to PulsingError itself + if let Some(pulsing_err) = err.downcast_ref::() { + return pulsing_err.clone(); } - if let Some(config_err) = err.downcast_ref::() { - return Self::Config(config_err.clone()); + // Default to runtime error for unknown errors + Self::Runtime(RuntimeError::Other(err.to_string())) + } +} + +// Implement Clone for PulsingError to support downcast +impl Clone for PulsingError { + fn clone(&self) -> Self { + match self { + Self::Runtime(e) => Self::Runtime(e.clone()), + Self::Actor(e) => Self::Actor(e.clone()), } - Self::Other(err.to_string()) } } -/// Actor-related errors +/// Runtime errors: Framework/system-level errors +/// +/// These errors occur at the framework level and are not caused by user code. +/// Examples: transport failures, cluster issues, configuration errors, etc. #[derive(Error, Debug, Clone, PartialEq, Eq)] -pub enum ActorError { +pub enum RuntimeError { + // ========================================================================= + // Actor system errors (framework-level) + // ========================================================================= /// Actor not found by name or ID #[error("Actor not found: {name}")] - NotFound { name: String }, + ActorNotFound { name: String }, /// Actor already exists with the given name #[error("Actor already exists: {name}")] - AlreadyExists { name: String }, + ActorAlreadyExists { name: String }, /// Actor is not local to this node #[error("Actor is not local: {name}")] - NotLocal { name: String }, + ActorNotLocal { name: String }, /// Actor has stopped and cannot process messages #[error("Actor stopped: {name}")] - Stopped { name: String }, + ActorStopped { name: String }, /// Actor mailbox is full #[error("Actor mailbox full: {name}")] - MailboxFull { name: String }, + ActorMailboxFull { name: String }, /// Invalid actor path format #[error("Invalid actor path: {path}")] - InvalidPath { path: String }, + InvalidActorPath { path: String }, /// Message type mismatch #[error("Message type mismatch: expected {expected}, got {actual}")] @@ -110,41 +114,11 @@ pub enum ActorError { /// Actor spawn failed #[error("Failed to spawn actor: {reason}")] - SpawnFailed { reason: String }, -} - -impl ActorError { - /// Create a "not found" error - pub fn not_found(name: impl Into) -> Self { - Self::NotFound { name: name.into() } - } - - /// Create an "already exists" error - pub fn already_exists(name: impl Into) -> Self { - Self::AlreadyExists { name: name.into() } - } - - /// Create a "mailbox full" error - pub fn mailbox_full(name: impl Into) -> Self { - Self::MailboxFull { name: name.into() } - } - - /// Create an "invalid path" error - pub fn invalid_path(path: impl Into) -> Self { - Self::InvalidPath { path: path.into() } - } - - /// Create a "spawn failed" error - pub fn spawn_failed(reason: impl Into) -> Self { - Self::SpawnFailed { - reason: reason.into(), - } - } -} + ActorSpawnFailed { reason: String }, -/// Transport layer errors -#[derive(Error, Debug, Clone, PartialEq, Eq)] -pub enum TransportError { + // ========================================================================= + // Transport errors + // ========================================================================= /// Connection failed #[error("Connection failed to {addr}: {reason}")] ConnectionFailed { addr: String, reason: String }, @@ -168,36 +142,13 @@ pub enum TransportError { /// Protocol error (HTTP/2) #[error("Protocol error: {reason}")] ProtocolError { reason: String }, -} - -impl TransportError { - /// Create a connection failed error - pub fn connection_failed(addr: impl Into, reason: impl Into) -> Self { - Self::ConnectionFailed { - addr: addr.into(), - reason: reason.into(), - } - } - - /// Create a request timeout error - pub fn request_timeout(timeout_ms: u64) -> Self { - Self::RequestTimeout { timeout_ms } - } - - /// Create a TLS error - pub fn tls_error(reason: impl Into) -> Self { - Self::TlsError { - reason: reason.into(), - } - } -} -/// Cluster-related errors -#[derive(Error, Debug, Clone, PartialEq, Eq)] -pub enum ClusterError { + // ========================================================================= + // Cluster errors + // ========================================================================= /// Cluster not initialized #[error("Cluster not initialized")] - NotInitialized, + ClusterNotInitialized, /// Node not found in cluster #[error("Node not found: {node_id}")] @@ -218,12 +169,134 @@ pub enum ClusterError { /// Gossip protocol error #[error("Gossip error: {reason}")] GossipError { reason: String }, + + // ========================================================================= + // Configuration errors + // ========================================================================= + /// Invalid configuration value + #[error("Invalid configuration: {field} = {value} ({reason})")] + InvalidConfigValue { + field: String, + value: String, + reason: String, + }, + + /// Missing required configuration + #[error("Missing required configuration: {field}")] + MissingRequiredConfig { field: String }, + + /// Conflicting configuration options + #[error("Conflicting configuration: {reason}")] + ConflictingConfig { reason: String }, + + /// Address parsing error + #[error("Invalid address '{addr}': {reason}")] + InvalidAddress { addr: String, reason: String }, + + // ========================================================================= + // Other runtime errors + // ========================================================================= + /// I/O errors + #[error("I/O error: {0}")] + Io(String), + + /// Serialization/deserialization errors + #[error("Serialization error: {0}")] + Serialization(String), + + /// Generic runtime errors + #[error("{0}")] + Other(String), } -impl ClusterError { - /// Create a "not initialized" error - pub fn not_initialized() -> Self { - Self::NotInitialized +impl RuntimeError { + // ========================================================================= + // Actor system error constructors + // ========================================================================= + + /// Create an "actor not found" error + pub fn actor_not_found(name: impl Into) -> Self { + Self::ActorNotFound { name: name.into() } + } + + /// Create an "actor already exists" error + pub fn actor_already_exists(name: impl Into) -> Self { + Self::ActorAlreadyExists { name: name.into() } + } + + /// Create an "actor not local" error + pub fn actor_not_local(name: impl Into) -> Self { + Self::ActorNotLocal { name: name.into() } + } + + /// Create an "actor stopped" error + pub fn actor_stopped(name: impl Into) -> Self { + Self::ActorStopped { name: name.into() } + } + + /// Create an "actor mailbox full" error + pub fn actor_mailbox_full(name: impl Into) -> Self { + Self::ActorMailboxFull { name: name.into() } + } + + /// Create an "invalid actor path" error + pub fn invalid_actor_path(path: impl Into) -> Self { + Self::InvalidActorPath { path: path.into() } + } + + /// Create a "message type mismatch" error + pub fn message_type_mismatch(expected: impl Into, actual: impl Into) -> Self { + Self::MessageTypeMismatch { + expected: expected.into(), + actual: actual.into(), + } + } + + /// Create an "actor spawn failed" error + pub fn actor_spawn_failed(reason: impl Into) -> Self { + Self::ActorSpawnFailed { + reason: reason.into(), + } + } + + // ========================================================================= + // Transport error constructors + // ========================================================================= + + /// Create a connection failed error + pub fn connection_failed(addr: impl Into, reason: impl Into) -> Self { + Self::ConnectionFailed { + addr: addr.into(), + reason: reason.into(), + } + } + + /// Create a request timeout error + pub fn request_timeout(timeout_ms: u64) -> Self { + Self::RequestTimeout { timeout_ms } + } + + /// Create a TLS error + pub fn tls_error(reason: impl Into) -> Self { + Self::TlsError { + reason: reason.into(), + } + } + + /// Create a protocol error + pub fn protocol_error(reason: impl Into) -> Self { + Self::ProtocolError { + reason: reason.into(), + } + } + + // ========================================================================= + // Cluster error constructors + // ========================================================================= + + /// Create a "cluster not initialized" error + pub fn cluster_not_initialized() -> Self { + Self::ClusterNotInitialized } /// Create a "node not found" error @@ -242,56 +315,34 @@ impl ClusterError { pub fn no_healthy_instances(path: impl Into) -> Self { Self::NoHealthyInstances { path: path.into() } } -} - -/// Configuration-related errors -#[derive(Error, Debug, Clone, PartialEq, Eq)] -pub enum ConfigError { - /// Invalid configuration value - #[error("Invalid configuration: {field} = {value} ({reason})")] - InvalidValue { - field: String, - value: String, - reason: String, - }, - - /// Missing required configuration - #[error("Missing required configuration: {field}")] - MissingRequired { field: String }, - /// Conflicting configuration options - #[error("Conflicting configuration: {reason}")] - Conflicting { reason: String }, - - /// Address parsing error - #[error("Invalid address '{addr}': {reason}")] - InvalidAddress { addr: String, reason: String }, -} + // ========================================================================= + // Config error constructors + // ========================================================================= -impl ConfigError { - /// Create an "invalid value" error - pub fn invalid_value( + /// Create an "invalid config value" error + pub fn invalid_config_value( field: impl Into, value: impl Into, reason: impl Into, ) -> Self { - Self::InvalidValue { + Self::InvalidConfigValue { field: field.into(), value: value.into(), reason: reason.into(), } } - /// Create a "missing required" error - pub fn missing_required(field: impl Into) -> Self { - Self::MissingRequired { + /// Create a "missing required config" error + pub fn missing_required_config(field: impl Into) -> Self { + Self::MissingRequiredConfig { field: field.into(), } } - /// Create a "conflicting" error - pub fn conflicting(reason: impl Into) -> Self { - Self::Conflicting { + /// Create a "conflicting config" error + pub fn conflicting_config(reason: impl Into) -> Self { + Self::ConflictingConfig { reason: reason.into(), } } @@ -303,8 +354,145 @@ impl ConfigError { reason: reason.into(), } } + + // ========================================================================= + // Other error constructors + // ========================================================================= + + /// Create a serialization error + pub fn serialization(msg: impl Into) -> Self { + Self::Serialization(msg.into()) + } + + /// Create a generic runtime error + pub fn other(msg: impl Into) -> Self { + Self::Other(msg.into()) + } + + /// Create an I/O error from std::io::Error + pub fn io(err: std::io::Error) -> Self { + Self::Io(err.to_string()) + } +} + +impl From for RuntimeError { + fn from(err: std::io::Error) -> Self { + Self::Io(err.to_string()) + } +} + +/// Actor errors: User Actor execution errors +/// +/// These errors are raised by user code during Actor execution. +/// They are distinct from RuntimeError which are framework-level errors. +#[derive(Error, Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ActorError { + /// Business error: User input error, business logic error + /// These are recoverable and should be returned to the caller + #[error("Business error [{code}]: {message}")] + Business { + code: u32, + message: String, + #[serde(skip_serializing_if = "Option::is_none")] + details: Option, + }, + + /// System error: Internal error, resource error + /// May trigger Actor restart depending on recoverable flag + #[error("System error: {error}")] + System { error: String, recoverable: bool }, + + /// Timeout error: Operation timed out + /// Usually recoverable, can be retried + #[error("Timeout: operation '{operation}' timed out after {duration_ms}ms")] + Timeout { operation: String, duration_ms: u64 }, + + /// Unsupported operation + #[error("Unsupported operation: {operation}")] + Unsupported { operation: String }, +} + +impl ActorError { + /// Create a business error + pub fn business(code: u32, message: impl Into, details: Option) -> Self { + Self::Business { + code, + message: message.into(), + details, + } + } + + /// Create a system error + pub fn system(error: impl Into, recoverable: bool) -> Self { + Self::System { + error: error.into(), + recoverable, + } + } + + /// Create a timeout error + pub fn timeout(operation: impl Into, duration_ms: u64) -> Self { + Self::Timeout { + operation: operation.into(), + duration_ms, + } + } + + /// Create an unsupported operation error + pub fn unsupported(operation: impl Into) -> Self { + Self::Unsupported { + operation: operation.into(), + } + } + + /// Check if this error is recoverable + /// + /// - Business errors: always recoverable (return to caller) + /// - System errors: depends on recoverable flag + /// - Timeout errors: usually recoverable (can retry) + /// - Unsupported errors: not recoverable + pub fn is_recoverable(&self) -> bool { + match self { + Self::Business { .. } => true, + Self::System { recoverable, .. } => *recoverable, + Self::Timeout { .. } => true, + Self::Unsupported { .. } => false, + } + } + + /// Check if this is a business error + pub fn is_business(&self) -> bool { + matches!(self, Self::Business { .. }) + } + + /// Check if this is a system error + pub fn is_system(&self) -> bool { + matches!(self, Self::System { .. }) + } + + /// Check if this is a timeout error + pub fn is_timeout(&self) -> bool { + matches!(self, Self::Timeout { .. }) + } } +// ============================================================================= +// Legacy type aliases for backward compatibility +// ============================================================================= + +/// Legacy: TransportError (now part of RuntimeError) +#[deprecated(note = "Use RuntimeError instead")] +pub type TransportError = RuntimeError; + +/// Legacy: ClusterError (now part of RuntimeError) +#[deprecated(note = "Use RuntimeError instead")] +pub type ClusterError = RuntimeError; + +/// Legacy: ConfigError (now part of RuntimeError) +#[deprecated(note = "Use RuntimeError instead")] +pub type ConfigError = RuntimeError; + /// Convenience type alias for results using PulsingError pub type Result = std::result::Result; @@ -313,45 +501,37 @@ mod tests { use super::*; #[test] - fn test_actor_error_display() { - let err = ActorError::not_found("my-actor"); + fn test_runtime_error_display() { + let err = RuntimeError::actor_not_found("my-actor"); assert!(err.to_string().contains("my-actor")); - let err = ActorError::already_exists("existing-actor"); - assert!(err.to_string().contains("existing-actor")); - } - - #[test] - fn test_transport_error_display() { - let err = TransportError::connection_failed("127.0.0.1:8000", "connection refused"); + let err = RuntimeError::connection_failed("127.0.0.1:8000", "connection refused"); assert!(err.to_string().contains("127.0.0.1:8000")); assert!(err.to_string().contains("refused")); - - let err = TransportError::request_timeout(5000); - assert!(err.to_string().contains("5000")); } #[test] - fn test_cluster_error_display() { - let err = ClusterError::not_initialized(); - assert!(err.to_string().contains("not initialized")); + fn test_actor_error_display() { + let err = ActorError::business(400, "Invalid input", None); + assert!(err.to_string().contains("400")); + assert!(err.to_string().contains("Invalid input")); - let err = ClusterError::named_actor_not_found("services/echo"); - assert!(err.to_string().contains("services/echo")); + let err = ActorError::system("Database error", true); + assert!(err.to_string().contains("Database error")); } #[test] - fn test_config_error_display() { - let err = ConfigError::invalid_value("mailbox_capacity", "0", "must be > 0"); - assert!(err.to_string().contains("mailbox_capacity")); + fn test_pulsing_error_from_runtime_error() { + let runtime_err = RuntimeError::actor_not_found("test"); + let pulsing_err: PulsingError = runtime_err.into(); - let err = ConfigError::conflicting("cannot be both head node and worker"); - assert!(err.to_string().contains("head node")); + assert!(matches!(pulsing_err, PulsingError::Runtime(_))); + assert!(pulsing_err.to_string().contains("test")); } #[test] fn test_pulsing_error_from_actor_error() { - let actor_err = ActorError::not_found("test"); + let actor_err = ActorError::business(400, "test", None); let pulsing_err: PulsingError = actor_err.into(); assert!(matches!(pulsing_err, PulsingError::Actor(_))); @@ -359,28 +539,25 @@ mod tests { } #[test] - fn test_pulsing_error_from_transport_error() { - let transport_err = TransportError::request_timeout(3000); - let pulsing_err: PulsingError = transport_err.into(); - - assert!(matches!(pulsing_err, PulsingError::Transport(_))); - assert!(pulsing_err.to_string().contains("3000")); - } - - #[test] - fn test_pulsing_error_helpers() { - let err = PulsingError::other("something went wrong"); - assert!(err.to_string().contains("wrong")); - - let err = PulsingError::timeout("operation timed out"); - assert!(err.to_string().contains("timed out")); + fn test_error_classification() { + let business_err = ActorError::business(400, "test", None); + assert!(business_err.is_recoverable()); + assert!(business_err.is_business()); + + let system_err = ActorError::system("error", true); + assert!(system_err.is_recoverable()); + assert!(system_err.is_system()); + + let timeout_err = ActorError::timeout("op", 1000); + assert!(timeout_err.is_recoverable()); + assert!(timeout_err.is_timeout()); } #[test] fn test_error_equality() { - let err1 = ActorError::not_found("test"); - let err2 = ActorError::not_found("test"); - let err3 = ActorError::not_found("other"); + let err1 = ActorError::business(400, "test", None); + let err2 = ActorError::business(400, "test", None); + let err3 = ActorError::business(400, "other", None); assert_eq!(err1, err2); assert_ne!(err1, err3); diff --git a/crates/pulsing-actor/src/system/handler.rs b/crates/pulsing-actor/src/system/handler.rs index a1416008b..8fc4b148f 100644 --- a/crates/pulsing-actor/src/system/handler.rs +++ b/crates/pulsing-actor/src/system/handler.rs @@ -4,6 +4,7 @@ use super::handle::LocalActorHandle; use crate::actor::{ActorId, ActorPath, Envelope, Message, NodeId}; use crate::cluster::backends::{RegisterActorRequest, UnregisterActorRequest}; use crate::cluster::{GossipBackend, GossipMessage, HeadNodeBackend, NamingBackend}; +use crate::error::{PulsingError, RuntimeError}; use crate::metrics::{metrics, SystemMetrics as PrometheusMetrics}; use crate::transport::Http2ServerHandler; use dashmap::DashMap; @@ -59,7 +60,9 @@ impl SystemMessageHandler { } } - Err(anyhow::anyhow!("Actor not found: {}", actor_name)) + Err(anyhow::Error::from(PulsingError::from( + RuntimeError::actor_not_found(actor_name.to_string()), + ))) } /// Dispatch a message to an actor (ask pattern) diff --git a/crates/pulsing-actor/src/system/resolve.rs b/crates/pulsing-actor/src/system/resolve.rs index 8df676e60..cce4959a5 100644 --- a/crates/pulsing-actor/src/system/resolve.rs +++ b/crates/pulsing-actor/src/system/resolve.rs @@ -7,6 +7,7 @@ use crate::actor::{ ActorAddress, ActorId, ActorPath, ActorRef, ActorResolver, IntoActorPath, NodeId, }; use crate::cluster::{MemberInfo, MemberStatus, NamedActorInfo}; +use crate::error::{PulsingError, RuntimeError}; use crate::policies::LoadBalancingPolicy; use crate::system::config::ResolveOptions; use crate::system::load_balancer::{MemberWorker, NodeLoadTracker}; @@ -48,7 +49,9 @@ impl ActorSystem { return Ok(ActorRef::remote(*id, member_info.addr, Arc::new(transport))); } - Err(anyhow::anyhow!("Actor not found: {}", id)) + Err(anyhow::Error::from(PulsingError::from( + RuntimeError::actor_not_found(id.to_string()), + ))) } /// Resolve a named actor by path (direct resolution) @@ -157,10 +160,11 @@ impl ActorSystem { .ok_or_else(|| anyhow::anyhow!("Named actor not found locally"))? .clone(); - let local_id = self - .actor_names - .get(&actor_name) - .ok_or_else(|| anyhow::anyhow!("Actor not found: {}", actor_name))?; + let local_id = self.actor_names.get(&actor_name).ok_or_else(|| { + anyhow::Error::from(PulsingError::from(RuntimeError::actor_not_found( + actor_name.clone(), + ))) + })?; let handle = self .local_actors diff --git a/crates/pulsing-actor/src/system/spawn.rs b/crates/pulsing-actor/src/system/spawn.rs index 313021a54..14adee7f6 100644 --- a/crates/pulsing-actor/src/system/spawn.rs +++ b/crates/pulsing-actor/src/system/spawn.rs @@ -7,6 +7,7 @@ //! All other spawn methods delegate to the builder. use crate::actor::{Actor, ActorContext, ActorId, ActorPath, ActorRef, ActorSystemRef, Mailbox}; +use crate::error::{PulsingError, RuntimeError}; use crate::system::config::SpawnOptions; use crate::system::handle::{ActorStats, LocalActorHandle}; use crate::system::runtime::run_supervision_loop; @@ -33,7 +34,9 @@ impl ActorSystem { // Check for name conflicts (only for named actors) if let Some(ref name) = name_str { if self.actor_names.contains_key(name) { - return Err(anyhow::anyhow!("Actor already exists: {}", name)); + return Err(anyhow::Error::from(PulsingError::from( + RuntimeError::actor_already_exists(name.clone()), + ))); } if self.named_actor_paths.contains_key(name) { return Err(anyhow::anyhow!("Named path already registered: {}", name)); diff --git a/crates/pulsing-actor/src/transport/http2/client.rs b/crates/pulsing-actor/src/transport/http2/client.rs index 058fd0523..5940caa1a 100644 --- a/crates/pulsing-actor/src/transport/http2/client.rs +++ b/crates/pulsing-actor/src/transport/http2/client.rs @@ -6,6 +6,7 @@ use super::retry::{RetryConfig, RetryExecutor}; use super::stream::{BinaryFrameParser, StreamFrame, StreamHandle}; use super::{headers, MessageMode, RequestType}; use crate::actor::{Message, MessageStream}; +use crate::error::RuntimeError; use crate::tracing::{TraceContext, TRACEPARENT_HEADER}; use bytes::Bytes; use futures::{Stream, StreamExt, TryStreamExt}; @@ -308,8 +309,13 @@ impl Http2Client { let tcp_stream = tokio::time::timeout(self.config.connect_timeout, TcpStream::connect(addr)) .await - .map_err(|_| anyhow::anyhow!("Connection timeout"))? - .map_err(|e| anyhow::anyhow!("Failed to connect: {}", e))?; + .map_err(|_| { + RuntimeError::connection_failed( + addr.to_string(), + "Connection timeout".to_string(), + ) + })? + .map_err(|e| RuntimeError::connection_failed(addr.to_string(), e.to_string()))?; // Build HTTP/2 connection with streaming body type - with or without TLS type StreamingBody = @@ -373,7 +379,9 @@ impl Http2Client { .header(TRACEPARENT_HEADER, trace_ctx.to_traceparent()) .header("content-type", "application/octet-stream") .body(body) - .map_err(|e| anyhow::anyhow!("Failed to build request: {}", e))?; + .map_err(|e| { + RuntimeError::protocol_error(format!("Failed to build request: {}", e)) + })?; let send_future = sender.send_request(request); let response = tokio::time::timeout(self.config.stream_timeout, send_future) @@ -389,7 +397,9 @@ impl Http2Client { let (mut sender, conn): (http2::SendRequest, _) = http2::handshake(TokioExecutor::new(), io) .await - .map_err(|e| anyhow::anyhow!("HTTP/2 handshake failed: {}", e))?; + .map_err(|e| { + RuntimeError::protocol_error(format!("HTTP/2 handshake failed: {}", e)) + })?; // Spawn connection driver let cancel = self.cancel.clone(); @@ -446,14 +456,18 @@ impl Http2Client { .header(TRACEPARENT_HEADER, trace_ctx.to_traceparent()) .header("content-type", "application/octet-stream") .body(body) - .map_err(|e| anyhow::anyhow!("Failed to build request: {}", e))?; + .map_err(|e| RuntimeError::protocol_error(format!("Failed to build request: {}", e)))?; // Send request with timeout let send_future = sender.send_request(request); let response = tokio::time::timeout(self.config.stream_timeout, send_future) .await - .map_err(|_| anyhow::anyhow!("Streaming request timeout"))? - .map_err(|e| anyhow::anyhow!("Streaming request failed: {}", e))?; + .map_err(|_| { + RuntimeError::request_timeout(self.config.stream_timeout.as_millis() as u64) + })? + .map_err(|e| { + RuntimeError::protocol_error(format!("Streaming request failed: {}", e)) + })?; Ok(response) } @@ -588,14 +602,16 @@ impl Http2Client { .header(TRACEPARENT_HEADER, trace_ctx.to_traceparent()) .header("content-type", "application/octet-stream") .body(Full::new(Bytes::from(payload))) - .map_err(|e| anyhow::anyhow!("Failed to build request: {}", e))?; + .map_err(|e| RuntimeError::protocol_error(format!("Failed to build request: {}", e)))?; // Send request with timeout let send_future = conn.sender.send_request(request); let response = tokio::time::timeout(self.config.request_timeout, send_future) .await - .map_err(|_| anyhow::anyhow!("Request timeout"))? - .map_err(|e| anyhow::anyhow!("Request failed: {}", e))?; + .map_err(|_| { + RuntimeError::request_timeout(self.config.request_timeout.as_millis() as u64) + })? + .map_err(|e| RuntimeError::protocol_error(format!("Request failed: {}", e)))?; Ok(response) } diff --git a/crates/pulsing-actor/src/transport/http2/mod.rs b/crates/pulsing-actor/src/transport/http2/mod.rs index 91ec9113c..25292d3d8 100644 --- a/crates/pulsing-actor/src/transport/http2/mod.rs +++ b/crates/pulsing-actor/src/transport/http2/mod.rs @@ -7,6 +7,8 @@ mod retry; mod server; mod stream; +use crate::error::RuntimeError; + #[cfg(feature = "tls")] mod tls; @@ -106,7 +108,7 @@ impl Http2Transport { ) -> anyhow::Result<()> { let path = format!("/actors/{}", actor_name); let Message::Single { msg_type, data } = msg else { - return Err(anyhow::anyhow!("Streaming not supported for tell")); + return Err(RuntimeError::protocol_error("Streaming not supported for tell").into()); }; self.client.tell(addr, &path, &msg_type, data).await @@ -120,7 +122,7 @@ impl Http2Transport { ) -> anyhow::Result<()> { let url_path = format!("/named/{}", path.as_str()); let Message::Single { msg_type, data } = msg else { - return Err(anyhow::anyhow!("Streaming not supported for tell")); + return Err(RuntimeError::protocol_error("Streaming not supported for tell").into()); }; self.client.tell(addr, &url_path, &msg_type, data).await @@ -350,10 +352,11 @@ impl RemoteTransport for Http2RemoteTransport { ) -> anyhow::Result> { // Check circuit breaker before making request if !self.circuit_breaker.can_execute() { - return Err(anyhow::anyhow!( - "Circuit breaker is open for {}", - self.remote_addr - )); + return Err(RuntimeError::ConnectionFailed { + addr: self.remote_addr.to_string(), + reason: "Circuit breaker is open".to_string(), + } + .into()); } let result = self @@ -374,10 +377,11 @@ impl RemoteTransport for Http2RemoteTransport { ) -> anyhow::Result<()> { // Check circuit breaker before making request if !self.circuit_breaker.can_execute() { - return Err(anyhow::anyhow!( - "Circuit breaker is open for {}", - self.remote_addr - )); + return Err(RuntimeError::ConnectionFailed { + addr: self.remote_addr.to_string(), + reason: "Circuit breaker is open".to_string(), + } + .into()); } let result = self @@ -399,10 +403,11 @@ impl RemoteTransport for Http2RemoteTransport { async fn send_message(&self, _actor_id: &ActorId, msg: Message) -> anyhow::Result { // Check circuit breaker before making request if !self.circuit_breaker.can_execute() { - return Err(anyhow::anyhow!( - "Circuit breaker is open for {}", - self.remote_addr - )); + return Err(RuntimeError::ConnectionFailed { + addr: self.remote_addr.to_string(), + reason: "Circuit breaker is open".to_string(), + } + .into()); } // Use unified send_message_full that handles both single and streaming diff --git a/crates/pulsing-py/src/actor.rs b/crates/pulsing-py/src/actor.rs index 461e4af86..40349461e 100644 --- a/crates/pulsing-py/src/actor.rs +++ b/crates/pulsing-py/src/actor.rs @@ -2,9 +2,10 @@ use futures::StreamExt; use pulsing_actor::actor::{ActorId, ActorPath, NodeId}; +use pulsing_actor::error::PulsingError; use pulsing_actor::prelude::*; use pulsing_actor::supervision::{BackoffStrategy, RestartPolicy, SupervisionSpec}; -use pyo3::exceptions::{PyException, PyRuntimeError, PyStopAsyncIteration, PyValueError}; +use pyo3::exceptions::{PyRuntimeError, PyStopAsyncIteration, PyValueError}; use pyo3::prelude::*; use pyo3::types::PyBytes; use std::net::SocketAddr; @@ -13,13 +14,27 @@ use std::sync::Mutex as StdMutex; use tokio::sync::mpsc; use tokio::sync::Mutex as TokioMutex; +use crate::errors::pulsing_error_to_py_err_direct; +use crate::python_error_converter::convert_python_exception_to_actor_error; use crate::python_executor::python_executor; /// Special message type identifier for pickle-encoded Python objects const SEALED_PY_MSG_TYPE: &str = "__sealed_py_message__"; +/// Convert error to Python exception +/// Prefer using pulsing_error_to_py_err_direct for PulsingError types fn to_pyerr(err: E) -> PyErr { - PyException::new_err(format!("{}", err)) + // Try to downcast to PulsingError + let err_str = err.to_string(); + + // For non-PulsingError types, use RuntimeError + // In practice, most errors from pulsing-actor should be PulsingError + PyRuntimeError::new_err(err_str) +} + +/// Convert PulsingError to Python exception +fn pulsing_to_pyerr(err: PulsingError) -> PyErr { + pulsing_error_to_py_err_direct(err) } /// Python wrapper for NodeId @@ -924,7 +939,7 @@ impl Actor for PythonActorWrapper { let is_sealed_msg = msg.msg_type() == SEALED_PY_MSG_TYPE; let py_msg = PyMessage::from_rust_message(msg); - let response = python_executor() + let response: Result = python_executor() .execute(move || { Python::with_gil(|py| -> PyResult { let receive_method = handler.getattr(py, "receive")?; @@ -939,7 +954,18 @@ impl Actor for PythonActorWrapper { py_msg.into_pyobject(py)?.into_any().unbind() }; - let result = receive_method.call1(py, (call_arg,))?; + let result = receive_method.call1(py, (call_arg,)); + + // Handle Python exceptions and convert to ActorError + let result = match result { + Ok(value) => value, + Err(py_err) => { + // Convert Python exception to ActorError + // We need to return this as an error in the Python execution context + // The error will be caught and converted at the Rust level + return Err(py_err); + } + }; let asyncio = py.import("asyncio")?; let is_coro = asyncio @@ -1023,8 +1049,22 @@ impl Actor for PythonActorWrapper { }) }) .await - .map_err(|e| anyhow::anyhow!("Python executor error: {:?}", e))? - .map_err(|e| anyhow::anyhow!("Python handler error: {:?}", e))?; + .map_err(|e| anyhow::anyhow!("Python executor error: {:?}", e))?; + + // Convert Python exceptions to ActorError + let response = match response { + Ok(resp) => resp, + Err(py_err) => { + // Convert Python exception to ActorError + Python::with_gil(|py| { + let actor_err = convert_python_exception_to_actor_error(py, &py_err)?; + // Convert ActorError to PulsingError and then to anyhow::Error + Err(anyhow::Error::from( + pulsing_actor::error::PulsingError::from(actor_err), + )) + }) + }?, + }; match response { PyActorResponse::Single(msg) => Ok(msg.to_message()), @@ -1133,7 +1173,9 @@ impl PyActorSystem { ) -> PyResult> { let config_inner = config.inner; pyo3_async_runtimes::tokio::future_into_py(py, async move { - let system = ActorSystem::new(config_inner).await.map_err(to_pyerr)?; + let system = ActorSystem::new(config_inner) + .await + .map_err(|e| pulsing_to_pyerr(PulsingError::from(e)))?; Ok(PyActorSystem { inner: system, event_loop, diff --git a/crates/pulsing-py/src/errors.rs b/crates/pulsing-py/src/errors.rs new file mode 100644 index 000000000..680bec7fc --- /dev/null +++ b/crates/pulsing-py/src/errors.rs @@ -0,0 +1,62 @@ +//! Python exception bindings for Pulsing errors +//! +//! This module converts Rust error types to Python exceptions. +//! Due to PyO3 abi3 limitations, we use PyRuntimeError as the base +//! and let Python layer re-raise as appropriate exception types. + +use pulsing_actor::error::{PulsingError, RuntimeError}; +use pyo3::exceptions::PyRuntimeError; +use pyo3::prelude::*; + +/// Convert Rust PulsingError to appropriate Python exception +/// +/// This function prefixes error messages with error type markers so Python +/// layer can identify and re-raise as appropriate exception types. +pub fn pulsing_error_to_py_err(err: PulsingError) -> PyErr { + let err_msg = err.to_string(); + + match &err { + // Actor errors (user code errors) -> prefix with "ACTOR_ERROR:" + PulsingError::Actor(_actor_err) => { + PyRuntimeError::new_err(format!("ACTOR_ERROR:{}", err_msg)) + } + // Runtime errors (framework errors) -> prefix with "RUNTIME_ERROR:" + PulsingError::Runtime(runtime_err) => { + // Extract actor name if available for runtime errors + let actor_name = match runtime_err { + RuntimeError::ActorNotFound { name } => Some(name.clone()), + RuntimeError::ActorAlreadyExists { name } => Some(name.clone()), + RuntimeError::ActorNotLocal { name } => Some(name.clone()), + RuntimeError::ActorStopped { name } => Some(name.clone()), + RuntimeError::ActorMailboxFull { name } => Some(name.clone()), + RuntimeError::InvalidActorPath { path: _ } => None, + RuntimeError::MessageTypeMismatch { .. } => None, + RuntimeError::ActorSpawnFailed { .. } => None, + _ => None, + }; + + let full_msg = if let Some(ref name) = actor_name { + format!("RUNTIME_ERROR:{}:actor={}", err_msg, name) + } else { + format!("RUNTIME_ERROR:{}", err_msg) + }; + + PyRuntimeError::new_err(full_msg) + } + } +} + +/// Convert PulsingError to Python exception (preferred method) +pub fn pulsing_error_to_py_err_direct(err: PulsingError) -> PyErr { + pulsing_error_to_py_err(err) +} + +/// Add error classes to Python module +/// +/// Note: In abi3 mode, we can't create custom exception classes directly. +/// Exception classes are defined in Python (pulsing/exceptions.py). +/// This function is kept for API consistency. +pub fn add_to_module(_m: &Bound<'_, PyModule>) -> PyResult<()> { + // Error classes are defined in Python layer + Ok(()) +} diff --git a/crates/pulsing-py/src/lib.rs b/crates/pulsing-py/src/lib.rs index c191dd7e6..a9f34e6ea 100644 --- a/crates/pulsing-py/src/lib.rs +++ b/crates/pulsing-py/src/lib.rs @@ -6,7 +6,9 @@ use pyo3::prelude::*; mod actor; +mod errors; mod policies; +mod python_error_converter; mod python_executor; pub use python_executor::{init_python_executor, python_executor, ExecutorError}; @@ -30,6 +32,9 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> { .try_init() .ok(); + // Add error classes + errors::add_to_module(m)?; + // Add actor system classes actor::add_to_module(m)?; diff --git a/crates/pulsing-py/src/python_error_converter.rs b/crates/pulsing-py/src/python_error_converter.rs new file mode 100644 index 000000000..abc4eb41c --- /dev/null +++ b/crates/pulsing-py/src/python_error_converter.rs @@ -0,0 +1,132 @@ +//! Convert Python exceptions to Rust ActorError +//! +//! This module provides automatic conversion from Python exceptions +//! to unified ActorError types, enabling seamless error handling +//! across Rust and Python boundaries. + +use pulsing_actor::error::ActorError; +use pyo3::exceptions::{PyTimeoutError, PyTypeError, PyValueError}; +use pyo3::prelude::*; + +/// Convert Python exception (PyErr) to ActorError +/// +/// This function automatically classifies Python exceptions: +/// - ValueError, TypeError -> Business error +/// - TimeoutError -> Timeout error +/// - Other exceptions -> System error +pub fn convert_python_exception_to_actor_error( + py: Python, + err: &PyErr, +) -> anyhow::Result { + // Try to extract exception type and message + let err_type = err.get_type(py); + let type_name = err_type.name()?.to_string(); + let err_msg = err.to_string(); + + // Check for specific exception types + if err.is_instance_of::(py) { + // Timeout error + return Ok(ActorError::timeout("python_operation", 0)); + } + + if err.is_instance_of::(py) || err.is_instance_of::(py) { + // Business error: validation/type errors + return Ok(ActorError::business(400, err_msg, None)); + } + + // Check if it's a custom Pulsing exception + // Try to extract error details from exception attributes + let py_err_obj = err.value(py); + + // Check for PulsingBusinessError + if let Ok(code_attr) = py_err_obj.getattr("code") { + if let Ok(code) = code_attr.extract::() { + let message_attr = py_err_obj.getattr("message").ok(); + let message = message_attr + .and_then(|m| m.extract::().ok()) + .unwrap_or_else(|| err_msg.clone()); + + let details_attr = py_err_obj.getattr("details").ok(); + let details = details_attr.and_then(|d| d.extract::().ok()); + + return Ok(ActorError::business(code, message, details)); + } + } + + // Check for PulsingSystemError + if let Ok(error_attr) = py_err_obj.getattr("error") { + if let Ok(error_msg) = error_attr.extract::() { + let recoverable_attr = py_err_obj.getattr("recoverable").ok(); + let recoverable = recoverable_attr + .and_then(|r| r.extract::().ok()) + .unwrap_or(true); + + return Ok(ActorError::system(error_msg, recoverable)); + } + } + + // Check for PulsingTimeoutError (has both operation and duration_ms) + if let Ok(operation_attr) = py_err_obj.getattr("operation") { + if let Ok(operation) = operation_attr.extract::() { + let duration_attr = py_err_obj.getattr("duration_ms").ok(); + if let Some(duration_ms) = duration_attr.and_then(|d| d.extract::().ok()) { + // Has duration_ms -> Timeout error + return Ok(ActorError::timeout(operation, duration_ms)); + } + } + } + + // Check for PulsingUnsupportedError (by type name or operation attribute without duration_ms) + if type_name.contains("Unsupported") || type_name.contains("unsupported") { + if let Ok(operation_attr) = py_err_obj.getattr("operation") { + if let Ok(operation) = operation_attr.extract::() { + return Ok(ActorError::unsupported(operation)); + } + } + // Fallback: use error message as operation + return Ok(ActorError::unsupported(err_msg)); + } + + // Default: classify based on exception type name + match type_name.as_str() { + "TimeoutError" | "asyncio.TimeoutError" => Ok(ActorError::timeout("python_operation", 0)), + "ValueError" | "TypeError" | "KeyError" | "AttributeError" => { + // Business errors: user input errors + Ok(ActorError::business(400, err_msg, None)) + } + "RuntimeError" | "SystemError" | "OSError" | "IOError" => { + // System errors: internal errors + Ok(ActorError::system(err_msg, true)) + } + _ => { + // Unknown exception type: treat as system error + Ok(ActorError::system( + format!("{}: {}", type_name, err_msg), + true, + )) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_convert_timeout_error() { + Python::with_gil(|py| { + let err = PyTimeoutError::new_err("Operation timed out"); + let actor_err = convert_python_exception_to_actor_error(py, &err).unwrap(); + assert!(matches!(actor_err, ActorError::Timeout { .. })); + }); + } + + #[test] + fn test_convert_value_error() { + Python::with_gil(|py| { + let err = PyValueError::new_err("Invalid value"); + let actor_err = convert_python_exception_to_actor_error(py, &err).unwrap(); + assert!(matches!(actor_err, ActorError::Business { code: 400, .. })); + }); + } +} diff --git a/python/pulsing/__init__.py b/python/pulsing/__init__.py index fcf482e25..f0854a3eb 100644 --- a/python/pulsing/__init__.py +++ b/python/pulsing/__init__.py @@ -83,6 +83,17 @@ def incr(self): self.value += 1; return self.value PYTHON_ACTOR_SERVICE_NAME, ) +# Import exceptions +from pulsing.exceptions import ( + PulsingError, + PulsingRuntimeError, + PulsingActorError, + PulsingBusinessError, + PulsingSystemError, + PulsingTimeoutError, + PulsingUnsupportedError, +) + class ActorSystem: """ActorSystem wrapper with queue API @@ -274,4 +285,13 @@ async def refer(actorid: ActorId | str) -> ActorRef: "ActorProxy", "Message", "StreamMessage", + # Exceptions + "PulsingError", + "PulsingRuntimeError", + "PulsingActorError", + # Business-level exceptions (automatically converted to ActorError) + "PulsingBusinessError", + "PulsingSystemError", + "PulsingTimeoutError", + "PulsingUnsupportedError", ] diff --git a/python/pulsing/actor/__init__.py b/python/pulsing/actor/__init__.py index 15041bd29..7a6893260 100644 --- a/python/pulsing/actor/__init__.py +++ b/python/pulsing/actor/__init__.py @@ -110,7 +110,11 @@ async def shutdown() -> None: def get_system() -> ActorSystem: """Get the global actor system (must call init() first)""" if _global_system is None: - raise RuntimeError("Actor system not initialized. Call 'await init()' first.") + from pulsing.exceptions import PulsingRuntimeError + + raise PulsingRuntimeError( + "Actor system not initialized. Call 'await init()' first." + ) return _global_system @@ -200,6 +204,13 @@ async def tell_with_timeout( resolve, ) +# Import exceptions for convenience +from pulsing.exceptions import ( + PulsingError, + PulsingRuntimeError, + PulsingActorError, +) + # NOTE: `__all__` is the *public, stable surface* for `from pulsing.actor import *`. # We intentionally keep it minimal. Advanced/diagnostic APIs may still be # importable by name, but are not part of the stable top-level contract. @@ -225,6 +236,10 @@ async def tell_with_timeout( # Service (for actor_system function) "PythonActorService", "PYTHON_ACTOR_SERVICE_NAME", + # Exceptions + "PulsingError", + "PulsingRuntimeError", + "PulsingActorError", ] diff --git a/python/pulsing/actor/remote.py b/python/pulsing/actor/remote.py index cd621ca79..dc1515685 100644 --- a/python/pulsing/actor/remote.py +++ b/python/pulsing/actor/remote.py @@ -9,6 +9,77 @@ from typing import Any, TypeVar from pulsing._core import ActorRef, ActorSystem, Message, StreamMessage +from pulsing.exceptions import PulsingActorError, PulsingRuntimeError + + +def _convert_rust_error(err: RuntimeError) -> Exception: + """Convert Rust-raised RuntimeError to appropriate Pulsing exception. + + Rust layer prefixes error messages with markers: + - "ACTOR_ERROR:" -> PulsingActorError (or specific subclasses) + - "RUNTIME_ERROR:" -> PulsingRuntimeError + + The error message format for ActorError: + - "ACTOR_ERROR:Business error [code]: message" -> PulsingBusinessError + - "ACTOR_ERROR:System error: message" -> PulsingSystemError + - "ACTOR_ERROR:Timeout: operation 'op' timed out..." -> PulsingTimeoutError + - "ACTOR_ERROR:Unsupported operation: op" -> PulsingUnsupportedError + """ + from pulsing.exceptions import ( + PulsingBusinessError, + PulsingSystemError, + PulsingTimeoutError, + PulsingUnsupportedError, + ) + + err_msg = str(err) + + if err_msg.startswith("ACTOR_ERROR:"): + msg = err_msg.replace("ACTOR_ERROR:", "") + + # Try to identify specific ActorError type from message + if msg.startswith("Business error ["): + # Extract code, message, and details from "Business error [code]: message" + import re + + match = re.match(r"Business error \[(\d+)\]: (.+)", msg) + if match: + code = int(match.group(1)) + message = match.group(2) + return PulsingBusinessError(code, message) + + if msg.startswith("System error: "): + # Extract error message from "System error: message" + error_msg = msg.replace("System error: ", "") + # Default to recoverable=True (we don't have recoverable flag in message) + return PulsingSystemError(error_msg, recoverable=True) + + if msg.startswith("Timeout: operation '"): + # Extract operation and duration from "Timeout: operation 'op' timed out after Xms" + import re + + match = re.match( + r"Timeout: operation '([^']+)' timed out after (\d+)ms", msg + ) + if match: + operation = match.group(1) + duration_ms = int(match.group(2)) + return PulsingTimeoutError(operation, duration_ms) + + if msg.startswith("Unsupported operation: "): + # Extract operation from "Unsupported operation: op" + operation = msg.replace("Unsupported operation: ", "") + return PulsingUnsupportedError(operation) + + # Fallback: generic PulsingActorError + return PulsingActorError(msg) + elif err_msg.startswith("RUNTIME_ERROR:"): + msg = err_msg.replace("RUNTIME_ERROR:", "") + return PulsingRuntimeError(msg) + else: + # Unknown format, wrap as RuntimeError + return PulsingRuntimeError(err_msg) + logger = logging.getLogger(__name__) @@ -127,14 +198,25 @@ async def _sync_call(self, *args, **kwargs) -> Any: if isinstance(resp, dict): if "__error__" in resp: - raise RuntimeError(resp["__error__"]) + # Actor execution error + try: + raise PulsingActorError( + resp["__error__"], actor_name=str(self._ref.actor_id.id) + ) + except RuntimeError as e: + # If it's a Rust error, convert it + raise _convert_rust_error(e) from e return resp.get("__result__") elif isinstance(resp, Message): if resp.is_stream: return _SyncGeneratorStreamReader(resp) data = resp.to_json() if resp.msg_type == "Error": - raise RuntimeError(data.get("error", "Remote call failed")) + # Actor execution error + raise PulsingActorError( + data.get("error", "Remote call failed"), + actor_name=str(self._ref.actor_id.id), + ) return data.get("result") return resp @@ -182,7 +264,11 @@ async def _get_stream(self): # Not streaming, might be an error data = resp.to_json() if resp.msg_type == "Error": - raise RuntimeError(data.get("error", "Remote call failed")) + # Actor execution error + raise PulsingActorError( + data.get("error", "Remote call failed"), + actor_name=str(self._ref.actor_id.id), + ) # Wrap as single-value iterator self._stream_reader = _SingleValueIterator(data) else: @@ -207,7 +293,10 @@ async def __anext__(self): self._got_result = True raise StopAsyncIteration if "__error__" in item: - raise RuntimeError(item["__error__"]) + # Actor execution error + raise PulsingActorError( + item["__error__"], actor_name=str(self._ref.actor_id.id) + ) if "__yield__" in item: return item["__yield__"] return item @@ -264,7 +353,10 @@ async def __anext__(self): self._got_result = True raise StopAsyncIteration if "__error__" in item: - raise RuntimeError(item["__error__"]) + # Actor execution error + raise PulsingActorError( + item["__error__"], actor_name=str(self._ref.actor_id.id) + ) if "__yield__" in item: return item["__yield__"] return item @@ -606,7 +698,7 @@ def incr(self): self.value += 1; return self.value from . import _global_system if _global_system is None: - raise RuntimeError( + raise PulsingRuntimeError( "Actor system not initialized. Call 'await init()' first." ) @@ -747,7 +839,8 @@ async def remote( data = resp.to_json() if resp.msg_type == "Error": - raise RuntimeError(f"Remote create failed: {data.get('error')}") + # System error: actor creation failed + raise PulsingRuntimeError(f"Remote create failed: {data.get('error')}") # Build remote ActorRef from pulsing._core import ActorId @@ -905,7 +998,8 @@ async def list_actors(self) -> list[dict]: """List all actors on this node.""" data = await self._ask("ListActors") if data.get("type") == "Error": - raise RuntimeError(data.get("message")) + # System error: system message failed + raise PulsingRuntimeError(data.get("message")) return data.get("actors", []) async def get_metrics(self) -> dict: @@ -1025,7 +1119,8 @@ async def create_actor( ) data = resp.to_json() if resp.msg_type == "Error" or data.get("error"): - raise RuntimeError(data.get("error", "Unknown error")) + # System error: actor creation failed + raise PulsingRuntimeError(data.get("error", "Unknown error")) return data diff --git a/python/pulsing/exceptions.py b/python/pulsing/exceptions.py new file mode 100644 index 000000000..545812c3f --- /dev/null +++ b/python/pulsing/exceptions.py @@ -0,0 +1,182 @@ +"""Pulsing exception hierarchy. + +This module provides Python exceptions that correspond to Rust error types. +The exceptions are defined in Python but correspond to Rust error types defined +in crates/pulsing-actor/src/error.rs using thiserror. + +Errors are divided into two categories (matching Rust error structure): + +1. PulsingRuntimeError: Framework/system-level errors + Corresponds to: pulsing_actor::error::RuntimeError + + These are framework-level errors, not caused by user code: + - Actor system errors (NotFound, Stopped, etc.) + - Transport errors (ConnectionFailed, etc.) + - Cluster errors (NodeNotFound, etc.) + - Config errors (InvalidValue, etc.) + - I/O errors, Serialization errors + +2. PulsingActorError: User Actor execution errors + Corresponds to: pulsing_actor::error::ActorError + + These are errors raised by user code during Actor execution: + - Business errors (user input errors) → PulsingBusinessError + - System errors (internal errors from user code) → PulsingSystemError + - Timeout errors (operation timeouts) → PulsingTimeoutError + - Unsupported errors (unsupported operations) → PulsingUnsupportedError + +Note: Due to PyO3 abi3 limitations, we define exceptions in Python and +Rust code raises them using PyRuntimeError with message prefixes. +The Python layer can catch and re-raise as appropriate types. + +For Actor execution errors, use the specific exception types below which +will be automatically converted to Rust ActorError variants. +""" + + +class PulsingError(Exception): + """Base exception for all Pulsing errors. + + This corresponds to pulsing_actor::error::PulsingError in Rust. + """ + + pass + + +class PulsingRuntimeError(PulsingError): + """Framework/system-level errors. + + This corresponds to pulsing_actor::error::RuntimeError in Rust. + + These are framework-level errors, not caused by user code: + - Actor system errors (NotFound, Stopped, etc.) + - Transport errors (ConnectionFailed, etc.) + - Cluster errors (NodeNotFound, etc.) + - Config errors (InvalidValue, etc.) + - I/O errors + - Serialization errors + """ + + def __init__(self, message: str, cause: Exception | None = None): + super().__init__(message) + self.cause = cause + + +class PulsingActorError(PulsingError): + """User Actor execution errors. + + This corresponds to pulsing_actor::error::ActorError in Rust. + + These are errors raised by user code during Actor execution: + - Business errors (user input errors) + - System errors (internal errors from user code) + - Timeout errors (operation timeouts) + - Unsupported errors (unsupported operations) + + Note: Framework-level errors like "Actor not found" are RuntimeError, + not ActorError. + """ + + def __init__( + self, + message: str, + actor_name: str | None = None, + cause: Exception | None = None, + ): + super().__init__(message) + self.actor_name = actor_name + self.cause = cause + + +# ============================================================================ +# Business-level error types (automatically converted to ActorError) +# ============================================================================ + + +class PulsingBusinessError(PulsingActorError): + """Business error: User input error, business logic error. + + These errors are recoverable and should be returned to the caller. + Automatically converted to ActorError::Business in Rust. + + Example: + @remote + class UserActor: + async def validate_age(self, age: int) -> bool: + if age < 18: + raise PulsingBusinessError(400, "Age must be >= 18", + details="User validation failed") + return True + """ + + def __init__(self, code: int, message: str, details: str | None = None): + self.code = code + self.message = message + self.details = details + super().__init__(f"[{code}] {message}", cause=None) + + +class PulsingSystemError(PulsingActorError): + """System error: Internal error, resource error. + + May trigger Actor restart depending on recoverable flag. + Automatically converted to ActorError::System in Rust. + + Example: + @remote + class DataProcessor: + async def process(self, data: str) -> str: + try: + return process_data(data) + except Exception as e: + raise PulsingSystemError(f"Processing failed: {e}", recoverable=True) + """ + + def __init__(self, error: str, recoverable: bool = True): + self.error = error + self.recoverable = recoverable + super().__init__(error, cause=None) + + +class PulsingTimeoutError(PulsingActorError): + """Timeout error: Operation timed out. + + Usually recoverable, can be retried. + Automatically converted to ActorError::Timeout in Rust. + + Example: + @remote + class NetworkActor: + async def fetch(self, url: str) -> str: + try: + return await asyncio.wait_for(httpx.get(url), timeout=5.0) + except asyncio.TimeoutError: + raise PulsingTimeoutError("fetch", duration_ms=5000) + """ + + def __init__(self, operation: str, duration_ms: int = 0): + self.operation = operation + self.duration_ms = duration_ms + super().__init__( + f"Operation '{operation}' timed out after {duration_ms}ms", cause=None + ) + + +class PulsingUnsupportedError(PulsingActorError): + """Unsupported operation error. + + Not recoverable. Indicates that the requested operation is not supported. + Automatically converted to ActorError::Unsupported in Rust. + + Example: + @remote + class LegacyActor: + async def process(self, data: str) -> str: + if data.startswith("legacy:"): + raise PulsingUnsupportedError("process") + return process_data(data) + """ + + def __init__(self, operation: str): + self.operation = operation + super().__init__(f"Unsupported operation: {operation}", cause=None) diff --git a/tests/python/test_agent_runtime_lifecycle.py b/tests/python/test_agent_runtime_lifecycle.py index a58a152f8..0201f5309 100644 --- a/tests/python/test_agent_runtime_lifecycle.py +++ b/tests/python/test_agent_runtime_lifecycle.py @@ -64,7 +64,9 @@ async def test_basic_create_destroy(self): assert result == 10 # After runtime exits, global system should be cleaned up - with pytest.raises(RuntimeError, match="Actor system not initialized"): + from pulsing.exceptions import PulsingRuntimeError + + with pytest.raises(PulsingRuntimeError, match="Actor system not initialized"): get_system() @pytest.mark.asyncio @@ -77,7 +79,9 @@ async def test_repeated_create_destroy(self): assert result == i # Check system is cleaned up after each exit - with pytest.raises(RuntimeError): + from pulsing.exceptions import PulsingRuntimeError + + with pytest.raises(PulsingRuntimeError): get_system() @pytest.mark.asyncio @@ -163,7 +167,9 @@ async def test_multiple_actors_cleanup(self): assert results == list(range(10)) # After runtime exits, system should clean up all actors - with pytest.raises(RuntimeError): + from pulsing.exceptions import PulsingRuntimeError + + with pytest.raises(PulsingRuntimeError): get_system() @pytest.mark.asyncio @@ -197,7 +203,9 @@ async def test_exception_during_runtime(self): pass # Even with exception, system should be cleaned up - with pytest.raises(RuntimeError): + from pulsing.exceptions import PulsingRuntimeError + + with pytest.raises(PulsingRuntimeError): get_system() clear_agent_registry() @@ -341,7 +349,9 @@ async def test_empty_runtime(self): async with runtime(): pass - with pytest.raises(RuntimeError): + from pulsing.exceptions import PulsingRuntimeError + + with pytest.raises(PulsingRuntimeError): get_system() @pytest.mark.asyncio diff --git a/tests/python/test_remote_decorator.py b/tests/python/test_remote_decorator.py index 57083463b..f5be18b43 100644 --- a/tests/python/test_remote_decorator.py +++ b/tests/python/test_remote_decorator.py @@ -77,7 +77,9 @@ def will_fail(self): try: service = await ErrorService.spawn() - with pytest.raises(RuntimeError, match="Intentional error"): + from pulsing.exceptions import PulsingActorError + + with pytest.raises(PulsingActorError, match="Intentional error"): await service.will_fail() finally: @@ -100,7 +102,9 @@ async def will_fail(self): try: service = await AsyncErrorService.spawn() - with pytest.raises(RuntimeError, match="Async error"): + from pulsing.exceptions import PulsingActorError + + with pytest.raises(PulsingActorError, match="Async error"): await service.will_fail() finally: