Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 186 additions & 8 deletions hyperactor/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,65 @@ impl AttrValue for ChannelTransport {
}
}

/// Specifies how to bind a channel server.
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Named)]
pub enum BindSpec {
/// Bind to any available address for the given transport.
Any(ChannelTransport),

/// Bind to a specific channel address.
Addr(ChannelAddr),
}

impl BindSpec {
/// Return an "any" address for this bind spec.
pub fn any(&self) -> ChannelAddr {
match self {
BindSpec::Any(transport) => ChannelAddr::any(transport.clone()),
BindSpec::Addr(addr) => addr.clone(),
}
}
}

impl From<ChannelTransport> for BindSpec {
fn from(transport: ChannelTransport) -> Self {
BindSpec::Any(transport)
}
}

impl From<ChannelAddr> for BindSpec {
fn from(addr: ChannelAddr) -> Self {
BindSpec::Addr(addr)
}
}

impl fmt::Display for BindSpec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Any(transport) => write!(f, "any({})", transport),
Self::Addr(addr) => write!(f, "addr({})", addr),
}
}
}

impl AttrValue for BindSpec {
fn display(&self) -> String {
self.to_string()
}

fn parse(s: &str) -> Result<Self, anyhow::Error> {
if let Some(inner) = s.strip_prefix("addr(").and_then(|s| s.strip_suffix(")")) {
let addr = ChannelAddr::from_str(inner)?;
Ok(BindSpec::Addr(addr))
} else if let Some(inner) = s.strip_prefix("any(").and_then(|s| s.strip_suffix(")")) {
let transport = ChannelTransport::from_str(inner)?;
Ok(BindSpec::Any(transport))
} else {
Err(anyhow::anyhow!("invalid bind spec: {}", s))
}
}
}

/// The type of (TCP) hostnames.
pub type Hostname = String;

Expand Down Expand Up @@ -505,6 +564,27 @@ pub enum ChannelAddr {
/// A unix domain socket address. Supports both absolute path names as
/// well as "abstract" names per https://manpages.debian.org/unstable/manpages/unix.7.en.html#Abstract_sockets
Unix(net::unix::SocketAddr),

/// A pair of addresses, one for the client and one for the server:
/// - The client should dial to the `dial_to` address.
/// - The server should bind to the `bind_to` address.
///
/// The user is responsible for ensuring the traffic to the `dial_to` address
/// is routed to the `bind_to` address.
///
/// This is useful for scenarios where the network is configured in a way,
/// that the bound address is not directly accessible from the client.
///
/// For example, in AWS, the client could be provided with the public IP
/// address, yet the server is bound to a private IP address or simply
/// INADDR_ANY. Traffic to the public IP address is mapped to the private
/// IP address through network address translation (NAT).
Alias {
/// The address to which the client should dial to.
dial_to: Box<ChannelAddr>,
/// The address to which the server should bind to.
bind_to: Box<ChannelAddr>,
},
}

impl From<SocketAddr> for ChannelAddr {
Expand Down Expand Up @@ -602,6 +682,9 @@ impl ChannelAddr {
Self::Local(_) => ChannelTransport::Local,
Self::Sim(addr) => ChannelTransport::Sim(Box::new(addr.transport())),
Self::Unix(_) => ChannelTransport::Unix,
// bind_to's transport is what is actually used in communication.
// Therefore we use its transport to represent the Alias.
Self::Alias { bind_to, .. } => bind_to.transport(),
}
}
}
Expand All @@ -614,6 +697,9 @@ impl fmt::Display for ChannelAddr {
Self::Local(index) => write!(f, "local:{}", index),
Self::Sim(sim_addr) => write!(f, "sim:{}", sim_addr),
Self::Unix(addr) => write!(f, "unix:{}", addr),
Self::Alias { dial_to, bind_to } => {
write!(f, "alias:dial_to={};bind_to={}", dial_to, bind_to)
}
}
}
}
Expand All @@ -634,6 +720,11 @@ impl FromStr for ChannelAddr {
Some(("metatls", rest)) => net::meta::parse(rest).map_err(|e| e.into()),
Some(("sim", rest)) => sim::parse(rest).map_err(|e| e.into()),
Some(("unix", rest)) => Ok(Self::Unix(net::unix::SocketAddr::from_str(rest)?)),
Some(("alias", _)) => Err(anyhow::anyhow!(
"detect possible alias address, but we currently do not support \
parsing alias' string representation since we only want to \
support parsing its zmq url format."
)),
Some((r#type, _)) => Err(anyhow::anyhow!("no such channel type: {type}")),
None => Err(anyhow::anyhow!("no channel type specified")),
}
Expand All @@ -647,7 +738,38 @@ impl ChannelAddr {
/// - inproc://endpoint-name (equivalent to local)
/// - ipc://path (equivalent to unix)
/// - metatls://hostname:port or metatls://*:port
/// - Alias format: dial_to_url@bind_to_url (e.g., tcp://host:port@tcp://host:port)
/// Note: Alias format is currently only supported for TCP addresses
pub fn from_zmq_url(address: &str) -> Result<Self, anyhow::Error> {
// Check for Alias format: dial_to_url@bind_to_url
// The @ character separates two valid ZMQ URLs
if let Some(at_pos) = address.find('@') {
let dial_to_str = &address[..at_pos];
let bind_to_str = &address[at_pos + 1..];

// Validate that both addresses use TCP scheme
if !dial_to_str.starts_with("tcp://") {
return Err(anyhow::anyhow!(
"alias format is only supported for TCP addresses, got dial_to: {}",
dial_to_str
));
}
if !bind_to_str.starts_with("tcp://") {
return Err(anyhow::anyhow!(
"alias format is only supported for TCP addresses, got bind_to: {}",
bind_to_str
));
}

let dial_to = Self::from_zmq_url(dial_to_str)?;
let bind_to = Self::from_zmq_url(bind_to_str)?;

return Ok(Self::Alias {
dial_to: Box::new(dial_to),
bind_to: Box::new(bind_to),
});
}

// Try ZMQ-style URL format first (scheme://...)
let (scheme, address) = address.split_once("://").ok_or_else(|| {
anyhow::anyhow!("address must be in url form scheme://endppoint {}", address)
Expand Down Expand Up @@ -850,6 +972,7 @@ pub fn dial<M: RemoteMessage>(addr: ChannelAddr) -> Result<ChannelTx<M>, Channel
ChannelAddr::MetaTls(meta_addr) => ChannelTxKind::MetaTls(net::meta::dial(meta_addr)?),
ChannelAddr::Sim(sim_addr) => ChannelTxKind::Sim(sim::dial::<M>(sim_addr)?),
ChannelAddr::Unix(path) => ChannelTxKind::Unix(net::unix::dial(path)),
ChannelAddr::Alias { dial_to, .. } => dial(*dial_to)?.inner,
};
Ok(ChannelTx { inner })
}
Expand All @@ -862,6 +985,19 @@ pub fn serve<M: RemoteMessage>(
addr: ChannelAddr,
) -> Result<(ChannelAddr, ChannelRx<M>), ChannelError> {
let caller = Location::caller();
serve_inner(addr).map(|(addr, inner)| {
tracing::debug!(
name = "serve",
%addr,
%caller,
);
(addr, ChannelRx { inner })
})
}

fn serve_inner<M: RemoteMessage>(
addr: ChannelAddr,
) -> Result<(ChannelAddr, ChannelRxKind<M>), ChannelError> {
match addr {
ChannelAddr::Tcp(addr) => {
let (addr, rx) = net::tcp::serve::<M>(addr)?;
Expand All @@ -887,15 +1023,15 @@ pub fn serve<M: RemoteMessage>(
"invalid local addr: {}",
a
))),
ChannelAddr::Alias { dial_to, bind_to } => {
let (bound_addr, rx) = serve_inner::<M>(*bind_to)?;
let alias_addr = ChannelAddr::Alias {
dial_to,
bind_to: Box::new(bound_addr),
};
Ok((alias_addr, rx))
}
}
.map(|(addr, inner)| {
tracing::debug!(
name = "serve",
%addr,
%caller,
);
(addr, ChannelRx { inner })
})
}

/// Serve on the local address. The server is turned down
Expand Down Expand Up @@ -1066,6 +1202,48 @@ mod tests {
assert!(ChannelAddr::from_zmq_url("inproc://not-a-number").is_err());
}

#[test]
fn test_zmq_style_alias_channel_addr() {
// Test Alias format: dial_to_url@bind_to_url
// The format is: dial_to_url@bind_to_url where both are valid ZMQ URLs
// Note: Alias format is only supported for TCP addresses

// Test Alias with tcp on both sides
let alias_addr = ChannelAddr::from_zmq_url("tcp://127.0.0.1:9000@tcp://[::]:8800").unwrap();
match alias_addr {
ChannelAddr::Alias { dial_to, bind_to } => {
assert_eq!(
*dial_to,
ChannelAddr::Tcp("127.0.0.1:9000".parse().unwrap())
);
assert_eq!(*bind_to, ChannelAddr::Tcp("[::]:8800".parse().unwrap()));
}
_ => panic!("Expected Alias"),
}

// Test error: alias with non-tcp dial_to (not supported)
assert!(
ChannelAddr::from_zmq_url("metatls://example.com:443@tcp://127.0.0.1:8080").is_err()
);

// Test error: alias with non-tcp bind_to (not supported)
assert!(
ChannelAddr::from_zmq_url("tcp://127.0.0.1:8080@metatls://example.com:443").is_err()
);

// Test error: invalid dial_to URL in Alias
assert!(ChannelAddr::from_zmq_url("invalid://scheme@tcp://127.0.0.1:8080").is_err());

// Test error: invalid bind_to URL in Alias
assert!(ChannelAddr::from_zmq_url("tcp://127.0.0.1:8080@invalid://scheme").is_err());

// Test error: missing port in dial_to
assert!(ChannelAddr::from_zmq_url("tcp://host@tcp://127.0.0.1:8080").is_err());

// Test error: missing port in bind_to
assert!(ChannelAddr::from_zmq_url("tcp://127.0.0.1:8080@tcp://example.com").is_err());
}

#[tokio::test]
async fn test_multiple_connections() {
for addr in ChannelTransport::all().map(ChannelAddr::any) {
Expand Down
7 changes: 7 additions & 0 deletions hyperactor_mesh/proptest-regressions/actor_mesh.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc 7da0353e3138258986cf2598af0836656a1f7c3399a9ffa18ca93cf983b3e64c # shrinks to extent = Extent { inner: ExtentData { labels: ["d/0"], sizes: [1] } }
5 changes: 4 additions & 1 deletion hyperactor_mesh/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,10 @@ async fn halt<R>() -> R {
unreachable!()
}

/// Bootstrap a host in this process, returning a handle to the mesh agent:
/// Bootstrap a host in this process, returning a handle to the mesh agent.
///
/// To obtain the local proc, use `GetLocalProc` on the returned host mesh agent,
/// then use `GetProc` on the returned proc mesh agent.
///
/// - `addr`: the listening address of the host; this is used to bind the frontend address;
/// - `command`: optional bootstrap command to spawn procs, otherwise [`BootstrapProcManager::current`];
Expand Down
20 changes: 17 additions & 3 deletions hyperactor_mesh/src/proc_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use hyperactor::WorldId;
use hyperactor::actor::ActorStatus;
use hyperactor::actor::remote::Remote;
use hyperactor::channel;
use hyperactor::channel::BindSpec;
use hyperactor::channel::ChannelAddr;
use hyperactor::channel::ChannelTransport;
use hyperactor::context;
Expand Down Expand Up @@ -96,11 +97,24 @@ declare_attrs! {
env_name: Some("HYPERACTOR_MESH_DEFAULT_TRANSPORT".to_string()),
py_name: Some("default_transport".to_string()),
})
pub attr DEFAULT_TRANSPORT: ChannelTransport = ChannelTransport::Unix;
pub attr DEFAULT_TRANSPORT: BindSpec = BindSpec::Any(ChannelTransport::Unix);
}

/// Get the default transport type to use across the application.
/// Temporary: used to support the legacy allocator-based V1 bootstrap. Should
/// be removed once we fully migrate to simple bootstrap.
///
/// Get the default transport to use across the application. Panic if BindSpec::Addr
/// is set as default transport. Since we expect BindSpec::Addr to be used only
/// with simple bootstrap, we should not see this panic in production.
pub fn default_transport() -> ChannelTransport {
match default_bind_spec() {
BindSpec::Any(transport) => transport,
BindSpec::Addr(addr) => panic!("default_bind_spec() returned BindSpec::Addr({addr})"),
}
}

/// Get the default bind spec to use across the application.
pub fn default_bind_spec() -> BindSpec {
global::get_cloned(DEFAULT_TRANSPORT)
}

Expand Down Expand Up @@ -187,7 +201,7 @@ pub fn global_root_client() -> &'static Instance<GlobalClientActor> {
)> = OnceLock::new();
&GLOBAL_INSTANCE.get_or_init(|| {
let client_proc = Proc::direct_with_default(
ChannelAddr::any(default_transport()),
default_bind_spec().any(),
"mesh_root_client_proc".into(),
router::global().clone().boxed(),
)
Expand Down
20 changes: 20 additions & 0 deletions hyperactor_mesh/src/proc_mesh/mesh_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,26 @@ impl Handler<NewClientInstance> for ProcMeshAgent {
}
}

/// A handler to get a clone of the proc managed by this agent.
/// This is used to obtain the local proc from a host mesh.
#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
pub struct GetProc {
#[reply]
pub proc: PortHandle<Proc>,
}

#[async_trait]
impl Handler<GetProc> for ProcMeshAgent {
async fn handle(
&mut self,
_cx: &Context<Self>,
GetProc { proc }: GetProc,
) -> anyhow::Result<()> {
proc.send(self.proc.clone())?;
Ok(())
}
}

/// A mailbox sender that initially queues messages, and then relays them to
/// an underlying sender once configured.
#[derive(Clone)]
Expand Down
6 changes: 5 additions & 1 deletion hyperactor_mesh/src/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub use host_mesh::HostMeshRef;
use hyperactor::ActorId;
use hyperactor::ActorRef;
use hyperactor::Named;
use hyperactor::ProcId;
use hyperactor::host::HostError;
use hyperactor::mailbox::MailboxSenderError;
use hyperactor::reference;
Expand Down Expand Up @@ -149,6 +150,9 @@ pub enum Error {
#[error("error spawning controller actor for mesh {0}: {1}")]
ControllerActorSpawnError(Name, anyhow::Error),

#[error("proc {0} must be direct-addressable")]
RankedProc(ProcId),

#[error("error: {0} does not exist")]
NotExist(Name),

Expand Down Expand Up @@ -253,7 +257,7 @@ impl Name {
}

/// Create a Reserved `Name` with no uuid. Only for use by system actors.
pub(crate) fn new_reserved(name: impl Into<String>) -> Result<Self> {
pub fn new_reserved(name: impl Into<String>) -> Result<Self> {
Ok(Self::new_with_uuid(name, None)?)
}

Expand Down
Loading
Loading