Skip to content

Commit 36429b7

Browse files
pzhan9facebook-github-bot
authored andcommitted
make client this_proc() real (#2144)
Summary: This builds on the previous change to implement a *real* in-process host and local proc mesh. Thus, nothing is fake anymore in the v1 path: this_proc() behaves the same everywhere, and is managed in precisely the same way. This also means that we can do things like hand out references to the local host (or proc), and have remove actors spawn (procs, actors) on it! It will also simplify integration: since the client proc is now a host, it has a single front-end address that multiplexes all its procs (including the local one). We keep around the old codepaths for the benefit of v0; this can be significantly simplified again once we drop v0 support. Differential Revision: D89196041
1 parent 3f99569 commit 36429b7

File tree

16 files changed

+306
-95
lines changed

16 files changed

+306
-95
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Seeds for failure cases proptest has generated in the past. It is
2+
# automatically read and these particular cases re-run before any
3+
# novel cases are generated.
4+
#
5+
# It is recommended to check this file in to source control so that
6+
# everyone who runs the test benefits from these saved cases.
7+
cc 7da0353e3138258986cf2598af0836656a1f7c3399a9ffa18ca93cf983b3e64c # shrinks to extent = Extent { inner: ExtentData { labels: ["d/0"], sizes: [1] } }

hyperactor_mesh/src/bootstrap.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,10 @@ async fn halt<R>() -> R {
266266
unreachable!()
267267
}
268268

269-
/// Bootstrap a host in this process, returning a handle to the mesh agent:
269+
/// Bootstrap a host in this process, returning a handle to the mesh agent.
270+
///
271+
/// To obtain the local proc, use `GetLocalProc` on the returned host mesh agent,
272+
/// then use `GetProc` on the returned proc mesh agent.
270273
///
271274
/// - `addr`: the listening address of the host; this is used to bind the frontend address;
272275
/// - `command`: optional bootstrap command to spawn procs, otherwise [`BootstrapProcManager::current`];

hyperactor_mesh/src/proc_mesh/mesh_agent.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,26 @@ impl Handler<NewClientInstance> for ProcMeshAgent {
795795
}
796796
}
797797

798+
/// A handler to get a clone of the proc managed by this agent.
799+
/// This is used to obtain the local proc from a host mesh.
800+
#[derive(Debug, hyperactor::Handler, hyperactor::HandleClient)]
801+
pub struct GetProc {
802+
#[reply]
803+
pub proc: PortHandle<Proc>,
804+
}
805+
806+
#[async_trait]
807+
impl Handler<GetProc> for ProcMeshAgent {
808+
async fn handle(
809+
&mut self,
810+
_cx: &Context<Self>,
811+
GetProc { proc }: GetProc,
812+
) -> anyhow::Result<()> {
813+
proc.send(self.proc.clone())?;
814+
Ok(())
815+
}
816+
}
817+
798818
/// A mailbox sender that initially queues messages, and then relays them to
799819
/// an underlying sender once configured.
800820
#[derive(Clone)]

hyperactor_mesh/src/v1.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub use host_mesh::HostMeshRef;
2828
use hyperactor::ActorId;
2929
use hyperactor::ActorRef;
3030
use hyperactor::Named;
31+
use hyperactor::ProcId;
3132
use hyperactor::host::HostError;
3233
use hyperactor::mailbox::MailboxSenderError;
3334
use hyperactor::reference;
@@ -149,6 +150,9 @@ pub enum Error {
149150
#[error("error spawning controller actor for mesh {0}: {1}")]
150151
ControllerActorSpawnError(Name, anyhow::Error),
151152

153+
#[error("proc {0} must be direct-addressable")]
154+
RankedProc(ProcId),
155+
152156
#[error("error: {0} does not exist")]
153157
NotExist(Name),
154158

@@ -253,7 +257,7 @@ impl Name {
253257
}
254258

255259
/// Create a Reserved `Name` with no uuid. Only for use by system actors.
256-
pub(crate) fn new_reserved(name: impl Into<String>) -> Result<Self> {
260+
pub fn new_reserved(name: impl Into<String>) -> Result<Self> {
257261
Ok(Self::new_with_uuid(name, None)?)
258262
}
259263

hyperactor_mesh/src/v1/host_mesh.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,18 @@ impl HostRef {
147147
}
148148
}
149149

150+
impl TryFrom<ActorRef<HostMeshAgent>> for HostRef {
151+
type Error = v1::Error;
152+
153+
fn try_from(value: ActorRef<HostMeshAgent>) -> Result<Self, v1::Error> {
154+
let proc_id = value.actor_id().proc_id();
155+
match proc_id.as_direct() {
156+
Some((addr, _)) => Ok(HostRef(addr.clone())),
157+
None => Err(v1::Error::RankedProc(proc_id.clone())),
158+
}
159+
}
160+
}
161+
150162
impl std::fmt::Display for HostRef {
151163
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152164
self.0.fmt(f)
@@ -734,6 +746,20 @@ impl HostMeshRef {
734746
}
735747
}
736748

749+
/// Create a new HostMeshRef from an arbitrary set of host mesh agents.
750+
pub fn from_host_agents(name: Name, agents: Vec<ActorRef<HostMeshAgent>>) -> v1::Result<Self> {
751+
Ok(Self {
752+
name,
753+
region: extent!(hosts = agents.len()).into(),
754+
ranks: Arc::new(
755+
agents
756+
.into_iter()
757+
.map(HostRef::try_from)
758+
.collect::<v1::Result<_>>()?,
759+
),
760+
})
761+
}
762+
737763
/// Spawn a ProcMesh onto this host mesh. The per_host extent specifies the shape
738764
/// of the procs to spawn on each host.
739765
///

hyperactor_mesh/src/v1/proc_mesh.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ pub struct ProcRef {
106106
}
107107

108108
impl ProcRef {
109-
pub(crate) fn new(proc_id: ProcId, create_rank: usize, agent: ActorRef<ProcMeshAgent>) -> Self {
109+
/// Create a new proc ref from the provided id, create rank and agent.
110+
pub fn new(proc_id: ProcId, create_rank: usize, agent: ActorRef<ProcMeshAgent>) -> Self {
110111
Self {
111112
proc_id,
112113
create_rank,
@@ -713,6 +714,20 @@ impl ProcMeshRef {
713714
})
714715
}
715716

717+
/// Create a singleton ProcMeshRef, given the provided ProcRef and name.
718+
/// This is used to support creating local singleton proc meshes to support `this_proc()`
719+
/// in python client actors.
720+
pub fn new_singleton(name: Name, proc_ref: ProcRef) -> Self {
721+
Self {
722+
name,
723+
region: Extent::unity().into(),
724+
ranks: Arc::new(vec![proc_ref]),
725+
host_mesh: None,
726+
root_region: None,
727+
root_comm_actor: None,
728+
}
729+
}
730+
716731
pub(crate) fn root_comm_actor(&self) -> Option<&ActorRef<CommActor>> {
717732
self.root_comm_actor.as_ref()
718733
}

monarch_hyperactor/src/actor.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,8 @@ impl PythonActor {
520520
})
521521
}
522522

523+
/// Bootstrap the root client actor, creating a new proc for it.
524+
/// This is the legacy entry point that creates its own proc.
523525
pub(crate) fn bootstrap_client(py: Python<'_>) -> (&'static Instance<Self>, ActorHandle<Self>) {
524526
static ROOT_CLIENT_INSTANCE: OnceLock<Instance<PythonActor>> = OnceLock::new();
525527

@@ -530,8 +532,21 @@ impl PythonActor {
530532
)
531533
.unwrap();
532534

535+
Self::bootstrap_client_inner(py, client_proc, &ROOT_CLIENT_INSTANCE)
536+
}
537+
538+
/// Bootstrap the client proc, storing the root client instance in given static.
539+
/// This is passed in because we require storage, as the instance is shared.
540+
/// This can be simplified when we remove v0.
541+
pub(crate) fn bootstrap_client_inner(
542+
py: Python<'_>,
543+
client_proc: Proc,
544+
root_client_instance: &'static OnceLock<Instance<PythonActor>>,
545+
) -> (&'static Instance<Self>, ActorHandle<Self>) {
533546
// Make this proc reachable through the global router, so that we can use the
534547
// same client in both direct-addressed and ranked-addressed modes.
548+
//
549+
// DEPRECATE after v0 removal
535550
router::global().bind(client_proc.proc_id().clone().into(), client_proc.clone());
536551

537552
let actor_mesh_mod = py
@@ -557,7 +572,7 @@ impl PythonActor {
557572
)
558573
.expect("root instance create");
559574

560-
ROOT_CLIENT_INSTANCE
575+
root_client_instance
561576
.set(client)
562577
.map_err(|_| "already initialized root client instance")
563578
.unwrap();
@@ -577,7 +592,7 @@ impl PythonActor {
577592
)
578593
.expect("initialize root client");
579594

580-
let instance = ROOT_CLIENT_INSTANCE.get().unwrap();
595+
let instance = root_client_instance.get().unwrap();
581596

582597
get_tokio_runtime().spawn(async move {
583598
let mut signal_rx = signal_rx;
@@ -626,7 +641,7 @@ impl PythonActor {
626641
instance.proc().handle_supervision_event(event);
627642
});
628643

629-
(ROOT_CLIENT_INSTANCE.get().unwrap(), handle)
644+
(root_client_instance.get().unwrap(), handle)
630645
}
631646
}
632647

monarch_hyperactor/src/context.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,17 @@ impl PyContext {
126126
rank: Extent::unity().point_of_rank(0).unwrap(),
127127
})
128128
}
129+
130+
/// Create a context from an existing instance.
131+
/// This is used when the root client was bootstrapped via bootstrap_host()
132+
/// instead of the default bootstrap_client().
133+
#[staticmethod]
134+
fn _from_instance(py: Python<'_>, instance: PyInstance) -> PyResult<PyContext> {
135+
Ok(PyContext {
136+
instance: instance.into_pyobject(py)?.into(),
137+
rank: Extent::unity().point_of_rank(0).unwrap(),
138+
})
139+
}
129140
}
130141

131142
impl PyContext {

monarch_hyperactor/src/v1/host_mesh.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,20 @@
99
use std::collections::HashMap;
1010
use std::ops::Deref;
1111
use std::path::PathBuf;
12+
use std::sync::OnceLock;
1213

14+
use hyperactor::Instance;
15+
use hyperactor::Proc;
1316
use hyperactor_mesh::bootstrap::BootstrapCommand;
17+
use hyperactor_mesh::bootstrap::host;
18+
use hyperactor_mesh::proc_mesh::default_transport;
19+
use hyperactor_mesh::proc_mesh::mesh_agent::GetProcClient;
1420
use hyperactor_mesh::shared_cell::SharedCell;
21+
use hyperactor_mesh::v1::ProcMeshRef;
1522
use hyperactor_mesh::v1::host_mesh::HostMesh;
1623
use hyperactor_mesh::v1::host_mesh::HostMeshRef;
24+
use hyperactor_mesh::v1::host_mesh::mesh_agent::GetLocalProcClient;
25+
use hyperactor_mesh::v1::proc_mesh::ProcRef;
1726
use ndslice::View;
1827
use ndslice::view::RankedSliceable;
1928
use pyo3::IntoPyObjectExt;
@@ -24,6 +33,7 @@ use pyo3::prelude::*;
2433
use pyo3::types::PyBytes;
2534
use pyo3::types::PyType;
2635

36+
use crate::actor::PythonActor;
2737
use crate::actor::to_py_error;
2838
use crate::alloc::PyAlloc;
2939
use crate::context::PyInstance;
@@ -240,6 +250,76 @@ impl PyHostMeshRefImpl {
240250
}
241251
}
242252

253+
/// Static storage for the root client instance when using host-based bootstrap.
254+
static ROOT_CLIENT_INSTANCE_FOR_HOST: OnceLock<Instance<PythonActor>> = OnceLock::new();
255+
256+
/// Bootstrap the client host and root client actor.
257+
///
258+
/// This creates a proper Host with BootstrapProcManager, spawns the root client
259+
/// actor on the Host's local_proc.
260+
///
261+
/// Returns a tuple of (HostMesh, ProcMesh, PyInstance) where:
262+
/// - PyHostMesh: the bootstrapped (local) host mesh; and
263+
/// - PyProcMesh: the local ProcMesh on this HostMesh; and
264+
/// - PyInstance: the root client actor instance, on the ProcMesh.
265+
///
266+
/// The HostMesh is served on the default transport.
267+
///
268+
/// This should be called only once, at process initialization
269+
#[pyfunction]
270+
fn bootstrap_host(bootstrap_cmd: Option<PyBootstrapCommand>) -> PyResult<PyPythonTask> {
271+
let bootstrap_cmd = match bootstrap_cmd {
272+
Some(cmd) => cmd.to_rust(),
273+
None => BootstrapCommand::current().map_err(|e| PyException::new_err(e.to_string()))?,
274+
};
275+
276+
PyPythonTask::new(async move {
277+
let host_mesh_agent = host(default_transport().any(), Some(bootstrap_cmd), None)
278+
.await
279+
.map_err(|e| PyException::new_err(e.to_string()))?;
280+
281+
let host_mesh_name = hyperactor_mesh::v1::Name::new_reserved("local").unwrap();
282+
let host_mesh = HostMeshRef::from_host_agents(host_mesh_name, vec![host_mesh_agent.bind()])
283+
.map_err(|e| PyException::new_err(e.to_string()))?;
284+
285+
// We require a temporary instance to make a call to the host/proc agent.
286+
let temp_proc = Proc::local();
287+
let (temp_instance, _) = temp_proc
288+
.instance("temp")
289+
.map_err(|e| PyException::new_err(e.to_string()))?;
290+
291+
let local_proc_agent = host_mesh_agent
292+
.get_local_proc(&temp_instance)
293+
.await
294+
.map_err(|e| PyException::new_err(e.to_string()))?;
295+
296+
let proc_mesh_name = hyperactor_mesh::v1::Name::new_reserved("local").unwrap();
297+
let proc_mesh = ProcMeshRef::new_singleton(
298+
proc_mesh_name,
299+
ProcRef::new(
300+
local_proc_agent.actor_id().proc_id().clone(),
301+
0,
302+
local_proc_agent.bind(),
303+
),
304+
);
305+
306+
let local_proc = local_proc_agent
307+
.get_proc(&temp_instance)
308+
.await
309+
.map_err(|e| PyException::new_err(e.to_string()))?;
310+
311+
let (instance, _handle) = Python::with_gil(|py| {
312+
PythonActor::bootstrap_client_inner(py, local_proc, &ROOT_CLIENT_INSTANCE_FOR_HOST)
313+
});
314+
315+
Ok((
316+
PyHostMesh::new_ref(host_mesh),
317+
PyProcMesh::new_ref(proc_mesh),
318+
PyInstance::from(instance),
319+
))
320+
})
321+
}
322+
243323
#[pyfunction]
244324
fn py_host_mesh_from_bytes(bytes: &Bound<'_, PyBytes>) -> PyResult<PyHostMesh> {
245325
let r: PyResult<HostMeshRef> = bincode::deserialize(bytes.as_bytes())
@@ -254,6 +334,14 @@ pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResul
254334
"monarch._rust_bindings.monarch_hyperactor.v1.host_mesh",
255335
)?;
256336
hyperactor_mod.add_function(f)?;
337+
338+
let f2 = wrap_pyfunction!(bootstrap_host, hyperactor_mod)?;
339+
f2.setattr(
340+
"__module__",
341+
"monarch._rust_bindings.monarch_hyperactor.v1.host_mesh",
342+
)?;
343+
hyperactor_mod.add_function(f2)?;
344+
257345
hyperactor_mod.add_class::<PyHostMesh>()?;
258346
hyperactor_mod.add_class::<PyBootstrapCommand>()?;
259347
Ok(())

python/monarch/_rust_bindings/monarch_hyperactor/v1/host_mesh.pyi

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,15 @@ class BootstrapCommand:
101101
...
102102

103103
def __repr__(self) -> str: ...
104+
105+
def bootstrap_host(
106+
bootstrap_cmd: BootstrapCommand | None,
107+
) -> PythonTask[(HostMesh, ProcMesh, Instance)]:
108+
"""
109+
Bootstrap a host mesh in this process, returning the host mesh,
110+
proc mesh, and client instance.
111+
112+
Arguments:
113+
- `bootstrap_cmd`: The bootstrap command to use to bootstrap the host.
114+
"""
115+
...

0 commit comments

Comments
 (0)